This commit is contained in:
Aliaksandr Valialkin 2024-05-30 16:03:00 +02:00
parent d2fa0f0c51
commit bbb2ba0214
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 417 additions and 67 deletions

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add [`row_any`](https://docs.victoriametrics.com/victorialogs/logsql/#row_any-stats) function for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). This function returns a sample log entry per every calculated [group of results](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields).
* FEATURE: add `default` operator to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). It allows setting `NaN` result to the given default value.
* FEATURE: allow omitting result name in [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) expresions. In this case the result name is automatically set to string representation of the corresponding math expression. For example, `_time:5m | math duration / 1000` is equivalent to `_time:5m | math (duration / 1000) as "duration / 1000"`.
* FEATURE: allow omitting result name in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). In this case the result name is automatically set to string representation of the corresponding [stats function expression](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). For example, `_time:5m | count(*)` is valid [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) now. It is equivalent to `_time:5m | stats count(*) as "count(*)"`.

View file

@ -1969,6 +1969,12 @@ The `by` keyword can be skipped in `stats ...` pipe. For example, the following
_time:5m | stats (host, path) count() logs_total, count_uniq(ip) ips_total
```
See also:
- [`row_min`](#row_min-stats)
- [`row_max`](#row_max-stats)
- [`row_any`](#row_any-stats)
#### Stats by time buckets
The following syntax can be used for calculating stats grouped by time buckets:
@ -2299,12 +2305,13 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
- [`count`](#count-stats) returns the number of log entries.
- [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`row_max`](#row_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field.
- [`row_min`](#row_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field.
- [`max`](#max-stats) returns the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`min`](#min-stats) returns the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`quantile`](#quantile-stats) returns the given quantile for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`row_any`](#row_any-stats) returns a sample [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) per each selected [stats group](#stats-by-fields).
- [`row_max`](#row_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field.
- [`row_min`](#row_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field.
- [`sum`](#sum-stats) returns the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`sum_len`](#sum_len-stats) returns the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -2412,59 +2419,6 @@ See also:
- [`uniq_values`](#uniq_values-stats)
- [`count`](#count-stats)
### row_max stats
`row_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats row_max(duration) as log_with_max_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes:
```logsql
_time:5m | stats row_max(duration, _time, path, duration) as time_and_ip_with_max_duration
```
See also:
- [`max`](#max-stats)
- [`row_min`](#row_min-stats)
### row_min stats
`row_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats row_min(duration) as log_with_min_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes:
```logsql
_time:5m | stats row_min(duration, _time, path, duration) as time_and_ip_with_min_duration
```
See also:
- [`min`](#min-stats)
- [`row_max`](#row_max-stats)
### max stats
`max(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) returns the maximum value across
@ -2547,6 +2501,86 @@ See also:
- [`median`](#median-stats)
- [`avg`](#avg-stats)
### row_any stats
`row_any()` [stats pipe function](#stats-pipe-functions) returns arbitrary [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
(aka sample) per each selected [stats group](#stats-by-fields). Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns a sample log entry per each [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
across logs for the last 5 minutes:
```logsql
_time:5m | stats by (_stream) row_any() as sample_row
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed, then they can be enumerated inside `row_any(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from a sample log entry for logs over the last 5 minutes:
```logsql
_time:5m | stats row_any(_time, path) as time_and_path_sample
```
See also:
- [`row_max`](#row_max-stats)
- [`row_min`](#row_min-stats)
### row_max stats
`row_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats row_max(duration) as log_with_max_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes:
```logsql
_time:5m | stats row_max(duration, _time, path, duration) as time_and_path_with_max_duration
```
See also:
- [`max`](#max-stats)
- [`row_min`](#row_min-stats)
- [`row_any`](#row_any-stats)
### row_min stats
`row_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats row_min(duration) as log_with_min_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes:
```logsql
_time:5m | stats row_min(duration, _time, path, duration) as time_and_path_with_min_duration
```
See also:
- [`min`](#min-stats)
- [`row_max`](#row_max-stats)
- [`row_any`](#row_any-stats)
### sum stats
`sum(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) calculates the sum of numeric values across

View file

@ -631,18 +631,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err)
}
return sus, nil
case lex.isKeyword("row_max"):
sms, err := parseStatsRowMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err)
}
return sms, nil
case lex.isKeyword("row_min"):
sms, err := parseStatsRowMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err)
}
return sms, nil
case lex.isKeyword("max"):
sms, err := parseStatsMax(lex)
if err != nil {
@ -667,6 +655,24 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err)
}
return sqs, nil
case lex.isKeyword("row_any"):
sas, err := parseStatsRowAny(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_any' func: %w", err)
}
return sas, nil
case lex.isKeyword("row_max"):
sms, err := parseStatsRowMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err)
}
return sms, nil
case lex.isKeyword("row_min"):
sms, err := parseStatsRowMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err)
}
return sms, nil
case lex.isKeyword("sum"):
sss, err := parseStatsSum(lex)
if err != nil {

View file

@ -0,0 +1,127 @@
package logstorage
import (
"fmt"
"slices"
"strings"
"unsafe"
)
type statsRowAny struct {
fields []string
}
func (sa *statsRowAny) String() string {
return "row_any(" + statsFuncFieldsToString(sa.fields) + ")"
}
func (sa *statsRowAny) updateNeededFields(neededFields fieldsSet) {
if len(sa.fields) == 0 {
neededFields.add("*")
} else {
neededFields.addFields(sa.fields)
}
}
func (sa *statsRowAny) newStatsProcessor() (statsProcessor, int) {
sap := &statsRowAnyProcessor{
sa: sa,
}
return sap, int(unsafe.Sizeof(*sap))
}
type statsRowAnyProcessor struct {
sa *statsRowAny
captured bool
fields []Field
}
func (sap *statsRowAnyProcessor) updateStatsForAllRows(br *blockResult) int {
if len(br.timestamps) == 0 {
return 0
}
if sap.captured {
return 0
}
sap.captured = true
return sap.updateState(br, 0)
}
func (sap *statsRowAnyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if sap.captured {
return 0
}
sap.captured = true
return sap.updateState(br, rowIdx)
}
func (sap *statsRowAnyProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsRowAnyProcessor)
if !sap.captured {
sap.captured = src.captured
sap.fields = src.fields
}
}
func (sap *statsRowAnyProcessor) updateState(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
fields := sap.fields
fetchFields := sap.sa.fields
if len(fetchFields) == 0 {
cs := br.getColumns()
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
})
stateSizeIncrease += len(c.name) + len(v)
}
} else {
for _, field := range fetchFields {
c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
})
stateSizeIncrease += len(c.name) + len(v)
}
}
sap.fields = fields
return stateSizeIncrease
}
func (sap *statsRowAnyProcessor) finalizeStats() string {
bb := bbPool.Get()
bb.B = marshalFieldsToJSON(bb.B, sap.fields)
result := string(bb.B)
bbPool.Put(bb)
return result
}
func parseStatsRowAny(lex *lexer) (*statsRowAny, error) {
if !lex.isKeyword("row_any") {
return nil, fmt.Errorf("unexpected func; got %q; want 'row_any'", lex.token)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_any' args: %w", err)
}
if slices.Contains(fields, "*") {
fields = nil
}
sa := &statsRowAny{
fields: fields,
}
return sa, nil
}

View file

@ -0,0 +1,182 @@
package logstorage
import (
"testing"
)
func TestParseStatsRowAnySuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncSuccess(t, pipeStr)
}
f(`row_any(*)`)
f(`row_any(foo)`)
f(`row_any(foo, bar)`)
}
func TestParseStatsRowAnyFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncFailure(t, pipeStr)
}
f(`row_any`)
f(`row_any(x) bar`)
}
func TestStatsRowAny(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("row_any()", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
}, [][]Field{
{
{"row_any(*)", `{"_msg":"abc","a":"2","b":"3"}`},
},
})
f("stats row_any(a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
}, [][]Field{
{
{"x", `{"a":"2"}`},
},
})
f("stats row_any(a, x, b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
}, [][]Field{
{
{"x", `{"a":"2","x":"","b":"3"}`},
},
})
f("stats row_any(a) if (b:'') as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
}, [][]Field{
{
{"x", `{"a":"1"}`},
},
})
f("stats by (b) row_any(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"a", `3`},
{"c", `54`},
},
}, [][]Field{
{
{"b", "3"},
{"x", `{"a":"2"}`},
},
{
{"b", ""},
{"x", `{}`},
},
})
f("stats by (a) row_any(b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"a", `3`},
{"b", `5`},
},
}, [][]Field{
{
{"a", "1"},
{"x", `{"b":"3"}`},
},
{
{"a", "3"},
{"x", `{"b":"5"}`},
},
})
f("stats by (a) row_any(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"a", `3`},
{"c", `foo`},
},
}, [][]Field{
{
{"a", "1"},
{"x", `{"c":""}`},
},
{
{"a", "3"},
{"x", `{"c":"foo"}`},
},
})
f("stats by (a, b) row_any(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"c", "foo"},
},
{
{"a", `3`},
{"b", `5`},
{"c", "4"},
},
}, [][]Field{
{
{"a", "1"},
{"b", "3"},
{"x", `{"c":""}`},
},
{
{"a", "1"},
{"b", ""},
{"x", `{"c":"foo"}`},
},
{
{"a", "3"},
{"b", "5"},
{"x", `{"c":"4"}`},
},
})
}