VictoriaMetrics/lib/logstorage/pipe.go

213 lines
6.6 KiB
Go
Raw Normal View History

2024-04-25 22:19:58 +00:00
package logstorage
import (
"fmt"
)
type pipe interface {
2024-04-26 21:47:50 +00:00
// String returns string representation of the pipe.
2024-04-25 22:19:58 +00:00
String() string
2024-04-26 21:47:50 +00:00
2024-05-09 00:52:28 +00:00
// updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input.
updateNeededFields(neededFields, unneededFields fieldsSet)
2024-05-04 22:28:01 +00:00
2024-04-26 21:47:50 +00:00
// newPipeProcessor must return new pipeProcessor for the given ppBase.
//
// workersCount is the number of goroutine workers, which will call writeBlock() method.
//
// If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds.
// It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds.
//
2024-04-27 20:08:03 +00:00
// The returned pipeProcessor may call cancel() at any time in order to notify worker goroutines to stop sending new data to pipeProcessor.
2024-04-26 21:47:50 +00:00
newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor
2024-05-25 12:37:26 +00:00
// optimize must optimize the pipe
optimize()
// hasFilterInWithQuery must return true of pipe contains 'in(subquery)' filter (recursively).
hasFilterInWithQuery() bool
// initFilterInValues must return new pipe with the initialized values for 'in(subquery)' filters (recursively).
//
// It is OK to return the pipe itself if it doesn't contain 'in(subquery)' filters.
initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error)
2024-04-26 21:47:50 +00:00
}
// pipeProcessor must process a single pipe.
type pipeProcessor interface {
// writeBlock must write the given block of data to the given pipeProcessor.
//
2024-04-27 20:08:03 +00:00
// writeBlock is called concurrently from worker goroutines.
// The workerID is the id of the worker goroutine, which calls the writeBlock.
2024-04-26 21:47:50 +00:00
// It is in the range 0 ... workersCount-1 .
//
2024-04-30 23:19:22 +00:00
// It is OK to modify br contents inside writeBlock. The caller mustn't rely on br contents after writeBlock call.
// It is forbidden to hold references to br after returning from writeBlock, since the caller may re-use it.
2024-04-27 20:08:03 +00:00
//
// If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines
// to stop sending new data. The occurred error must be returned from flush().
//
// cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error.
2024-04-30 21:03:34 +00:00
writeBlock(workerID uint, br *blockResult)
2024-04-26 21:47:50 +00:00
// flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor.
//
2024-04-27 20:08:03 +00:00
// flush is called after all the worker goroutines are stopped.
//
// It is guaranteed that flush() is called for every pipeProcessor returned from pipe.newPipeProcessor().
flush() error
2024-04-26 21:47:50 +00:00
}
2024-04-30 21:03:34 +00:00
type defaultPipeProcessor func(workerID uint, br *blockResult)
2024-04-26 21:47:50 +00:00
2024-04-30 21:03:34 +00:00
func newDefaultPipeProcessor(writeBlock func(workerID uint, br *blockResult)) pipeProcessor {
2024-04-26 21:47:50 +00:00
return defaultPipeProcessor(writeBlock)
}
2024-04-30 21:03:34 +00:00
func (dpp defaultPipeProcessor) writeBlock(workerID uint, br *blockResult) {
dpp(workerID, br)
2024-04-26 21:47:50 +00:00
}
2024-04-27 20:08:03 +00:00
func (dpp defaultPipeProcessor) flush() error {
return nil
2024-04-25 22:19:58 +00:00
}
func parsePipes(lex *lexer) ([]pipe, error) {
var pipes []pipe
2024-05-21 19:18:05 +00:00
for {
2024-05-19 21:27:52 +00:00
p, err := parsePipe(lex)
if err != nil {
return nil, err
2024-04-25 22:19:58 +00:00
}
2024-05-19 21:27:52 +00:00
pipes = append(pipes, p)
2024-05-21 19:18:05 +00:00
switch {
case lex.isKeyword("|"):
lex.nextToken()
case lex.isKeyword(")", ""):
return pipes, nil
}
2024-04-25 22:19:58 +00:00
}
}
2024-05-19 21:27:52 +00:00
func parsePipe(lex *lexer) (pipe, error) {
switch {
case lex.isKeyword("copy", "cp"):
pc, err := parsePipeCopy(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err)
}
return pc, nil
case lex.isKeyword("delete", "del", "rm"):
pd, err := parsePipeDelete(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
}
return pd, nil
case lex.isKeyword("extract"):
pe, err := parsePipeExtract(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err)
}
return pe, nil
case lex.isKeyword("field_names"):
pf, err := parsePipeFieldNames(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err)
}
return pf, nil
2024-05-22 13:29:18 +00:00
case lex.isKeyword("fields", "keep"):
2024-05-19 21:27:52 +00:00
pf, err := parsePipeFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("filter"):
pf, err := parsePipeFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err)
}
return pf, nil
2024-05-22 13:29:18 +00:00
case lex.isKeyword("format"):
pf, err := parsePipeFormat(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'format' pipe: %w", err)
}
return pf, nil
2024-05-19 21:27:52 +00:00
case lex.isKeyword("limit", "head"):
pl, err := parsePipeLimit(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err)
}
return pl, nil
case lex.isKeyword("offset", "skip"):
ps, err := parsePipeOffset(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err)
}
return ps, nil
2024-05-25 17:24:21 +00:00
case lex.isKeyword("pack_json"):
pp, err := parsePackJSON(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err)
}
return pp, nil
2024-05-19 21:27:52 +00:00
case lex.isKeyword("rename", "mv"):
pr, err := parsePipeRename(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err)
}
return pr, nil
2024-05-24 22:22:14 +00:00
case lex.isKeyword("replace"):
pr, err := parsePipeReplace(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'replace' pipe: %w", err)
}
return pr, nil
2024-05-25 12:37:26 +00:00
case lex.isKeyword("replace_regexp"):
pr, err := parsePipeReplaceRegexp(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'replace_regexp' pipe: %w", err)
}
return pr, nil
2024-05-19 21:27:52 +00:00
case lex.isKeyword("sort"):
ps, err := parsePipeSort(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err)
}
return ps, nil
case lex.isKeyword("stats"):
ps, err := parsePipeStats(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
}
return ps, nil
case lex.isKeyword("uniq"):
pu, err := parsePipeUniq(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err)
}
return pu, nil
case lex.isKeyword("unpack_json"):
pu, err := parsePipeUnpackJSON(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'unpack_json' pipe: %w", err)
}
return pu, nil
2024-05-20 01:52:16 +00:00
case lex.isKeyword("unpack_logfmt"):
pu, err := parsePipeUnpackLogfmt(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err)
}
return pu, nil
2024-05-25 16:26:07 +00:00
case lex.isKeyword("unroll"):
pu, err := parsePipeUnroll(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'unroll' pipe: %w", err)
}
return pu, nil
2024-05-19 21:27:52 +00:00
default:
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
}
}