VictoriaMetrics/lib/logstorage/pipe.go

100 lines
3.4 KiB
Go
Raw Normal View History

2024-04-25 22:19:58 +00:00
package logstorage
import (
"fmt"
)
type pipe interface {
2024-04-26 21:47:50 +00:00
// String returns string representation of the pipe.
2024-04-25 22:19:58 +00:00
String() string
2024-04-26 21:47:50 +00:00
// newPipeProcessor must return new pipeProcessor for the given ppBase.
//
// workersCount is the number of goroutine workers, which will call writeBlock() method.
//
// If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds.
// It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds.
//
2024-04-27 20:08:03 +00:00
// The returned pipeProcessor may call cancel() at any time in order to notify worker goroutines to stop sending new data to pipeProcessor.
2024-04-26 21:47:50 +00:00
newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor
}
// pipeProcessor must process a single pipe.
type pipeProcessor interface {
// writeBlock must write the given block of data to the given pipeProcessor.
//
2024-04-27 20:08:03 +00:00
// writeBlock is called concurrently from worker goroutines.
// The workerID is the id of the worker goroutine, which calls the writeBlock.
2024-04-26 21:47:50 +00:00
// It is in the range 0 ... workersCount-1 .
//
// It is forbidden to hold references to columns after returning from writeBlock, since the caller re-uses columns.
2024-04-27 20:08:03 +00:00
//
// If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines
// to stop sending new data. The occurred error must be returned from flush().
//
// cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error.
2024-04-26 21:47:50 +00:00
writeBlock(workerID uint, timestamps []int64, columns []BlockColumn)
// flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor.
//
2024-04-27 20:08:03 +00:00
// flush is called after all the worker goroutines are stopped.
//
// It is guaranteed that flush() is called for every pipeProcessor returned from pipe.newPipeProcessor().
flush() error
2024-04-26 21:47:50 +00:00
}
type defaultPipeProcessor func(workerID uint, timestamps []int64, columns []BlockColumn)
func newDefaultPipeProcessor(writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) pipeProcessor {
return defaultPipeProcessor(writeBlock)
}
func (dpp defaultPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
dpp(workerID, timestamps, columns)
}
2024-04-27 20:08:03 +00:00
func (dpp defaultPipeProcessor) flush() error {
return nil
2024-04-25 22:19:58 +00:00
}
func parsePipes(lex *lexer) ([]pipe, error) {
var pipes []pipe
2024-04-26 23:53:32 +00:00
for !lex.isKeyword(")", "") {
2024-04-25 22:19:58 +00:00
if !lex.isKeyword("|") {
return nil, fmt.Errorf("expecting '|'")
}
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing token after '|'")
}
switch {
case lex.isKeyword("fields"):
2024-04-29 01:27:46 +00:00
pf, err := parsePipeFields(lex)
2024-04-25 22:19:58 +00:00
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
}
2024-04-29 01:27:46 +00:00
pipes = append(pipes, pf)
2024-04-25 22:19:58 +00:00
case lex.isKeyword("stats"):
2024-04-29 01:32:13 +00:00
ps, err := parsePipeStats(lex)
2024-04-25 22:19:58 +00:00
if err != nil {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
}
2024-04-29 01:30:25 +00:00
pipes = append(pipes, ps)
2024-04-27 00:50:19 +00:00
case lex.isKeyword("head"):
2024-04-29 01:32:13 +00:00
ph, err := parsePipeHead(lex)
2024-04-27 00:50:19 +00:00
if err != nil {
return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err)
}
2024-04-29 01:32:13 +00:00
pipes = append(pipes, ph)
2024-04-27 01:14:00 +00:00
case lex.isKeyword("skip"):
2024-04-29 01:32:49 +00:00
ps, err := parsePipeSkip(lex)
2024-04-27 01:14:00 +00:00
if err != nil {
return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err)
}
2024-04-29 01:30:25 +00:00
pipes = append(pipes, ps)
2024-04-25 22:19:58 +00:00
default:
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
}
}
return pipes, nil
}