diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 13b5209a8..a2bb0fe10 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`row_any`](https://docs.victoriametrics.com/victorialogs/logsql/#row_any-stats) function for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). This function returns a sample log entry per every calculated [group of results](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields). * FEATURE: add `default` operator to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). It allows setting `NaN` result to the given default value. * FEATURE: allow omitting result name in [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) expresions. In this case the result name is automatically set to string representation of the corresponding math expression. For example, `_time:5m | math duration / 1000` is equivalent to `_time:5m | math (duration / 1000) as "duration / 1000"`. * FEATURE: allow omitting result name in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). In this case the result name is automatically set to string representation of the corresponding [stats function expression](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). For example, `_time:5m | count(*)` is valid [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) now. It is equivalent to `_time:5m | stats count(*) as "count(*)"`. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 8d970f41a..d71e6479b 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1969,6 +1969,12 @@ The `by` keyword can be skipped in `stats ...` pipe. For example, the following _time:5m | stats (host, path) count() logs_total, count_uniq(ip) ips_total ``` +See also: + +- [`row_min`](#row_min-stats) +- [`row_max`](#row_max-stats) +- [`row_any`](#row_any-stats) + #### Stats by time buckets The following syntax can be used for calculating stats grouped by time buckets: @@ -2299,12 +2305,13 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe): - [`count`](#count-stats) returns the number of log entries. - [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -- [`row_max`](#row_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field. -- [`row_min`](#row_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field. - [`max`](#max-stats) returns the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) returns the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`quantile`](#quantile-stats) returns the given quantile for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`row_any`](#row_any-stats) returns a sample [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) per each selected [stats group](#stats-by-fields). +- [`row_max`](#row_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field. +- [`row_min`](#row_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field. - [`sum`](#sum-stats) returns the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum_len`](#sum_len-stats) returns the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -2412,59 +2419,6 @@ See also: - [`uniq_values`](#uniq_values-stats) - [`count`](#count-stats) -### row_max stats - -`row_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. - -For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -across logs for the last 5 minutes: - -```logsql -_time:5m | stats row_max(duration) as log_with_max_duration -``` - -Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. - -If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`. -For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes: - -```logsql -_time:5m | stats row_max(duration, _time, path, duration) as time_and_ip_with_max_duration -``` - -See also: - -- [`max`](#max-stats) -- [`row_min`](#row_min-stats) - - -### row_min stats - -`row_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. - -For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -across logs for the last 5 minutes: - -```logsql -_time:5m | stats row_min(duration) as log_with_min_duration -``` - -Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. - -If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`. -For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes: - -```logsql -_time:5m | stats row_min(duration, _time, path, duration) as time_and_ip_with_min_duration -``` - -See also: - -- [`min`](#min-stats) -- [`row_max`](#row_max-stats) - ### max stats `max(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) returns the maximum value across @@ -2547,6 +2501,86 @@ See also: - [`median`](#median-stats) - [`avg`](#avg-stats) +### row_any stats + +`row_any()` [stats pipe function](#stats-pipe-functions) returns arbitrary [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +(aka sample) per each selected [stats group](#stats-by-fields). Log entry is returned as JSON-encoded dictionary with all the fields from the original log. + +For example, the following query returns a sample log entry per each [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) +across logs for the last 5 minutes: + +```logsql +_time:5m | stats by (_stream) row_any() as sample_row +``` + +Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. + +If only the specific fields are needed, then they can be enumerated inside `row_any(...)`. +For example, the following query returns only `_time`, `path` and `duration` fields from a sample log entry for logs over the last 5 minutes: + +```logsql +_time:5m | stats row_any(_time, path) as time_and_path_sample +``` + +See also: + +- [`row_max`](#row_max-stats) +- [`row_min`](#row_min-stats) + +### row_max stats + +`row_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. + +For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +across logs for the last 5 minutes: + +```logsql +_time:5m | stats row_max(duration) as log_with_max_duration +``` + +Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. + +If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`. +For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes: + +```logsql +_time:5m | stats row_max(duration, _time, path, duration) as time_and_path_with_max_duration +``` + +See also: + +- [`max`](#max-stats) +- [`row_min`](#row_min-stats) +- [`row_any`](#row_any-stats) + +### row_min stats + +`row_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. + +For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +across logs for the last 5 minutes: + +```logsql +_time:5m | stats row_min(duration) as log_with_min_duration +``` + +Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. + +If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`. +For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes: + +```logsql +_time:5m | stats row_min(duration, _time, path, duration) as time_and_path_with_min_duration +``` + +See also: + +- [`min`](#min-stats) +- [`row_max`](#row_max-stats) +- [`row_any`](#row_any-stats) + ### sum stats `sum(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) calculates the sum of numeric values across diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 7f9248d8f..b60c51ac5 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -631,18 +631,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err) } return sus, nil - case lex.isKeyword("row_max"): - sms, err := parseStatsRowMax(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err) - } - return sms, nil - case lex.isKeyword("row_min"): - sms, err := parseStatsRowMin(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err) - } - return sms, nil case lex.isKeyword("max"): sms, err := parseStatsMax(lex) if err != nil { @@ -667,6 +655,24 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err) } return sqs, nil + case lex.isKeyword("row_any"): + sas, err := parseStatsRowAny(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_any' func: %w", err) + } + return sas, nil + case lex.isKeyword("row_max"): + sms, err := parseStatsRowMax(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err) + } + return sms, nil + case lex.isKeyword("row_min"): + sms, err := parseStatsRowMin(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err) + } + return sms, nil case lex.isKeyword("sum"): sss, err := parseStatsSum(lex) if err != nil { diff --git a/lib/logstorage/stats_row_any.go b/lib/logstorage/stats_row_any.go new file mode 100644 index 000000000..6707040cc --- /dev/null +++ b/lib/logstorage/stats_row_any.go @@ -0,0 +1,127 @@ +package logstorage + +import ( + "fmt" + "slices" + "strings" + "unsafe" +) + +type statsRowAny struct { + fields []string +} + +func (sa *statsRowAny) String() string { + return "row_any(" + statsFuncFieldsToString(sa.fields) + ")" +} + +func (sa *statsRowAny) updateNeededFields(neededFields fieldsSet) { + if len(sa.fields) == 0 { + neededFields.add("*") + } else { + neededFields.addFields(sa.fields) + } +} + +func (sa *statsRowAny) newStatsProcessor() (statsProcessor, int) { + sap := &statsRowAnyProcessor{ + sa: sa, + } + return sap, int(unsafe.Sizeof(*sap)) +} + +type statsRowAnyProcessor struct { + sa *statsRowAny + + captured bool + + fields []Field +} + +func (sap *statsRowAnyProcessor) updateStatsForAllRows(br *blockResult) int { + if len(br.timestamps) == 0 { + return 0 + } + if sap.captured { + return 0 + } + sap.captured = true + + return sap.updateState(br, 0) +} + +func (sap *statsRowAnyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if sap.captured { + return 0 + } + sap.captured = true + + return sap.updateState(br, rowIdx) +} + +func (sap *statsRowAnyProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsRowAnyProcessor) + if !sap.captured { + sap.captured = src.captured + sap.fields = src.fields + } +} + +func (sap *statsRowAnyProcessor) updateState(br *blockResult, rowIdx int) int { + stateSizeIncrease := 0 + fields := sap.fields + fetchFields := sap.sa.fields + if len(fetchFields) == 0 { + cs := br.getColumns() + for _, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } else { + for _, field := range fetchFields { + c := br.getColumnByName(field) + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } + sap.fields = fields + + return stateSizeIncrease +} + +func (sap *statsRowAnyProcessor) finalizeStats() string { + bb := bbPool.Get() + bb.B = marshalFieldsToJSON(bb.B, sap.fields) + result := string(bb.B) + bbPool.Put(bb) + + return result +} + +func parseStatsRowAny(lex *lexer) (*statsRowAny, error) { + if !lex.isKeyword("row_any") { + return nil, fmt.Errorf("unexpected func; got %q; want 'row_any'", lex.token) + } + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'row_any' args: %w", err) + } + + if slices.Contains(fields, "*") { + fields = nil + } + + sa := &statsRowAny{ + fields: fields, + } + return sa, nil +} diff --git a/lib/logstorage/stats_row_any_test.go b/lib/logstorage/stats_row_any_test.go new file mode 100644 index 000000000..0312ce276 --- /dev/null +++ b/lib/logstorage/stats_row_any_test.go @@ -0,0 +1,182 @@ +package logstorage + +import ( + "testing" +) + +func TestParseStatsRowAnySuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncSuccess(t, pipeStr) + } + + f(`row_any(*)`) + f(`row_any(foo)`) + f(`row_any(foo, bar)`) +} + +func TestParseStatsRowAnyFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncFailure(t, pipeStr) + } + + f(`row_any`) + f(`row_any(x) bar`) +} + +func TestStatsRowAny(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("row_any()", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + }, [][]Field{ + { + {"row_any(*)", `{"_msg":"abc","a":"2","b":"3"}`}, + }, + }) + + f("stats row_any(a) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + }, [][]Field{ + { + {"x", `{"a":"2"}`}, + }, + }) + + f("stats row_any(a, x, b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + }, [][]Field{ + { + {"x", `{"a":"2","x":"","b":"3"}`}, + }, + }) + + f("stats row_any(a) if (b:'') as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + }, [][]Field{ + { + {"x", `{"a":"1"}`}, + }, + }) + + f("stats by (b) row_any(a) if (b:*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", `3`}, + {"c", `54`}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"x", `{"a":"2"}`}, + }, + { + {"b", ""}, + {"x", `{}`}, + }, + }) + + f("stats by (a) row_any(b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"a", `3`}, + {"b", `5`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"b":"3"}`}, + }, + { + {"a", "3"}, + {"x", `{"b":"5"}`}, + }, + }) + + f("stats by (a) row_any(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"a", `3`}, + {"c", `foo`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"c":""}`}, + }, + { + {"a", "3"}, + {"x", `{"c":"foo"}`}, + }, + }) + + f("stats by (a, b) row_any(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "foo"}, + }, + { + {"a", `3`}, + {"b", `5`}, + {"c", "4"}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"b", "3"}, + {"x", `{"c":""}`}, + }, + { + {"a", "1"}, + {"b", ""}, + {"x", `{"c":"foo"}`}, + }, + { + {"a", "3"}, + {"b", "5"}, + {"x", `{"c":"4"}`}, + }, + }) +}