This commit is contained in:
Aliaksandr Valialkin 2024-04-29 03:32:13 +02:00
parent fa8c611361
commit 16352baba9
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -83,17 +83,17 @@ func parsePipes(lex *lexer) ([]pipe, error) {
} }
pipes = append(pipes, pf) pipes = append(pipes, pf)
case lex.isKeyword("stats"): case lex.isKeyword("stats"):
ps, err := parseStatsPipe(lex) ps, err := parsePipeStats(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
} }
pipes = append(pipes, ps) pipes = append(pipes, ps)
case lex.isKeyword("head"): case lex.isKeyword("head"):
hp, err := parseHeadPipe(lex) ph, err := parsePipeHead(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err)
} }
pipes = append(pipes, hp) pipes = append(pipes, ph)
case lex.isKeyword("skip"): case lex.isKeyword("skip"):
ps, err := parseSkipPipe(lex) ps, err := parseSkipPipe(lex)
if err != nil { if err != nil {
@ -555,7 +555,7 @@ func (ps *pipeStats) neededFields() []string {
return neededFields return neededFields
} }
func parseStatsPipe(lex *lexer) (*pipeStats, error) { func parsePipeStats(lex *lexer) (*pipeStats, error) {
if !lex.mustNextToken() { if !lex.mustNextToken() {
return nil, fmt.Errorf("missing stats config") return nil, fmt.Errorf("missing stats config")
} }
@ -626,37 +626,37 @@ func parseResultName(lex *lexer) (string, error) {
return resultName, nil return resultName, nil
} }
type headPipe struct { type pipeHead struct {
n uint64 n uint64
} }
func (hp *headPipe) String() string { func (ph *pipeHead) String() string {
return fmt.Sprintf("head %d", hp.n) return fmt.Sprintf("head %d", ph.n)
} }
func (hp *headPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (ph *pipeHead) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
if hp.n == 0 { if ph.n == 0 {
// Special case - notify the caller to stop writing data to the returned headPipeProcessor // Special case - notify the caller to stop writing data to the returned pipeHeadProcessor
cancel() cancel()
} }
return &headPipeProcessor{ return &pipeHeadProcessor{
hp: hp, ph: ph,
cancel: cancel, cancel: cancel,
ppBase: ppBase, ppBase: ppBase,
} }
} }
type headPipeProcessor struct { type pipeHeadProcessor struct {
hp *headPipe ph *pipeHead
cancel func() cancel func()
ppBase pipeProcessor ppBase pipeProcessor
rowsProcessed atomic.Uint64 rowsProcessed atomic.Uint64
} }
func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { func (hpp *pipeHeadProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
rowsProcessed := hpp.rowsProcessed.Add(uint64(len(timestamps))) rowsProcessed := hpp.rowsProcessed.Add(uint64(len(timestamps)))
if rowsProcessed <= hpp.hp.n { if rowsProcessed <= hpp.ph.n {
// Fast path - write all the rows to ppBase. // Fast path - write all the rows to ppBase.
hpp.ppBase.writeBlock(workerID, timestamps, columns) hpp.ppBase.writeBlock(workerID, timestamps, columns)
return return
@ -664,13 +664,13 @@ func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu
// Slow path - overflow. Write the remaining rows if needed. // Slow path - overflow. Write the remaining rows if needed.
rowsProcessed -= uint64(len(timestamps)) rowsProcessed -= uint64(len(timestamps))
if rowsProcessed >= hpp.hp.n { if rowsProcessed >= hpp.ph.n {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return return
} }
// Write remaining rows. // Write remaining rows.
rowsRemaining := hpp.hp.n - rowsProcessed rowsRemaining := hpp.ph.n - rowsProcessed
cs := make([]BlockColumn, len(columns)) cs := make([]BlockColumn, len(columns))
for i, c := range columns { for i, c := range columns {
cDst := &cs[i] cDst := &cs[i]
@ -684,11 +684,11 @@ func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu
hpp.cancel() hpp.cancel()
} }
func (hpp *headPipeProcessor) flush() error { func (hpp *pipeHeadProcessor) flush() error {
return nil return nil
} }
func parseHeadPipe(lex *lexer) (*headPipe, error) { func parsePipeHead(lex *lexer) (*pipeHead, error) {
if !lex.mustNextToken() { if !lex.mustNextToken() {
return nil, fmt.Errorf("missing the number of head rows to return") return nil, fmt.Errorf("missing the number of head rows to return")
} }
@ -697,10 +697,10 @@ func parseHeadPipe(lex *lexer) (*headPipe, error) {
return nil, fmt.Errorf("cannot parse the number of head rows to return %q: %w", lex.token, err) return nil, fmt.Errorf("cannot parse the number of head rows to return %q: %w", lex.token, err)
} }
lex.nextToken() lex.nextToken()
hp := &headPipe{ ph := &pipeHead{
n: n, n: n,
} }
return hp, nil return ph, nil
} }
type skipPipe struct { type skipPipe struct {