From 7c4c8d45d6ad253158a1c81f7350241283731ccd Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Apr 2024 03:32:49 +0200 Subject: [PATCH] wip --- lib/logstorage/pipes.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 09ea8ec6f..79a10a236 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -95,7 +95,7 @@ func parsePipes(lex *lexer) ([]pipe, error) { } pipes = append(pipes, ph) case lex.isKeyword("skip"): - ps, err := parseSkipPipe(lex) + ps, err := parsePipeSkip(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err) } @@ -703,29 +703,29 @@ func parsePipeHead(lex *lexer) (*pipeHead, error) { return ph, nil } -type skipPipe struct { +type pipeSkip struct { n uint64 } -func (ps *skipPipe) String() string { +func (ps *pipeSkip) String() string { return fmt.Sprintf("skip %d", ps.n) } -func (ps *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return &skipPipeProcessor{ +func (ps *pipeSkip) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + return &pipeSkipProcessor{ ps: ps, ppBase: ppBase, } } -type skipPipeProcessor struct { - ps *skipPipe +type pipeSkipProcessor struct { + ps *pipeSkip ppBase pipeProcessor rowsProcessed atomic.Uint64 } -func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { +func (spp *pipeSkipProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { rowsProcessed := spp.rowsProcessed.Add(uint64(len(timestamps))) if rowsProcessed <= spp.ps.n { return @@ -748,11 +748,11 @@ func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu spp.ppBase.writeBlock(workerID, timestamps, cs) } -func (spp *skipPipeProcessor) flush() error { +func (spp *pipeSkipProcessor) flush() error { return nil } -func parseSkipPipe(lex *lexer) (*skipPipe, error) { +func parsePipeSkip(lex *lexer) (*pipeSkip, error) { if !lex.mustNextToken() { return nil, fmt.Errorf("missing the number of rows to skip") } @@ -761,7 +761,7 @@ func parseSkipPipe(lex *lexer) (*skipPipe, error) { return nil, fmt.Errorf("cannot parse the number of rows to skip %q: %w", lex.token, err) } lex.nextToken() - ps := &skipPipe{ + ps := &pipeSkip{ n: n, } return ps, nil