From de7450b7e06bcf683467cc32ccc40fe2de23f3d7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 24 Jun 2024 23:27:12 +0200 Subject: [PATCH] lib/logstorage: work-in-progress --- .../docker/docker-compose-victorialogs.yml | 2 +- .../filebeat-docker/docker-compose.yml | 2 +- .../filebeat-syslog/docker-compose.yml | 2 +- .../fluentbit-docker/docker-compose.yml | 2 +- .../victorialogs/logstash/docker-compose.yml | 2 +- .../victorialogs/promtail/docker-compose.yml | 2 +- .../vector-docker/docker-compose.yml | 2 +- deployment/logs-benchmark/docker-compose.yml | 2 +- docs/VictoriaLogs/CHANGELOG.md | 7 ++ docs/VictoriaLogs/LogsQL.md | 14 +++ docs/VictoriaLogs/QuickStart.md | 6 +- lib/logstorage/filter_stream_id.go | 76 +++++++++++-- lib/logstorage/filter_stream_id_test.go | 24 +++- lib/logstorage/parser.go | 107 ++++++++++++++---- lib/logstorage/parser_test.go | 15 ++- lib/logstorage/storage_search.go | 88 ++++++++++---- lib/logstorage/storage_search_test.go | 63 ++++++----- 17 files changed, 320 insertions(+), 96 deletions(-) diff --git a/deployment/docker/docker-compose-victorialogs.yml b/deployment/docker/docker-compose-victorialogs.yml index 4a3bf71c8..e66357485 100644 --- a/deployment/docker/docker-compose-victorialogs.yml +++ b/deployment/docker/docker-compose-victorialogs.yml @@ -42,7 +42,7 @@ services: # storing logs and serving read queries. victorialogs: container_name: victorialogs - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs command: - "--storageDataPath=/vlogs" - "--httpListenAddr=:9428" diff --git a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml index 0d6ecb786..5ad66071f 100644 --- a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: - -beat.uri=http://filebeat-victorialogs:5066 victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs volumes: - victorialogs-filebeat-docker-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml index 16e201f19..52ab8e792 100644 --- a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml @@ -13,7 +13,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs volumes: - victorialogs-filebeat-syslog-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml index 5a99a957c..4abf41df7 100644 --- a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml @@ -11,7 +11,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs volumes: - victorialogs-fluentbit-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/logstash/docker-compose.yml b/deployment/docker/victorialogs/logstash/docker-compose.yml index 1d318c23a..5950cd5d2 100644 --- a/deployment/docker/victorialogs/logstash/docker-compose.yml +++ b/deployment/docker/victorialogs/logstash/docker-compose.yml @@ -14,7 +14,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs volumes: - victorialogs-logstash-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/promtail/docker-compose.yml b/deployment/docker/victorialogs/promtail/docker-compose.yml index 7e915ecff..2e8f7a0d3 100644 --- a/deployment/docker/victorialogs/promtail/docker-compose.yml +++ b/deployment/docker/victorialogs/promtail/docker-compose.yml @@ -12,7 +12,7 @@ services: - "5140:5140" vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs volumes: - victorialogs-promtail-docker:/vlogs ports: diff --git a/deployment/docker/victorialogs/vector-docker/docker-compose.yml b/deployment/docker/victorialogs/vector-docker/docker-compose.yml index 35d992e54..8736d2c10 100644 --- a/deployment/docker/victorialogs/vector-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/vector-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: condition: service_healthy victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs volumes: - victorialogs-vector-docker-vl:/vlogs ports: diff --git a/deployment/logs-benchmark/docker-compose.yml b/deployment/logs-benchmark/docker-compose.yml index 8b9af085c..a22d67cd2 100644 --- a/deployment/logs-benchmark/docker-compose.yml +++ b/deployment/logs-benchmark/docker-compose.yml @@ -3,7 +3,7 @@ version: '3' services: # Run `make package-victoria-logs` to build victoria-logs image vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs volumes: - vlogs:/vlogs ports: diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index aa00ae504..be915da9c 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,13 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +## [v0.22.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.22.0-victorialogs) + +Released at 2024-06-24 + +* FEATURE: allow specifying multiple `_stream_id` values in [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter) via `_stream_id:in(id1, ..., idN)` syntax. +* FEATURE: allow specifying subquery for searching for `_stream_id` values inside [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter). For example, `_stream_id:in(_time:5m error | fields _stream_id)` returns logs for [logs streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with the `error` word across logs for the last 5 minutes. + ## [v0.21.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.21.0-victorialogs) Released at 2024-06-20 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 3355349a8..b2bff6996 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -479,6 +479,20 @@ query selects logs for the given stream for the last hour: _time:1h _stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7 ``` +The `_stream_id` filter supports specifying multiple `_stream_id` values via `_stream_id:in(...)` syntax. For example: + +```logsql +_stream_id:in(0000007b000001c850d9950ea6196b1a4812081265faa1c7, 1230007b456701c850d9950ea6196b1a4812081265fff2a9) +``` + +It is also possible specifying subquery inside `in(...)`, which selects the needed `_stream_id` values. For example, the following query returns +logs for [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) containing `error` [word](#word) +in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) during the last 5 minutes: + +```logsql +_stream_id:in(_time:5m error | fields _stream_id) +``` + See also: - [stream filter](#stream-filter) diff --git a/docs/VictoriaLogs/QuickStart.md b/docs/VictoriaLogs/QuickStart.md index 804575a43..16e792969 100644 --- a/docs/VictoriaLogs/QuickStart.md +++ b/docs/VictoriaLogs/QuickStart.md @@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: ```sh -curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.21.0-victorialogs/victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz -tar xzf victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz +curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.22.0-victorialogs/victoria-logs-linux-amd64-v0.22.0-victorialogs.tar.gz +tar xzf victoria-logs-linux-amd64-v0.22.0-victorialogs.tar.gz ./victoria-logs-prod ``` @@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container: ```sh docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ - docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs + docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs ``` See also: diff --git a/lib/logstorage/filter_stream_id.go b/lib/logstorage/filter_stream_id.go index 307d63f0b..47cc4b90a 100644 --- a/lib/logstorage/filter_stream_id.go +++ b/lib/logstorage/filter_stream_id.go @@ -1,27 +1,76 @@ package logstorage import ( + "strings" + "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // filterStreamID is the filter for `_stream_id:id` type filterStreamID struct { - streamIDStr string + streamIDs []streamID + + // needeExecuteQuery is set to true if q must be executed for populating streamIDs before filter execution. + needExecuteQuery bool + + // If q is non-nil, then streamIDs must be populated from q before filter execution. + q *Query + + // qFieldName must be set to field name for obtaining values from if q is non-nil. + qFieldName string + + streamIDsMap map[string]struct{} + streamIDsMapOnce sync.Once } func (fs *filterStreamID) String() string { - return "_stream_id:" + quoteTokenIfNeeded(fs.streamIDStr) + if fs.q != nil { + return "_stream_id:in(" + fs.q.String() + ")" + } + + streamIDs := fs.streamIDs + if len(streamIDs) == 1 { + return "_stream_id:" + string(streamIDs[0].marshalString(nil)) + } + + a := make([]string, len(streamIDs)) + for i, streamID := range streamIDs { + a[i] = string(streamID.marshalString(nil)) + } + return "_stream_id:in(" + strings.Join(a, ",") + ")" } func (fs *filterStreamID) updateNeededFields(neededFields fieldsSet) { neededFields.add("_stream_id") } +func (fs *filterStreamID) getStreamIDsMap() map[string]struct{} { + fs.streamIDsMapOnce.Do(fs.initStreamIDsMap) + return fs.streamIDsMap +} + +func (fs *filterStreamID) initStreamIDsMap() { + m := make(map[string]struct{}, len(fs.streamIDs)) + for _, streamID := range fs.streamIDs { + k := streamID.marshalString(nil) + m[string(k)] = struct{}{} + } + fs.streamIDsMap = m +} + func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) { + m := fs.getStreamIDsMap() + + if len(m) == 0 { + bm.resetBits() + return + } + c := br.getColumnByName("_stream_id") if c.isConst { v := c.valuesEncoded[0] - if fs.streamIDStr != v { + if _, ok := m[v]; !ok { bm.resetBits() } return @@ -36,16 +85,18 @@ func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) { values := c.getValues(br) bm.forEachSetBit(func(idx int) bool { v := values[idx] - return fs.streamIDStr == v + _, ok := m[v] + return ok }) case valueTypeDict: bb := bbPool.Get() for _, v := range c.dictValues { - c := byte(0) - if fs.streamIDStr == v { - c = 1 + ch := byte(0) + _, ok := m[v] + if ok { + ch = 1 } - bb.B = append(bb.B, c) + bb.B = append(bb.B, ch) } valuesEncoded := c.getValuesEncoded(br) bm.forEachSetBit(func(idx int) bool { @@ -73,10 +124,17 @@ func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) { } func (fs *filterStreamID) applyToBlockSearch(bs *blockSearch, bm *bitmap) { + m := fs.getStreamIDsMap() + if len(m) == 0 { + bm.resetBits() + return + } + bb := bbPool.Get() bb.B = bs.bsw.bh.streamID.marshalString(bb.B) - ok := fs.streamIDStr == string(bb.B) + _, ok := m[string(bb.B)] bbPool.Put(bb) + if !ok { bm.resetBits() return diff --git a/lib/logstorage/filter_stream_id_test.go b/lib/logstorage/filter_stream_id_test.go index dc8b9600a..78fc357fc 100644 --- a/lib/logstorage/filter_stream_id_test.go +++ b/lib/logstorage/filter_stream_id_test.go @@ -12,19 +12,37 @@ func TestFilterStreamID(t *testing.T) { t.Parallel() // match + var sid1 streamID + if !sid1.tryUnmarshalFromString("0000007b000001c8302bc96e02e54e5524b3a68ec271e55e") { + t.Fatalf("cannot unmarshal _stream_id") + } ft := &filterStreamID{ - streamIDStr: "0000007b000001c8302bc96e02e54e5524b3a68ec271e55e", + streamIDs: []streamID{sid1}, } testFilterMatchForStreamID(t, ft, []int{0, 3, 6, 9}) + var sid2 streamID + if !sid2.tryUnmarshalFromString("0000007b000001c850d9950ea6196b1a4812081265faa1c7") { + t.Fatalf("cannot unmarshal _stream_id") + } ft = &filterStreamID{ - streamIDStr: "0000007b000001c850d9950ea6196b1a4812081265faa1c7", + streamIDs: []streamID{sid2}, } testFilterMatchForStreamID(t, ft, []int{1, 4, 7}) + ft = &filterStreamID{ + streamIDs: []streamID{sid1, sid2}, + } + testFilterMatchForStreamID(t, ft, []int{0, 1, 3, 4, 6, 7, 9}) + // mismatch ft = &filterStreamID{ - streamIDStr: "abc", + streamIDs: nil, + } + testFilterMatchForStreamID(t, ft, nil) + + ft = &filterStreamID{ + streamIDs: []streamID{{}}, } testFilterMatchForStreamID(t, ft, nil) } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 8a6f5b0fc..becb9bdc8 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -3,7 +3,6 @@ package logstorage import ( "fmt" "math" - "sort" "strconv" "strings" "time" @@ -235,46 +234,38 @@ func (q *Query) String() string { return s } -func (q *Query) getSortedStreamIDs() []streamID { +func (q *Query) getStreamIDs() []streamID { switch t := q.f.(type) { case *filterAnd: for _, f := range t.filters { - streamIDs, ok := getSortedStreamIDsFromFilterOr(f) + streamIDs, ok := getStreamIDsFromFilterOr(f) if ok { return streamIDs } } return nil default: - streamIDs, _ := getSortedStreamIDsFromFilterOr(q.f) + streamIDs, _ := getStreamIDsFromFilterOr(q.f) return streamIDs } } -func getSortedStreamIDsFromFilterOr(f filter) ([]streamID, bool) { +func getStreamIDsFromFilterOr(f filter) ([]streamID, bool) { switch t := f.(type) { case *filterOr: + streamIDsFilters := 0 var streamIDs []streamID for _, f := range t.filters { fs, ok := f.(*filterStreamID) if !ok { return nil, false } - var sid streamID - if sid.tryUnmarshalFromString(fs.streamIDStr) { - streamIDs = append(streamIDs, sid) - } + streamIDsFilters++ + streamIDs = append(streamIDs, fs.streamIDs...) } - sort.Slice(streamIDs, func(i, j int) bool { - return streamIDs[i].less(&streamIDs[j]) - }) - return streamIDs, len(streamIDs) > 0 + return streamIDs, streamIDsFilters > 0 case *filterStreamID: - var sid streamID - if !sid.tryUnmarshalFromString(t.streamIDStr) { - return nil, true - } - return []streamID{sid}, true + return t.streamIDs, true default: return nil, false } @@ -1830,17 +1821,89 @@ func stripTimezoneSuffix(s string) string { return s[:len(s)-len(tz)] } -func parseFilterStreamID(lex *lexer) (*filterStreamID, error) { - s, err := getCompoundToken(lex) +func parseFilterStreamID(lex *lexer) (filter, error) { + if lex.isKeyword("in") { + return parseFilterStreamIDIn(lex) + } + + sid, err := parseStreamID(lex) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot parse _stream_id: %w", err) } fs := &filterStreamID{ - streamIDStr: s, + streamIDs: []streamID{sid}, } return fs, nil } +func parseFilterStreamIDIn(lex *lexer) (filter, error) { + if !lex.isKeyword("in") { + return nil, fmt.Errorf("unexpected token %q; expecting 'in'", lex.token) + } + + // Try parsing in(arg1, ..., argN) at first + lexState := lex.backupState() + fs, err := parseFuncArgs(lex, "", func(args []string) (filter, error) { + streamIDs := make([]streamID, len(args)) + for i, arg := range args { + if !streamIDs[i].tryUnmarshalFromString(arg) { + return nil, fmt.Errorf("cannot unmarshal _stream_id from %q", arg) + } + } + fs := &filterStreamID{ + streamIDs: streamIDs, + } + return fs, nil + }) + if err == nil { + return fs, nil + } + + // Try parsing in(query) + lex.restoreState(lexState) + lex.nextToken() + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing '(' after 'in'") + } + lex.nextToken() + + q, err := parseQuery(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse query inside 'in(...)': %w", err) + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("missing ')' after 'in(%s)'", q) + } + lex.nextToken() + + qFieldName, err := getFieldNameFromPipes(q.pipes) + if err != nil { + return nil, fmt.Errorf("cannot determine field name for values in 'in(%s)': %w", q, err) + } + + fs = &filterStreamID{ + needExecuteQuery: true, + q: q, + qFieldName: qFieldName, + } + return fs, nil +} + +func parseStreamID(lex *lexer) (streamID, error) { + var sid streamID + + s, err := getCompoundToken(lex) + if err != nil { + return sid, err + } + + if !sid.tryUnmarshalFromString(s) { + return sid, fmt.Errorf("cannot unmarshal _stream_id from %q", s) + } + return sid, nil +} + func parseFilterStream(lex *lexer) (*filterStream, error) { sf, err := parseStreamFilter(lex) if err != nil { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 357b26ded..db9b5306f 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -697,8 +697,13 @@ func TestParseQuerySuccess(t *testing.T) { f(`"" or foo:"" and not bar:""`, `"" or foo:"" !bar:""`) // _stream_id filter - f(`_stream_id:foo`, `_stream_id:foo`) - f(`_stream_id:foo-bar/b:az`, `_stream_id:"foo-bar/b:az"`) + f(`_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`) + f(`_stream_id:"0000007b000001c8302bc96e02e54e5524b3a68ec271e55e"`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`) + f(`_stream_id:in()`, `_stream_id:in()`) + f(`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e)`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`) + f(`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e, "0000007b000001c850d9950ea6196b1a4812081265faa1c7")`, + `_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e,0000007b000001c850d9950ea6196b1a4812081265faa1c7)`) + f(`_stream_id:in(_time:5m | fields _stream_id)`, `_stream_id:in(_time:5m | fields _stream_id)`) // _stream filters f(`_stream:{}`, `_stream:{}`) @@ -1243,7 +1248,11 @@ func TestParseQueryFailure(t *testing.T) { f("`foo") // invalid _stream_id filters - f("_stream_id:(foo)") + f("_stream_id:foo") + f("_stream_id:()") + f("_stream_id:in(foo)") + f("_stream_id:in(foo | bar)") + f("_stream_id:in(* | stats by (x) count() y)") // invalid _stream filters f("_stream:") diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index c5c9123e8..923c391d7 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -105,7 +105,11 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, } func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error { - streamIDs := q.getSortedStreamIDs() + streamIDs := q.getStreamIDs() + sort.Slice(streamIDs, func(i, j int) bool { + return streamIDs[i].less(&streamIDs[j]) + }) + neededColumnNames, unneededColumnNames := q.getNeededColumns() so := &genericSearchOptions{ tenantIDs: tenantIDs, @@ -427,8 +431,14 @@ func hasFilterInWithQueryForFilter(f filter) bool { return false } visitFunc := func(f filter) bool { - fi, ok := f.(*filterIn) - return ok && fi.needExecuteQuery + switch t := f.(type) { + case *filterIn: + return t.needExecuteQuery + case *filterStreamID: + return t.needExecuteQuery + default: + return false + } } return visitFilter(f, visitFunc) } @@ -465,33 +475,71 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldVa } visitFunc := func(f filter) bool { - fi, ok := f.(*filterIn) - return ok && fi.needExecuteQuery + switch t := f.(type) { + case *filterIn: + return t.needExecuteQuery + case *filterStreamID: + return t.needExecuteQuery + default: + return false + } } copyFunc := func(f filter) (filter, error) { - fi := f.(*filterIn) - - qStr := fi.q.String() - values, ok := cache[qStr] - if !ok { - vs, err := getFieldValuesFunc(fi.q, fi.qFieldName) + switch t := f.(type) { + case *filterIn: + values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc) if err != nil { - return nil, fmt.Errorf("cannot obtain unique values for %s: %w", fi, err) + return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err) } - cache[qStr] = vs - values = vs - } - fiNew := &filterIn{ - fieldName: fi.fieldName, - q: fi.q, - values: values, + fiNew := &filterIn{ + fieldName: t.fieldName, + q: t.q, + values: values, + } + return fiNew, nil + case *filterStreamID: + values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc) + if err != nil { + return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err) + } + + // convert values to streamID list + streamIDs := make([]streamID, 0, len(values)) + for _, v := range values { + var sid streamID + if sid.tryUnmarshalFromString(v) { + streamIDs = append(streamIDs, sid) + } + } + + fsNew := &filterStreamID{ + streamIDs: streamIDs, + q: t.q, + } + return fsNew, nil + default: + return f, nil } - return fiNew, nil } return copyFilter(f, visitFunc, copyFunc) } +func getValuesForQuery(q *Query, qFieldName string, cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) ([]string, error) { + qStr := q.String() + values, ok := cache[qStr] + if ok { + return values, nil + } + + vs, err := getFieldValuesFunc(q, qFieldName) + if err != nil { + return nil, err + } + cache[qStr] = vs + return vs, nil +} + func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) { pipesNew := make([]pipe, len(pipes)) for i, p := range pipes { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index eea4dd249..3235eee1b 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -82,7 +82,7 @@ func TestStorageRunQuery(t *testing.T) { } s.debugFlush() - mustRunQuery := func(tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) { + mustRunQuery := func(t *testing.T, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) { t.Helper() err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock) if err != nil { @@ -91,7 +91,7 @@ func TestStorageRunQuery(t *testing.T) { } // run tests on the storage data - t.Run("missing-tenant", func(_ *testing.T) { + t.Run("missing-tenant", func(t *testing.T) { q := mustParseQuery(`"log message"`) tenantID := TenantID{ AccountID: 0, @@ -101,9 +101,9 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) }) - t.Run("missing-message-text", func(_ *testing.T) { + t.Run("missing-message-text", func(t *testing.T) { q := mustParseQuery(`foobar`) tenantID := TenantID{ AccountID: 1, @@ -113,7 +113,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) }) t.Run("matching-tenant-id", func(t *testing.T) { q := mustParseQuery(`tenant.id:*`) @@ -147,7 +147,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -161,7 +161,7 @@ func TestStorageRunQuery(t *testing.T) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { rowsCountTotal.Add(uint32(len(timestamps))) } - mustRunQuery(allTenantIDs, q, writeBlock) + mustRunQuery(t, allTenantIDs, q, writeBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -174,19 +174,19 @@ func TestStorageRunQuery(t *testing.T) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { rowsCountTotal.Add(uint32(len(timestamps))) } - mustRunQuery(allTenantIDs, q, writeBlock) + mustRunQuery(t, allTenantIDs, q, writeBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) - t.Run("stream-filter-mismatch", func(_ *testing.T) { + t.Run("stream-filter-mismatch", func(t *testing.T) { q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`) writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } - mustRunQuery(allTenantIDs, q, writeBlock) + mustRunQuery(t, allTenantIDs, q, writeBlock) }) t.Run("matching-stream-id", func(t *testing.T) { for i := 0; i < streamsPerTenant; i++ { @@ -220,7 +220,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) expectedRowsCount := blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -239,7 +239,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * 2 if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -259,7 +259,7 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -279,14 +279,14 @@ func TestStorageRunQuery(t *testing.T) { rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) expectedRowsCount := blocksPerStream if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) - t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) { + t.Run("matching-stream-id-missing-time-range", func(t *testing.T) { minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9 q := mustParseQuery(fmt.Sprintf(`_stream:{job="foobar",instance="host-1:234"} _time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9)) @@ -298,9 +298,9 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) }) - t.Run("missing-time-range", func(_ *testing.T) { + t.Run("missing-time-range", func(t *testing.T) { minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9 q := mustParseQuery(fmt.Sprintf(`_time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9)) @@ -312,7 +312,7 @@ func TestStorageRunQuery(t *testing.T) { panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - mustRunQuery(tenantIDs, q, writeBlock) + mustRunQuery(t, tenantIDs, q, writeBlock) }) t.Run("field_names-all", func(t *testing.T) { q := mustParseQuery("*") @@ -493,7 +493,7 @@ func TestStorageRunQuery(t *testing.T) { resultRowsLock.Unlock() } } - mustRunQuery(allTenantIDs, q, writeBlock) + mustRunQuery(t, allTenantIDs, q, writeBlock) assertRowsEqual(t, resultRows, rowsExpected) } @@ -505,6 +505,13 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) + t.Run("_stream_id-filter", func(t *testing.T) { + f(t, `_stream_id:in(tenant.id:2 | fields _stream_id) | stats count() rows`, [][]Field{ + { + {"rows", "105"}, + }, + }) + }) t.Run("in-filter-with-subquery-match", func(t *testing.T) { f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{ { @@ -545,7 +552,7 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) - t.Run("pipe-extract", func(*testing.T) { + t.Run("pipe-extract", func(t *testing.T) { f(t, `* | extract "host-:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{ { {"host", "0"}, @@ -561,7 +568,7 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) - t.Run("pipe-extract-if-filter-with-subquery", func(*testing.T) { + t.Run("pipe-extract-if-filter-with-subquery", func(t *testing.T) { f(t, `* | extract if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id)) "host-:" from instance @@ -590,7 +597,7 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) - t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(*testing.T) { + t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(t *testing.T) { f(t, `* | extract if (tenant.id:in(tenant.id:3 | fields tenant.id)) "host-:" from instance @@ -611,7 +618,7 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) - t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(*testing.T) { + t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(t *testing.T) { f(t, `* | extract if (tenant.id:in(tenant.id:3 | fields tenant.id)) "host-:" from instance @@ -717,7 +724,7 @@ func TestStorageSearch(t *testing.T) { } } - t.Run("missing-tenant-smaller-than-existing", func(_ *testing.T) { + t.Run("missing-tenant-smaller-than-existing", func(t *testing.T) { tenantID := TenantID{ AccountID: 0, ProjectID: 0, @@ -735,7 +742,7 @@ func TestStorageSearch(t *testing.T) { } s.search(workersCount, so, nil, processBlock) }) - t.Run("missing-tenant-bigger-than-existing", func(_ *testing.T) { + t.Run("missing-tenant-bigger-than-existing", func(t *testing.T) { tenantID := TenantID{ AccountID: tenantsCount + 1, ProjectID: 0, @@ -753,7 +760,7 @@ func TestStorageSearch(t *testing.T) { } s.search(workersCount, so, nil, processBlock) }) - t.Run("missing-tenant-middle", func(_ *testing.T) { + t.Run("missing-tenant-middle", func(t *testing.T) { tenantID := TenantID{ AccountID: 1, ProjectID: 0, @@ -817,7 +824,7 @@ func TestStorageSearch(t *testing.T) { t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) - t.Run("stream-filter-mismatch", func(_ *testing.T) { + t.Run("stream-filter-mismatch", func(t *testing.T) { sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-.+:2345"}`) minTimestamp := baseTimestamp maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream @@ -943,7 +950,7 @@ func TestStorageSearch(t *testing.T) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) - t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) { + t.Run("matching-stream-id-missing-time-range", func(t *testing.T) { sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`) tenantID := TenantID{ AccountID: 1,