diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index ef0ce27db..fc5832f00 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -320,10 +320,10 @@ func (q *Query) Optimize() { switch t := p.(type) { case *pipeStats: for _, f := range t.funcs { - if f.iff != nil { - optimizeFilterIn(f.iff) - } + optimizeFilterIn(f.iff) } + case *pipeExtract: + optimizeFilterIn(t.iff) } } } @@ -345,6 +345,10 @@ func removeStarFilters(f filter) filter { } func optimizeFilterIn(f filter) { + if f == nil { + return + } + visitFunc := func(f filter) bool { fi, ok := f.(*filterIn) if ok && fi.q != nil { diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 85323cc7e..aee6f897f 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -65,7 +65,10 @@ func parsePipes(lex *lexer) ([]pipe, error) { var pipes []pipe for !lex.isKeyword(")", "") { if !lex.isKeyword("|") { - return nil, fmt.Errorf("expecting '|'; got %q", lex.token) + if len(pipes) == 0 { + return nil, fmt.Errorf("expecting '|' after the query filters; got %q", lex.token) + } + return nil, fmt.Errorf("expecting '|' after [%s] pipe; got %q", pipes[len(pipes)-1], lex.token) } lex.nextToken() p, err := parsePipe(lex) diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 46bef0415..f2d42827e 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -205,7 +205,7 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { if lex.isKeyword("if") { iff, err := parseIfFilter(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'if' filter for %s: %w", pe, err) + return nil, err } pe.iff = iff diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index 8f3437c1b..6635bc096 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -4,6 +4,35 @@ import ( "testing" ) +func TestParsePipeExtractSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`extract "foo"`) + f(`extract from x "foo"`) + f(`extract from x "foo" if (y:in(a:foo bar | uniq by (qwe) limit 10))`) +} + +func TestParsePipeExtractFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`extract`) + f(`extract from`) + f(`extract if (x:y)`) + f(`extract if (x:y) "a"`) + f(`extract "a" if`) + f(`extract "a" if (foo`) + f(`extract "a" if "foo"`) + f(`extract "a"`) + f(`extract ""`) + f(`extract "<*>foo<_>bar"`) +} + func TestPipeExtract(t *testing.T) { f := func(pipeStr string, rows, rowsExpected [][]Field) { t.Helper() @@ -211,3 +240,28 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) { f("extract from x 'x'", "f2,foo,x,y", "", "f2,x,y", "") f("extract from x 'x' if (a:b foo:q)", "f2,foo,x,y", "", "a,f2,foo,x,y", "") } + +func expectParsePipeFailure(t *testing.T, pipeStr string) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err == nil { + t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p) + } +} + +func expectParsePipeSuccess(t *testing.T, pipeStr string) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", pipeStr, err) + } + + pipeStrResult := p.String() + if pipeStrResult != pipeStr { + t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr) + } +} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 1397a1e60..b46113197 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -526,7 +526,7 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { if lex.isKeyword("if") { iff, err := parseIfFilter(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'if' filter for %s: %w", sf, err) + return nil, err } f.iff = iff @@ -537,7 +537,7 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { resultName, err := parseResultName(lex) if err != nil { - return nil, fmt.Errorf("cannot parse result name for %s: %w", sf, err) + return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err) } f.resultName = resultName diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 894c738f2..5e79f888f 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -260,6 +260,9 @@ func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, } func hasFilterInWithQueryForFilter(f filter) bool { + if f == nil { + return false + } visitFunc := func(f filter) bool { fi, ok := f.(*filterIn) return ok && fi.needExecuteQuery @@ -269,12 +272,15 @@ func hasFilterInWithQueryForFilter(f filter) bool { func hasFilterInWithQueryForPipes(pipes []pipe) bool { for _, p := range pipes { - ps, ok := p.(*pipeStats) - if !ok { - continue - } - for _, f := range ps.funcs { - if f.iff != nil && hasFilterInWithQueryForFilter(f.iff) { + switch t := p.(type) { + case *pipeStats: + for _, f := range t.funcs { + if hasFilterInWithQueryForFilter(f.iff) { + return true + } + } + case *pipeExtract: + if hasFilterInWithQueryForFilter(t.iff) { return true } } @@ -333,6 +339,14 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFiel byFields: t.byFields, funcs: funcsNew, } + case *pipeExtract: + fNew, err := initFilterInValuesForFilter(cache, t.iff, getFieldValuesFunc) + if err != nil { + return nil, err + } + pe := *t + pe.iff = fNew + pipesNew[i] = &pe default: pipesNew[i] = p }