From b30e80b0714de9aac6e216cfd6795d212089f690 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 30 May 2024 16:19:23 +0200 Subject: [PATCH] lib/logstorage: work-in-progress --- docs/VictoriaLogs/CHANGELOG.md | 14 +- docs/VictoriaLogs/LogsQL.md | 189 +++++++++++------- docs/VictoriaLogs/keyConcepts.md | 26 ++- lib/logstorage/fields_set.go | 4 + lib/logstorage/filter_and.go | 73 +++---- lib/logstorage/filter_in.go | 23 ++- lib/logstorage/filter_in_test.go | 2 +- lib/logstorage/filter_or.go | 63 +++--- lib/logstorage/parser_test.go | 102 +++++++--- lib/logstorage/pipe_delete.go | 2 - lib/logstorage/pipe_extract.go | 7 + lib/logstorage/pipe_extract_regexp.go | 7 + lib/logstorage/pipe_field_values.go | 5 + lib/logstorage/pipe_fields.go | 1 + lib/logstorage/pipe_format.go | 7 + lib/logstorage/pipe_math.go | 91 +++++++-- lib/logstorage/pipe_math_test.go | 91 ++++++++- lib/logstorage/pipe_sort.go | 4 + lib/logstorage/pipe_stats.go | 75 ++++--- lib/logstorage/pipe_stats_test.go | 6 +- lib/logstorage/pipe_unpack.go | 7 + lib/logstorage/pipe_unroll.go | 18 +- lib/logstorage/pipe_unroll_test.go | 10 +- lib/logstorage/pipe_update.go | 7 + lib/logstorage/stats_row_any.go | 127 ++++++++++++ lib/logstorage/stats_row_any_test.go | 182 +++++++++++++++++ .../{stats_fields_max.go => stats_row_max.go} | 44 ++-- ...elds_max_test.go => stats_row_max_test.go} | 38 ++-- .../{stats_fields_min.go => stats_row_min.go} | 44 ++-- ...elds_min_test.go => stats_row_min_test.go} | 38 ++-- 30 files changed, 988 insertions(+), 319 deletions(-) create mode 100644 lib/logstorage/stats_row_any.go create mode 100644 lib/logstorage/stats_row_any_test.go rename lib/logstorage/{stats_fields_max.go => stats_row_max.go} (77%) rename lib/logstorage/{stats_fields_max_test.go => stats_row_max_test.go} (79%) rename lib/logstorage/{stats_fields_min.go => stats_row_min.go} (77%) rename lib/logstorage/{stats_fields_min_test.go => stats_row_min_test.go} (79%) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d1e2f2240..0f0350148 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,18 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +## [v0.15.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.15.0-victorialogs) + +Released at 2024-05-30 + +* FEATURE: add [`row_any`](https://docs.victoriametrics.com/victorialogs/logsql/#row_any-stats) function for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). This function returns a sample log entry per every calculated [group of results](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields). +* FEATURE: add `default` operator to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). It allows overriding `NaN` results with the given default value. +* FEATURE: add `exp()` and `ln()` functions to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). +* FEATURE: allow omitting result name in [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) expresions. In this case the result name is automatically set to string representation of the corresponding math expression. For example, `_time:5m | math duration / 1000` is equivalent to `_time:5m | math (duration / 1000) as "duration / 1000"`. +* FEATURE: allow omitting result name in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). In this case the result name is automatically set to string representation of the corresponding [stats function expression](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). For example, `_time:5m | count(*)` is valid [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) now. It is equivalent to `_time:5m | stats count(*) as "count(*)"`. + +* BUGFIX: properly calculate the number of matching rows in `* | field_values x | stats count() rows` and in `* | unroll (x) | stats count() rows` queries. + ## [v0.14.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.14.0-victorialogs) Released at 2024-05-29 @@ -103,7 +115,7 @@ Released at 2024-05-22 * FEATURE: add ability to extract fields with [`extract` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract). * FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json). * FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt). -* FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field. +* FEATURE: add [`row_min`](https://docs.victoriametrics.com/victorialogs/logsql/#row_min-stats) and [`row_max`](https://docs.victoriametrics.com/victorialogs/logsql/#row_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field. * FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details. * FEATURE: add `/select/logsql/stream_field_names` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-names) for details. * FEATURE: add `/select/logsql/stream_field_values` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values for the given label from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values) for details. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index a0f9c3c73..d71e6479b 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1594,6 +1594,19 @@ See also: ### math pipe `| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +It has the following format: + +``` +| math + expr1 as resultName1, + ... + exprN as resultNameN +``` + +Where `exprX` is one of the supported math expressions mentioned below, while `resultNameX` is the name of the field to store the calculated result to. +The `as` keyword is optional. The result name can be omitted. In this case the result is stored to a field with the name equal to string represenation +of the corresponding math expression. + For example, the following query divides `duration_msecs` field value by 1000, then rounds it to integer and stores the result in the `duration_secs` field: ```logsql @@ -1608,7 +1621,10 @@ The following mathematical operations are supported by `math` pipe: - `arg1 / arg2` - divides `arg1` by `arg2` - `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2` - `arg1 ^ arg2` - returns the power of `arg1` by `arg2` +- `arg1 default arg2` - returns `arg2` if `arg1` equals to `NaN`. - `abs(arg)` - returns an absolute value for the given `arg` +- `exp(arg)` - powers [`e`](https://en.wikipedia.org/wiki/E_(mathematical_constant)) by `arg`. +- `ln(arg)` - returns [natural logarightm](https://en.wikipedia.org/wiki/Natural_logarithm) for the given `arg`. - `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN` - `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN` - `round(arg)` - returns rounded to integer value for the given `arg`. The `round()` accepts optional `nearest` arg, which allows rounding the number to the given `nearest` multiple. @@ -1617,18 +1633,10 @@ The following mathematical operations are supported by `math` pipe: Every `argX` argument in every mathematical operation can contain one of the following values: - The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`. + If the log field contains value, which cannot be parsed into [supported numeric value](#numeric-values), then it is replaced with `NaN`. - Any [supported numeric value](#numeric-values). For example, `response_size_bytes / 1MiB`. - Another mathematical expression. Optionally, it may be put inside `(...)`. For example, `(a + b) * c`. -Multiple distinct results can be calculated in a single `math ...` pipe - just separate them with `,`. For example, the following query calculates the error rate -and the number of successful requests from `errors`, `warnings` and `requests` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model): - -```logsql -_time:5m | math - (errors / requests) as error_rate, - (requests - (errors + warnings)) as success_requests -``` - See also: - [`stats` pipe](#stats-pipe) @@ -1885,7 +1893,7 @@ See also: uses [`count` stats function](#count-stats) for calculating the number of logs for the last 5 minutes: ```logsql -_time:5m | stats count() logs_total +_time:5m | stats count() as logs_total ``` `| stats ...` pipe has the following basic format: @@ -1909,12 +1917,19 @@ For example, the following query calculates the following stats for logs over th _time:5m | stats count() logs_total, count_uniq(_stream) streams_total ``` -It is allowed to omit `stats` prefix for convenience. So the following query is equivalent to the previous one: +It is allowed omitting `stats` prefix for convenience. So the following query is equivalent to the previous one: ```logsql _time:5m | count() logs_total, count_uniq(_stream) streams_total ``` +It is allowed omitting the result name. In this case the result name equals to the string representation of the used [stats function](#stats-pipe-functions). +For example, the following query returns the same stats as the previous one, but gives uses `count()` and `count_uniq(_stream)` names for the returned fields: + +```logsql +_time:5m | count(), count_uniq(_stream) +``` + See also: - [stats by fields](#stats-by-fields) @@ -1954,6 +1969,12 @@ The `by` keyword can be skipped in `stats ...` pipe. For example, the following _time:5m | stats (host, path) count() logs_total, count_uniq(ip) ips_total ``` +See also: + +- [`row_min`](#row_min-stats) +- [`row_max`](#row_max-stats) +- [`row_any`](#row_any-stats) + #### Stats by time buckets The following syntax can be used for calculating stats grouped by time buckets: @@ -2284,12 +2305,13 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe): - [`count`](#count-stats) returns the number of log entries. - [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -- [`fields_max`](#fields_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field. -- [`fields_min`](#fields_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field. - [`max`](#max-stats) returns the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) returns the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`quantile`](#quantile-stats) returns the given quantile for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`row_any`](#row_any-stats) returns a sample [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) per each selected [stats group](#stats-by-fields). +- [`row_max`](#row_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field. +- [`row_min`](#row_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field. - [`sum`](#sum-stats) returns the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum_len`](#sum_len-stats) returns the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -2397,59 +2419,6 @@ See also: - [`uniq_values`](#uniq_values-stats) - [`count`](#count-stats) -### fields_max stats - -`fields_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. - -For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -across logs for the last 5 minutes: - -```logsql -_time:5m | stats fields_max(duration) as log_with_max_duration -``` - -Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. - -If only the specific fields are needed from the returned log entry, then they can be enumerated inside `fields_max(...)`. -For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes: - -```logsql -_time:5m | stats fields_max(duration, _time, path, duration) as time_and_ip_with_max_duration -``` - -See also: - -- [`max`](#max-stats) -- [`fields_min`](#fields_min-stats) - - -### fields_min stats - -`fields_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. - -For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -across logs for the last 5 minutes: - -```logsql -_time:5m | stats fields_min(duration) as log_with_min_duration -``` - -Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. - -If only the specific fields are needed from the returned log entry, then they can be enumerated inside `fields_max(...)`. -For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes: - -```logsql -_time:5m | stats fields_min(duration, _time, path, duration) as time_and_ip_with_min_duration -``` - -See also: - -- [`min`](#min-stats) -- [`fields_max`](#fields_max-stats) - ### max stats `max(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) returns the maximum value across @@ -2462,11 +2431,11 @@ over logs for the last 5 minutes: _time:5m | stats max(duration) max_duration ``` -[`fields_max`](#fields_max-stats) function can be used for obtaining other fields with the maximum duration. +[`row_max`](#row_max-stats) function can be used for obtaining other fields with the maximum duration. See also: -- [`fields_max`](#fields_max-stats) +- [`row_max`](#row_max-stats) - [`min`](#min-stats) - [`quantile`](#quantile-stats) - [`avg`](#avg-stats) @@ -2500,11 +2469,11 @@ over logs for the last 5 minutes: _time:5m | stats min(duration) min_duration ``` -[`fields_min`](#fields_min-stats) function can be used for obtaining other fields with the minimum duration. +[`row_min`](#row_min-stats) function can be used for obtaining other fields with the minimum duration. See also: -- [`fields_min`](#fields_min-stats) +- [`row_min`](#row_min-stats) - [`max`](#max-stats) - [`quantile`](#quantile-stats) - [`avg`](#avg-stats) @@ -2532,6 +2501,86 @@ See also: - [`median`](#median-stats) - [`avg`](#avg-stats) +### row_any stats + +`row_any()` [stats pipe function](#stats-pipe-functions) returns arbitrary [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +(aka sample) per each selected [stats group](#stats-by-fields). Log entry is returned as JSON-encoded dictionary with all the fields from the original log. + +For example, the following query returns a sample log entry per each [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) +across logs for the last 5 minutes: + +```logsql +_time:5m | stats by (_stream) row_any() as sample_row +``` + +Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. + +If only the specific fields are needed, then they can be enumerated inside `row_any(...)`. +For example, the following query returns only `_time`, `path` and `duration` fields from a sample log entry for logs over the last 5 minutes: + +```logsql +_time:5m | stats row_any(_time, path) as time_and_path_sample +``` + +See also: + +- [`row_max`](#row_max-stats) +- [`row_min`](#row_min-stats) + +### row_max stats + +`row_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. + +For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +across logs for the last 5 minutes: + +```logsql +_time:5m | stats row_max(duration) as log_with_max_duration +``` + +Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. + +If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`. +For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes: + +```logsql +_time:5m | stats row_max(duration, _time, path, duration) as time_and_path_with_max_duration +``` + +See also: + +- [`max`](#max-stats) +- [`row_min`](#row_min-stats) +- [`row_any`](#row_any-stats) + +### row_min stats + +`row_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. + +For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +across logs for the last 5 minutes: + +```logsql +_time:5m | stats row_min(duration) as log_with_min_duration +``` + +Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. + +If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`. +For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes: + +```logsql +_time:5m | stats row_min(duration, _time, path, duration) as time_and_path_with_min_duration +``` + +See also: + +- [`min`](#min-stats) +- [`row_max`](#row_max-stats) +- [`row_any`](#row_any-stats) + ### sum stats `sum(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) calculates the sum of numeric values across diff --git a/docs/VictoriaLogs/keyConcepts.md b/docs/VictoriaLogs/keyConcepts.md index c2cacfd08..44d25aa2e 100644 --- a/docs/VictoriaLogs/keyConcepts.md +++ b/docs/VictoriaLogs/keyConcepts.md @@ -17,7 +17,7 @@ aliases: [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) works with both structured and unstructured logs. Every log entry must contain at least [log message field](#message-field) plus arbitrary number of additional `key=value` fields. -A single log entry can be expressed as a single-level [JSON](https://www.json.org/json-en.html) object with string keys and values. +A single log entry can be expressed as a single-level [JSON](https://www.json.org/json-en.html) object with string keys and string values. For example: ```json @@ -31,6 +31,30 @@ For example: } ``` +Empty values are treated the same as non-existing values. For example, the following log entries are equivalent, +since they have only one identical non-empty field - [`_msg`](#message-field): + +```json +{ + "_msg": "foo bar", + "some_field": "", + "another_field": "" +} +``` + +```json +{ + "_msg": "foo bar", + "third_field": "", +} +``` + +```json +{ + "_msg": "foo bar", +} +``` + VictoriaLogs automatically transforms multi-level JSON (aka nested JSON) into single-level JSON during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/) according to the following rules: diff --git a/lib/logstorage/fields_set.go b/lib/logstorage/fields_set.go index faf52ce9d..cb1194f6e 100644 --- a/lib/logstorage/fields_set.go +++ b/lib/logstorage/fields_set.go @@ -28,6 +28,10 @@ func (fs fieldsSet) clone() fieldsSet { return fsNew } +func (fs fieldsSet) isEmpty() bool { + return len(fs) == 0 +} + func (fs fieldsSet) getAll() []string { a := make([]string, 0, len(fs)) for f := range fs { diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index 08bf8736a..59d623b2e 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -117,43 +117,48 @@ func (fa *filterAnd) initByFieldTokens() { byFieldFilters := make(map[string]int) var fieldNames []string - for _, f := range fa.filters { - fieldName := "" - var tokens []string - - switch t := f.(type) { - case *filterExact: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterExactPrefix: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterPhrase: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterPrefix: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterRegexp: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterSequence: - fieldName = t.fieldName - tokens = t.getTokens() - } - + mergeFieldTokens := func(fieldName string, tokens []string) { fieldName = getCanonicalColumnName(fieldName) byFieldFilters[fieldName]++ + if len(tokens) == 0 { + return + } - if len(tokens) > 0 { - mTokens, ok := m[fieldName] - if !ok { - fieldNames = append(fieldNames, fieldName) - mTokens = make(map[string]struct{}) - m[fieldName] = mTokens - } - for _, token := range tokens { - mTokens[token] = struct{}{} + mTokens, ok := m[fieldName] + if !ok { + fieldNames = append(fieldNames, fieldName) + mTokens = make(map[string]struct{}) + m[fieldName] = mTokens + } + for _, token := range tokens { + mTokens[token] = struct{}{} + } + } + + for _, f := range fa.filters { + switch t := f.(type) { + case *filterExact: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterExactPrefix: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterPhrase: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterPrefix: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterRegexp: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterSequence: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterOr: + bfts := t.getByFieldTokens() + for _, bft := range bfts { + mergeFieldTokens(bft.field, bft.tokens) } } } diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go index 742a0fb40..eb39f41f6 100644 --- a/lib/logstorage/filter_in.go +++ b/lib/logstorage/filter_in.go @@ -426,10 +426,12 @@ func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[str func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, commonTokens []string, tokenSets [][]string) bool { if len(commonTokens) > 0 { - return matchBloomFilterAllTokens(bs, ch, commonTokens) + if !matchBloomFilterAllTokens(bs, ch, commonTokens) { + return false + } } if len(tokenSets) == 0 { - return false + return len(commonTokens) > 0 } if len(tokenSets) > maxTokenSetsToInit || uint64(len(tokenSets)) > 10*bs.bsw.bh.rowsCount { // It is faster to match every row in the block against all the values @@ -471,7 +473,22 @@ func getCommonTokensAndTokenSets(values []string) ([]string, [][]string) { if len(commonTokens) == 0 { return nil, tokenSets } - return commonTokens, nil + + // remove commonTokens from tokenSets + for i, tokens := range tokenSets { + dstTokens := tokens[:0] + for _, token := range tokens { + if !slices.Contains(commonTokens, token) { + dstTokens = append(dstTokens, token) + } + } + if len(dstTokens) == 0 { + return commonTokens, nil + } + tokenSets[i] = dstTokens + } + + return commonTokens, tokenSets } func getCommonTokens(tokenSets [][]string) []string { diff --git a/lib/logstorage/filter_in_test.go b/lib/logstorage/filter_in_test.go index 29f1e35ff..80c76c616 100644 --- a/lib/logstorage/filter_in_test.go +++ b/lib/logstorage/filter_in_test.go @@ -716,6 +716,6 @@ func TestGetCommonTokensAndTokenSets(t *testing.T) { f([]string{"foo", "foo"}, []string{"foo"}, nil) f([]string{"foo", "bar", "bar", "foo"}, nil, [][]string{{"foo"}, {"bar"}, {"bar"}, {"foo"}}) f([]string{"foo", "foo bar", "bar foo"}, []string{"foo"}, nil) - f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, nil) + f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, [][]string{{"a"}, {"abc"}, {"a", "abc"}}) f([]string{"a xfoo bar", "xbar abc foo", "foo abc a bar"}, nil, [][]string{{"a", "bar", "xfoo"}, {"abc", "foo", "xbar"}, {"a", "abc", "bar", "foo"}}) } diff --git a/lib/logstorage/filter_or.go b/lib/logstorage/filter_or.go index 256337a0c..040c74680 100644 --- a/lib/logstorage/filter_or.go +++ b/lib/logstorage/filter_or.go @@ -130,39 +130,44 @@ func (fo *filterOr) initByFieldTokens() { byFieldFilters := make(map[string]int) var fieldNames []string - for _, f := range fo.filters { - fieldName := "" - var tokens []string - - switch t := f.(type) { - case *filterExact: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterExactPrefix: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterPhrase: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterPrefix: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterRegexp: - fieldName = t.fieldName - tokens = t.getTokens() - case *filterSequence: - fieldName = t.fieldName - tokens = t.getTokens() - } - + mergeFieldTokens := func(fieldName string, tokens []string) { fieldName = getCanonicalColumnName(fieldName) byFieldFilters[fieldName]++ + if len(tokens) == 0 { + return + } - if len(tokens) > 0 { - if _, ok := m[fieldName]; !ok { - fieldNames = append(fieldNames, fieldName) + if _, ok := m[fieldName]; !ok { + fieldNames = append(fieldNames, fieldName) + } + m[fieldName] = append(m[fieldName], tokens) + } + + for _, f := range fo.filters { + switch t := f.(type) { + case *filterExact: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterExactPrefix: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterPhrase: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterPrefix: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterRegexp: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterSequence: + tokens := t.getTokens() + mergeFieldTokens(t.fieldName, tokens) + case *filterAnd: + bfts := t.getByFieldTokens() + for _, bft := range bfts { + mergeFieldTokens(bft.field, bft.tokens) } - m[fieldName] = append(m[fieldName], tokens) } } diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 01f89c5fe..702db4678 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -909,15 +909,18 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats count(foo,*,bar) x`, `* | stats count(*) as x`) f(`* | stats count('') foo`, `* | stats count(_msg) as foo`) f(`* | stats count(foo) ''`, `* | stats count(foo) as _msg`) + f(`* | count()`, `* | stats count(*) as "count(*)"`) // stats pipe count_empty f(`* | stats count_empty() x`, `* | stats count_empty(*) as x`) - f(`* | stats by (x, y) count_empty(a,b,c) x`, `* | stats by (x, y) count_empty(a, b, c) as x`) + f(`* | stats by (x, y) count_empty(a,b,c) z`, `* | stats by (x, y) count_empty(a, b, c) as z`) + f(`* | count_empty()`, `* | stats count_empty(*) as "count_empty(*)"`) // stats pipe sum f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`) f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`) f(`* | stats sum() x`, `* | stats sum(*) as x`) + f(`* | sum()`, `* | stats sum(*) as "sum(*)"`) f(`* | stats sum(*) x`, `* | stats sum(*) as x`) f(`* | stats sum(foo,*,bar) x`, `* | stats sum(*) as x`) @@ -925,6 +928,7 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Max(foo) bar`, `* | stats max(foo) as bar`) f(`* | stats BY(x, y, ) MAX(foo,bar,) bar`, `* | stats by (x, y) max(foo, bar) as bar`) f(`* | stats max() x`, `* | stats max(*) as x`) + f(`* | max()`, `* | stats max(*) as "max(*)"`) f(`* | stats max(*) x`, `* | stats max(*) as x`) f(`* | stats max(foo,*,bar) x`, `* | stats max(*) as x`) @@ -932,22 +936,26 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`) f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`) f(`* | stats min() x`, `* | stats min(*) as x`) + f(`* | min()`, `* | stats min(*) as "min(*)"`) f(`* | stats min(*) x`, `* | stats min(*) as x`) f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`) - // stats pipe fields_min - f(`* | stats fields_Min(foo) bar`, `* | stats fields_min(foo) as bar`) - f(`* | stats BY(x, y, ) fields_MIN(foo,bar,) bar`, `* | stats by (x, y) fields_min(foo, bar) as bar`) + // stats pipe row_min + f(`* | stats row_Min(foo) bar`, `* | stats row_min(foo) as bar`) + f(`* | row_Min(foo)`, `* | stats row_min(foo) as "row_min(foo)"`) + f(`* | stats BY(x, y, ) row_MIN(foo,bar,) bar`, `* | stats by (x, y) row_min(foo, bar) as bar`) // stats pipe avg f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`) f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`) f(`* | stats avg() x`, `* | stats avg(*) as x`) + f(`* | avg()`, `* | stats avg(*) as "avg(*)"`) f(`* | stats avg(*) x`, `* | stats avg(*) as x`) f(`* | stats avg(foo,*,bar) x`, `* | stats avg(*) as x`) // stats pipe count_uniq f(`* | stats count_uniq(foo) bar`, `* | stats count_uniq(foo) as bar`) + f(`* | count_uniq(foo)`, `* | stats count_uniq(foo) as "count_uniq(foo)"`) f(`* | stats by(x, y) count_uniq(foo,bar) LiMit 10 As baz`, `* | stats by (x, y) count_uniq(foo, bar) limit 10 as baz`) f(`* | stats by(x) count_uniq(*) z`, `* | stats by (x) count_uniq(*) as z`) f(`* | stats by(x) count_uniq() z`, `* | stats by (x) count_uniq(*) as z`) @@ -955,6 +963,7 @@ func TestParseQuerySuccess(t *testing.T) { // stats pipe uniq_values f(`* | stats uniq_values(foo) bar`, `* | stats uniq_values(foo) as bar`) + f(`* | uniq_values(foo)`, `* | stats uniq_values(foo) as "uniq_values(foo)"`) f(`* | stats uniq_values(foo) limit 10 bar`, `* | stats uniq_values(foo) limit 10 as bar`) f(`* | stats by(x, y) uniq_values(foo, bar) as baz`, `* | stats by (x, y) uniq_values(foo, bar) as baz`) f(`* | stats by(x) uniq_values(*) y`, `* | stats by (x) uniq_values(*) as y`) @@ -963,6 +972,7 @@ func TestParseQuerySuccess(t *testing.T) { // stats pipe values f(`* | stats values(foo) bar`, `* | stats values(foo) as bar`) + f(`* | values(foo)`, `* | stats values(foo) as "values(foo)"`) f(`* | stats values(foo) limit 10 bar`, `* | stats values(foo) limit 10 as bar`) f(`* | stats by(x, y) values(foo, bar) as baz`, `* | stats by (x, y) values(foo, bar) as baz`) f(`* | stats by(x) values(*) y`, `* | stats by (x) values(*) as y`) @@ -973,6 +983,7 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Sum_len(foo) bar`, `* | stats sum_len(foo) as bar`) f(`* | stats BY(x, y, ) SUM_Len(foo,bar,) bar`, `* | stats by (x, y) sum_len(foo, bar) as bar`) f(`* | stats sum_len() x`, `* | stats sum_len(*) as x`) + f(`* | sum_len()`, `* | stats sum_len(*) as "sum_len(*)"`) f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`) f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`) @@ -981,12 +992,14 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`) f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`) f(`* | stats quantile(0.99) bar`, `* | stats quantile(0.99) as bar`) + f(`* | quantile(0.99)`, `* | stats quantile(0.99) as "quantile(0.99)"`) f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99) as bar`) // stats pipe median f(`* | stats Median(foo) bar`, `* | stats median(foo) as bar`) f(`* | stats BY(x, y, ) MEDIAN(foo,bar,) bar`, `* | stats by (x, y) median(foo, bar) as bar`) f(`* | stats median() x`, `* | stats median(*) as x`) + f(`* | median()`, `* | stats median(*) as "median(*)"`) f(`* | stats median(*) x`, `* | stats median(*) as x`) f(`* | stats median(foo,*,bar) x`, `* | stats median(*) as x`) @@ -995,7 +1008,7 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`) // stats pipe with grouping buckets - f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count(*) as foo`) + f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as bar`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count(*) as bar`) f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count(*) as foo`) f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`) f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`) @@ -1357,7 +1370,6 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats count(`) f(`foo | stats count bar`) f(`foo | stats count(bar`) - f(`foo | stats count(bar)`) f(`foo | stats count() as`) f(`foo | stats count() as |`) @@ -1368,27 +1380,21 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats sum f(`foo | stats sum`) - f(`foo | stats sum()`) // invalid stats max f(`foo | stats max`) - f(`foo | stats max()`) // invalid stats min f(`foo | stats min`) - f(`foo | stats min()`) // invalid stats min - f(`foo | stats fields_min`) - f(`foo | stats fields_min()`) + f(`foo | stats row_min`) // invalid stats avg f(`foo | stats avg`) - f(`foo | stats avg()`) // invalid stats count_uniq f(`foo | stats count_uniq`) - f(`foo | stats count_uniq()`) f(`foo | stats count_uniq() limit`) f(`foo | stats count_uniq() limit foo`) f(`foo | stats count_uniq() limit 0.5`) @@ -1396,7 +1402,6 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats uniq_values f(`foo | stats uniq_values`) - f(`foo | stats uniq_values()`) f(`foo | stats uniq_values() limit`) f(`foo | stats uniq_values(a) limit foo`) f(`foo | stats uniq_values(a) limit 0.5`) @@ -1404,7 +1409,6 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats values f(`foo | stats values`) - f(`foo | stats values()`) f(`foo | stats values() limit`) f(`foo | stats values(a) limit foo`) f(`foo | stats values(a) limit 0.5`) @@ -1412,7 +1416,6 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats sum_len f(`foo | stats sum_len`) - f(`foo | stats sum_len()`) // invalid stats quantile f(`foo | stats quantile`) @@ -1436,6 +1439,12 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats by(bar,`) f(`foo | stats by(bar)`) + // duplicate stats result names + f(`foo | stats min() x, max() x`) + + // stats result names identical to by fields + f(`foo | stats by (x) count() x`) + // invalid sort pipe f(`foo | sort bar`) f(`foo | sort by`) @@ -1513,10 +1522,10 @@ func TestQueryGetNeededColumns(t *testing.T) { unneededColumns := strings.Join(unneeded, ",") if neededColumns != neededColumnsExpected { - t.Fatalf("unexpected neededColumns; got %q; want %q", neededColumns, neededColumnsExpected) + t.Fatalf("unexpected neededColumns for [%s]; got %q; want %q", s, neededColumns, neededColumnsExpected) } if unneededColumns != unneededColumnsExpected { - t.Fatalf("unexpected unneededColumns; got %q; want %q", unneededColumns, unneededColumnsExpected) + t.Fatalf("unexpected unneededColumns for [%s]; got %q; want %q", s, unneededColumns, unneededColumnsExpected) } } @@ -1542,7 +1551,6 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | fields f1, f2 | mv f1 f3, f4 f5`, `f1,f2`, ``) f(`* | fields f1, f2 | mv f2 f3, f4 f5`, `f1,f2`, ``) f(`* | fields f1, f2 | mv f2 f3, f4 f1`, `f2`, ``) - f(`* | fields f1, f2 | stats count() r1`, ``, ``) f(`* | fields f1, f2 | stats count_uniq() r1`, `f1,f2`, ``) f(`* | fields f1, f2 | stats count(f1) r1`, `f1`, ``) f(`* | fields f1, f2 | stats count(f1,f2,f3) r1`, `f1,f2`, ``) @@ -1553,7 +1561,7 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | fields f1, f2 | sort by(f3)`, `f1,f2`, ``) f(`* | fields f1, f2 | sort by(f1,f3)`, `f1,f2`, ``) f(`* | fields f1, f2 | sort by(f3) | stats count() r1`, ``, ``) - f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, `f1`, ``) + f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, ``, ``) f(`* | fields f1, f2 | sort by(f1) | stats count(f2,f3) r1`, `f1,f2`, ``) f(`* | fields f1, f2 | sort by(f3) | fields f2`, `f2`, ``) f(`* | fields f1, f2 | sort by(f1,f3) | fields f2`, `f1,f2`, ``) @@ -1619,12 +1627,12 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | stats count_uniq() q`, `*`, ``) f(`* | stats count_uniq(*) q`, `*`, ``) f(`* | stats count_uniq(x) q`, `x`, ``) - f(`* | stats fields_max(a) q`, `*`, ``) - f(`* | stats fields_max(a, *) q`, `*`, ``) - f(`* | stats fields_max(a, x) q`, `a,x`, ``) - f(`* | stats fields_min(a) q`, `*`, ``) - f(`* | stats fields_min(a, *) q`, `*`, ``) - f(`* | stats fields_min(a, x) q`, `a,x`, ``) + f(`* | stats row_max(a) q`, `*`, ``) + f(`* | stats row_max(a, *) q`, `*`, ``) + f(`* | stats row_max(a, x) q`, `a,x`, ``) + f(`* | stats row_min(a) q`, `*`, ``) + f(`* | stats row_min(a, *) q`, `*`, ``) + f(`* | stats row_min(a, x) q`, `a,x`, ``) f(`* | stats min() q`, `*`, ``) f(`* | stats min(*) q`, `*`, ``) f(`* | stats min(x) q`, `x`, ``) @@ -1776,11 +1784,51 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | rm f1, f2 | mv f2 f3 | sort by(f1)`, `*`, `f1,f2,f3`) f(`* | rm f1, f2 | fields f3`, `f3`, ``) f(`* | rm f1, f2 | fields f1,f3`, `f3`, ``) - f(`* | rm f1, f2 | stats count() f1`, ``, ``) f(`* | rm f1, f2 | stats count(f3) r1`, `f3`, ``) f(`* | rm f1, f2 | stats count(f1) r1`, ``, ``) f(`* | rm f1, f2 | stats count(f1,f3) r1`, `f3`, ``) f(`* | rm f1, f2 | stats by(f1) count(f2) r1`, ``, ``) f(`* | rm f1, f2 | stats by(f3) count(f2) r1`, `f3`, ``) f(`* | rm f1, f2 | stats by(f3) count(f4) r1`, `f3,f4`, ``) + + // Verify that fields are correctly tracked before count(*) + f(`* | copy a b, c d | count() r1`, ``, ``) + f(`* | delete a, b | count() r1`, ``, ``) + f(`* | extract "bar" from x | count() r1`, ``, ``) + f(`* | extract if (q:w p:a) "bar" from x | count() r1`, `p,q`, ``) + f(`* | extract_regexp "(?P.*)bar" from x | count() r1`, ``, ``) + f(`* | extract_regexp if (q:w p:a) "(?P.*)bar" from x | count() r1`, `p,q`, ``) + f(`* | field_names | count() r1`, `*`, `_time`) + f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``) + f(`* | fields a, b | count() r1`, ``, ``) + f(`* | field_values a | count() r1`, `a`, ``) + f(`* | limit 10 | filter a:b c:d | count() r1`, `a,c`, ``) + f(`* | limit 10 | count() r1`, ``, ``) + f(`* | format "" as c | count() r1`, ``, ``) + f(`* | format if (q:w p:a) "" as c | count() r1`, `p,q`, ``) + f(`* | math (a + b) as c, d * 2 as x | count() r1`, ``, ``) + f(`* | offset 10 | count() r1`, ``, ``) + f(`* | pack_json | count() r1`, ``, ``) + f(`* | pack_json fields(a,b) | count() r1`, ``, ``) + f(`* | rename a b, c d | count() r1`, ``, ``) + f(`* | replace ("a", "b") at x | count() r1`, ``, ``) + f(`* | replace if (q:w p:a) ("a", "b") at x | count() r1`, `p,q`, ``) + f(`* | replace_regexp ("a", "b") at x | count() r1`, ``, ``) + f(`* | replace_regexp if (q:w p:a) ("a", "b") at x | count() r1`, `p,q`, ``) + f(`* | sort by (a,b) | count() r1`, ``, ``) + f(`* | stats count_uniq(a, b) as c | count() r1`, ``, ``) + f(`* | stats count_uniq(a, b) if (q:w p:a) as c | count() r1`, ``, ``) + f(`* | stats by (a1,a2) count_uniq(a, b) as c | count() r1`, `a1,a2`, ``) + f(`* | stats by (a1,a2) count_uniq(a, b) if (q:w p:a) as c | count() r1`, `a1,a2`, ``) + f(`* | uniq by (a, b) | count() r1`, `a,b`, ``) + f(`* | unpack_json from x | count() r1`, ``, ``) + f(`* | unpack_json from x fields (a,b) | count() r1`, ``, ``) + f(`* | unpack_json if (q:w p:a) from x | count() r1`, `p,q`, ``) + f(`* | unpack_json if (q:w p:a) from x fields(a,b) | count() r1`, `p,q`, ``) + f(`* | unpack_logfmt from x | count() r1`, ``, ``) + f(`* | unpack_logfmt from x fields (a,b) | count() r1`, ``, ``) + f(`* | unpack_logfmt if (q:w p:a) from x | count() r1`, `p,q`, ``) + f(`* | unpack_logfmt if (q:w p:a) from x fields(a,b) | count() r1`, `p,q`, ``) + f(`* | unroll (a, b) | count() r1`, `a,b`, ``) + f(`* | unroll if (q:w p:a) (a, b) | count() r1`, `a,b,p,q`, ``) } diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 543b769fb..9f6b22e1e 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -24,10 +24,8 @@ func (pd *pipeDelete) String() string { func (pd *pipeDelete) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { - // update only unneeded fields unneededFields.addFields(pd.fields) } else { - // update only needed fields neededFields.removeFields(pd.fields) } } diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index a0cfe16f4..99d21fad5 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -60,6 +60,13 @@ func (pe *pipeExtract) initFilterInValues(cache map[string][]string, getFieldVal } func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.isEmpty() { + if pe.iff != nil { + neededFields.addFields(pe.iff.neededFields) + } + return + } + if neededFields.contains("*") { unneededFieldsOrig := unneededFields.clone() needFromField := false diff --git a/lib/logstorage/pipe_extract_regexp.go b/lib/logstorage/pipe_extract_regexp.go index 5a5c4017a..0df49dd7d 100644 --- a/lib/logstorage/pipe_extract_regexp.go +++ b/lib/logstorage/pipe_extract_regexp.go @@ -62,6 +62,13 @@ func (pe *pipeExtractRegexp) initFilterInValues(cache map[string][]string, getFi } func (pe *pipeExtractRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.isEmpty() { + if pe.iff != nil { + neededFields.addFields(pe.iff.neededFields) + } + return + } + if neededFields.contains("*") { unneededFieldsOrig := unneededFields.clone() needFromField := false diff --git a/lib/logstorage/pipe_field_values.go b/lib/logstorage/pipe_field_values.go index d9c1f57ac..b56111f55 100644 --- a/lib/logstorage/pipe_field_values.go +++ b/lib/logstorage/pipe_field_values.go @@ -22,6 +22,11 @@ func (pf *pipeFieldValues) String() string { } func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.isEmpty() { + neededFields.add(pf.field) + return + } + if neededFields.contains("*") { neededFields.reset() if !unneededFields.contains(pf.field) { diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index 1724897df..0c08d57a9 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -29,6 +29,7 @@ func (pf *pipeFields) updateNeededFields(neededFields, unneededFields fieldsSet) if pf.containsStar { return } + if neededFields.contains("*") { // subtract unneeded fields from pf.fields neededFields.reset() diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index 7146f99ed..aa91a0677 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -43,6 +43,13 @@ func (pf *pipeFormat) String() string { } func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.isEmpty() { + if pf.iff != nil { + neededFields.addFields(pf.iff.neededFields) + } + return + } + if neededFields.contains("*") { if !unneededFields.contains(pf.resultField) { if !pf.keepOriginalFields && !pf.skipEmptyResults { diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go index 47f7ed73a..11bd5d5cd 100644 --- a/lib/logstorage/pipe_math.go +++ b/lib/logstorage/pipe_math.go @@ -161,6 +161,10 @@ var mathBinaryOps = map[string]mathBinaryOp{ priority: 3, f: mathFuncMinus, }, + "default": { + priority: 10, + f: mathFuncDefault, + }, } type mathBinaryOp struct { @@ -175,26 +179,19 @@ func (pm *pipeMath) updateNeededFields(neededFields, unneededFields fieldsSet) { if !unneededFields.contains(e.resultField) { unneededFields.add(e.resultField) - entryFields := e.getNeededFields() - unneededFields.removeFields(entryFields) + fs := newFieldsSet() + e.expr.updateNeededFields(fs) + unneededFields.removeFields(fs.getAll()) } } else { if neededFields.contains(e.resultField) { neededFields.remove(e.resultField) - - entryFields := e.getNeededFields() - neededFields.addFields(entryFields) + e.expr.updateNeededFields(neededFields) } } } } -func (me *mathEntry) getNeededFields() []string { - neededFields := newFieldsSet() - me.expr.updateNeededFields(neededFields) - return neededFields.getAll() -} - func (me *mathExpr) updateNeededFields(neededFields fieldsSet) { if me.isConst { return @@ -387,14 +384,20 @@ func parseMathEntry(lex *lexer) (*mathEntry, error) { return nil, err } - // skip optional 'as' - if lex.isKeyword("as") { - lex.nextToken() - } + resultField := "" + if lex.isKeyword(",", "|", ")", "") { + resultField = me.String() + } else { + if lex.isKeyword("as") { + // skip optional 'as' + lex.nextToken() + } - resultField, err := parseFieldName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name for [%s]: %w", me, err) + fieldName, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for [%s]: %w", me, err) + } + resultField = fieldName } e := &mathEntry{ @@ -476,6 +479,10 @@ func parseMathExprOperand(lex *lexer) (*mathExpr, error) { switch { case lex.isKeyword("abs"): return parseMathExprAbs(lex) + case lex.isKeyword("exp"): + return parseMathExprExp(lex) + case lex.isKeyword("ln"): + return parseMathExprLn(lex) case lex.isKeyword("max"): return parseMathExprMax(lex) case lex.isKeyword("min"): @@ -506,6 +513,28 @@ func parseMathExprAbs(lex *lexer) (*mathExpr, error) { return me, nil } +func parseMathExprExp(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "exp", mathFuncExp) + if err != nil { + return nil, err + } + if len(me.args) != 1 { + return nil, fmt.Errorf("'exp' function accepts only one arg; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + +func parseMathExprLn(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "ln", mathFuncLn) + if err != nil { + return nil, err + } + if len(me.args) != 1 { + return nil, fmt.Errorf("'ln' function accepts only one arg; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + func parseMathExprMax(lex *lexer) (*mathExpr, error) { me, err := parseMathExprGenericFunc(lex, "max", mathFuncMax) if err != nil { @@ -707,6 +736,18 @@ func mathFuncPow(result []float64, args [][]float64) { } } +func mathFuncDefault(result []float64, args [][]float64) { + values := args[0] + defaultValues := args[1] + for i := range result { + f := values[i] + if math.IsNaN(f) { + f = defaultValues[i] + } + result[i] = f + } +} + func mathFuncAbs(result []float64, args [][]float64) { arg := args[0] for i := range result { @@ -714,6 +755,20 @@ func mathFuncAbs(result []float64, args [][]float64) { } } +func mathFuncExp(result []float64, args [][]float64) { + arg := args[0] + for i := range result { + result[i] = math.Exp(arg[i]) + } +} + +func mathFuncLn(result []float64, args [][]float64) { + arg := args[0] + for i := range result { + result[i] = math.Log(arg[i]) + } +} + func mathFuncUnaryMinus(result []float64, args [][]float64) { arg := args[0] for i := range result { diff --git a/lib/logstorage/pipe_math_test.go b/lib/logstorage/pipe_math_test.go index 66a9da24f..7795263ac 100644 --- a/lib/logstorage/pipe_math_test.go +++ b/lib/logstorage/pipe_math_test.go @@ -22,6 +22,8 @@ func TestParsePipeMathSuccess(t *testing.T) { f(`math min(3, foo, (1 + bar) / baz) as a, max(a, b) as b, (abs(c) + 5) as d`) f(`math round(foo) as x`) f(`math round(foo, 0.1) as y`) + f(`math (a / b default 10) as z`) + f(`math (ln(a) + exp(b)) as x`) } func TestParsePipeMathFailure(t *testing.T) { @@ -31,7 +33,6 @@ func TestParsePipeMathFailure(t *testing.T) { } f(`math`) - f(`math x`) f(`math x as`) f(`math abs() as x`) f(`math abs(a, b) as x`) @@ -63,6 +64,94 @@ func TestPipeMath(t *testing.T) { }, }) + f("math a / b default c", [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + }, + { + {"a", "0"}, + {"b", "0"}, + {"c", "3"}, + }, + { + {"a", "3"}, + {"b", "2"}, + }, + { + {"a", "3"}, + {"b", "foo"}, + }, + }, [][]Field{ + { + {"a", "v1"}, + {"b", "2"}, + {"c", "3"}, + {"a / b default c", "3"}, + }, + { + {"a", "0"}, + {"b", "0"}, + {"c", "3"}, + {"a / b default c", "3"}, + }, + { + {"a", "3"}, + {"b", "2"}, + {"a / b default c", "1.5"}, + }, + { + {"a", "3"}, + {"b", "foo"}, + {"a / b default c", "NaN"}, + }, + }) + + f("math round(exp(a), 0.01), round(ln(a), 0.01)", [][]Field{ + { + {"a", "v1"}, + }, + { + {"a", "0"}, + }, + { + {"a", "1"}, + }, + { + {"a", "2"}, + }, + { + {"a", "3"}, + }, + }, [][]Field{ + { + {"a", "v1"}, + {"round(exp(a), 0.01)", "NaN"}, + {"round(ln(a), 0.01)", "NaN"}, + }, + { + {"a", "0"}, + {"round(exp(a), 0.01)", "1"}, + {"round(ln(a), 0.01)", "NaN"}, + }, + { + {"a", "1"}, + {"round(exp(a), 0.01)", "2.72"}, + {"round(ln(a), 0.01)", "0"}, + }, + { + {"a", "2"}, + {"round(exp(a), 0.01)", "7.39"}, + {"round(ln(a), 0.01)", "0.69"}, + }, + { + {"a", "3"}, + {"round(exp(a), 0.01)", "20.09"}, + {"round(ln(a), 0.01)", "1.1"}, + }, + }) + f("math 1 as a", [][]Field{ { {"a", "v1"}, diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 0b6d3cbbd..1bcee3670 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -56,6 +56,10 @@ func (ps *pipeSort) String() string { } func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.isEmpty() { + return + } + if len(ps.byFields) == 0 { neededFields.add("*") unneededFields.reset() diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 31506bfa3..b60c51ac5 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -545,9 +545,17 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { ps.byFields = bfs } + seenByFields := make(map[string]*byStatsField, len(ps.byFields)) + for _, bf := range ps.byFields { + seenByFields[bf.name] = bf + } + + seenResultNames := make(map[string]statsFunc) + var funcs []pipeStatsFunc for { var f pipeStatsFunc + sf, err := parseStatsFunc(lex) if err != nil { return nil, err @@ -557,15 +565,31 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { if lex.isKeyword("if") { iff, err := parseIfFilter(lex) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot parse 'if' filter for [%s]: %w", sf, err) } f.iff = iff } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err) + resultName := "" + if lex.isKeyword(",", "|", ")", "") { + resultName = sf.String() + } else { + if lex.isKeyword("as") { + lex.nextToken() + } + fieldName, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err) + } + resultName = fieldName } + if bf := seenByFields[resultName]; bf != nil { + return nil, fmt.Errorf("the %q is used as 'by' field [%s], so it cannot be used as result name for [%s]", resultName, bf, sf) + } + if sfPrev := seenResultNames[resultName]; sfPrev != nil { + return nil, fmt.Errorf("cannot use identical result name %q for [%s] and [%s]", resultName, sfPrev, sf) + } + seenResultNames[resultName] = sf f.resultName = resultName funcs = append(funcs, f) @@ -575,7 +599,7 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { return &ps, nil } if !lex.isKeyword(",") { - return nil, fmt.Errorf("unexpected token %q; want ',', '|' or ')'", lex.token) + return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", sf, lex.token) } lex.nextToken() } @@ -607,18 +631,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err) } return sus, nil - case lex.isKeyword("fields_max"): - sms, err := parseStatsFieldsMax(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'fields_max' func: %w", err) - } - return sms, nil - case lex.isKeyword("fields_min"): - sms, err := parseStatsFieldsMin(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'fields_min' func: %w", err) - } - return sms, nil case lex.isKeyword("max"): sms, err := parseStatsMax(lex) if err != nil { @@ -643,6 +655,24 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err) } return sqs, nil + case lex.isKeyword("row_any"): + sas, err := parseStatsRowAny(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_any' func: %w", err) + } + return sas, nil + case lex.isKeyword("row_max"): + sms, err := parseStatsRowMax(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err) + } + return sms, nil + case lex.isKeyword("row_min"): + sms, err := parseStatsRowMin(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err) + } + return sms, nil case lex.isKeyword("sum"): sss, err := parseStatsSum(lex) if err != nil { @@ -672,17 +702,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { } } -func parseResultName(lex *lexer) (string, error) { - if lex.isKeyword("as") { - lex.nextToken() - } - resultName, err := parseFieldName(lex) - if err != nil { - return "", err - } - return resultName, nil -} - var zeroByStatsField = &byStatsField{} // byStatsField represents 'by (...)' part of the pipeStats. diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go index 5363518db..f6cab5fcf 100644 --- a/lib/logstorage/pipe_stats_test.go +++ b/lib/logstorage/pipe_stats_test.go @@ -39,8 +39,8 @@ func TestPipeStats(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } - // missing 'stats' keyword - f("count(*) as rows", [][]Field{ + // missing 'stats' keyword and resutl name + f("count(*)", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -56,7 +56,7 @@ func TestPipeStats(t *testing.T) { }, }, [][]Field{ { - {"rows", "3"}, + {`count(*)`, "3"}, }, }) diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index fde04b369..fe2e36d78 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -7,6 +7,13 @@ import ( ) func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter, neededFields, unneededFields fieldsSet) { + if neededFields.isEmpty() { + if iff != nil { + neededFields.addFields(iff.neededFields) + } + return + } + if neededFields.contains("*") { unneededFieldsOrig := unneededFields.clone() unneededFieldsCount := 0 diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go index 2a2978f0b..97117b259 100644 --- a/lib/logstorage/pipe_unroll.go +++ b/lib/logstorage/pipe_unroll.go @@ -52,25 +52,15 @@ func (pu *pipeUnroll) initFilterInValues(cache map[string][]string, getFieldValu func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { - unneededFieldsCount := 0 - for _, f := range pu.fields { - if unneededFields.contains(f) { - unneededFieldsCount++ - } - } - if unneededFieldsCount < len(pu.fields) && pu.iff != nil { + if pu.iff != nil { unneededFields.removeFields(pu.iff.neededFields) } + unneededFields.removeFields(pu.fields) } else { - needIfFields := false - for _, f := range pu.fields { - if neededFields.contains(f) { - needIfFields = true - } - } - if needIfFields && pu.iff != nil { + if pu.iff != nil { neededFields.addFields(pu.iff.neededFields) } + neededFields.addFields(pu.fields) } } diff --git a/lib/logstorage/pipe_unroll_test.go b/lib/logstorage/pipe_unroll_test.go index f30dff562..75ad91eaa 100644 --- a/lib/logstorage/pipe_unroll_test.go +++ b/lib/logstorage/pipe_unroll_test.go @@ -225,13 +225,13 @@ func TestPipeUnrollUpdateNeededFields(t *testing.T) { f("unroll if (f1:b) (x)", "*", "f1,f2", "*", "f2") // all the needed fields, unneeded fields intersect with src - f("unroll (x)", "*", "f2,x", "*", "f2,x") - f("unroll if (a:b) (x)", "*", "f2,x", "*", "f2,x") - f("unroll if (f2:b) (x)", "*", "f2,x", "*", "f2,x") + f("unroll (x)", "*", "f2,x", "*", "f2") + f("unroll if (a:b) (x)", "*", "f2,x", "*", "f2") + f("unroll if (f2:b) (x)", "*", "f2,x", "*", "") // needed fields do not intersect with src - f("unroll (x)", "f1,f2", "", "f1,f2", "") - f("unroll if (a:b) (x)", "f1,f2", "", "f1,f2", "") + f("unroll (x)", "f1,f2", "", "f1,f2,x", "") + f("unroll if (a:b) (x)", "f1,f2", "", "a,f1,f2,x", "") // needed fields intersect with src f("unroll (x)", "f2,x", "", "f2,x", "") diff --git a/lib/logstorage/pipe_update.go b/lib/logstorage/pipe_update.go index 718c2e37b..2684bd9a5 100644 --- a/lib/logstorage/pipe_update.go +++ b/lib/logstorage/pipe_update.go @@ -5,6 +5,13 @@ import ( ) func updateNeededFieldsForUpdatePipe(neededFields, unneededFields fieldsSet, field string, iff *ifFilter) { + if neededFields.isEmpty() { + if iff != nil { + neededFields.addFields(iff.neededFields) + } + return + } + if neededFields.contains("*") { if !unneededFields.contains(field) && iff != nil { unneededFields.removeFields(iff.neededFields) diff --git a/lib/logstorage/stats_row_any.go b/lib/logstorage/stats_row_any.go new file mode 100644 index 000000000..6707040cc --- /dev/null +++ b/lib/logstorage/stats_row_any.go @@ -0,0 +1,127 @@ +package logstorage + +import ( + "fmt" + "slices" + "strings" + "unsafe" +) + +type statsRowAny struct { + fields []string +} + +func (sa *statsRowAny) String() string { + return "row_any(" + statsFuncFieldsToString(sa.fields) + ")" +} + +func (sa *statsRowAny) updateNeededFields(neededFields fieldsSet) { + if len(sa.fields) == 0 { + neededFields.add("*") + } else { + neededFields.addFields(sa.fields) + } +} + +func (sa *statsRowAny) newStatsProcessor() (statsProcessor, int) { + sap := &statsRowAnyProcessor{ + sa: sa, + } + return sap, int(unsafe.Sizeof(*sap)) +} + +type statsRowAnyProcessor struct { + sa *statsRowAny + + captured bool + + fields []Field +} + +func (sap *statsRowAnyProcessor) updateStatsForAllRows(br *blockResult) int { + if len(br.timestamps) == 0 { + return 0 + } + if sap.captured { + return 0 + } + sap.captured = true + + return sap.updateState(br, 0) +} + +func (sap *statsRowAnyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if sap.captured { + return 0 + } + sap.captured = true + + return sap.updateState(br, rowIdx) +} + +func (sap *statsRowAnyProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsRowAnyProcessor) + if !sap.captured { + sap.captured = src.captured + sap.fields = src.fields + } +} + +func (sap *statsRowAnyProcessor) updateState(br *blockResult, rowIdx int) int { + stateSizeIncrease := 0 + fields := sap.fields + fetchFields := sap.sa.fields + if len(fetchFields) == 0 { + cs := br.getColumns() + for _, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } else { + for _, field := range fetchFields { + c := br.getColumnByName(field) + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } + sap.fields = fields + + return stateSizeIncrease +} + +func (sap *statsRowAnyProcessor) finalizeStats() string { + bb := bbPool.Get() + bb.B = marshalFieldsToJSON(bb.B, sap.fields) + result := string(bb.B) + bbPool.Put(bb) + + return result +} + +func parseStatsRowAny(lex *lexer) (*statsRowAny, error) { + if !lex.isKeyword("row_any") { + return nil, fmt.Errorf("unexpected func; got %q; want 'row_any'", lex.token) + } + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_any' args: %w", err) + } + + if slices.Contains(fields, "*") { + fields = nil + } + + sa := &statsRowAny{ + fields: fields, + } + return sa, nil +} diff --git a/lib/logstorage/stats_row_any_test.go b/lib/logstorage/stats_row_any_test.go new file mode 100644 index 000000000..0312ce276 --- /dev/null +++ b/lib/logstorage/stats_row_any_test.go @@ -0,0 +1,182 @@ +package logstorage + +import ( + "testing" +) + +func TestParseStatsRowAnySuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncSuccess(t, pipeStr) + } + + f(`row_any(*)`) + f(`row_any(foo)`) + f(`row_any(foo, bar)`) +} + +func TestParseStatsRowAnyFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncFailure(t, pipeStr) + } + + f(`row_any`) + f(`row_any(x) bar`) +} + +func TestStatsRowAny(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("row_any()", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + }, [][]Field{ + { + {"row_any(*)", `{"_msg":"abc","a":"2","b":"3"}`}, + }, + }) + + f("stats row_any(a) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + }, [][]Field{ + { + {"x", `{"a":"2"}`}, + }, + }) + + f("stats row_any(a, x, b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + }, [][]Field{ + { + {"x", `{"a":"2","x":"","b":"3"}`}, + }, + }) + + f("stats row_any(a) if (b:'') as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + }, [][]Field{ + { + {"x", `{"a":"1"}`}, + }, + }) + + f("stats by (b) row_any(a) if (b:*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", `3`}, + {"c", `54`}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"x", `{"a":"2"}`}, + }, + { + {"b", ""}, + {"x", `{}`}, + }, + }) + + f("stats by (a) row_any(b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"a", `3`}, + {"b", `5`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"b":"3"}`}, + }, + { + {"a", "3"}, + {"x", `{"b":"5"}`}, + }, + }) + + f("stats by (a) row_any(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"a", `3`}, + {"c", `foo`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"c":""}`}, + }, + { + {"a", "3"}, + {"x", `{"c":"foo"}`}, + }, + }) + + f("stats by (a, b) row_any(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "foo"}, + }, + { + {"a", `3`}, + {"b", `5`}, + {"c", "4"}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"b", "3"}, + {"x", `{"c":""}`}, + }, + { + {"a", "1"}, + {"b", ""}, + {"x", `{"c":"foo"}`}, + }, + { + {"a", "3"}, + {"b", "5"}, + {"x", `{"c":"4"}`}, + }, + }) +} diff --git a/lib/logstorage/stats_fields_max.go b/lib/logstorage/stats_row_max.go similarity index 77% rename from lib/logstorage/stats_fields_max.go rename to lib/logstorage/stats_row_max.go index 599cf48b0..8f53f0f65 100644 --- a/lib/logstorage/stats_fields_max.go +++ b/lib/logstorage/stats_row_max.go @@ -11,14 +11,14 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -type statsFieldsMax struct { +type statsRowMax struct { srcField string fetchFields []string } -func (sm *statsFieldsMax) String() string { - s := "fields_max(" + quoteTokenIfNeeded(sm.srcField) +func (sm *statsRowMax) String() string { + s := "row_max(" + quoteTokenIfNeeded(sm.srcField) if len(sm.fetchFields) > 0 { s += ", " + fieldNamesString(sm.fetchFields) } @@ -26,7 +26,7 @@ func (sm *statsFieldsMax) String() string { return s } -func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) { +func (sm *statsRowMax) updateNeededFields(neededFields fieldsSet) { if len(sm.fetchFields) == 0 { neededFields.add("*") } else { @@ -35,22 +35,22 @@ func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) { neededFields.add(sm.srcField) } -func (sm *statsFieldsMax) newStatsProcessor() (statsProcessor, int) { - smp := &statsFieldsMaxProcessor{ +func (sm *statsRowMax) newStatsProcessor() (statsProcessor, int) { + smp := &statsRowMaxProcessor{ sm: sm, } return smp, int(unsafe.Sizeof(*smp)) } -type statsFieldsMaxProcessor struct { - sm *statsFieldsMax +type statsRowMaxProcessor struct { + sm *statsRowMax max string fields []Field } -func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int { +func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease := 0 c := br.getColumnByName(smp.sm.srcField) @@ -114,7 +114,7 @@ func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } -func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { +func (smp *statsRowMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { stateSizeIncrease := 0 c := br.getColumnByName(smp.sm.srcField) @@ -138,27 +138,27 @@ func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx in return stateSizeIncrease } -func (smp *statsFieldsMaxProcessor) mergeState(sfp statsProcessor) { - src := sfp.(*statsFieldsMaxProcessor) +func (smp *statsRowMaxProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsRowMaxProcessor) if smp.needUpdateStateString(src.max) { smp.max = src.max smp.fields = src.fields } } -func (smp *statsFieldsMaxProcessor) needUpdateStateBytes(b []byte) bool { +func (smp *statsRowMaxProcessor) needUpdateStateBytes(b []byte) bool { v := bytesutil.ToUnsafeString(b) return smp.needUpdateStateString(v) } -func (smp *statsFieldsMaxProcessor) needUpdateStateString(v string) bool { +func (smp *statsRowMaxProcessor) needUpdateStateString(v string) bool { if v == "" { return false } return smp.max == "" || lessString(smp.max, v) } -func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int { +func (smp *statsRowMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int { stateSizeIncrease := 0 if !smp.needUpdateStateString(v) { @@ -204,7 +204,7 @@ func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowId return stateSizeIncrease } -func (smp *statsFieldsMaxProcessor) finalizeStats() string { +func (smp *statsRowMaxProcessor) finalizeStats() string { bb := bbPool.Get() bb.B = marshalFieldsToJSON(bb.B, smp.fields) result := string(bb.B) @@ -213,18 +213,18 @@ func (smp *statsFieldsMaxProcessor) finalizeStats() string { return result } -func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) { - if !lex.isKeyword("fields_max") { - return nil, fmt.Errorf("unexpected func; got %q; want 'fields_max'", lex.token) +func parseStatsRowMax(lex *lexer) (*statsRowMax, error) { + if !lex.isKeyword("row_max") { + return nil, fmt.Errorf("unexpected func; got %q; want 'row_max'", lex.token) } lex.nextToken() fields, err := parseFieldNamesInParens(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'fields_max' args: %w", err) + return nil, fmt.Errorf("cannot parse 'row_max' args: %w", err) } if len(fields) == 0 { - return nil, fmt.Errorf("missing first arg for 'fields_max' func - source field") + return nil, fmt.Errorf("missing first arg for 'row_max' func - source field") } srcField := fields[0] @@ -233,7 +233,7 @@ func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) { fetchFields = nil } - sm := &statsFieldsMax{ + sm := &statsRowMax{ srcField: srcField, fetchFields: fetchFields, } diff --git a/lib/logstorage/stats_fields_max_test.go b/lib/logstorage/stats_row_max_test.go similarity index 79% rename from lib/logstorage/stats_fields_max_test.go rename to lib/logstorage/stats_row_max_test.go index de4e6e5e5..39fd82564 100644 --- a/lib/logstorage/stats_fields_max_test.go +++ b/lib/logstorage/stats_row_max_test.go @@ -4,35 +4,35 @@ import ( "testing" ) -func TestParseStatsFieldsMaxSuccess(t *testing.T) { +func TestParseStatsRowMaxSuccess(t *testing.T) { f := func(pipeStr string) { t.Helper() expectParseStatsFuncSuccess(t, pipeStr) } - f(`fields_max(foo)`) - f(`fields_max(foo, bar)`) - f(`fields_max(foo, bar, baz)`) + f(`row_max(foo)`) + f(`row_max(foo, bar)`) + f(`row_max(foo, bar, baz)`) } -func TestParseStatsFieldsMaxFailure(t *testing.T) { +func TestParseStatsRowMaxFailure(t *testing.T) { f := func(pipeStr string) { t.Helper() expectParseStatsFuncFailure(t, pipeStr) } - f(`fields_max`) - f(`fields_max()`) - f(`fields_max(x) bar`) + f(`row_max`) + f(`row_max()`) + f(`row_max(x) bar`) } -func TestStatsFieldsMax(t *testing.T) { +func TestStatsRowMax(t *testing.T) { f := func(pipeStr string, rows, rowsExpected [][]Field) { t.Helper() expectPipeResults(t, pipeStr, rows, rowsExpected) } - f("stats fields_max(a) as x", [][]Field{ + f("stats row_max(a) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -52,7 +52,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats fields_max(foo) as x", [][]Field{ + f("stats row_max(foo) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -72,7 +72,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats fields_max(b, a) as x", [][]Field{ + f("stats row_max(b, a) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -93,7 +93,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats fields_max(b, a, x, b) as x", [][]Field{ + f("stats row_max(b, a, x, b) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -114,7 +114,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats fields_max(a) if (b:*) as x", [][]Field{ + f("stats row_max(a) if (b:*) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -134,7 +134,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats by (b) fields_max(a) if (b:*) as x", [][]Field{ + f("stats by (b) row_max(a) if (b:*) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -160,7 +160,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats by (a) fields_max(b) as x", [][]Field{ + f("stats by (a) row_max(b) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, @@ -189,7 +189,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats by (a) fields_max(c) as x", [][]Field{ + f("stats by (a) row_max(c) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, @@ -218,7 +218,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats by (a) fields_max(b, c) as x", [][]Field{ + f("stats by (a) row_max(b, c) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, @@ -250,7 +250,7 @@ func TestStatsFieldsMax(t *testing.T) { }, }) - f("stats by (a, b) fields_max(c) as x", [][]Field{ + f("stats by (a, b) row_max(c) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, diff --git a/lib/logstorage/stats_fields_min.go b/lib/logstorage/stats_row_min.go similarity index 77% rename from lib/logstorage/stats_fields_min.go rename to lib/logstorage/stats_row_min.go index e57b466e8..9aa69681a 100644 --- a/lib/logstorage/stats_fields_min.go +++ b/lib/logstorage/stats_row_min.go @@ -11,14 +11,14 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -type statsFieldsMin struct { +type statsRowMin struct { srcField string fetchFields []string } -func (sm *statsFieldsMin) String() string { - s := "fields_min(" + quoteTokenIfNeeded(sm.srcField) +func (sm *statsRowMin) String() string { + s := "row_min(" + quoteTokenIfNeeded(sm.srcField) if len(sm.fetchFields) > 0 { s += ", " + fieldNamesString(sm.fetchFields) } @@ -26,7 +26,7 @@ func (sm *statsFieldsMin) String() string { return s } -func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) { +func (sm *statsRowMin) updateNeededFields(neededFields fieldsSet) { if len(sm.fetchFields) == 0 { neededFields.add("*") } else { @@ -35,22 +35,22 @@ func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) { neededFields.add(sm.srcField) } -func (sm *statsFieldsMin) newStatsProcessor() (statsProcessor, int) { - smp := &statsFieldsMinProcessor{ +func (sm *statsRowMin) newStatsProcessor() (statsProcessor, int) { + smp := &statsRowMinProcessor{ sm: sm, } return smp, int(unsafe.Sizeof(*smp)) } -type statsFieldsMinProcessor struct { - sm *statsFieldsMin +type statsRowMinProcessor struct { + sm *statsRowMin min string fields []Field } -func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int { +func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease := 0 c := br.getColumnByName(smp.sm.srcField) @@ -114,7 +114,7 @@ func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int { return stateSizeIncrease } -func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { +func (smp *statsRowMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { stateSizeIncrease := 0 c := br.getColumnByName(smp.sm.srcField) @@ -138,27 +138,27 @@ func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx in return stateSizeIncrease } -func (smp *statsFieldsMinProcessor) mergeState(sfp statsProcessor) { - src := sfp.(*statsFieldsMinProcessor) +func (smp *statsRowMinProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsRowMinProcessor) if smp.needUpdateStateString(src.min) { smp.min = src.min smp.fields = src.fields } } -func (smp *statsFieldsMinProcessor) needUpdateStateBytes(b []byte) bool { +func (smp *statsRowMinProcessor) needUpdateStateBytes(b []byte) bool { v := bytesutil.ToUnsafeString(b) return smp.needUpdateStateString(v) } -func (smp *statsFieldsMinProcessor) needUpdateStateString(v string) bool { +func (smp *statsRowMinProcessor) needUpdateStateString(v string) bool { if v == "" { return false } return smp.min == "" || lessString(v, smp.min) } -func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowIdx int) int { +func (smp *statsRowMinProcessor) updateState(v string, br *blockResult, rowIdx int) int { stateSizeIncrease := 0 if !smp.needUpdateStateString(v) { @@ -204,7 +204,7 @@ func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowId return stateSizeIncrease } -func (smp *statsFieldsMinProcessor) finalizeStats() string { +func (smp *statsRowMinProcessor) finalizeStats() string { bb := bbPool.Get() bb.B = marshalFieldsToJSON(bb.B, smp.fields) result := string(bb.B) @@ -213,18 +213,18 @@ func (smp *statsFieldsMinProcessor) finalizeStats() string { return result } -func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) { - if !lex.isKeyword("fields_min") { - return nil, fmt.Errorf("unexpected func; got %q; want 'fields_min'", lex.token) +func parseStatsRowMin(lex *lexer) (*statsRowMin, error) { + if !lex.isKeyword("row_min") { + return nil, fmt.Errorf("unexpected func; got %q; want 'row_min'", lex.token) } lex.nextToken() fields, err := parseFieldNamesInParens(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'fields_min' args: %w", err) + return nil, fmt.Errorf("cannot parse 'row_min' args: %w", err) } if len(fields) == 0 { - return nil, fmt.Errorf("missing first arg for 'fields_min' func - source field") + return nil, fmt.Errorf("missing first arg for 'row_min' func - source field") } srcField := fields[0] @@ -233,7 +233,7 @@ func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) { fetchFields = nil } - sm := &statsFieldsMin{ + sm := &statsRowMin{ srcField: srcField, fetchFields: fetchFields, } diff --git a/lib/logstorage/stats_fields_min_test.go b/lib/logstorage/stats_row_min_test.go similarity index 79% rename from lib/logstorage/stats_fields_min_test.go rename to lib/logstorage/stats_row_min_test.go index dbbdb8d27..67225c7f6 100644 --- a/lib/logstorage/stats_fields_min_test.go +++ b/lib/logstorage/stats_row_min_test.go @@ -4,35 +4,35 @@ import ( "testing" ) -func TestParseStatsFieldsMinSuccess(t *testing.T) { +func TestParseStatsRowMinSuccess(t *testing.T) { f := func(pipeStr string) { t.Helper() expectParseStatsFuncSuccess(t, pipeStr) } - f(`fields_min(foo)`) - f(`fields_min(foo, bar)`) - f(`fields_min(foo, bar, baz)`) + f(`row_min(foo)`) + f(`row_min(foo, bar)`) + f(`row_min(foo, bar, baz)`) } -func TestParseStatsFieldsMinFailure(t *testing.T) { +func TestParseStatsRowMinFailure(t *testing.T) { f := func(pipeStr string) { t.Helper() expectParseStatsFuncFailure(t, pipeStr) } - f(`fields_min`) - f(`fields_min()`) - f(`fields_min(x) bar`) + f(`row_min`) + f(`row_min()`) + f(`row_min(x) bar`) } -func TestStatsFieldsMin(t *testing.T) { +func TestStatsRowMin(t *testing.T) { f := func(pipeStr string, rows, rowsExpected [][]Field) { t.Helper() expectPipeResults(t, pipeStr, rows, rowsExpected) } - f("stats fields_min(a) as x", [][]Field{ + f("stats row_min(a) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -52,7 +52,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats fields_min(foo) as x", [][]Field{ + f("stats row_min(foo) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -72,7 +72,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats fields_min(b, a) as x", [][]Field{ + f("stats row_min(b, a) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -93,7 +93,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats fields_min(b, a, x, b) as x", [][]Field{ + f("stats row_min(b, a, x, b) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -114,7 +114,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats fields_min(a) if (b:*) as x", [][]Field{ + f("stats row_min(a) if (b:*) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -134,7 +134,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats by (b) fields_min(a) if (b:*) as x", [][]Field{ + f("stats by (b) row_min(a) if (b:*) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -160,7 +160,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats by (a) fields_min(b) as x", [][]Field{ + f("stats by (a) row_min(b) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, @@ -189,7 +189,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats by (a) fields_min(c) as x", [][]Field{ + f("stats by (a) row_min(c) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, @@ -218,7 +218,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats by (a) fields_min(b, c) as x", [][]Field{ + f("stats by (a) row_min(b, c) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, @@ -249,7 +249,7 @@ func TestStatsFieldsMin(t *testing.T) { }, }) - f("stats by (a, b) fields_min(c) as x", [][]Field{ + f("stats by (a, b) row_min(c) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`},