From 0aafca29be5522450314ee2db520e976cd5d2779 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 28 May 2024 19:29:41 +0200 Subject: [PATCH] lib/logstorage: work-in-progress --- docs/VictoriaLogs/CHANGELOG.md | 12 + docs/VictoriaLogs/LogsQL.md | 164 ++++- lib/logstorage/filter_and.go | 144 ++-- lib/logstorage/filter_in.go | 102 ++- lib/logstorage/filter_in_test.go | 31 + lib/logstorage/filter_or.go | 112 +++ lib/logstorage/filter_string_range.go | 8 +- lib/logstorage/parser.go | 88 ++- lib/logstorage/parser_test.go | 43 +- lib/logstorage/pipe.go | 42 +- lib/logstorage/pipe_delete.go | 4 +- lib/logstorage/pipe_extract.go | 20 +- lib/logstorage/pipe_extract_regexp.go | 334 +++++++++ lib/logstorage/pipe_extract_regexp_test.go | 329 +++++++++ lib/logstorage/pipe_extract_test.go | 28 - lib/logstorage/pipe_field_values.go | 93 +++ lib/logstorage/pipe_field_values_test.go | 148 ++++ lib/logstorage/pipe_filter.go | 10 +- lib/logstorage/pipe_filter_test.go | 21 + lib/logstorage/pipe_format.go | 12 +- lib/logstorage/pipe_limit.go | 17 +- lib/logstorage/pipe_limit_test.go | 12 +- lib/logstorage/pipe_math.go | 776 +++++++++++++++++++++ lib/logstorage/pipe_math_test.go | 233 +++++++ lib/logstorage/pipe_replace.go | 10 +- lib/logstorage/pipe_replace_regexp.go | 10 +- lib/logstorage/pipe_stats.go | 11 +- lib/logstorage/pipe_stats_test.go | 90 +++ lib/logstorage/pipe_unpack.go | 10 +- lib/logstorage/pipe_utils_test.go | 28 + lib/logstorage/storage_search.go | 6 +- lib/logstorage/storage_search_test.go | 152 ++++ 32 files changed, 2911 insertions(+), 189 deletions(-) create mode 100644 lib/logstorage/pipe_extract_regexp.go create mode 100644 lib/logstorage/pipe_extract_regexp_test.go create mode 100644 lib/logstorage/pipe_field_values.go create mode 100644 lib/logstorage/pipe_field_values_test.go create mode 100644 lib/logstorage/pipe_math.go create mode 100644 lib/logstorage/pipe_math_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index b569f6195..66418ffe4 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,18 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs) + +Released at 2024-05-28 + +* FEATURE: add [`extract_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) for extracting arbitrary substrings from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with [RE2 egular expressions](https://github.com/google/re2/wiki/Syntax). +* FEATURE: add [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +* FEATURE: add [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), which returns unique values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +* FEATURE: allow omitting `stats` prefix in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). For example, `_time:5m | count() rows` is a valid query now. It is equivalent to `_time:5m | stats count() as rows`. +* FEATURE: allow omitting `filter` prefix in [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the filter doesn't clash with [pipe names](#https://docs.victoriametrics.com/victorialogs/logsql/#pipes). For example, `_time:5m | stats by (host) count() rows | rows:>1000` is a valid query now. It is equivalent to `_time:5m | stats by (host) count() rows | filter rows:>1000`. +* FEATURE: allow [`head` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) without number. For example, `error | head`. In this case 10 last values are returned as `head` Unix command does by default. +* FEATURE: allow using [comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filters) with strings. For example, `some_text_field:>="foo"` matches [log entries](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with `some_text_field` field values bigger or equal to `foo`. + ## [v0.12.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.1-victorialogs) Released at 2024-05-26 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 8b6e17467..72317d1a4 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -255,6 +255,7 @@ The list of LogsQL filters: - [Phrase filter](#phrase-filter) - matches logs with the given phrase - [Prefix filter](#prefix-filter) - matches logs with the given word prefix or phrase prefix - [Substring filter](#substring-filter) - matches logs with the given substring +- [Range comparison filter](#range-comparison-filter) - matches logs with field values in the provided range - [Empty value filter](#empty-value-filter) - matches logs without the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - [Any value filter](#any-value-filter) - matches logs with the given non-empty [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - [Exact filter](#exact-filter) - matches logs with the exact value @@ -576,6 +577,26 @@ See also: - [Regexp filter](#regexp-filter) +### Range comparison filter + +LogsQL supports `field:>X`, `field:>=X`, `field:10KiB +``` + +The following query returns logs with `user` field containing string values smaller than 'John`: + +```logsql +username:<"John" +``` + +See also: + +- [String range filter](#string-range-filter) +- [Range filter](#range-filter) + ### Empty value filter Sometimes it is needed to find log entries without the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -906,18 +927,12 @@ for searching for log entries with request durations exceeding 4.2 seconds: request.duration:range(4.2, Inf) ``` -This query can be shortened to: +This query can be shortened to by using [range comparison filter](#range-comparison-filter): ```logsql request.duration:>4.2 ``` -The following query returns logs with request durations smaller or equal to 1.5 seconds: - -```logsql -request.duration:<=1.5 -``` - The lower and the upper bounds of the `range(lower, upper)` are excluded by default. If they must be included, then substitute the corresponding parentheses with square brackets. For example: @@ -941,6 +956,7 @@ Performance tips: See also: +- [Range comparison filter](#range-comparison-filter) - [IPv4 range filter](#ipv4-range-filter) - [String range filter](#string-range-filter) - [Length range filter](#length-range-filter) @@ -1012,6 +1028,7 @@ For example, the `user.name:string_range(C, E)` would match `user.name` fields, See also: +- [Range comparison filter](#range-comparison-filter) - [Range filter](#range-filter) - [IPv4 range filter](#ipv4-range-filter) - [Length range filter](#length-range-filter) @@ -1135,11 +1152,14 @@ LogsQL supports the following pipes: - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`extract`](#extract-pipe) extracts the sepcified text into the given log fields. +- [`extract_regexp`](#extract_regexp-pipe) extracts the sepcified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax). - [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`field_values`](#field_values-pipe) returns all the values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`filter`](#filter-pipe) applies additional [filters](#filters) to results. - [`format`](#format-pipe) formats ouptut field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`limit`](#limit-pipe) limits the number selected logs. +- [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`offset`](#offset-pipe) skips the given number of selected logs. - [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1188,7 +1208,7 @@ For example, the following query deletes `host` and `app` fields from the logs o _time:5m | delete host, app ``` -`del` and `rm` keywords can be used instead of `delete` for convenience. For example, `_time:5m | del host` is equivalent to `_time:5m | rm host` and `_time:5m | delete host`. +`drop`, `del` and `rm` keywords can be used instead of `delete` for convenience. For example, `_time:5m | drop host` is equivalent to `_time:5m | delete host`. See also: @@ -1251,6 +1271,7 @@ See also: - [Conditional extract](#conditional-extract) - [`unpack_json` pipe](#unpack_json-pipe) - [`unpack_logfmt` pipe](#unpack_logfmt-pipe) +- [`math` pipe](#math-pipe) #### Format for extract pipe pattern @@ -1334,6 +1355,34 @@ For example, the following query is equivalent to the previous one: _time:5m | extract "ip= " keep_original_fields ``` +### extract_regexp pipe + +`| extract_regexp "pattern" from field_name` [pipe](#pipes) extracts substrings from the [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +according to the provided `pattern`, and stores them into field names according to the named fields inside the `pattern`. +The `pattern` must contain [RE2 regular expression](https://github.com/google/re2/wiki/Syntax) with named fields (aka capturing groups) in the form `(?P...)`. +Matching substrings are stored to the given `capture_field_name` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +For example, the following query extracts ipv4 addresses from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +and puts them into `ip` field for logs over the last 5 minutes: + +```logsql +_time:5m | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)" from _msg +``` + +The `from _msg` part can be omitted if the data extraction is performed from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +So the following query is equivalent to the previous one: + +```logsql +_time:5m | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)" +``` + +Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance. + +See also: + +- [`extract` pipe](#extract-pipe) +- [`replace_regexp` pipe](#replace_regexp-pipe) +- [`unpack_json` pipe](#unpack_json-pipe) + ### field_names pipe `| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) @@ -1348,8 +1397,33 @@ Field names are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in or See also: +- [`field_values` pipe](#field_values-pipe) - [`uniq` pipe](#uniq-pipe) +### field_values pipe + +`| field_values field_name` [pipe](#pipe) returns all the values for the given [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the number of logs per each value. +For example, the following query returns all the values with the number of matching logs for the field `level` over logs logs for the last 5 minutes: + +```logsql +_time:5m | field_values level +``` + +It is possible limiting the number of returned values by adding `limit N` to the end of the `field_values ...`. For example, the following query returns +up to 10 values for the field `user_id` over logs for the last 5 minutes: + +```logsql +_time:5m | field_values user_id limit 10 +``` + +If the limit is reached, then the set of returned values is random. Also the number of matchin logs per each returned value is zeroed for performance reasons. + +See also: + +- [`field_names` pipe](#field_names-pipe) +- [`uniq` pipe)(#uniq-pipe) + ### fields pipe By default all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) are returned in the response. @@ -1374,8 +1448,7 @@ See also: ### filter pipe -Sometimes it is needed to apply additional filters on the calculated results. This can be done with `| filter ...` [pipe](#pipes). -The `filter` pipe can contain arbitrary [filters](#filters). +The `| filter ...` [pipe](#pipes) allows filtering the selected logs entries with arbitrary [filters](#filters). For example, the following query returns `host` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values if the number of log messages with the `error` [word](#word) for them over the last hour exceeds `1_000`: @@ -1384,6 +1457,13 @@ if the number of log messages with the `error` [word](#word) for them over the l _time:1h error | stats by (host) count() logs_count | filter logs_count:> 1_000 ``` +It is allowed to omit `filter` prefix if the used filters do not clash with [pipe names](#pipes). +So the following query is equivalent to the previous one: + +```logsql +_time:1h error | stats by (host) count() logs_count | logs_count:> 1_000 +``` + See also: - [`stats` pipe](#stats-pipe) @@ -1463,6 +1543,12 @@ _time:5m | limit 100 `head` keyword can be used instead of `limit` for convenience. For example, `_time:5m | head 100` is equivalent to `_time:5m | limit 100`. +The `N` in `head N` can be omitted - in this case up to 10 matching logs are returned: + +```logsql +error | head +``` + By default rows are selected in arbitrary order because of performance reasons, so the query above can return different sets of logs every time it is executed. [`sort` pipe](#sort-pipe) can be used for making sure the logs are in the same order before applying `limit ...` to them. @@ -1471,6 +1557,50 @@ See also: - [`sort` pipe](#sort-pipe) - [`offset` pipe](#offset-pipe) +### math pipe + +`| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +For example, the following query divides `duration_msecs` field value by 1000, then rounds them to integer and stores the result in the `duration_secs` field: + +```logsql +_time:5m | math round(duration_msecs / 1000) as duration_secs +``` + +The following mathematical operations are supported by `math` pipe: + +- `arg1 + arg2` - returns the sum of `arg1` and `arg2` +- `arg1 - arg2` - returns the difference between `arg1` and `arg2` +- `arg1 * arg2` - multiplies `arg1` by `arg2` +- `arg1 / arg2` - divides `arg1` by `arg2` +- `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2` +- `arg1 ^ arg2` - returns the power of `arg1` by `arg2` +- `abs(arg)` - returns an absolute values for the given `arg` +- `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN` +- `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN` +- `round(arg)` - returns rounded to integer value for the given `arg`. The `round()` accepts optional `nearest` arg, which allows rounding the number to the given `nearest` multiple. + For example, `round(temperature, 0.1)` rounds `temperature` field to one decimal digit after the point. + +Every `argX` argument in every mathematical operation can contain one of the following values: + +- The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`. +- Any [supported numeric value](#numeric-values). For example, `response_size_bytes / 1MiB`. +- Another mathematical expression. Optionally, it may be put inside `(...)`. For example, `(a + b) * c`. + +Multiple distinct results can be calculated in a single `math ...` pipe - just separate them with `,`. For example, the following query calculates the error rate +and the number of successful requests from `errors`, `warnings` and `requests` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model): + +```logsql +_time:5m | math + (errors / requests) as error_rate, + (requests - (errors + warnings)) as success_requests +``` + +See also: + +- [`stats` pipe](#stats-pipe) +- [`extract` pipe](#extract-pipe) + + ### offset pipe If some selected logs must be skipped after [`sort`](#sort-pipe), then `| offset N` [pipe](#pipes) can be used, where `N` can contain any [supported integer numeric value](#numeric-values). @@ -1738,6 +1868,12 @@ For example, the following query calculates the following stats for logs over th _time:5m | stats count() logs_total, count_uniq(_stream) streams_total ``` +It is allowed to omit `stats` prefix for convenience. So the following query is equivalent to the previous one: + +```logsql +_time:5m | count() logs_total, count_uniq(_stream) streams_total +``` + See also: - [stats by fields](#stats-by-fields) @@ -1747,6 +1883,7 @@ See also: - [stats by IPv4 buckets](#stats-by-ipv4-buckets) - [stats with additional filters](#stats-with-additional-filters) - [stats pipe functions](#stats-pipe-functions) +- [`math` pipe](#math-pipe) - [`sort` pipe](#sort-pipe) @@ -2462,12 +2599,7 @@ LogsQL supports the following transformations on the log entries selected with [ - Creating a new field from existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) according to the provided format. See [`format` pipe](#format-pipe). - Replacing substrings in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [`replace` pipe](#replace-pipe) and [`replace_regexp` pipe](#replace_regexp-pipe) docs. - -LogsQL will support the following transformations in the future: - -- Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - -See the [Roadmap](https://docs.victoriametrics.com/victorialogs/roadmap/) for details. +- Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [`math` pipe](#math-pipe). It is also possible to perform various transformations on the [selected log entries](#filters) at client side with `jq`, `awk`, `cut`, etc. Unix commands according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line). diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index caea8ae6f..08bf8736a 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -13,8 +13,13 @@ import ( type filterAnd struct { filters []filter - msgTokensOnce sync.Once - msgTokens []string + byFieldTokensOnce sync.Once + byFieldTokens []fieldTokens +} + +type fieldTokens struct { + field string + tokens []string } func (fa *filterAnd) String() string { @@ -22,8 +27,7 @@ func (fa *filterAnd) String() string { a := make([]string, len(filters)) for i, f := range filters { s := f.String() - switch f.(type) { - case *filterOr: + if _, ok := f.(*filterOr); ok { s = "(" + s + ")" } a[i] = s @@ -49,8 +53,8 @@ func (fa *filterAnd) applyToBlockResult(br *blockResult, bm *bitmap) { } func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) { - if !fa.matchMessageBloomFilter(bs) { - // Fast path - fa doesn't match _msg bloom filter. + if !fa.matchBloomFilters(bs) { + // Fast path - fa doesn't match bloom filters. bm.resetBits() return } @@ -66,60 +70,114 @@ func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) { } } -func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool { - tokens := fa.getMessageTokens() - if len(tokens) == 0 { +func (fa *filterAnd) matchBloomFilters(bs *blockSearch) bool { + byFieldTokens := fa.getByFieldTokens() + if len(byFieldTokens) == 0 { return true } - v := bs.csh.getConstColumnValue("_msg") - if v != "" { - return matchStringByAllTokens(v, tokens) + for _, fieldTokens := range byFieldTokens { + fieldName := fieldTokens.field + tokens := fieldTokens.tokens + + v := bs.csh.getConstColumnValue(fieldName) + if v != "" { + if !matchStringByAllTokens(v, tokens) { + return false + } + continue + } + + ch := bs.csh.getColumnHeader(fieldName) + if ch == nil { + return false + } + + if ch.valueType == valueTypeDict { + if !matchDictValuesByAllTokens(ch.valuesDict.values, tokens) { + return false + } + continue + } + if !matchBloomFilterAllTokens(bs, ch, tokens) { + return false + } } - ch := bs.csh.getColumnHeader("_msg") - if ch == nil { - return false - } - - if ch.valueType == valueTypeDict { - return matchDictValuesByAllTokens(ch.valuesDict.values, tokens) - } - return matchBloomFilterAllTokens(bs, ch, tokens) + return true } -func (fa *filterAnd) getMessageTokens() []string { - fa.msgTokensOnce.Do(fa.initMsgTokens) - return fa.msgTokens +func (fa *filterAnd) getByFieldTokens() []fieldTokens { + fa.byFieldTokensOnce.Do(fa.initByFieldTokens) + return fa.byFieldTokens } -func (fa *filterAnd) initMsgTokens() { - var a []string +func (fa *filterAnd) initByFieldTokens() { + m := make(map[string]map[string]struct{}) + byFieldFilters := make(map[string]int) + var fieldNames []string + for _, f := range fa.filters { + fieldName := "" + var tokens []string + switch t := f.(type) { - case *filterPhrase: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } - case *filterSequence: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } case *filterExact: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } + fieldName = t.fieldName + tokens = t.getTokens() case *filterExactPrefix: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) - } + fieldName = t.fieldName + tokens = t.getTokens() + case *filterPhrase: + fieldName = t.fieldName + tokens = t.getTokens() case *filterPrefix: - if isMsgFieldName(t.fieldName) { - a = append(a, t.getTokens()...) + fieldName = t.fieldName + tokens = t.getTokens() + case *filterRegexp: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterSequence: + fieldName = t.fieldName + tokens = t.getTokens() + } + + fieldName = getCanonicalColumnName(fieldName) + byFieldFilters[fieldName]++ + + if len(tokens) > 0 { + mTokens, ok := m[fieldName] + if !ok { + fieldNames = append(fieldNames, fieldName) + mTokens = make(map[string]struct{}) + m[fieldName] = mTokens + } + for _, token := range tokens { + mTokens[token] = struct{}{} } } } - fa.msgTokens = a + + var byFieldTokens []fieldTokens + for _, fieldName := range fieldNames { + if byFieldFilters[fieldName] < 2 { + // It is faster to perform bloom filter match inline when visiting the corresponding column + continue + } + + mTokens := m[fieldName] + tokens := make([]string, 0, len(mTokens)) + for token := range mTokens { + tokens = append(tokens, token) + } + + byFieldTokens = append(byFieldTokens, fieldTokens{ + field: fieldName, + tokens: tokens, + }) + } + + fa.byFieldTokens = byFieldTokens } func matchStringByAllTokens(v string, tokens []string) bool { diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go index b1d7b4821..742a0fb40 100644 --- a/lib/logstorage/filter_in.go +++ b/lib/logstorage/filter_in.go @@ -3,6 +3,7 @@ package logstorage import ( "fmt" "math" + "slices" "strings" "sync" @@ -27,8 +28,9 @@ type filterIn struct { // qFieldName must be set to field name for obtaining values from if q is non-nil. qFieldName string - tokenSetsOnce sync.Once - tokenSets [][]string + tokensOnce sync.Once + commonTokens []string + tokenSets [][]string stringValuesOnce sync.Once stringValues map[string]struct{} @@ -74,28 +76,15 @@ func (fi *filterIn) updateNeededFields(neededFields fieldsSet) { neededFields.add(fi.fieldName) } -func (fi *filterIn) getTokenSets() [][]string { - fi.tokenSetsOnce.Do(fi.initTokenSets) - return fi.tokenSets +func (fi *filterIn) getTokens() ([]string, [][]string) { + fi.tokensOnce.Do(fi.initTokens) + return fi.commonTokens, fi.tokenSets } -// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter. -const maxTokenSetsToInit = 1000 +func (fi *filterIn) initTokens() { + commonTokens, tokenSets := getCommonTokensAndTokenSets(fi.values) -func (fi *filterIn) initTokenSets() { - values := fi.values - tokenSetsLen := len(values) - if tokenSetsLen > maxTokenSetsToInit { - tokenSetsLen = maxTokenSetsToInit - } - tokenSets := make([][]string, 0, tokenSetsLen+1) - for _, v := range values { - tokens := tokenizeStrings(nil, []string{v}) - tokenSets = append(tokenSets, tokens) - if len(tokens) > maxTokenSetsToInit { - break - } - } + fi.commonTokens = commonTokens fi.tokenSets = tokenSets } @@ -385,47 +374,47 @@ func (fi *filterIn) applyToBlockSearch(bs *blockSearch, bm *bitmap) { return } - tokenSets := fi.getTokenSets() + commonTokens, tokenSets := fi.getTokens() switch ch.valueType { case valueTypeString: stringValues := fi.getStringValues() - matchAnyValue(bs, ch, bm, stringValues, tokenSets) + matchAnyValue(bs, ch, bm, stringValues, commonTokens, tokenSets) case valueTypeDict: stringValues := fi.getStringValues() matchValuesDictByAnyValue(bs, ch, bm, stringValues) case valueTypeUint8: binValues := fi.getUint8Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeUint16: binValues := fi.getUint16Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeUint32: binValues := fi.getUint32Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeUint64: binValues := fi.getUint64Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeFloat64: binValues := fi.getFloat64Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeIPv4: binValues := fi.getIPv4Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) case valueTypeTimestampISO8601: binValues := fi.getTimestampISO8601Values() - matchAnyValue(bs, ch, bm, binValues, tokenSets) + matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets) default: logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType) } } -func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, tokenSets [][]string) { +func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, commonTokens []string, tokenSets [][]string) { if len(values) == 0 { bm.resetBits() return } - if !matchBloomFilterAnyTokenSet(bs, ch, tokenSets) { + if !matchBloomFilterAnyTokenSet(bs, ch, commonTokens, tokenSets) { bm.resetBits() return } @@ -435,7 +424,10 @@ func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[str }) } -func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets [][]string) bool { +func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, commonTokens []string, tokenSets [][]string) bool { + if len(commonTokens) > 0 { + return matchBloomFilterAllTokens(bs, ch, commonTokens) + } if len(tokenSets) == 0 { return false } @@ -453,6 +445,9 @@ func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets [] return false } +// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter. +const maxTokenSetsToInit = 1000 + func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}) { bb := bbPool.Get() for _, v := range ch.valuesDict.values { @@ -465,3 +460,44 @@ func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, va matchEncodedValuesDict(bs, ch, bm, bb.B) bbPool.Put(bb) } + +func getCommonTokensAndTokenSets(values []string) ([]string, [][]string) { + tokenSets := make([][]string, len(values)) + for i, v := range values { + tokenSets[i] = tokenizeStrings(nil, []string{v}) + } + + commonTokens := getCommonTokens(tokenSets) + if len(commonTokens) == 0 { + return nil, tokenSets + } + return commonTokens, nil +} + +func getCommonTokens(tokenSets [][]string) []string { + if len(tokenSets) == 0 { + return nil + } + + m := make(map[string]struct{}, len(tokenSets[0])) + for _, token := range tokenSets[0] { + m[token] = struct{}{} + } + + for _, tokens := range tokenSets[1:] { + if len(m) == 0 { + return nil + } + for token := range m { + if !slices.Contains(tokens, token) { + delete(m, token) + } + } + } + + tokens := make([]string, 0, len(m)) + for token := range m { + tokens = append(tokens, token) + } + return tokens +} diff --git a/lib/logstorage/filter_in_test.go b/lib/logstorage/filter_in_test.go index ffe8e944b..29f1e35ff 100644 --- a/lib/logstorage/filter_in_test.go +++ b/lib/logstorage/filter_in_test.go @@ -1,6 +1,8 @@ package logstorage import ( + "reflect" + "slices" "testing" ) @@ -688,3 +690,32 @@ func TestFilterIn(t *testing.T) { testFilterMatchForColumns(t, columns, fi, "_msg", nil) }) } + +func TestGetCommonTokensAndTokenSets(t *testing.T) { + f := func(values []string, commonTokensExpected []string, tokenSetsExpected [][]string) { + t.Helper() + + commonTokens, tokenSets := getCommonTokensAndTokenSets(values) + slices.Sort(commonTokens) + + if !reflect.DeepEqual(commonTokens, commonTokensExpected) { + t.Fatalf("unexpected commonTokens for values=%q\ngot\n%q\nwant\n%q", values, commonTokens, commonTokensExpected) + } + + for i, tokens := range tokenSets { + slices.Sort(tokens) + tokensExpected := tokenSetsExpected[i] + if !reflect.DeepEqual(tokens, tokensExpected) { + t.Fatalf("unexpected tokens for value=%q\ngot\n%q\nwant\n%q", values[i], tokens, tokensExpected) + } + } + } + + f(nil, nil, nil) + f([]string{"foo"}, []string{"foo"}, nil) + f([]string{"foo", "foo"}, []string{"foo"}, nil) + f([]string{"foo", "bar", "bar", "foo"}, nil, [][]string{{"foo"}, {"bar"}, {"bar"}, {"foo"}}) + f([]string{"foo", "foo bar", "bar foo"}, []string{"foo"}, nil) + f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, nil) + f([]string{"a xfoo bar", "xbar abc foo", "foo abc a bar"}, nil, [][]string{{"a", "bar", "xfoo"}, {"abc", "foo", "xbar"}, {"a", "abc", "bar", "foo"}}) +} diff --git a/lib/logstorage/filter_or.go b/lib/logstorage/filter_or.go index 5349c4370..256337a0c 100644 --- a/lib/logstorage/filter_or.go +++ b/lib/logstorage/filter_or.go @@ -2,6 +2,7 @@ package logstorage import ( "strings" + "sync" ) // filterOr contains filters joined by OR operator. @@ -9,6 +10,9 @@ import ( // It is epxressed as `f1 OR f2 ... OR fN` in LogsQL. type filterOr struct { filters []filter + + byFieldTokensOnce sync.Once + byFieldTokens []fieldTokens } func (fo *filterOr) String() string { @@ -51,6 +55,12 @@ func (fo *filterOr) applyToBlockResult(br *blockResult, bm *bitmap) { } func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) { + if !fo.matchBloomFilters(bs) { + // Fast path - fo doesn't match bloom filters. + bm.resetBits() + return + } + bmResult := getBitmap(bm.bitsLen) bmTmp := getBitmap(bm.bitsLen) for _, f := range fo.filters { @@ -72,3 +82,105 @@ func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) { bm.copyFrom(bmResult) putBitmap(bmResult) } + +func (fo *filterOr) matchBloomFilters(bs *blockSearch) bool { + byFieldTokens := fo.getByFieldTokens() + if len(byFieldTokens) == 0 { + return true + } + + for _, fieldTokens := range byFieldTokens { + fieldName := fieldTokens.field + tokens := fieldTokens.tokens + + v := bs.csh.getConstColumnValue(fieldName) + if v != "" { + if matchStringByAllTokens(v, tokens) { + return true + } + continue + } + + ch := bs.csh.getColumnHeader(fieldName) + if ch == nil { + continue + } + + if ch.valueType == valueTypeDict { + if matchDictValuesByAllTokens(ch.valuesDict.values, tokens) { + return true + } + continue + } + if matchBloomFilterAllTokens(bs, ch, tokens) { + return true + } + } + + return false +} + +func (fo *filterOr) getByFieldTokens() []fieldTokens { + fo.byFieldTokensOnce.Do(fo.initByFieldTokens) + return fo.byFieldTokens +} + +func (fo *filterOr) initByFieldTokens() { + m := make(map[string][][]string) + byFieldFilters := make(map[string]int) + var fieldNames []string + + for _, f := range fo.filters { + fieldName := "" + var tokens []string + + switch t := f.(type) { + case *filterExact: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterExactPrefix: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterPhrase: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterPrefix: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterRegexp: + fieldName = t.fieldName + tokens = t.getTokens() + case *filterSequence: + fieldName = t.fieldName + tokens = t.getTokens() + } + + fieldName = getCanonicalColumnName(fieldName) + byFieldFilters[fieldName]++ + + if len(tokens) > 0 { + if _, ok := m[fieldName]; !ok { + fieldNames = append(fieldNames, fieldName) + } + m[fieldName] = append(m[fieldName], tokens) + } + } + + var byFieldTokens []fieldTokens + for _, fieldName := range fieldNames { + if byFieldFilters[fieldName] < 2 { + // It is faster to perform bloom filter match inline when visiting the corresponding column + continue + } + + commonTokens := getCommonTokens(m[fieldName]) + if len(commonTokens) > 0 { + byFieldTokens = append(byFieldTokens, fieldTokens{ + field: fieldName, + tokens: commonTokens, + }) + } + } + + fo.byFieldTokens = byFieldTokens +} diff --git a/lib/logstorage/filter_string_range.go b/lib/logstorage/filter_string_range.go index 4ab081b60..095159715 100644 --- a/lib/logstorage/filter_string_range.go +++ b/lib/logstorage/filter_string_range.go @@ -1,11 +1,11 @@ package logstorage import ( - "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) +var maxStringRangeValue = string([]byte{255, 255, 255, 255}) + // filterStringRange matches tie given string range [minValue..maxValue) // // Note that the minValue is included in the range, while the maxValue isn't included in the range. @@ -16,10 +16,12 @@ type filterStringRange struct { fieldName string minValue string maxValue string + + stringRepr string } func (fr *filterStringRange) String() string { - return fmt.Sprintf("%sstring_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), quoteTokenIfNeeded(fr.minValue), quoteTokenIfNeeded(fr.maxValue)) + return quoteFieldNameIfNeeded(fr.fieldName) + fr.stringRepr } func (fr *filterStringRange) updateNeededFields(neededFields fieldsSet) { diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 0220d9655..87e4ecc00 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -74,6 +74,11 @@ func (lex *lexer) isQuotedToken() bool { return lex.token != lex.rawToken } +func (lex *lexer) isNumber() bool { + s := lex.rawToken + lex.s + return isNumberPrefix(s) +} + func (lex *lexer) isPrevToken(tokens ...string) bool { for _, token := range tokens { if token == lex.prevToken { @@ -247,7 +252,7 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) { s := fmt.Sprintf("stats by (%s) count() hits", byFieldsStr) lex := newLexer(s) - ps, err := parsePipeStats(lex) + ps, err := parsePipeStats(lex, true) if err != nil { logger.Panicf("BUG: unexpected error when parsing [%s]: %s", s, err) } @@ -855,6 +860,8 @@ func parseFilterStringRange(lex *lexer, fieldName string) (filter, error) { fieldName: fieldName, minValue: args[0], maxValue: args[1], + + stringRepr: fmt.Sprintf("string_range(%s, %s)", quoteTokenIfNeeded(args[0]), quoteTokenIfNeeded(args[1])), } return fr, nil }) @@ -1091,6 +1098,15 @@ func parseFilterGT(lex *lexer, fieldName string) (filter, error) { op = ">=" } + if !lex.isNumber() { + lexState := lex.backupState() + fr := tryParseFilterGTString(lex, fieldName, op, includeMinValue) + if fr != nil { + return fr, nil + } + lex.restoreState(lexState) + } + minValue, fStr, err := parseFloat64(lex) if err != nil { return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err) @@ -1120,6 +1136,15 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) { op = "<=" } + if !lex.isNumber() { + lexState := lex.backupState() + fr := tryParseFilterLTString(lex, fieldName, op, includeMaxValue) + if fr != nil { + return fr, nil + } + lex.restoreState(lexState) + } + maxValue, fStr, err := parseFloat64(lex) if err != nil { return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err) @@ -1138,6 +1163,43 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) { return fr, nil } +func tryParseFilterGTString(lex *lexer, fieldName, op string, includeMinValue bool) filter { + minValueOrig, err := getCompoundToken(lex) + if err != nil { + return nil + } + minValue := minValueOrig + if !includeMinValue { + minValue = string(append([]byte(minValue), 0)) + } + fr := &filterStringRange{ + fieldName: fieldName, + minValue: minValue, + maxValue: maxStringRangeValue, + + stringRepr: op + quoteStringTokenIfNeeded(minValueOrig), + } + return fr +} + +func tryParseFilterLTString(lex *lexer, fieldName, op string, includeMaxValue bool) filter { + maxValueOrig, err := getCompoundToken(lex) + if err != nil { + return nil + } + maxValue := maxValueOrig + if includeMaxValue { + maxValue = string(append([]byte(maxValue), 0)) + } + fr := &filterStringRange{ + fieldName: fieldName, + maxValue: maxValue, + + stringRepr: op + quoteStringTokenIfNeeded(maxValueOrig), + } + return fr +} + func parseFilterRange(lex *lexer, fieldName string) (filter, error) { funcName := lex.token lex.nextToken() @@ -1495,6 +1557,13 @@ func parseTime(lex *lexer) (int64, string, error) { return int64(math.Round(t*1e3)) * 1e6, s, nil } +func quoteStringTokenIfNeeded(s string) string { + if !needQuoteStringToken(s) { + return s + } + return strconv.Quote(s) +} + func quoteTokenIfNeeded(s string) string { if !needQuoteToken(s) { return s @@ -1502,6 +1571,23 @@ func quoteTokenIfNeeded(s string) string { return strconv.Quote(s) } +func needQuoteStringToken(s string) bool { + return isNumberPrefix(s) || needQuoteToken(s) +} + +func isNumberPrefix(s string) bool { + if len(s) == 0 { + return false + } + if s[0] == '-' || s[0] == '+' { + s = s[1:] + if len(s) == 0 { + return false + } + } + return s[0] >= '0' && s[0] <= '9' +} + func needQuoteToken(s string) bool { sLower := strings.ToLower(s) if _, ok := reservedKeywords[sLower]; ok { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 4b80b66b8..01f89c5fe 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -353,6 +353,10 @@ func TestParseFilterStringRange(t *testing.T) { f("string_range(foo, bar)", ``, "foo", "bar") f(`abc:string_range("foo,bar", "baz) !")`, `abc`, `foo,bar`, `baz) !`) + f(">foo", ``, "foo\x00", maxStringRangeValue) + f("x:>=foo", `x`, "foo", maxStringRangeValue) + f("x:=10.43`, `foo`, 10.43, inf) f(`foo: >= -10.43`, `foo`, -10.43, inf) - f(`foo:<10.43`, `foo`, -inf, nextafter(10.43, -inf)) + f(`foo:<10.43K`, `foo`, -inf, nextafter(10_430, -inf)) f(`foo: < -10.43`, `foo`, -inf, nextafter(-10.43, -inf)) - f(`foo:<=10.43`, `foo`, -inf, 10.43) + f(`foo:<=10.43ms`, `foo`, -inf, 10_430_000) f(`foo: <= 10.43`, `foo`, -inf, 10.43) } @@ -590,6 +594,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`NOT foo AND bar OR baz`, `!foo bar or baz`) f(`NOT (foo AND bar) OR baz`, `!(foo bar) or baz`) f(`foo OR bar AND baz`, `foo or bar baz`) + f(`foo bar or baz xyz`, `foo bar or baz xyz`) + f(`foo (bar or baz) xyz`, `foo (bar or baz) xyz`) + f(`foo or bar baz or xyz`, `foo or bar baz or xyz`) + f(`(foo or bar) (baz or xyz)`, `(foo or bar) (baz or xyz)`) f(`(foo OR bar) AND baz`, `(foo or bar) baz`) // parens @@ -802,6 +810,12 @@ func TestParseQuerySuccess(t *testing.T) { // string_range filter f(`string_range(foo, bar)`, `string_range(foo, bar)`) f(`foo:string_range("foo, bar", baz)`, `foo:string_range("foo, bar", baz)`) + f(`foo:>bar`, `foo:>bar`) + f(`foo:>"1234"`, `foo:>"1234"`) + f(`>="abc"`, `>=abc`) + f(`foo:10`, `* | stats by (host) count(*) as rows | filter rows:>10`) + f(`* | (host) count() rows, count() if (error) errors | rows:>10`, `* | stats by (host) count(*) as rows, count(*) if (error) as errors | filter rows:>10`) } func TestParseQueryFailure(t *testing.T) { @@ -1082,7 +1102,7 @@ func TestParseQueryFailure(t *testing.T) { f("") f("|") f("foo|") - f("foo|bar") + f("foo|bar(") f("foo and") f("foo OR ") f("not") @@ -1151,7 +1171,7 @@ func TestParseQueryFailure(t *testing.T) { f(`very long query with error aaa ffdfd fdfdfd fdfd:( ffdfdfdfdfd`) // query with unexpected tail - f(`foo | bar`) + f(`foo | bar(`) // unexpected comma f(`foo,bar`) @@ -1264,6 +1284,7 @@ func TestParseQueryFailure(t *testing.T) { f(`string_range(foo, bar`) f(`string_range(foo)`) f(`string_range(foo, bar, baz)`) + f(`>(`) // missing filter f(`| fields *`) @@ -1271,9 +1292,9 @@ func TestParseQueryFailure(t *testing.T) { // missing pipe keyword f(`foo |`) - // unknown pipe keyword - f(`foo | bar`) - f(`foo | fields bar | baz`) + // invlaid pipe + f(`foo | bar(`) + f(`foo | fields bar | baz(`) // missing field in fields pipe f(`foo | fields`) @@ -1313,10 +1334,6 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | delete foo,`) f(`foo | delete foo,,`) - // missing limit and head pipe value - f(`foo | limit`) - f(`foo | head`) - // invalid limit pipe value f(`foo | limit bar`) f(`foo | limit -123`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index d01fdc4be..32a72d0dc 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -86,6 +86,8 @@ func parsePipes(lex *lexer) ([]pipe, error) { lex.nextToken() case lex.isKeyword(")", ""): return pipes, nil + default: + return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token) } } } @@ -98,7 +100,7 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) } return pc, nil - case lex.isKeyword("delete", "del", "rm"): + case lex.isKeyword("delete", "del", "rm", "drop"): pd, err := parsePipeDelete(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) @@ -110,12 +112,24 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err) } return pe, nil + case lex.isKeyword("extract_regexp"): + pe, err := parsePipeExtractRegexp(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'extract_regexp' pipe: %w", err) + } + return pe, nil case lex.isKeyword("field_names"): pf, err := parsePipeFieldNames(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err) } return pf, nil + case lex.isKeyword("field_values"): + pf, err := parsePipeFieldValues(lex) + if err != nil { + return nil, fmt.Errorf("cannot pase 'field_values' pipe: %w", err) + } + return pf, nil case lex.isKeyword("fields", "keep"): pf, err := parsePipeFields(lex) if err != nil { @@ -123,7 +137,7 @@ func parsePipe(lex *lexer) (pipe, error) { } return pf, nil case lex.isKeyword("filter"): - pf, err := parsePipeFilter(lex) + pf, err := parsePipeFilter(lex, true) if err != nil { return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err) } @@ -140,6 +154,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err) } return pl, nil + case lex.isKeyword("math"): + pm, err := parsePipeMath(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'math' pipe: %w", err) + } + return pm, nil case lex.isKeyword("offset", "skip"): ps, err := parsePipeOffset(lex) if err != nil { @@ -177,7 +197,7 @@ func parsePipe(lex *lexer) (pipe, error) { } return ps, nil case lex.isKeyword("stats"): - ps, err := parsePipeStats(lex) + ps, err := parsePipeStats(lex, true) if err != nil { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } @@ -207,6 +227,22 @@ func parsePipe(lex *lexer) (pipe, error) { } return pu, nil default: + lexState := lex.backupState() + + // Try parsing stats pipe without 'stats' keyword + ps, err := parsePipeStats(lex, false) + if err == nil { + return ps, nil + } + lex.restoreState(lexState) + + // Try parsing filter pipe without 'filter' keyword + pf, err := parsePipeFilter(lex, false) + if err == nil { + return pf, nil + } + lex.restoreState(lexState) + return nil, fmt.Errorf("unexpected pipe %q", lex.token) } } diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 9f2ca5a68..543b769fb 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -70,8 +70,8 @@ func (pdp *pipeDeleteProcessor) flush() error { } func parsePipeDelete(lex *lexer) (*pipeDelete, error) { - if !lex.isKeyword("delete", "del", "rm") { - return nil, fmt.Errorf("expecting 'delete', 'del' or 'rm'; got %q", lex.token) + if !lex.isKeyword("delete", "del", "rm", "drop") { + return nil, fmt.Errorf("expecting 'delete', 'del', 'rm' or 'drop'; got %q", lex.token) } var fields []string diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index e5c950592..a0cfe16f4 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -64,13 +64,14 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet unneededFieldsOrig := unneededFields.clone() needFromField := false for _, step := range pe.ptn.steps { - if step.field != "" { - if !unneededFieldsOrig.contains(step.field) { - needFromField = true - } - if !pe.keepOriginalFields && !pe.skipEmptyResults { - unneededFields.add(step.field) - } + if step.field == "" { + continue + } + if !unneededFieldsOrig.contains(step.field) { + needFromField = true + } + if !pe.keepOriginalFields && !pe.skipEmptyResults { + unneededFields.add(step.field) } } if needFromField { @@ -85,7 +86,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet neededFieldsOrig := neededFields.clone() needFromField := false for _, step := range pe.ptn.steps { - if step.field != "" && neededFieldsOrig.contains(step.field) { + if step.field == "" { + continue + } + if neededFieldsOrig.contains(step.field) { needFromField = true if !pe.keepOriginalFields && !pe.skipEmptyResults { neededFields.remove(step.field) diff --git a/lib/logstorage/pipe_extract_regexp.go b/lib/logstorage/pipe_extract_regexp.go new file mode 100644 index 000000000..5a5c4017a --- /dev/null +++ b/lib/logstorage/pipe_extract_regexp.go @@ -0,0 +1,334 @@ +package logstorage + +import ( + "fmt" + "regexp" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeExtractRegexp processes '| extract_regexp ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe +type pipeExtractRegexp struct { + fromField string + + re *regexp.Regexp + reFields []string + + keepOriginalFields bool + skipEmptyResults bool + + // iff is an optional filter for skipping the extract func + iff *ifFilter +} + +func (pe *pipeExtractRegexp) String() string { + s := "extract_regexp" + if pe.iff != nil { + s += " " + pe.iff.String() + } + reStr := pe.re.String() + s += " " + quoteTokenIfNeeded(reStr) + if !isMsgFieldName(pe.fromField) { + s += " from " + quoteTokenIfNeeded(pe.fromField) + } + if pe.keepOriginalFields { + s += " keep_original_fields" + } + if pe.skipEmptyResults { + s += " skip_empty_results" + } + return s +} + +func (pe *pipeExtractRegexp) optimize() { + pe.iff.optimizeFilterIn() +} + +func (pe *pipeExtractRegexp) hasFilterInWithQuery() bool { + return pe.iff.hasFilterInWithQuery() +} + +func (pe *pipeExtractRegexp) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pe.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + peNew := *pe + peNew.iff = iffNew + return &peNew, nil +} + +func (pe *pipeExtractRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + unneededFieldsOrig := unneededFields.clone() + needFromField := false + for _, f := range pe.reFields { + if f == "" { + continue + } + if !unneededFieldsOrig.contains(f) { + needFromField = true + } + if !pe.keepOriginalFields && !pe.skipEmptyResults { + unneededFields.add(f) + } + } + if needFromField { + unneededFields.remove(pe.fromField) + if pe.iff != nil { + unneededFields.removeFields(pe.iff.neededFields) + } + } else { + unneededFields.add(pe.fromField) + } + } else { + neededFieldsOrig := neededFields.clone() + needFromField := false + for _, f := range pe.reFields { + if f == "" { + continue + } + if neededFieldsOrig.contains(f) { + needFromField = true + if !pe.keepOriginalFields && !pe.skipEmptyResults { + neededFields.remove(f) + } + } + } + if needFromField { + neededFields.add(pe.fromField) + if pe.iff != nil { + neededFields.addFields(pe.iff.neededFields) + } + } + } +} + +func (pe *pipeExtractRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return &pipeExtractRegexpProcessor{ + pe: pe, + ppNext: ppNext, + + shards: make([]pipeExtractRegexpProcessorShard, workersCount), + } +} + +type pipeExtractRegexpProcessor struct { + pe *pipeExtractRegexp + ppNext pipeProcessor + + shards []pipeExtractRegexpProcessorShard +} + +type pipeExtractRegexpProcessorShard struct { + pipeExtractRegexpProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeExtractRegexpProcessorShardNopad{})%128]byte +} + +func (shard *pipeExtractRegexpProcessorShard) apply(re *regexp.Regexp, v string) { + shard.fields = slicesutil.SetLength(shard.fields, len(shard.rcs)) + fields := shard.fields + clear(fields) + + locs := re.FindStringSubmatchIndex(v) + if locs == nil { + return + } + + for i := range fields { + start := locs[2*i] + if start < 0 { + // mismatch + continue + } + end := locs[2*i+1] + fields[i] = v[start:end] + } +} + +type pipeExtractRegexpProcessorShardNopad struct { + bm bitmap + + resultColumns []*blockResultColumn + resultValues []string + + rcs []resultColumn + a arena + + fields []string +} + +func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + pe := pep.pe + shard := &pep.shards[workerID] + + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if iff := pe.iff; iff != nil { + iff.f.applyToBlockResult(br, bm) + if bm.isZero() { + pep.ppNext.writeBlock(workerID, br) + return + } + } + + reFields := pe.reFields + + shard.rcs = slicesutil.SetLength(shard.rcs, len(reFields)) + rcs := shard.rcs + for i := range reFields { + rcs[i].name = reFields[i] + } + + c := br.getColumnByName(pe.fromField) + values := c.getValues(br) + + shard.resultColumns = slicesutil.SetLength(shard.resultColumns, len(rcs)) + resultColumns := shard.resultColumns + for i := range resultColumns { + if reFields[i] != "" { + resultColumns[i] = br.getColumnByName(rcs[i].name) + } + } + + shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs)) + resultValues := shard.resultValues + + hadUpdates := false + vPrev := "" + for rowIdx, v := range values { + if bm.isSetBit(rowIdx) { + if !hadUpdates || vPrev != v { + vPrev = v + hadUpdates = true + + shard.apply(pe.re, v) + + for i, v := range shard.fields { + if reFields[i] == "" { + continue + } + if v == "" && pe.skipEmptyResults || pe.keepOriginalFields { + c := resultColumns[i] + if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" { + v = vOrig + } + } else { + v = shard.a.copyString(v) + } + resultValues[i] = v + } + } + } else { + for i, c := range resultColumns { + if reFields[i] != "" { + resultValues[i] = c.getValueAtRow(br, rowIdx) + } + } + } + + for i, v := range resultValues { + if reFields[i] != "" { + rcs[i].addValue(v) + } + } + } + + for i := range rcs { + if reFields[i] != "" { + br.addResultColumn(&rcs[i]) + } + } + pep.ppNext.writeBlock(workerID, br) + + for i := range rcs { + rcs[i].reset() + } + shard.a.reset() +} + +func (pep *pipeExtractRegexpProcessor) flush() error { + return nil +} + +func parsePipeExtractRegexp(lex *lexer) (*pipeExtractRegexp, error) { + if !lex.isKeyword("extract_regexp") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "extract_regexp") + } + lex.nextToken() + + // parse optional if (...) + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + // parse pattern + patternStr, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot read 'pattern': %w", err) + } + re, err := regexp.Compile(patternStr) + if err != nil { + return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", patternStr, err) + } + reFields := re.SubexpNames() + + hasNamedFields := false + for _, f := range reFields { + if f != "" { + hasNamedFields = true + break + } + } + if !hasNamedFields { + return nil, fmt.Errorf("the 'pattern' %q must contain at least a single named group in the form (?P...)", patternStr) + } + + // parse optional 'from ...' part + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field name: %w", err) + } + fromField = f + } + + keepOriginalFields := false + skipEmptyResults := false + switch { + case lex.isKeyword("keep_original_fields"): + lex.nextToken() + keepOriginalFields = true + case lex.isKeyword("skip_empty_results"): + lex.nextToken() + skipEmptyResults = true + } + + pe := &pipeExtractRegexp{ + fromField: fromField, + re: re, + reFields: reFields, + keepOriginalFields: keepOriginalFields, + skipEmptyResults: skipEmptyResults, + iff: iff, + } + + return pe, nil +} diff --git a/lib/logstorage/pipe_extract_regexp_test.go b/lib/logstorage/pipe_extract_regexp_test.go new file mode 100644 index 000000000..e4fc7ced4 --- /dev/null +++ b/lib/logstorage/pipe_extract_regexp_test.go @@ -0,0 +1,329 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeExtractRegexpSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`extract_regexp "foo(?P.*)"`) + f(`extract_regexp "foo(?P.*)" skip_empty_results`) + f(`extract_regexp "foo(?P.*)" keep_original_fields`) + f(`extract_regexp "foo(?P.*)" from x`) + f(`extract_regexp "foo(?P.*)" from x skip_empty_results`) + f(`extract_regexp "foo(?P.*)" from x keep_original_fields`) + f(`extract_regexp if (x:y) "foo(?P.*)" from baz`) + f(`extract_regexp if (x:y) "foo(?P.*)" from baz skip_empty_results`) + f(`extract_regexp if (x:y) "foo(?P.*)" from baz keep_original_fields`) +} + +func TestParsePipeExtractRegexpFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`extract_regexp`) + f(`extract_regexp keep_original_fields`) + f(`extract_regexp skip_empty_results`) + f(`extract_regexp from`) + f(`extract_regexp from x`) + f(`extract_regexp from x "y(?P.*)"`) + f(`extract_regexp if (x:y)`) + f(`extract_regexp "a(?P.*)" if (x:y)`) + f(`extract_regexp "a"`) + f(`extract_regexp "(foo)"`) +} + +func TestPipeExtractRegexp(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // skip empty results + f(`extract_regexp "baz=(?P.*) a=(?P.*)" skip_empty_results`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", "foobar"}, + {"abc", "ippl"}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", "foobar"}, + {"abc", `"x y=z"`}, + }, + }) + + // no skip empty results + f(`extract_regexp "baz=(?P.*) a=(?P.*)"`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", "foobar"}, + {"abc", "ippl"}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=`}, + {"aa", ""}, + {"abc", `"x y=z"`}, + }, + }) + + // keep original fields + f(`extract_regexp "baz=(?P.*) a=(?P.*)" keep_original_fields`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"aa", "foobar"}, + {"abc", ""}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", `"x y=z"`}, + {"aa", "foobar"}, + }, + }) + + // no keep original fields + f(`extract_regexp "baz=(?P.*) a=(?P.*)"`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"aa", "foobar"}, + {"abc", ""}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", `"x y=z"`}, + {"aa", "b"}, + }, + }) + + // single row, extract from _msg + f(`extract_regexp "baz=(?P.*) a=(?P.*)"`, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"abc", `"x y=z"`}, + {"aa", "b"}, + }, + }) + + // single row, extract from _msg into _msg + f(`extract_regexp "msg=(?P<_msg>.*)"`, [][]Field{ + { + {"_msg", `msg=bar`}, + }, + }, [][]Field{ + { + {"_msg", "bar"}, + }, + }) + + // single row, extract from non-existing field + f(`extract_regexp "foo=(?P.*)" from x`, [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar`}, + {"bar", ""}, + }, + }) + + // single row, pattern mismatch + f(`extract_regexp "foo=(?P.*)" from x`, [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + {"bar", ""}, + }, + }) + + f(`extract_regexp "foo=(?P.*) baz=(?P.*)" from x`, [][]Field{ + { + {"x", `a foo="a\"b\\c" cde baz=aa`}, + }, + }, [][]Field{ + { + {"x", `a foo="a\"b\\c" cde baz=aa`}, + {"bar", `"a\"b\\c" cde`}, + {"xx", "aa"}, + }, + }) + + // single row, overwirte existing column + f(`extract_regexp "foo=(?P.*) baz=(?P.*)" from x`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `cc`}, + {"xx", `aa b`}, + }, + }) + + // single row, if match + f(`extract_regexp if (x:baz) "foo=(?P.*) baz=(?P.*)" from "x"`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `cc`}, + {"xx", `aa b`}, + }, + }) + + // single row, if mismatch + f(`extract_regexp if (bar:"") "foo=(?P.*) baz=(?P.*)" from 'x'`, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", "abc"}, + }, + }, [][]Field{ + { + {"x", `a foo=cc baz=aa b`}, + {"bar", `abc`}, + }, + }) + + // multiple rows with distinct set of labels + f(`extract_regexp if (!ip:keep) "ip=(?P([0-9]+[.]){3}[0-9]+) "`, [][]Field{ + { + {"foo", "bar"}, + {"_msg", "request from ip=1.2.3.4 xxx"}, + {"f3", "y"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=5.4.3.1 abcd"}, + {"ip", "keep"}, + {"a", "b"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=34.32.11.94 abcd"}, + {"ip", "ppp"}, + {"a", "b"}, + }, + { + {"foo", "klkfs"}, + {"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"}, + {"ip", "bbbsd"}, + {"a", "klo2i"}, + }, + }, [][]Field{ + { + {"foo", "bar"}, + {"_msg", "request from ip=1.2.3.4 xxx"}, + {"f3", "y"}, + {"ip", "1.2.3.4"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=5.4.3.1 abcd"}, + {"ip", "keep"}, + {"a", "b"}, + }, + { + {"foo", "aaa"}, + {"_msg", "ip=34.32.11.94 abcd"}, + {"ip", "34.32.11.94"}, + {"a", "b"}, + }, + { + {"foo", "klkfs"}, + {"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"}, + {"ip", ""}, + {"a", "klo2i"}, + }, + }) +} + +func TestPipeExtractRegexpUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("extract_regexp '(?P.*)' from x", "*", "", "*", "foo") + f("extract_regexp if (foo:bar) '(?P.*)' from x", "*", "", "*", "") + f("extract_regexp if (foo:bar) '(?P.*)' from x keep_original_fields", "*", "", "*", "") + f("extract_regexp if (foo:bar) '(?P.*)' from x skip_empty_results", "*", "", "*", "") + + // unneeded fields do not intersect with pattern and output fields + f("extract_regexp '(?P.*)' from x", "*", "f1,f2", "*", "f1,f2,foo") + f("extract_regexp '(?P.*)' from x keep_original_fields", "*", "f1,f2", "*", "f1,f2") + f("extract_regexp '(?P.*)' from x skip_empty_results", "*", "f1,f2", "*", "f1,f2") + f("extract_regexp if (f1:x) '(?P.*)' from x", "*", "f1,f2", "*", "f2,foo") + f("extract_regexp if (f1:x) '(?P.*)' from x keep_original_fields", "*", "f1,f2", "*", "f2") + f("extract_regexp if (f1:x) '(?P.*)' from x skip_empty_results", "*", "f1,f2", "*", "f2") + f("extract_regexp if (foo:bar f1:x) '(?P.*)' from x", "*", "f1,f2", "*", "f2") + + // unneeded fields intersect with pattern + f("extract_regexp '(?P.*)' from x", "*", "f2,x", "*", "f2,foo") + f("extract_regexp '(?P.*)' from x keep_original_fields", "*", "f2,x", "*", "f2") + f("extract_regexp '(?P.*)' from x skip_empty_results", "*", "f2,x", "*", "f2") + f("extract_regexp if (f1:abc) '(?P.*)' from x", "*", "f2,x", "*", "f2,foo") + f("extract_regexp if (f2:abc) '(?P.*)' from x", "*", "f2,x", "*", "foo") + + // unneeded fields intersect with output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "*", "f2,foo", "*", "bar,f2,foo") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "*", "f2,foo", "*", "f2,foo") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "*", "f2,foo", "*", "f2,foo") + f("extract_regexp if (f1:abc) '(?P.*)x(?P.*)' from x", "*", "f2,foo", "*", "bar,f2,foo") + f("extract_regexp if (f2:abc foo:w) '(?P.*)x(?P.*)' from x", "*", "f2,foo", "*", "bar") + f("extract_regexp if (f2:abc foo:w) '(?P.*)x(?P.*)' from x keep_original_fields", "*", "f2,foo", "*", "") + f("extract_regexp if (f2:abc foo:w) '(?P.*)x(?P.*)' from x skip_empty_results", "*", "f2,foo", "*", "") + + // unneeded fields intersect with all the output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract_regexp if (a:b f2:q x:y foo:w) '(?P.*)x(?P.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract_regexp if (a:b f2:q x:y foo:w) '(?P.*)x(?P.*)' from x keep_original_fields", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + f("extract_regexp if (a:b f2:q x:y foo:w) '(?P.*)x(?P.*)' from x skip_empty_results", "*", "f2,foo,bar", "*", "bar,f2,foo,x") + + // needed fields do not intersect with pattern and output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "f1,f2", "", "f1,f2", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f1,f2", "", "f1,f2", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f1,f2", "", "f1,f2", "") + f("extract_regexp if (a:b) '(?P.*)x(?P.*)' from x", "f1,f2", "", "f1,f2", "") + f("extract_regexp if (f1:b) '(?P.*)x(?P.*)' from x", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with pattern field + f("extract_regexp '(?P.*)x(?P.*)' from x", "f2,x", "", "f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f2,x", "", "f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f2,x", "", "f2,x", "") + f("extract_regexp if (a:b) '(?P.*)x(?P.*)' from x", "f2,x", "", "f2,x", "") + + // needed fields intersect with output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "f2,foo", "", "f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f2,foo", "", "foo,f2,x", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f2,foo", "", "foo,f2,x", "") + f("extract_regexp if (a:b) '(?P.*)x(?P.*)' from x", "f2,foo", "", "a,f2,x", "") + + // needed fields intersect with pattern and output fields + f("extract_regexp '(?P.*)x(?P.*)' from x", "f2,foo,x,y", "", "f2,x,y", "") + f("extract_regexp '(?P.*)x(?P.*)' from x keep_original_fields", "f2,foo,x,y", "", "foo,f2,x,y", "") + f("extract_regexp '(?P.*)x(?P.*)' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "") + f("extract_regexp if (a:b foo:q) '(?P.*)x(?P.*)' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "") +} diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index 08c94c520..740420513 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -353,31 +353,3 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) { f("extract 'x' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "") f("extract if (a:b foo:q) 'x' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "") } - -func expectParsePipeFailure(t *testing.T, pipeStr string) { - t.Helper() - - lex := newLexer(pipeStr) - p, err := parsePipe(lex) - if err == nil && lex.isEnd() { - t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p) - } -} - -func expectParsePipeSuccess(t *testing.T, pipeStr string) { - t.Helper() - - lex := newLexer(pipeStr) - p, err := parsePipe(lex) - if err != nil { - t.Fatalf("cannot parse [%s]: %s", pipeStr, err) - } - if !lex.isEnd() { - t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s) - } - - pipeStrResult := p.String() - if pipeStrResult != pipeStr { - t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr) - } -} diff --git a/lib/logstorage/pipe_field_values.go b/lib/logstorage/pipe_field_values.go new file mode 100644 index 000000000..d9c1f57ac --- /dev/null +++ b/lib/logstorage/pipe_field_values.go @@ -0,0 +1,93 @@ +package logstorage + +import ( + "fmt" +) + +// pipeFieldValues processes '| field_values ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe +type pipeFieldValues struct { + field string + + limit uint64 +} + +func (pf *pipeFieldValues) String() string { + s := "field_values " + quoteTokenIfNeeded(pf.field) + if pf.limit > 0 { + s += fmt.Sprintf(" limit %d", pf.limit) + } + return s +} + +func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + neededFields.reset() + if !unneededFields.contains(pf.field) { + neededFields.add(pf.field) + } + unneededFields.reset() + } else { + neededFieldsOrig := neededFields.clone() + neededFields.reset() + if neededFieldsOrig.contains(pf.field) { + neededFields.add(pf.field) + } + } +} + +func (pf *pipeFieldValues) optimize() { + // nothing to do +} + +func (pf *pipeFieldValues) hasFilterInWithQuery() bool { + return false +} + +func (pf *pipeFieldValues) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pf, nil +} + +func (pf *pipeFieldValues) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { + hitsFieldName := "hits" + if hitsFieldName == pf.field { + hitsFieldName = "hitss" + } + pu := &pipeUniq{ + byFields: []string{pf.field}, + hitsFieldName: hitsFieldName, + limit: pf.limit, + } + return pu.newPipeProcessor(workersCount, stopCh, cancel, ppNext) +} + +func parsePipeFieldValues(lex *lexer) (*pipeFieldValues, error) { + if !lex.isKeyword("field_values") { + return nil, fmt.Errorf("expecting 'field_values'; got %q", lex.token) + } + lex.nextToken() + + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse field name for 'field_values': %w", err) + } + + limit := uint64(0) + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token) + } + lex.nextToken() + limit = n + } + + pf := &pipeFieldValues{ + field: field, + limit: limit, + } + + return pf, nil +} diff --git a/lib/logstorage/pipe_field_values_test.go b/lib/logstorage/pipe_field_values_test.go new file mode 100644 index 000000000..26b0419b6 --- /dev/null +++ b/lib/logstorage/pipe_field_values_test.go @@ -0,0 +1,148 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeFieldValuesSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`field_values x`) + f(`field_values x limit 10`) +} + +func TestParsePipeFieldValuesFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`field_values`) + f(`field_values a b`) + f(`field_values a limit`) + f(`field_values limit N`) +} + +func TestPipeFieldValues(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("field_values a", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"hits", "3"}, + }, + }) + + f("field_values b", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"hits", "2"}, + }, + { + {"b", "54"}, + {"hits", "1"}, + }, + }) + + f("field_values c", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"c", ""}, + {"hits", "2"}, + }, + { + {"c", "d"}, + {"hits", "1"}, + }, + }) + + f("field_values d", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"d", ""}, + {"hits", "3"}, + }, + }) +} + +func TestPipeFieldValuesUpdateNeededFields(t *testing.T) { + f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("field_values x", "*", "", "x", "") + + // all the needed fields, unneeded fields do not intersect with src + f("field_values x", "*", "f1,f2", "x", "") + + // all the needed fields, unneeded fields intersect with src + f("field_values x", "*", "f1,x", "", "") + + // needed fields do not intersect with src + f("field_values x", "f1,f2", "", "", "") + + // needed fields intersect with src + f("field_values x", "f1,x", "", "x", "") +} diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go index daba8f5bc..c1f418f60 100644 --- a/lib/logstorage/pipe_filter.go +++ b/lib/logstorage/pipe_filter.go @@ -108,11 +108,13 @@ func (pfp *pipeFilterProcessor) flush() error { return nil } -func parsePipeFilter(lex *lexer) (*pipeFilter, error) { - if !lex.isKeyword("filter") { - return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token) +func parsePipeFilter(lex *lexer, needFilterKeyword bool) (*pipeFilter, error) { + if needFilterKeyword { + if !lex.isKeyword("filter") { + return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token) + } + lex.nextToken() } - lex.nextToken() f, err := parseFilter(lex) if err != nil { diff --git a/lib/logstorage/pipe_filter_test.go b/lib/logstorage/pipe_filter_test.go index 0c3183019..dc244ffb0 100644 --- a/lib/logstorage/pipe_filter_test.go +++ b/lib/logstorage/pipe_filter_test.go @@ -32,6 +32,14 @@ func TestPipeFilter(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // filter mismatch, missing 'filter' prefix + f("abc", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{}) + // filter mismatch f("filter abc", [][]Field{ { @@ -40,6 +48,19 @@ func TestPipeFilter(t *testing.T) { }, }, [][]Field{}) + // filter match, missing 'filter' prefix + f("foo", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }) + // filter match f("filter foo", [][]Field{ { diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index de580110e..7146f99ed 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -4,6 +4,8 @@ import ( "fmt" "strconv" "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeFormat processes '| format ...' pipe. @@ -169,8 +171,8 @@ func (pfp *pipeFormatProcessor) flush() error { } func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string { - bb := bbPool.Get() - b := bb.B + b := shard.a.b + bLen := len(b) for _, step := range pf.steps { b = append(b, step.prefix...) if step.field != "" { @@ -183,11 +185,9 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult } } } - bb.B = b + shard.a.b = b - v := shard.a.copyBytesToString(b) - bbPool.Put(bb) - return v + return bytesutil.ToUnsafeString(b[bLen:]) } func parsePipeFormat(lex *lexer) (*pipeFormat, error) { diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index f480cb46c..41f98fded 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -88,15 +88,20 @@ func parsePipeLimit(lex *lexer) (*pipeLimit, error) { if !lex.isKeyword("limit", "head") { return nil, fmt.Errorf("expecting 'limit' or 'head'; got %q", lex.token) } + lex.nextToken() - lex.nextToken() - n, err := parseUint(lex.token) - if err != nil { - return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err) + limit := uint64(10) + if !lex.isKeyword("|", ")", "") { + n, err := parseUint(lex.token) + if err != nil { + return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err) + } + lex.nextToken() + limit = n } - lex.nextToken() + pl := &pipeLimit{ - limit: n, + limit: limit, } return pl, nil } diff --git a/lib/logstorage/pipe_limit_test.go b/lib/logstorage/pipe_limit_test.go index bc7afa4fd..23b176ee1 100644 --- a/lib/logstorage/pipe_limit_test.go +++ b/lib/logstorage/pipe_limit_test.go @@ -20,7 +20,6 @@ func TestParsePipeLimitFailure(t *testing.T) { expectParsePipeFailure(t, pipeStr) } - f(`limit`) f(`limit -10`) f(`limit foo`) } @@ -30,6 +29,17 @@ func TestPipeLimit(t *testing.T) { t.Helper() expectPipeResults(t, pipeStr, rows, rowsExpected) } + f("limit", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }) f("limit 100", [][]Field{ { diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go new file mode 100644 index 000000000..47f7ed73a --- /dev/null +++ b/lib/logstorage/pipe_math.go @@ -0,0 +1,776 @@ +package logstorage + +import ( + "fmt" + "math" + "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeMath processes '| math ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe +type pipeMath struct { + entries []*mathEntry +} + +type mathEntry struct { + // The calculated expr result is stored in resultField. + resultField string + + // expr is the expression to calculate. + expr *mathExpr +} + +type mathExpr struct { + // if isConst is set, then the given mathExpr returns the given constValue. + isConst bool + constValue float64 + + // constValueStr is the original string representation of constValue. + // + // It is used in String() method for returning the original representation of the given constValue. + constValueStr string + + // if fieldName isn't empty, then the given mathExpr fetches numeric values from the given fieldName. + fieldName string + + // args are args for the given mathExpr. + args []*mathExpr + + // op is the operation name (aka function name) for the given mathExpr. + op string + + // f is the function for calculating results for the given mathExpr. + f mathFunc + + // whether the mathExpr was wrapped in parens. + wrappedInParens bool +} + +// mathFunc must fill result with calculated results based on the given args. +type mathFunc func(result []float64, args [][]float64) + +func (pm *pipeMath) String() string { + s := "math" + a := make([]string, len(pm.entries)) + for i, e := range pm.entries { + a[i] = e.String() + } + s += " " + strings.Join(a, ", ") + return s +} + +func (me *mathEntry) String() string { + s := me.expr.String() + if isMathBinaryOp(me.expr.op) { + s = "(" + s + ")" + } + s += " as " + quoteTokenIfNeeded(me.resultField) + return s +} + +func (me *mathExpr) String() string { + if me.isConst { + return me.constValueStr + } + if me.fieldName != "" { + return quoteTokenIfNeeded(me.fieldName) + } + + args := me.args + if isMathBinaryOp(me.op) { + opPriority := getMathBinaryOpPriority(me.op) + left := me.args[0] + right := me.args[1] + leftStr := left.String() + rightStr := right.String() + if isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > opPriority { + leftStr = "(" + leftStr + ")" + } + if isMathBinaryOp(right.op) && getMathBinaryOpPriority(right.op) > opPriority { + rightStr = "(" + rightStr + ")" + } + return fmt.Sprintf("%s %s %s", leftStr, me.op, rightStr) + } + + if me.op == "unary_minus" { + argStr := args[0].String() + if isMathBinaryOp(args[0].op) { + argStr = "(" + argStr + ")" + } + return "-" + argStr + } + + a := make([]string, len(args)) + for i, arg := range args { + a[i] = arg.String() + } + argsStr := strings.Join(a, ", ") + return fmt.Sprintf("%s(%s)", me.op, argsStr) +} + +func isMathBinaryOp(op string) bool { + _, ok := mathBinaryOps[op] + return ok +} + +func getMathBinaryOpPriority(op string) int { + bo, ok := mathBinaryOps[op] + if !ok { + logger.Panicf("BUG: unexpected binary op: %q", op) + } + return bo.priority +} + +func getMathFuncForBinaryOp(op string) (mathFunc, error) { + bo, ok := mathBinaryOps[op] + if !ok { + return nil, fmt.Errorf("unsupported binary operation: %q", op) + } + return bo.f, nil +} + +var mathBinaryOps = map[string]mathBinaryOp{ + "^": { + priority: 1, + f: mathFuncPow, + }, + "*": { + priority: 2, + f: mathFuncMul, + }, + "/": { + priority: 2, + f: mathFuncDiv, + }, + "%": { + priority: 2, + f: mathFuncMod, + }, + "+": { + priority: 3, + f: mathFuncPlus, + }, + "-": { + priority: 3, + f: mathFuncMinus, + }, +} + +type mathBinaryOp struct { + priority int + f mathFunc +} + +func (pm *pipeMath) updateNeededFields(neededFields, unneededFields fieldsSet) { + for i := len(pm.entries) - 1; i >= 0; i-- { + e := pm.entries[i] + if neededFields.contains("*") { + if !unneededFields.contains(e.resultField) { + unneededFields.add(e.resultField) + + entryFields := e.getNeededFields() + unneededFields.removeFields(entryFields) + } + } else { + if neededFields.contains(e.resultField) { + neededFields.remove(e.resultField) + + entryFields := e.getNeededFields() + neededFields.addFields(entryFields) + } + } + } +} + +func (me *mathEntry) getNeededFields() []string { + neededFields := newFieldsSet() + me.expr.updateNeededFields(neededFields) + return neededFields.getAll() +} + +func (me *mathExpr) updateNeededFields(neededFields fieldsSet) { + if me.isConst { + return + } + if me.fieldName != "" { + neededFields.add(me.fieldName) + return + } + for _, arg := range me.args { + arg.updateNeededFields(neededFields) + } +} + +func (pm *pipeMath) optimize() { + // nothing to do +} + +func (pm *pipeMath) hasFilterInWithQuery() bool { + return false +} + +func (pm *pipeMath) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pm, nil +} + +func (pm *pipeMath) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + pmp := &pipeMathProcessor{ + pm: pm, + ppNext: ppNext, + + shards: make([]pipeMathProcessorShard, workersCount), + } + return pmp +} + +type pipeMathProcessor struct { + pm *pipeMath + ppNext pipeProcessor + + shards []pipeMathProcessorShard +} + +type pipeMathProcessorShard struct { + pipeMathProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeMathProcessorShardNopad{})%128]byte +} + +type pipeMathProcessorShardNopad struct { + // a holds all the data for rcs. + a arena + + // rcs is used for storing calculated results before they are written to ppNext. + rcs []resultColumn + + // rs is storage for temporary results + rs [][]float64 + + // rsBuf is backing storage for rs slices + rsBuf []float64 +} + +func (shard *pipeMathProcessorShard) executeMathEntry(e *mathEntry, rc *resultColumn, br *blockResult) { + clear(shard.rs) + shard.rs = shard.rs[:0] + shard.rsBuf = shard.rsBuf[:0] + + shard.executeExpr(e.expr, br) + r := shard.rs[0] + + b := shard.a.b + for _, f := range r { + bLen := len(b) + b = marshalFloat64String(b, f) + v := bytesutil.ToUnsafeString(b[bLen:]) + rc.addValue(v) + } + shard.a.b = b +} + +func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) { + rIdx := len(shard.rs) + shard.rs = slicesutil.SetLength(shard.rs, len(shard.rs)+1) + + shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+len(br.timestamps)) + shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-len(br.timestamps):] + + if me.isConst { + r := shard.rs[rIdx] + for i := range br.timestamps { + r[i] = me.constValue + } + return + } + if me.fieldName != "" { + c := br.getColumnByName(me.fieldName) + values := c.getValues(br) + r := shard.rs[rIdx] + var f float64 + for i, v := range values { + if i == 0 || v != values[i-1] { + var ok bool + f, ok = tryParseFloat64(v) + if !ok { + f = nan + } + } + r[i] = f + } + return + } + + rsBufLen := len(shard.rsBuf) + for _, arg := range me.args { + shard.executeExpr(arg, br) + } + + result := shard.rs[rIdx] + args := shard.rs[rIdx+1:] + me.f(result, args) + + shard.rs = shard.rs[:rIdx+1] + shard.rsBuf = shard.rsBuf[:rsBufLen] +} + +func (pmp *pipeMathProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pmp.shards[workerID] + entries := pmp.pm.entries + + shard.rcs = slicesutil.SetLength(shard.rcs, len(entries)) + rcs := shard.rcs + for i, e := range entries { + rc := &rcs[i] + rc.name = e.resultField + shard.executeMathEntry(e, rc, br) + br.addResultColumn(rc) + } + + pmp.ppNext.writeBlock(workerID, br) + + for i := range rcs { + rcs[i].resetValues() + } + shard.a.reset() +} + +func (pmp *pipeMathProcessor) flush() error { + return nil +} + +func parsePipeMath(lex *lexer) (*pipeMath, error) { + if !lex.isKeyword("math") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "math") + } + lex.nextToken() + + var mes []*mathEntry + for { + me, err := parseMathEntry(lex) + if err != nil { + return nil, err + } + mes = append(mes, me) + + switch { + case lex.isKeyword(","): + lex.nextToken() + case lex.isKeyword("|", ")", ""): + if len(mes) == 0 { + return nil, fmt.Errorf("missing 'math' expressions") + } + pm := &pipeMath{ + entries: mes, + } + return pm, nil + default: + return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|' or ')'", mes[len(mes)-1], lex.token) + } + } +} + +func parseMathEntry(lex *lexer) (*mathEntry, error) { + me, err := parseMathExpr(lex) + if err != nil { + return nil, err + } + + // skip optional 'as' + if lex.isKeyword("as") { + lex.nextToken() + } + + resultField, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for [%s]: %w", me, err) + } + + e := &mathEntry{ + resultField: resultField, + expr: me, + } + return e, nil +} + +func parseMathExpr(lex *lexer) (*mathExpr, error) { + // parse left operand + left, err := parseMathExprOperand(lex) + if err != nil { + return nil, err + } + + for { + if !isMathBinaryOp(lex.token) { + // There is no right operand + return left, nil + } + + // parse operator + op := lex.token + lex.nextToken() + + f, err := getMathFuncForBinaryOp(op) + if err != nil { + return nil, fmt.Errorf("cannot parse operator after [%s]: %w", left, err) + } + + // parse right operand + right, err := parseMathExprOperand(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse operand after [%s %s]: %w", left, op, err) + } + + me := &mathExpr{ + args: []*mathExpr{left, right}, + op: op, + f: f, + } + + // balance operands according to their priority + if !left.wrappedInParens && isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > getMathBinaryOpPriority(op) { + me.args[0] = left.args[1] + left.args[1] = me + me = left + } + + left = me + } +} + +func parseMathExprInParens(lex *lexer) (*mathExpr, error) { + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing '('") + } + lex.nextToken() + + me, err := parseMathExpr(lex) + if err != nil { + return nil, err + } + me.wrappedInParens = true + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("missing ')'; got %q instead", lex.token) + } + lex.nextToken() + return me, nil +} + +func parseMathExprOperand(lex *lexer) (*mathExpr, error) { + if lex.isKeyword("(") { + return parseMathExprInParens(lex) + } + + switch { + case lex.isKeyword("abs"): + return parseMathExprAbs(lex) + case lex.isKeyword("max"): + return parseMathExprMax(lex) + case lex.isKeyword("min"): + return parseMathExprMin(lex) + case lex.isKeyword("round"): + return parseMathExprRound(lex) + case lex.isKeyword("-"): + return parseMathExprUnaryMinus(lex) + case lex.isKeyword("+"): + // just skip unary plus + lex.nextToken() + return parseMathExprOperand(lex) + case lex.isNumber(): + return parseMathExprConstNumber(lex) + default: + return parseMathExprFieldName(lex) + } +} + +func parseMathExprAbs(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "abs", mathFuncAbs) + if err != nil { + return nil, err + } + if len(me.args) != 1 { + return nil, fmt.Errorf("'abs' function accepts only one arg; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + +func parseMathExprMax(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "max", mathFuncMax) + if err != nil { + return nil, err + } + if len(me.args) < 2 { + return nil, fmt.Errorf("'max' function needs at least 2 args; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + +func parseMathExprMin(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "min", mathFuncMin) + if err != nil { + return nil, err + } + if len(me.args) < 2 { + return nil, fmt.Errorf("'min' function needs at least 2 args; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + +func parseMathExprRound(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "round", mathFuncRound) + if err != nil { + return nil, err + } + if len(me.args) != 1 && len(me.args) != 2 { + return nil, fmt.Errorf("'round' function needs 1 or 2 args; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + +func parseMathExprGenericFunc(lex *lexer, funcName string, f mathFunc) (*mathExpr, error) { + if !lex.isKeyword(funcName) { + return nil, fmt.Errorf("missing %q keyword", funcName) + } + lex.nextToken() + + args, err := parseMathFuncArgs(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse args for %q function: %w", funcName, err) + } + if len(args) == 0 { + return nil, fmt.Errorf("%q function needs at least one org", funcName) + } + me := &mathExpr{ + args: args, + op: funcName, + f: f, + } + return me, nil +} + +func parseMathFuncArgs(lex *lexer) ([]*mathExpr, error) { + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing '('") + } + lex.nextToken() + + var args []*mathExpr + for { + if lex.isKeyword(")") { + lex.nextToken() + return args, nil + } + + me, err := parseMathExpr(lex) + if err != nil { + return nil, err + } + args = append(args, me) + + switch { + case lex.isKeyword(")"): + case lex.isKeyword(","): + lex.nextToken() + default: + return nil, fmt.Errorf("unexpected token after [%s]: %q; want ',' or ')'", me, lex.token) + } + } +} + +func parseMathExprUnaryMinus(lex *lexer) (*mathExpr, error) { + if !lex.isKeyword("-") { + return nil, fmt.Errorf("missing '-'") + } + lex.nextToken() + + expr, err := parseMathExprOperand(lex) + if err != nil { + return nil, err + } + me := &mathExpr{ + args: []*mathExpr{expr}, + op: "unary_minus", + f: mathFuncUnaryMinus, + } + return me, nil +} + +func parseMathExprConstNumber(lex *lexer) (*mathExpr, error) { + if !lex.isNumber() { + return nil, fmt.Errorf("cannot parse number from %q", lex.token) + } + numStr, err := getCompoundMathToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse number: %w", err) + } + f, ok := tryParseNumber(numStr) + if !ok { + return nil, fmt.Errorf("cannot parse number from %q", numStr) + } + me := &mathExpr{ + isConst: true, + constValue: f, + constValueStr: numStr, + } + return me, nil +} + +func parseMathExprFieldName(lex *lexer) (*mathExpr, error) { + fieldName, err := getCompoundMathToken(lex) + if err != nil { + return nil, err + } + fieldName = getCanonicalColumnName(fieldName) + me := &mathExpr{ + fieldName: fieldName, + } + return me, nil +} + +func getCompoundMathToken(lex *lexer) (string, error) { + stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", ""} + if lex.isKeyword(stopTokens...) { + return "", fmt.Errorf("compound token cannot start with '%s'", lex.token) + } + + s := lex.token + rawS := lex.rawToken + lex.nextToken() + suffix := "" + for !lex.isSkippedSpace && !lex.isKeyword(stopTokens...) { + s += lex.token + lex.nextToken() + } + if suffix == "" { + return s, nil + } + return rawS + suffix, nil +} + +func mathFuncPlus(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + result[i] = a[i] + b[i] + } +} + +func mathFuncMinus(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + result[i] = a[i] - b[i] + } +} + +func mathFuncMul(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + result[i] = a[i] * b[i] + } +} + +func mathFuncDiv(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + result[i] = a[i] / b[i] + } +} + +func mathFuncMod(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + result[i] = math.Mod(a[i], b[i]) + } +} + +func mathFuncPow(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + result[i] = math.Pow(a[i], b[i]) + } +} + +func mathFuncAbs(result []float64, args [][]float64) { + arg := args[0] + for i := range result { + result[i] = math.Abs(arg[i]) + } +} + +func mathFuncUnaryMinus(result []float64, args [][]float64) { + arg := args[0] + for i := range result { + result[i] = -arg[i] + } +} + +func mathFuncMax(result []float64, args [][]float64) { + for i := range result { + f := nan + for _, arg := range args { + if math.IsNaN(f) || arg[i] > f { + f = arg[i] + } + } + result[i] = f + } +} + +func mathFuncMin(result []float64, args [][]float64) { + for i := range result { + f := nan + for _, arg := range args { + if math.IsNaN(f) || arg[i] < f { + f = arg[i] + } + } + result[i] = f + } +} + +func mathFuncRound(result []float64, args [][]float64) { + arg := args[0] + if len(args) == 1 { + // Round to integer + for i := range result { + result[i] = math.Round(arg[i]) + } + return + } + + // Round to nearest + nearest := args[1] + var f float64 + for i := range result { + if i == 0 || arg[i-1] != arg[i] || nearest[i-1] != nearest[i] { + f = round(arg[i], nearest[i]) + } + result[i] = f + } +} + +func round(f, nearest float64) float64 { + _, e := decimal.FromFloat(nearest) + p10 := math.Pow10(int(-e)) + f += 0.5 * math.Copysign(nearest, f) + f -= math.Mod(f, nearest) + f, _ = math.Modf(f * p10) + return f / p10 +} diff --git a/lib/logstorage/pipe_math_test.go b/lib/logstorage/pipe_math_test.go new file mode 100644 index 000000000..66a9da24f --- /dev/null +++ b/lib/logstorage/pipe_math_test.go @@ -0,0 +1,233 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeMathSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`math b as a`) + f(`math -123 as a`) + f(`math 12.345KB as a`) + f(`math (-2 + 2) as a`) + f(`math x as a, z as y`) + f(`math (foo / bar + baz * abc % -45ms) as a`) + f(`math (foo / (bar + baz) * abc ^ 2) as a`) + f(`math (foo / ((bar + baz) * abc) ^ -2) as a`) + f(`math (foo + bar / baz - abc) as a`) + f(`math min(3, foo, (1 + bar) / baz) as a, max(a, b) as b, (abs(c) + 5) as d`) + f(`math round(foo) as x`) + f(`math round(foo, 0.1) as y`) +} + +func TestParsePipeMathFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`math`) + f(`math x`) + f(`math x as`) + f(`math abs() as x`) + f(`math abs(a, b) as x`) + f(`math min() as x`) + f(`math min(a) as x`) + f(`math max() as x`) + f(`math max(a) as x`) + f(`math round() as x`) + f(`math round(a, b, c) as x`) +} + +func TestPipeMath(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("math b+1 as a, a*2 as b, b-10.5+c as c", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "3"}, + {"b", "6"}, + {"c", "-1.5"}, + }, + }) + + f("math 1 as a", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"b", "2"}, + {"c", "3"}, + }, + }) + + f("math 10 * 5 - 3 a", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "47"}, + {"b", "2"}, + {"c", "3"}, + }, + }) + + f("math -1.5K as a", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "-1500"}, + {"b", "2"}, + {"c", "3"}, + }, + }) + + f("math b as a", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "2"}, + {"c", "3"}, + }, + }) + + f("math a as a", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "NaN"}, + {"b", "2"}, + {"c", "3"}, + }, + }) + + f("math 2*c + b as x", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + {"x", "8"}, + }, + }) + + f("math abs(-min(a,b)) as min, round(max(40*b/30,c)) as max", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + }, [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + {"min", "2"}, + {"max", "3"}, + }, + }) + + f("math round((2*c + (b%c))/(c-b)^(b-1), -0.001) as a", [][]Field{ + { + {"a", "v"}, + {"b", "2"}, + {"c", "3"}, + }, + { + {"a", "x"}, + {"b", "3"}, + {"c", "5"}, + }, + { + {"b", "3"}, + {"c", "6"}, + }, + }, [][]Field{ + { + {"a", "8"}, + {"b", "2"}, + {"c", "3"}, + }, + { + {"a", "3.25"}, + {"b", "3"}, + {"c", "5"}, + }, + { + {"a", "1.667"}, + {"b", "3"}, + {"c", "6"}, + }, + }) +} + +func TestPipeMathUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("math (x + 1) as y", "*", "", "*", "y") + + // all the needed fields, unneeded fields do not intersect with src and dst + f("math (x + 1) as y", "*", "f1,f2", "*", "f1,f2,y") + + // all the needed fields, unneeded fields intersect with src + f("math (x + 1) as y", "*", "f1,x", "*", "f1,y") + + // all the needed fields, unneeded fields intersect with dst + f("math (x + 1) as y", "*", "f1,y", "*", "f1,y") + + // all the needed fields, unneeded fields intersect with src and dst + f("math (x + 1) as y", "*", "f1,x,y", "*", "f1,x,y") + + // needed fields do not intersect with src and dst + f("math (x + 1) as y", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with src + f("math (x + 1) as y", "f1,x", "", "f1,x", "") + + // needed fields intersect with dst + f("math (x + 1) as y", "f1,y", "", "f1,x", "") + + // needed fields intersect with src and dst + f("math (x + 1) as y", "f1,x,y", "", "f1,x", "") +} diff --git a/lib/logstorage/pipe_replace.go b/lib/logstorage/pipe_replace.go index 13d66c5a8..10e36169d 100644 --- a/lib/logstorage/pipe_replace.go +++ b/lib/logstorage/pipe_replace.go @@ -3,6 +3,8 @@ package logstorage import ( "fmt" "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeReplace processes '| replace ...' pipe. @@ -59,11 +61,9 @@ func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldVal func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { updateFunc := func(a *arena, v string) string { - bb := bbPool.Get() - bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit) - result := a.copyBytesToString(bb.B) - bbPool.Put(bb) - return result + bLen := len(a.b) + a.b = appendReplace(a.b, v, pr.oldSubstr, pr.newSubstr, pr.limit) + return bytesutil.ToUnsafeString(a.b[bLen:]) } return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff) diff --git a/lib/logstorage/pipe_replace_regexp.go b/lib/logstorage/pipe_replace_regexp.go index 24aa5418c..43c951f58 100644 --- a/lib/logstorage/pipe_replace_regexp.go +++ b/lib/logstorage/pipe_replace_regexp.go @@ -3,6 +3,8 @@ package logstorage import ( "fmt" "regexp" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeReplaceRegexp processes '| replace_regexp ...' pipe. @@ -59,11 +61,9 @@ func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFi func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { updateFunc := func(a *arena, v string) string { - bb := bbPool.Get() - bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit) - result := a.copyBytesToString(bb.B) - bbPool.Put(bb) - return result + bLen := len(a.b) + a.b = appendReplaceRegexp(a.b, v, pr.re, pr.replacement, pr.limit) + return bytesutil.ToUnsafeString(a.b[bLen:]) } return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 90fdb0c73..ab3852e95 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -537,13 +537,14 @@ func (psp *pipeStatsProcessor) flush() error { return nil } -func parsePipeStats(lex *lexer) (*pipeStats, error) { - if !lex.isKeyword("stats") { - return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token) +func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { + if needStatsKeyword { + if !lex.isKeyword("stats") { + return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token) + } + lex.nextToken() } - lex.nextToken() - var ps pipeStats if lex.isKeyword("by", "(") { if lex.isKeyword("by") { diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go index 0d2cdd4c9..5363518db 100644 --- a/lib/logstorage/pipe_stats_test.go +++ b/lib/logstorage/pipe_stats_test.go @@ -39,6 +39,70 @@ func TestPipeStats(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // missing 'stats' keyword + f("count(*) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "3"}, + }, + }) + + // missing 'stats' keyword + f("count() as rows, count() if (a:2) rows2", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "3"}, + {"rows2", "2"}, + }, + }) + + f("stats count() as rows, count() if (a:2) rows2", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "3"}, + {"rows2", "2"}, + }, + }) + f("stats count(*) as rows", [][]Field{ { {"_msg", `abc`}, @@ -141,6 +205,32 @@ func TestPipeStats(t *testing.T) { }, }) + // missing 'stats' keyword + f("by (a) count(*) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"rows", "1"}, + }, + { + {"a", "2"}, + {"rows", "2"}, + }, + }) + f("stats by (a) count(*) as rows", [][]Field{ { {"_msg", `abc`}, diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index b7861e03e..fde04b369 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -80,10 +80,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) { nameCopy := "" fieldPrefix := uctx.fieldPrefix if fieldPrefix != "" { - nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name)) - copy(nameBuf, fieldPrefix) - copy(nameBuf[len(fieldPrefix):], name) - nameCopy = bytesutil.ToUnsafeString(nameBuf) + b := uctx.a.b + bLen := len(b) + b = append(b, fieldPrefix...) + b = append(b, name...) + uctx.a.b = b + nameCopy = bytesutil.ToUnsafeString(b[bLen:]) } else { nameCopy = uctx.a.copyString(name) } diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go index af6c3b9fd..418f235bc 100644 --- a/lib/logstorage/pipe_utils_test.go +++ b/lib/logstorage/pipe_utils_test.go @@ -8,6 +8,34 @@ import ( "testing" ) +func expectParsePipeFailure(t *testing.T, pipeStr string) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err == nil && lex.isEnd() { + t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p) + } +} + +func expectParsePipeSuccess(t *testing.T, pipeStr string) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", pipeStr, err) + } + if !lex.isEnd() { + t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s) + } + + pipeStrResult := p.String() + if pipeStrResult != pipeStr { + t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr) + } +} + func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) { t.Helper() diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index f7e43b89b..bf681fc0e 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -229,12 +229,12 @@ func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) { pipes := append([]pipe{}, q.pipes...) quotedFieldName := quoteTokenIfNeeded(fieldName) - pipeStr := fmt.Sprintf("uniq by (%s) with hits limit %d", quotedFieldName, limit) + pipeStr := fmt.Sprintf("field_values %s limit %d", quotedFieldName, limit) lex := newLexer(pipeStr) - pu, err := parsePipeUniq(lex) + pu, err := parsePipeFieldValues(lex) if err != nil { - logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err) + logger.Panicf("BUG: unexpected error when parsing 'field_values' pipe at [%s]: %s", pipeStr, err) } if !lex.isEnd() { diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index f664a6f63..488c46c48 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -3,6 +3,7 @@ package logstorage import ( "context" "fmt" + "reflect" "strings" "sync" "sync/atomic" @@ -311,6 +312,157 @@ func TestStorageRunQuery(t *testing.T) { tenantIDs := []TenantID{tenantID} mustRunQuery(tenantIDs, q, writeBlock) }) + t.Run("field_names-all", func(t *testing.T) { + q := mustParseQuery("*") + names, err := s.GetFieldNames(context.Background(), allTenantIDs, q) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {"_msg", 1155}, + {"_stream", 1155}, + {"_time", 1155}, + {"instance", 1155}, + {"job", 1155}, + {"source-file", 1155}, + {"stream-id", 1155}, + {"tenant.id", 1155}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) + t.Run("field_names-some", func(t *testing.T) { + q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`) + names, err := s.GetFieldNames(context.Background(), allTenantIDs, q) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {"_msg", 385}, + {"_stream", 385}, + {"_time", 385}, + {"instance", 385}, + {"job", 385}, + {"source-file", 385}, + {"stream-id", 385}, + {"tenant.id", 385}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) + t.Run("field_values-nolimit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-0:234",job="foobar"}`, 385}, + {`{instance="host-1:234",job="foobar"}`, 385}, + {`{instance="host-2:234",job="foobar"}`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("field_values-limit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-0:234",job="foobar"}`, 0}, + {`{instance="host-1:234",job="foobar"}`, 0}, + {`{instance="host-2:234",job="foobar"}`, 0}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("field_values-limit", func(t *testing.T) { + q := mustParseQuery("instance:='host-1:234'") + values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-1:234",job="foobar"}`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("stream_field_names", func(t *testing.T) { + q := mustParseQuery("*") + names, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {"instance", 1155}, + {"job", 1155}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) + t.Run("stream_field_values-nolimit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`host-0:234`, 385}, + {`host-1:234`, 385}, + {`host-2:234`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("stream_field_values-limit", func(t *testing.T) { + q := mustParseQuery("*") + values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 3) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`host-0:234`, 385}, + {`host-1:234`, 385}, + {`host-2:234`, 385}, + } + if !reflect.DeepEqual(values, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected) + } + }) + t.Run("streams", func(t *testing.T) { + q := mustParseQuery("*") + names, err := s.GetStreams(context.Background(), allTenantIDs, q, 0) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + resultExpected := []ValueWithHits{ + {`{instance="host-0:234",job="foobar"}`, 385}, + {`{instance="host-1:234",job="foobar"}`, 385}, + {`{instance="host-2:234",job="foobar"}`, 385}, + } + if !reflect.DeepEqual(names, resultExpected) { + t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected) + } + }) // Run more complex tests f := func(t *testing.T, query string, rowsExpected [][]Field) {