diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 15331e191..c607453e0 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -107,7 +107,7 @@ func MustAddRows(lr *logstorage.LogRows) { } // RunQuery runs the given q and calls writeBlock for the returned data blocks -func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) error { +func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock logstorage.WriteBlockFunc) error { return strg.RunQuery(ctx, tenantIDs, q, writeBlock) } diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 0f90b3c16..2d341f277 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: add ability to put arbitrary [queries](https://docs.victoriametrics.com/victorialogs/logsql/#query-syntax) inside [`in()` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter). * FEATURE: add support for post-filtering of query results with [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe). * FEATURE: allow applying individual [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) per each [stats function](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters). * FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index e8f80966d..032d03d24 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -652,16 +652,16 @@ log.level:in("error", "fatal") It works very fast for long lists passed to `in()`. -The future VictoriaLogs versions will allow passing arbitrary [queries](#query-syntax) into `in()` filter. -For example, the following query selects all the logs for the last hour for users, who visited pages with `admin` [word](#word) in the `path` +It is possible to pass arbitrary [query](#query-syntax) inside `in(...)` filter in order to match against the results of this query. +The query inside `in(...)` must end with [`fields`](#fields-pipe) pipe containing a single field name, so VictoriaLogs could +fetch results from this field. For example, the following query selects all the logs for the last 5 minutes for users, +who visited pages with `admin` [word](#word) in the `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) during the last day: ```logsql -_time:1h AND user_id:in(_time:1d AND path:admin | fields user_id) +_time:5m AND user_id:in(_time:1d AND path:admin | fields user_id) ``` -See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details. - See also: - [Exact filter](#exact-filter) diff --git a/docs/VictoriaLogs/Roadmap.md b/docs/VictoriaLogs/Roadmap.md index 1dc515730..4210307b8 100644 --- a/docs/VictoriaLogs/Roadmap.md +++ b/docs/VictoriaLogs/Roadmap.md @@ -37,7 +37,6 @@ The following functionality is planned in the future versions of VictoriaLogs: - Add missing functionality to [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html): - [Stream context](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#stream-context). - [Transformation functions](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#transformations). - - The ability to use subqueries inside [in()](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#multi-exact-filter) function. - Live tailing for [LogsQL filters](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#filters) aka `tail -f`. - Web UI with the following abilities: - Explore the ingested logs ([partially done](https://docs.victoriametrics.com/VictoriaLogs/querying/#web-ui)). diff --git a/lib/logstorage/filter.go b/lib/logstorage/filter.go index c27c4ca26..4b8024bff 100644 --- a/lib/logstorage/filter.go +++ b/lib/logstorage/filter.go @@ -14,3 +14,95 @@ type filter interface { // applyToBlockResult must update bm according to the filter applied to the given br block applyToBlockResult(br *blockResult, bm *bitmap) } + +// visitFilter sequentially calls visitFunc for filters inside f. +// +// It stops calling visitFunc on the remaining filters as soon as visitFunc returns true. +// It returns the result of the last visitFunc call. +func visitFilter(f filter, visitFunc func(f filter) bool) bool { + switch t := f.(type) { + case *filterAnd: + return visitFilters(t.filters, visitFunc) + case *filterOr: + return visitFilters(t.filters, visitFunc) + case *filterNot: + return visitFilter(t.f, visitFunc) + default: + return visitFunc(f) + } +} + +// visitFilters calls visitFunc per each filter in filters. +// +// It stops calling visitFunc on the remaining filters as soon as visitFunc returns true. +// It returns the result of the last visitFunc call. +func visitFilters(filters []filter, visitFunc func(f filter) bool) bool { + for _, f := range filters { + if visitFilter(f, visitFunc) { + return true + } + } + return false +} + +// copyFilter recursively copies f filters with the help of copyFunc if visitFunc returns true for them. +// +// It doesn't copy other filters by returning them as is. +func copyFilter(f filter, visitFunc func(f filter) bool, copyFunc func(f filter) (filter, error)) (filter, error) { + switch t := f.(type) { + case *filterAnd: + filters, err := copyFilters(t.filters, visitFunc, copyFunc) + if err != nil { + return nil, err + } + fa := &filterAnd{ + filters: filters, + } + return fa, nil + case *filterOr: + filters, err := copyFilters(t.filters, visitFunc, copyFunc) + if err != nil { + return nil, err + } + fo := &filterOr{ + filters: filters, + } + return fo, nil + case *filterNot: + f, err := copyFilter(t.f, visitFunc, copyFunc) + if err != nil { + return nil, err + } + fn := &filterNot{ + f: f, + } + return fn, nil + default: + if !visitFunc(t) { + // Nothing to copy + return t, nil + } + return copyFunc(t) + } +} + +// copyFilters recursively copies filters with the help of copyfunc if visitFunc returns true for them. +// +// It doesn't copy other filters by returning them as is. +func copyFilters(filters []filter, visitFunc func(f filter) bool, copyFunc func(f filter) (filter, error)) ([]filter, error) { + if !visitFilters(filters, visitFunc) { + // Nothing to copy + return filters, nil + } + + // Copy filters. + filtersNew := make([]filter, len(filters)) + for i, f := range filters { + fNew, err := copyFilter(f, visitFunc, copyFunc) + if err != nil { + return nil, err + } + filtersNew[i] = fNew + } + return filtersNew, nil +} diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go index c150fbee3..b1d7b4821 100644 --- a/lib/logstorage/filter_in.go +++ b/lib/logstorage/filter_in.go @@ -18,6 +18,15 @@ type filterIn struct { fieldName string values []string + // needeExecuteQuery is set to true if q must be executed for populating values before filter execution. + needExecuteQuery bool + + // If q is non-nil, then values must be populated from q before filter execution. + q *Query + + // qFieldName must be set to field name for obtaining values from if q is non-nil. + qFieldName string + tokenSetsOnce sync.Once tokenSets [][]string @@ -47,12 +56,18 @@ type filterIn struct { } func (fi *filterIn) String() string { - values := fi.values - a := make([]string, len(values)) - for i, value := range values { - a[i] = quoteTokenIfNeeded(value) + args := "" + if fi.q != nil { + args = fi.q.String() + } else { + values := fi.values + a := make([]string, len(values)) + for i, value := range values { + a[i] = quoteTokenIfNeeded(value) + } + args = strings.Join(a, ",") } - return fmt.Sprintf("%sin(%s)", quoteFieldNameIfNeeded(fi.fieldName), strings.Join(a, ",")) + return fmt.Sprintf("%sin(%s)", quoteFieldNameIfNeeded(fi.fieldName), args) } func (fi *filterIn) updateNeededFields(neededFields fieldsSet) { diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index eb4b1c5ac..d6f6e5b78 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -38,6 +38,20 @@ type lexer struct { currentTimestamp int64 } +type lexerState struct { + lex lexer +} + +func (lex *lexer) backupState() *lexerState { + return &lexerState{ + lex: *lex, + } +} + +func (lex *lexer) restoreState(ls *lexerState) { + *lex = ls.lex +} + // newLexer returns new lexer for the given s. // // The lex.token points to the first token in s. @@ -251,6 +265,30 @@ func (q *Query) Optimize() { q.pipes = append(q.pipes[:0], q.pipes[1:]...) } } + + // Optimize 'in(query)' filters + optimizeFilterIn(q.f) + for _, p := range q.pipes { + switch t := p.(type) { + case *pipeStats: + for _, f := range t.funcs { + if f.iff != nil { + optimizeFilterIn(f.iff) + } + } + } + } +} + +func optimizeFilterIn(f filter) { + visitFunc := func(f filter) bool { + fi, ok := f.(*filterIn) + if ok && fi.q != nil { + fi.q.Optimize() + } + return false + } + _ = visitFilter(f, visitFunc) } func optimizeSortOffsetPipes(pipes []pipe) []pipe { @@ -355,7 +393,17 @@ func (q *Query) getNeededColumns() ([]string, []string) { // ParseQuery parses s. func ParseQuery(s string) (*Query, error) { lex := newLexer(s) + q, err := parseQuery(lex) + if err != nil { + return nil, err + } + if !lex.isEnd() { + return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]; tail: [%s]", lex.context(), lex.s) + } + return q, nil +} +func parseQuery(lex *lexer) (*Query, error) { f, err := parseFilter(lex) if err != nil { return nil, fmt.Errorf("%w; context: [%s]", err, lex.context()) @@ -370,10 +418,6 @@ func ParseQuery(s string) (*Query, error) { } q.pipes = pipes - if !lex.isEnd() { - return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]; tail: [%s]", lex.context(), lex.s) - } - return q, nil } @@ -538,7 +582,7 @@ func getCompoundFuncArg(lex *lexer) string { rawArg := lex.rawToken lex.nextToken() suffix := "" - for !lex.isSkippedSpace && !lex.isKeyword("*", ",", ")", "") { + for !lex.isSkippedSpace && !lex.isKeyword("*", ",", ")", "|", "") { suffix += lex.rawToken lex.nextToken() } @@ -759,13 +803,72 @@ func tryParseIPv4CIDR(s string) (uint32, uint32, bool) { } func parseFilterIn(lex *lexer, fieldName string) (filter, error) { - return parseFuncArgs(lex, fieldName, func(args []string) (filter, error) { - f := &filterIn{ + if !lex.isKeyword("in") { + return nil, fmt.Errorf("expecting 'in' keyword") + } + + // Try parsing in(arg1, ..., argN) at first + lexState := lex.backupState() + fi, err := parseFuncArgs(lex, fieldName, func(args []string) (filter, error) { + fi := &filterIn{ fieldName: fieldName, values: args, } - return f, nil + return fi, nil }) + if err == nil { + return fi, nil + } + + // Parse in(query | fields someField) then + lex.restoreState(lexState) + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing '(' after 'in'") + } + lex.nextToken() + + q, err := parseQuery(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse query inside 'in(...)': %w", err) + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("missing ')' after 'in(%s)'", q) + } + lex.nextToken() + + qFieldName, err := getFieldNameFromPipes(q.pipes) + if err != nil { + return nil, fmt.Errorf("cannot determine field name for values in 'in(%s)': %w", q, err) + } + fi = &filterIn{ + fieldName: fieldName, + needExecuteQuery: true, + q: q, + qFieldName: qFieldName, + } + return fi, nil +} + +func getFieldNameFromPipes(pipes []pipe) (string, error) { + if len(pipes) == 0 { + return "", fmt.Errorf("missing 'fields' or 'uniq' pipes at the end of query") + } + switch t := pipes[len(pipes)-1].(type) { + case *pipeFields: + if t.containsStar || len(t.fields) != 1 { + return "", fmt.Errorf("'%s' pipe must contain only a single non-star field name", t) + } + return t.fields[0], nil + case *pipeUniq: + if len(t.byFields) != 1 { + return "", fmt.Errorf("'%s' pipe must contain only a single non-star field name", t) + } + return t.byFields[0], nil + default: + return "", fmt.Errorf("missing 'fields' or 'uniq' pipe at the end of query") + } } func parseFilterSequence(lex *lexer, fieldName string) (filter, error) { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index f42d5e47f..340a6a5ed 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -277,6 +277,10 @@ func TestParseFilterIn(t *testing.T) { f(`:in("foo bar,baz")`, ``, []string{"foo bar,baz"}) f(`ip:in(1.2.3.4, 5.6.7.8, 9.10.11.12)`, `ip`, []string{"1.2.3.4", "5.6.7.8", "9.10.11.12"}) f(`foo-bar:in(foo,bar-baz.aa"bb","c,)d")`, `foo-bar`, []string{"foo", `bar-baz.aa"bb"`, "c,)d"}) + + // verify `in(query)` - it shouldn't set values + f(`in(x|fields foo)`, ``, nil) + f(`a:in(* | fields bar)`, `a`, nil) } func TestParseFilterIPv4Range(t *testing.T) { @@ -687,8 +691,8 @@ func TestParseQuerySuccess(t *testing.T) { f("exact(foo*)", `exact(foo*)`) f("exact('foo bar),|baz')", `exact("foo bar),|baz")`) f("exact('foo bar),|baz'*)", `exact("foo bar),|baz"*)`) - f(`exact(foo|b:ar)`, `exact("foo|b:ar")`) - f(`foo:exact(foo|b:ar*)`, `foo:exact("foo|b:ar"*)`) + f(`exact(foo/b:ar)`, `exact("foo/b:ar")`) + f(`foo:exact(foo/b:ar*)`, `foo:exact("foo/b:ar"*)`) // i filter f("i(foo)", `i(foo)`) @@ -696,14 +700,19 @@ func TestParseQuerySuccess(t *testing.T) { f("i(`foo`* )", `i(foo*)`) f("i(' foo ) bar')", `i(" foo ) bar")`) f("i('foo bar'*)", `i("foo bar"*)`) - f(`foo:i(foo:bar-baz|aa+bb)`, `foo:i("foo:bar-baz|aa+bb")`) + f(`foo:i(foo:bar-baz/aa+bb)`, `foo:i("foo:bar-baz/aa+bb")`) - // in filter + // in filter with values f(`in()`, `in()`) f(`in(foo)`, `in(foo)`) f(`in(foo, bar)`, `in(foo,bar)`) f(`in("foo bar", baz)`, `in("foo bar",baz)`) - f(`foo:in(foo-bar|baz)`, `foo:in("foo-bar|baz")`) + f(`foo:in(foo-bar/baz)`, `foo:in("foo-bar/baz")`) + + // in filter with query + f(`in(err|fields x)`, `in(err | fields x)`) + f(`ip:in(foo and user:in(admin, moderator)|fields ip)`, `ip:in(foo user:in(admin,moderator) | fields ip)`) + f(`x:in(_time:5m y:in(*|fields z) | stats by (q) count() rows|fields q)`, `x:in(_time:5m y:in(* | fields z) | stats by (q) count(*) as rows | fields q)`) // ipv4_range filter f(`ipv4_range(1.2.3.4, "5.6.7.8")`, `ipv4_range(1.2.3.4, 5.6.7.8)`) @@ -743,7 +752,7 @@ func TestParseQuerySuccess(t *testing.T) { // re filter f("re('foo|ba(r.+)')", `re("foo|ba(r.+)")`) f("re(foo)", `re("foo")`) - f(`foo:re(foo-bar|baz.)`, `foo:re("foo-bar|baz.")`) + f(`foo:re(foo-bar/baz.)`, `foo:re("foo-bar/baz.")`) // seq filter f(`seq()`, `seq()`) @@ -945,6 +954,7 @@ func TestParseQuerySuccess(t *testing.T) { count() if (is_admin:true or _msg:"foo bar"*) as foo, sum(duration) if (host:in('foo.com', 'bar.com') and path:/foobar) as bar`, `* | stats by (_time:1d offset -2h, f2) count(*) if (is_admin:true or "foo bar"*) as foo, sum(duration) if (host:in(foo.com,bar.com) path:"/foobar") as bar`) + f(`* | stats count(x) if (error ip:in(_time:1d | fields ip)) rows`, `* | stats count(x) if (error ip:in(_time:1d | fields ip)) as rows`) f(`* | stats count() if () rows`, `* | stats count(*) if () as rows`) // sort pipe @@ -1116,6 +1126,10 @@ func TestParseQueryFailure(t *testing.T) { f(`in(foo, "bar baz"*, abc)`) f(`in(foo bar)`) f(`in(foo, bar`) + f(`in(foo|bar)`) + f(`in(|foo`) + f(`in(x | limit 10)`) + f(`in(x | fields a,b)`) // invalid ipv4_range f(`ipv4_range(`) diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 310718f06..4afd0415e 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -2,12 +2,14 @@ package logstorage import ( "context" + "fmt" "math" "slices" "sort" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // genericSearchOptions contain options used for search. @@ -60,8 +62,21 @@ type searchOptions struct { needAllColumns bool } +// WriteBlockFunc must write a block with the given timestamps and columns. +// +// WriteBlockFunc cannot hold references to timestamps and columns after returning. +type WriteBlockFunc func(workerID uint, timestamps []int64, columns []BlockColumn) + // RunQuery runs the given q and calls writeBlock for results. -func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) error { +func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error { + qNew, err := s.initFilterInValues(ctx, tenantIDs, q) + if err != nil { + return err + } + return s.runQuery(ctx, tenantIDs, qNew, writeBlock) +} + +func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error { neededColumnNames, unneededColumnNames := q.getNeededColumns() so := &genericSearchOptions{ tenantIDs: tenantIDs, @@ -71,8 +86,6 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, needAllColumns: slices.Contains(neededColumnNames, "*"), } - workersCount := cgroup.AvailableCPUs() - pp := newDefaultPipeProcessor(func(workerID uint, br *blockResult) { brs := getBlockRows() csDst := brs.cs @@ -90,6 +103,8 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, putBlockRows(brs) }) + workersCount := cgroup.AvailableCPUs() + ppMain := pp stopCh := ctx.Done() cancels := make([]func(), len(q.pipes)) @@ -121,6 +136,159 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, return errFlush } +// GetUniqueValuesForColumn returns unique values for the column with the given columnName returned by q for the given tenantIDs. +// +// If limit > 0, then up to limit unique values are returned. The values are returned in arbitrary order because of performance reasons. +// The caller may sort the returned values if needed. +func (s *Storage) GetUniqueValuesForColumn(ctx context.Context, tenantIDs []TenantID, q *Query, columnName string, limit uint64) ([]string, error) { + // add 'uniq columnName' to the end of q.pipes + if !endsWithPipeUniqSingleColumn(q.pipes, columnName) { + pipes := append([]pipe{}, q.pipes...) + pipes = append(pipes, &pipeUniq{ + byFields: []string{columnName}, + limit: limit, + }) + q = &Query{ + f: q.f, + pipes: pipes, + } + } + + var values []string + var valuesLock sync.Mutex + writeBlock := func(workerID uint, timestamps []int64, columns []BlockColumn) { + if len(columns) != 1 { + logger.Panicf("BUG: expecting only a single column; got %d columns", len(columns)) + } + valuesLock.Lock() + values = append(values, columns[0].Values...) + valuesLock.Unlock() + } + + err := s.runQuery(ctx, tenantIDs, q, writeBlock) + if err != nil { + return nil, err + } + + return values, nil +} + +func endsWithPipeUniqSingleColumn(pipes []pipe, columnName string) bool { + if len(pipes) == 0 { + return false + } + pu, ok := pipes[len(pipes)-1].(*pipeUniq) + if !ok { + return false + } + return len(pu.byFields) == 1 && pu.byFields[0] == columnName +} + +func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) { + if !hasFilterInWithQueryForFilter(q.f) && !hasFilterInWithQueryForPipes(q.pipes) { + return q, nil + } + + getUniqueValues := func(q *Query, columnName string) ([]string, error) { + return s.GetUniqueValuesForColumn(ctx, tenantIDs, q, columnName, 0) + } + cache := make(map[string][]string) + fNew, err := initFilterInValuesForFilter(cache, q.f, getUniqueValues) + if err != nil { + return nil, err + } + pipesNew, err := initFilterInValuesForPipes(cache, q.pipes, getUniqueValues) + if err != nil { + return nil, err + } + qNew := &Query{ + f: fNew, + pipes: pipesNew, + } + return qNew, nil +} + +func hasFilterInWithQueryForFilter(f filter) bool { + visitFunc := func(f filter) bool { + fi, ok := f.(*filterIn) + return ok && fi.needExecuteQuery + } + return visitFilter(f, visitFunc) +} + +func hasFilterInWithQueryForPipes(pipes []pipe) bool { + for _, p := range pipes { + ps, ok := p.(*pipeStats) + if !ok { + continue + } + for _, f := range ps.funcs { + if f.iff != nil && hasFilterInWithQueryForFilter(f.iff) { + return true + } + } + } + return false +} + +type getUniqueValuesFunc func(q *Query, columnName string) ([]string, error) + +func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueValuesFunc getUniqueValuesFunc) (filter, error) { + visitFunc := func(f filter) bool { + fi, ok := f.(*filterIn) + return ok && fi.needExecuteQuery + } + copyFunc := func(f filter) (filter, error) { + fi := f.(*filterIn) + + qStr := fi.q.String() + values, ok := cache[qStr] + if !ok { + vs, err := getUniqueValuesFunc(fi.q, fi.qFieldName) + if err != nil { + return nil, fmt.Errorf("cannot obtain unique values for %s: %w", fi, err) + } + cache[qStr] = vs + values = vs + } + + fiNew := &filterIn{ + fieldName: fi.fieldName, + q: fi.q, + values: values, + } + return fiNew, nil + } + return copyFilter(f, visitFunc, copyFunc) +} + +func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getUniqueValuesFunc getUniqueValuesFunc) ([]pipe, error) { + pipesNew := make([]pipe, len(pipes)) + for i, p := range pipes { + switch t := p.(type) { + case *pipeStats: + funcsNew := make([]pipeStatsFunc, len(t.funcs)) + for j, f := range t.funcs { + if f.iff != nil { + fNew, err := initFilterInValuesForFilter(cache, f.iff, getUniqueValuesFunc) + if err != nil { + return nil, err + } + f.iff = fNew + } + funcsNew[j] = f + } + pipesNew[i] = &pipeStats{ + byFields: t.byFields, + funcs: funcsNew, + } + default: + pipesNew[i] = p + } + } + return pipesNew, nil +} + type blockRows struct { cs []BlockColumn } @@ -169,7 +337,7 @@ type searchResultFunc func(workerID uint, br *blockResult) // search searches for the matching rows according to so. // -// It calls processBlockResult for each found matching block. +// It calls processBlockResult for each matching block. func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { // Spin up workers var wgWorkers sync.WaitGroup @@ -294,60 +462,32 @@ func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *gene } func hasStreamFilters(f filter) bool { - switch t := f.(type) { - case *filterAnd: - return hasStreamFiltersInList(t.filters) - case *filterOr: - return hasStreamFiltersInList(t.filters) - case *filterNot: - return hasStreamFilters(t.f) - case *filterStream: - return true - default: - return false + visitFunc := func(f filter) bool { + _, ok := f.(*filterStream) + return ok } -} - -func hasStreamFiltersInList(filters []filter) bool { - for _, f := range filters { - if hasStreamFilters(f) { - return true - } - } - return false + return visitFilter(f, visitFunc) } func initStreamFilters(tenantIDs []TenantID, idb *indexdb, f filter) filter { - switch t := f.(type) { - case *filterAnd: - return &filterAnd{ - filters: initStreamFiltersList(tenantIDs, idb, t.filters), - } - case *filterOr: - return &filterOr{ - filters: initStreamFiltersList(tenantIDs, idb, t.filters), - } - case *filterNot: - return &filterNot{ - f: initStreamFilters(tenantIDs, idb, t.f), - } - case *filterStream: - return &filterStream{ - f: t.f, + visitFunc := func(f filter) bool { + _, ok := f.(*filterStream) + return ok + } + copyFunc := func(f filter) (filter, error) { + fs := f.(*filterStream) + fsNew := &filterStream{ + f: fs.f, tenantIDs: tenantIDs, idb: idb, } - default: - return t + return fsNew, nil } -} - -func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter) []filter { - result := make([]filter, len(filters)) - for i, f := range filters { - result[i] = initStreamFilters(tenantIDs, idb, f) + f, err := copyFilter(f, visitFunc, copyFunc) + if err != nil { + logger.Panicf("BUG: unexpected error: %s", err) } - return result + return f } func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index d2da8dea1..7e2dd9461 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -78,6 +78,14 @@ func TestStorageRunQuery(t *testing.T) { } s.debugFlush() + mustRunQuery := func(tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) { + t.Helper() + err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock) + if err != nil { + t.Fatalf("unexpected error returned from the query %s: %s", q, err) + } + } + // run tests on the storage data t.Run("missing-tenant", func(_ *testing.T) { q := mustParseQuery(`"log message"`) @@ -89,7 +97,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) }) t.Run("missing-message-text", func(_ *testing.T) { q := mustParseQuery(`foobar`) @@ -101,7 +109,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) }) t.Run("matching-tenant-id", func(t *testing.T) { q := mustParseQuery(`tenant.id:*`) @@ -135,7 +143,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -149,7 +157,7 @@ func TestStorageRunQuery(t *testing.T) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { rowsCountTotal.Add(uint32(len(timestamps))) } - checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)) + mustRunQuery(allTenantIDs, q, writeBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -162,7 +170,7 @@ func TestStorageRunQuery(t *testing.T) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { rowsCountTotal.Add(uint32(len(timestamps))) } - checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)) + mustRunQuery(allTenantIDs, q, writeBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -174,7 +182,7 @@ func TestStorageRunQuery(t *testing.T) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } - checkErr(t, s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)) + mustRunQuery(allTenantIDs, q, writeBlock) }) t.Run("matching-stream-id", func(t *testing.T) { for i := 0; i < streamsPerTenant; i++ { @@ -208,7 +216,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) expectedRowsCount := blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -227,7 +235,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * 2 if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -247,7 +255,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -267,7 +275,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) expectedRowsCount := blocksPerStream if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -286,7 +294,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) }) t.Run("missing-time-range", func(_ *testing.T) { minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 @@ -300,7 +308,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - checkErr(t, s.RunQuery(context.Background(), tenantIDs, q, writeBlock)) + mustRunQuery(tenantIDs, q, writeBlock) }) // Close the storage and delete its data @@ -308,13 +316,6 @@ func TestStorageRunQuery(t *testing.T) { fs.MustRemoveAll(path) } -func checkErr(t *testing.T, err error) { - t.Helper() - if err != nil { - t.Fatalf("unexpected err: %s", err) - } -} - func mustParseQuery(query string) *Query { q, err := ParseQuery(query) if err != nil {