diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 91d9c4486..09ea8ec6f 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -83,17 +83,17 @@ func parsePipes(lex *lexer) ([]pipe, error) { } pipes = append(pipes, pf) case lex.isKeyword("stats"): - ps, err := parseStatsPipe(lex) + ps, err := parsePipeStats(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } pipes = append(pipes, ps) case lex.isKeyword("head"): - hp, err := parseHeadPipe(lex) + ph, err := parsePipeHead(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err) } - pipes = append(pipes, hp) + pipes = append(pipes, ph) case lex.isKeyword("skip"): ps, err := parseSkipPipe(lex) if err != nil { @@ -555,7 +555,7 @@ func (ps *pipeStats) neededFields() []string { return neededFields } -func parseStatsPipe(lex *lexer) (*pipeStats, error) { +func parsePipeStats(lex *lexer) (*pipeStats, error) { if !lex.mustNextToken() { return nil, fmt.Errorf("missing stats config") } @@ -626,37 +626,37 @@ func parseResultName(lex *lexer) (string, error) { return resultName, nil } -type headPipe struct { +type pipeHead struct { n uint64 } -func (hp *headPipe) String() string { - return fmt.Sprintf("head %d", hp.n) +func (ph *pipeHead) String() string { + return fmt.Sprintf("head %d", ph.n) } -func (hp *headPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { - if hp.n == 0 { - // Special case - notify the caller to stop writing data to the returned headPipeProcessor +func (ph *pipeHead) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + if ph.n == 0 { + // Special case - notify the caller to stop writing data to the returned pipeHeadProcessor cancel() } - return &headPipeProcessor{ - hp: hp, + return &pipeHeadProcessor{ + ph: ph, cancel: cancel, ppBase: ppBase, } } -type headPipeProcessor struct { - hp *headPipe +type pipeHeadProcessor struct { + ph *pipeHead cancel func() ppBase pipeProcessor rowsProcessed atomic.Uint64 } -func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { +func (hpp *pipeHeadProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { rowsProcessed := hpp.rowsProcessed.Add(uint64(len(timestamps))) - if rowsProcessed <= hpp.hp.n { + if rowsProcessed <= hpp.ph.n { // Fast path - write all the rows to ppBase. hpp.ppBase.writeBlock(workerID, timestamps, columns) return @@ -664,13 +664,13 @@ func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu // Slow path - overflow. Write the remaining rows if needed. rowsProcessed -= uint64(len(timestamps)) - if rowsProcessed >= hpp.hp.n { + if rowsProcessed >= hpp.ph.n { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return } // Write remaining rows. - rowsRemaining := hpp.hp.n - rowsProcessed + rowsRemaining := hpp.ph.n - rowsProcessed cs := make([]BlockColumn, len(columns)) for i, c := range columns { cDst := &cs[i] @@ -684,11 +684,11 @@ func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu hpp.cancel() } -func (hpp *headPipeProcessor) flush() error { +func (hpp *pipeHeadProcessor) flush() error { return nil } -func parseHeadPipe(lex *lexer) (*headPipe, error) { +func parsePipeHead(lex *lexer) (*pipeHead, error) { if !lex.mustNextToken() { return nil, fmt.Errorf("missing the number of head rows to return") } @@ -697,10 +697,10 @@ func parseHeadPipe(lex *lexer) (*headPipe, error) { return nil, fmt.Errorf("cannot parse the number of head rows to return %q: %w", lex.token, err) } lex.nextToken() - hp := &headPipe{ + ph := &pipeHead{ n: n, } - return hp, nil + return ph, nil } type skipPipe struct {