This commit is contained in:
Aliaksandr Valialkin 2024-04-29 03:32:49 +02:00
parent 16352baba9
commit 7c4c8d45d6
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -95,7 +95,7 @@ func parsePipes(lex *lexer) ([]pipe, error) {
} }
pipes = append(pipes, ph) pipes = append(pipes, ph)
case lex.isKeyword("skip"): case lex.isKeyword("skip"):
ps, err := parseSkipPipe(lex) ps, err := parsePipeSkip(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err)
} }
@ -703,29 +703,29 @@ func parsePipeHead(lex *lexer) (*pipeHead, error) {
return ph, nil return ph, nil
} }
type skipPipe struct { type pipeSkip struct {
n uint64 n uint64
} }
func (ps *skipPipe) String() string { func (ps *pipeSkip) String() string {
return fmt.Sprintf("skip %d", ps.n) return fmt.Sprintf("skip %d", ps.n)
} }
func (ps *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (ps *pipeSkip) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return &skipPipeProcessor{ return &pipeSkipProcessor{
ps: ps, ps: ps,
ppBase: ppBase, ppBase: ppBase,
} }
} }
type skipPipeProcessor struct { type pipeSkipProcessor struct {
ps *skipPipe ps *pipeSkip
ppBase pipeProcessor ppBase pipeProcessor
rowsProcessed atomic.Uint64 rowsProcessed atomic.Uint64
} }
func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { func (spp *pipeSkipProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
rowsProcessed := spp.rowsProcessed.Add(uint64(len(timestamps))) rowsProcessed := spp.rowsProcessed.Add(uint64(len(timestamps)))
if rowsProcessed <= spp.ps.n { if rowsProcessed <= spp.ps.n {
return return
@ -748,11 +748,11 @@ func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu
spp.ppBase.writeBlock(workerID, timestamps, cs) spp.ppBase.writeBlock(workerID, timestamps, cs)
} }
func (spp *skipPipeProcessor) flush() error { func (spp *pipeSkipProcessor) flush() error {
return nil return nil
} }
func parseSkipPipe(lex *lexer) (*skipPipe, error) { func parsePipeSkip(lex *lexer) (*pipeSkip, error) {
if !lex.mustNextToken() { if !lex.mustNextToken() {
return nil, fmt.Errorf("missing the number of rows to skip") return nil, fmt.Errorf("missing the number of rows to skip")
} }
@ -761,7 +761,7 @@ func parseSkipPipe(lex *lexer) (*skipPipe, error) {
return nil, fmt.Errorf("cannot parse the number of rows to skip %q: %w", lex.token, err) return nil, fmt.Errorf("cannot parse the number of rows to skip %q: %w", lex.token, err)
} }
lex.nextToken() lex.nextToken()
ps := &skipPipe{ ps := &pipeSkip{
n: n, n: n,
} }
return ps, nil return ps, nil