mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
66b2987f49
This eliminates possible bugs related to forgotten Query.Optimize() calls. This also allows removing optimize() function from pipe interface. While at it, drop filterNoop inside filterAnd.
351 lines
10 KiB
Go
351 lines
10 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
)
|
|
|
|
type pipe interface {
|
|
// String returns string representation of the pipe.
|
|
String() string
|
|
|
|
// canLiveTail must return true if the given pipe can be used in live tailing
|
|
//
|
|
// See https://docs.victoriametrics.com/victorialogs/querying/#live-tailing
|
|
canLiveTail() bool
|
|
|
|
// updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input.
|
|
updateNeededFields(neededFields, unneededFields fieldsSet)
|
|
|
|
// newPipeProcessor must return new pipeProcessor, which writes data to the given ppNext.
|
|
//
|
|
// workersCount is the number of goroutine workers, which will call writeBlock() method.
|
|
//
|
|
// If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds.
|
|
// It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds.
|
|
//
|
|
// The returned pipeProcessor may call cancel() at any time in order to notify the caller to stop sending new data to it.
|
|
newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor
|
|
|
|
// hasFilterInWithQuery must return true of pipe contains 'in(subquery)' filter (recursively).
|
|
hasFilterInWithQuery() bool
|
|
|
|
// initFilterInValues must return new pipe with the initialized values for 'in(subquery)' filters (recursively).
|
|
//
|
|
// It is OK to return the pipe itself if it doesn't contain 'in(subquery)' filters.
|
|
initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error)
|
|
}
|
|
|
|
// pipeProcessor must process a single pipe.
|
|
type pipeProcessor interface {
|
|
// writeBlock must write the given block of data to the given pipeProcessor.
|
|
//
|
|
// writeBlock is called concurrently from worker goroutines.
|
|
// The workerID is the id of the worker goroutine, which calls the writeBlock.
|
|
// It is in the range 0 ... workersCount-1 .
|
|
//
|
|
// It is OK to modify br contents inside writeBlock. The caller mustn't rely on br contents after writeBlock call.
|
|
// It is forbidden to hold references to br after returning from writeBlock, since the caller may re-use it.
|
|
//
|
|
// If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines
|
|
// to stop sending new data. The occurred error must be returned from flush().
|
|
//
|
|
// cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error.
|
|
writeBlock(workerID uint, br *blockResult)
|
|
|
|
// flush must flush all the data accumulated in the pipeProcessor to the next pipeProcessor.
|
|
//
|
|
// flush is called after all the worker goroutines are stopped.
|
|
//
|
|
// It is guaranteed that flush() is called for every pipeProcessor returned from pipe.newPipeProcessor().
|
|
flush() error
|
|
}
|
|
|
|
type defaultPipeProcessor func(workerID uint, br *blockResult)
|
|
|
|
func newDefaultPipeProcessor(writeBlock func(workerID uint, br *blockResult)) pipeProcessor {
|
|
return defaultPipeProcessor(writeBlock)
|
|
}
|
|
|
|
func (dpp defaultPipeProcessor) writeBlock(workerID uint, br *blockResult) {
|
|
dpp(workerID, br)
|
|
}
|
|
|
|
func (dpp defaultPipeProcessor) flush() error {
|
|
return nil
|
|
}
|
|
|
|
func parsePipes(lex *lexer) ([]pipe, error) {
|
|
var pipes []pipe
|
|
for {
|
|
p, err := parsePipe(lex)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pipes = append(pipes, p)
|
|
|
|
switch {
|
|
case lex.isKeyword("|"):
|
|
lex.nextToken()
|
|
case lex.isKeyword(")", ""):
|
|
return pipes, nil
|
|
default:
|
|
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token)
|
|
}
|
|
}
|
|
}
|
|
|
|
func parsePipe(lex *lexer) (pipe, error) {
|
|
switch {
|
|
case lex.isKeyword("block_stats"):
|
|
ps, err := parsePipeBlockStats(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'block_stats' pipe: %w", err)
|
|
}
|
|
return ps, nil
|
|
case lex.isKeyword("blocks_count"):
|
|
pc, err := parsePipeBlocksCount(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'blocks_count' pipe: %w", err)
|
|
}
|
|
return pc, nil
|
|
case lex.isKeyword("copy", "cp"):
|
|
pc, err := parsePipeCopy(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err)
|
|
}
|
|
return pc, nil
|
|
case lex.isKeyword("delete", "del", "rm", "drop"):
|
|
pd, err := parsePipeDelete(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
|
|
}
|
|
return pd, nil
|
|
case lex.isKeyword("drop_empty_fields"):
|
|
pd, err := parsePipeDropEmptyFields(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'drop_empty_fields' pipe: %w", err)
|
|
}
|
|
return pd, nil
|
|
case lex.isKeyword("extract"):
|
|
pe, err := parsePipeExtract(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err)
|
|
}
|
|
return pe, nil
|
|
case lex.isKeyword("extract_regexp"):
|
|
pe, err := parsePipeExtractRegexp(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'extract_regexp' pipe: %w", err)
|
|
}
|
|
return pe, nil
|
|
case lex.isKeyword("field_names"):
|
|
pf, err := parsePipeFieldNames(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err)
|
|
}
|
|
return pf, nil
|
|
case lex.isKeyword("field_values"):
|
|
pf, err := parsePipeFieldValues(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot pase 'field_values' pipe: %w", err)
|
|
}
|
|
return pf, nil
|
|
case lex.isKeyword("fields", "keep"):
|
|
pf, err := parsePipeFields(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
|
|
}
|
|
return pf, nil
|
|
case lex.isKeyword("filter", "where"):
|
|
pf, err := parsePipeFilter(lex, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err)
|
|
}
|
|
return pf, nil
|
|
case lex.isKeyword("format"):
|
|
pf, err := parsePipeFormat(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'format' pipe: %w", err)
|
|
}
|
|
return pf, nil
|
|
case lex.isKeyword("join"):
|
|
pj, err := parsePipeJoin(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'join' pipe: %w", err)
|
|
}
|
|
return pj, nil
|
|
case lex.isKeyword("len"):
|
|
pl, err := parsePipeLen(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'len' pipe: %w", err)
|
|
}
|
|
return pl, nil
|
|
case lex.isKeyword("limit", "head"):
|
|
pl, err := parsePipeLimit(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err)
|
|
}
|
|
return pl, nil
|
|
case lex.isKeyword("math", "eval"):
|
|
pm, err := parsePipeMath(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'math' pipe: %w", err)
|
|
}
|
|
return pm, nil
|
|
case lex.isKeyword("offset", "skip"):
|
|
ps, err := parsePipeOffset(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err)
|
|
}
|
|
return ps, nil
|
|
case lex.isKeyword("pack_json"):
|
|
pp, err := parsePackJSON(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err)
|
|
}
|
|
return pp, nil
|
|
case lex.isKeyword("pack_logfmt"):
|
|
pp, err := parsePackLogfmt(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'pack_logfmt' pipe: %w", err)
|
|
}
|
|
return pp, nil
|
|
case lex.isKeyword("rename", "mv"):
|
|
pr, err := parsePipeRename(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err)
|
|
}
|
|
return pr, nil
|
|
case lex.isKeyword("replace"):
|
|
pr, err := parsePipeReplace(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'replace' pipe: %w", err)
|
|
}
|
|
return pr, nil
|
|
case lex.isKeyword("replace_regexp"):
|
|
pr, err := parsePipeReplaceRegexp(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'replace_regexp' pipe: %w", err)
|
|
}
|
|
return pr, nil
|
|
case lex.isKeyword("sort"), lex.isKeyword("order"):
|
|
ps, err := parsePipeSort(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err)
|
|
}
|
|
return ps, nil
|
|
case lex.isKeyword("stats"):
|
|
ps, err := parsePipeStats(lex, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
|
|
}
|
|
return ps, nil
|
|
case lex.isKeyword("stream_context"):
|
|
pc, err := parsePipeStreamContext(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'stream_context' pipe: %w", err)
|
|
}
|
|
return pc, nil
|
|
case lex.isKeyword("top"):
|
|
pt, err := parsePipeTop(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'top' pipe: %w", err)
|
|
}
|
|
return pt, nil
|
|
case lex.isKeyword("uniq"):
|
|
pu, err := parsePipeUniq(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err)
|
|
}
|
|
return pu, nil
|
|
case lex.isKeyword("unpack_json"):
|
|
pu, err := parsePipeUnpackJSON(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'unpack_json' pipe: %w", err)
|
|
}
|
|
return pu, nil
|
|
case lex.isKeyword("unpack_logfmt"):
|
|
pu, err := parsePipeUnpackLogfmt(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err)
|
|
}
|
|
return pu, nil
|
|
case lex.isKeyword("unpack_syslog"):
|
|
pu, err := parsePipeUnpackSyslog(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'unpack_syslog' pipe: %w", err)
|
|
}
|
|
return pu, nil
|
|
case lex.isKeyword("unroll"):
|
|
pu, err := parsePipeUnroll(lex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse 'unroll' pipe: %w", err)
|
|
}
|
|
return pu, nil
|
|
default:
|
|
lexState := lex.backupState()
|
|
|
|
// Try parsing stats pipe without 'stats' keyword
|
|
ps, err := parsePipeStats(lex, false)
|
|
if err == nil {
|
|
return ps, nil
|
|
}
|
|
lex.restoreState(lexState)
|
|
|
|
// Try parsing filter pipe without 'filter' keyword
|
|
pf, err := parsePipeFilter(lex, false)
|
|
if err == nil {
|
|
return pf, nil
|
|
}
|
|
lex.restoreState(lexState)
|
|
|
|
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
|
|
}
|
|
}
|
|
|
|
var pipeNames = func() map[string]struct{} {
|
|
a := []string{
|
|
"block_stats",
|
|
"blocks_count",
|
|
"copy", "cp",
|
|
"delete", "del", "rm", "drop",
|
|
"drop_empty_fields",
|
|
"extract",
|
|
"extract_regexp",
|
|
"field_names",
|
|
"field_values",
|
|
"fields", "keep",
|
|
"filter", "where",
|
|
"format",
|
|
"join",
|
|
"len",
|
|
"limit", "head",
|
|
"math", "eval",
|
|
"offset", "skip",
|
|
"pack_json",
|
|
"pack_logmft",
|
|
"rename", "mv",
|
|
"replace",
|
|
"replace_regexp",
|
|
"sort", "order",
|
|
"stats", "by",
|
|
"stream_context",
|
|
"top",
|
|
"uniq",
|
|
"unpack_json",
|
|
"unpack_logfmt",
|
|
"unpack_syslog",
|
|
"unroll",
|
|
}
|
|
|
|
m := make(map[string]struct{}, len(a))
|
|
for _, s := range a {
|
|
m[s] = struct{}{}
|
|
}
|
|
|
|
// add stats names here, since they can be used without the initial `stats` keyword
|
|
for _, s := range statsNames {
|
|
m[s] = struct{}{}
|
|
}
|
|
return m
|
|
}()
|