This commit is contained in:
Aliaksandr Valialkin 2024-05-30 15:16:34 +02:00
parent c75c8d7953
commit fb9018ddaa
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 118 additions and 118 deletions

View file

@ -109,7 +109,7 @@ Released at 2024-05-22
* FEATURE: add ability to extract fields with [`extract` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract). * FEATURE: add ability to extract fields with [`extract` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract).
* FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json). * FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json).
* FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt). * FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt).
* FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field. * FEATURE: add [`row_min`](https://docs.victoriametrics.com/victorialogs/logsql/#row_min-stats) and [`row_max`](https://docs.victoriametrics.com/victorialogs/logsql/#row_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field.
* FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details. * FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details.
* FEATURE: add `/select/logsql/stream_field_names` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-names) for details. * FEATURE: add `/select/logsql/stream_field_names` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-names) for details.
* FEATURE: add `/select/logsql/stream_field_values` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values for the given label from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values) for details. * FEATURE: add `/select/logsql/stream_field_values` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values for the given label from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values) for details.

View file

@ -2299,8 +2299,8 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
- [`count`](#count-stats) returns the number of log entries. - [`count`](#count-stats) returns the number of log entries.
- [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`fields_max`](#fields_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field. - [`row_max`](#row_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field.
- [`fields_min`](#fields_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field. - [`row_min`](#row_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field.
- [`max`](#max-stats) returns the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`max`](#max-stats) returns the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`min`](#min-stats) returns the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) returns the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -2412,58 +2412,58 @@ See also:
- [`uniq_values`](#uniq_values-stats) - [`uniq_values`](#uniq_values-stats)
- [`count`](#count-stats) - [`count`](#count-stats)
### fields_max stats ### row_max stats
`fields_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) `row_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes: across logs for the last 5 minutes:
```logsql ```logsql
_time:5m | stats fields_max(duration) as log_with_max_duration _time:5m | stats row_max(duration) as log_with_max_duration
``` ```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `fields_max(...)`. If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes: For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes:
```logsql ```logsql
_time:5m | stats fields_max(duration, _time, path, duration) as time_and_ip_with_max_duration _time:5m | stats row_max(duration, _time, path, duration) as time_and_ip_with_max_duration
``` ```
See also: See also:
- [`max`](#max-stats) - [`max`](#max-stats)
- [`fields_min`](#fields_min-stats) - [`row_min`](#row_min-stats)
### fields_min stats ### row_min stats
`fields_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) `row_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log. with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes: across logs for the last 5 minutes:
```logsql ```logsql
_time:5m | stats fields_min(duration) as log_with_min_duration _time:5m | stats row_min(duration) as log_with_min_duration
``` ```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes. Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `fields_max(...)`. If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes: For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes:
```logsql ```logsql
_time:5m | stats fields_min(duration, _time, path, duration) as time_and_ip_with_min_duration _time:5m | stats row_min(duration, _time, path, duration) as time_and_ip_with_min_duration
``` ```
See also: See also:
- [`min`](#min-stats) - [`min`](#min-stats)
- [`fields_max`](#fields_max-stats) - [`row_max`](#row_max-stats)
### max stats ### max stats
@ -2477,11 +2477,11 @@ over logs for the last 5 minutes:
_time:5m | stats max(duration) max_duration _time:5m | stats max(duration) max_duration
``` ```
[`fields_max`](#fields_max-stats) function can be used for obtaining other fields with the maximum duration. [`row_max`](#row_max-stats) function can be used for obtaining other fields with the maximum duration.
See also: See also:
- [`fields_max`](#fields_max-stats) - [`row_max`](#row_max-stats)
- [`min`](#min-stats) - [`min`](#min-stats)
- [`quantile`](#quantile-stats) - [`quantile`](#quantile-stats)
- [`avg`](#avg-stats) - [`avg`](#avg-stats)
@ -2515,11 +2515,11 @@ over logs for the last 5 minutes:
_time:5m | stats min(duration) min_duration _time:5m | stats min(duration) min_duration
``` ```
[`fields_min`](#fields_min-stats) function can be used for obtaining other fields with the minimum duration. [`row_min`](#row_min-stats) function can be used for obtaining other fields with the minimum duration.
See also: See also:
- [`fields_min`](#fields_min-stats) - [`row_min`](#row_min-stats)
- [`max`](#max-stats) - [`max`](#max-stats)
- [`quantile`](#quantile-stats) - [`quantile`](#quantile-stats)
- [`avg`](#avg-stats) - [`avg`](#avg-stats)

View file

@ -940,10 +940,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats min(*) x`, `* | stats min(*) as x`) f(`* | stats min(*) x`, `* | stats min(*) as x`)
f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`) f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`)
// stats pipe fields_min // stats pipe row_min
f(`* | stats fields_Min(foo) bar`, `* | stats fields_min(foo) as bar`) f(`* | stats fields_Min(foo) bar`, `* | stats row_min(foo) as bar`)
f(`* | fields_Min(foo)`, `* | stats fields_min(foo) as "fields_min(foo)"`) f(`* | fields_Min(foo)`, `* | stats row_min(foo) as "row_min(foo)"`)
f(`* | stats BY(x, y, ) fields_MIN(foo,bar,) bar`, `* | stats by (x, y) fields_min(foo, bar) as bar`) f(`* | stats BY(x, y, ) fields_MIN(foo,bar,) bar`, `* | stats by (x, y) row_min(foo, bar) as bar`)
// stats pipe avg // stats pipe avg
f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`) f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`)
@ -1388,7 +1388,7 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats min`) f(`foo | stats min`)
// invalid stats min // invalid stats min
f(`foo | stats fields_min`) f(`foo | stats row_min`)
// invalid stats avg // invalid stats avg
f(`foo | stats avg`) f(`foo | stats avg`)
@ -1627,12 +1627,12 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | stats count_uniq() q`, `*`, ``) f(`* | stats count_uniq() q`, `*`, ``)
f(`* | stats count_uniq(*) q`, `*`, ``) f(`* | stats count_uniq(*) q`, `*`, ``)
f(`* | stats count_uniq(x) q`, `x`, ``) f(`* | stats count_uniq(x) q`, `x`, ``)
f(`* | stats fields_max(a) q`, `*`, ``) f(`* | stats row_max(a) q`, `*`, ``)
f(`* | stats fields_max(a, *) q`, `*`, ``) f(`* | stats row_max(a, *) q`, `*`, ``)
f(`* | stats fields_max(a, x) q`, `a,x`, ``) f(`* | stats row_max(a, x) q`, `a,x`, ``)
f(`* | stats fields_min(a) q`, `*`, ``) f(`* | stats row_min(a) q`, `*`, ``)
f(`* | stats fields_min(a, *) q`, `*`, ``) f(`* | stats row_min(a, *) q`, `*`, ``)
f(`* | stats fields_min(a, x) q`, `a,x`, ``) f(`* | stats row_min(a, x) q`, `a,x`, ``)
f(`* | stats min() q`, `*`, ``) f(`* | stats min() q`, `*`, ``)
f(`* | stats min(*) q`, `*`, ``) f(`* | stats min(*) q`, `*`, ``)
f(`* | stats min(x) q`, `x`, ``) f(`* | stats min(x) q`, `x`, ``)

View file

@ -631,16 +631,16 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err) return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err)
} }
return sus, nil return sus, nil
case lex.isKeyword("fields_max"): case lex.isKeyword("row_max"):
sms, err := parseStatsFieldsMax(lex) sms, err := parseStatsRowMax(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_max' func: %w", err) return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err)
} }
return sms, nil return sms, nil
case lex.isKeyword("fields_min"): case lex.isKeyword("row_min"):
sms, err := parseStatsFieldsMin(lex) sms, err := parseStatsRowMin(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_min' func: %w", err) return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err)
} }
return sms, nil return sms, nil
case lex.isKeyword("max"): case lex.isKeyword("max"):

View file

@ -11,14 +11,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
type statsFieldsMax struct { type statsRowMax struct {
srcField string srcField string
fetchFields []string fetchFields []string
} }
func (sm *statsFieldsMax) String() string { func (sm *statsRowMax) String() string {
s := "fields_max(" + quoteTokenIfNeeded(sm.srcField) s := "row_max(" + quoteTokenIfNeeded(sm.srcField)
if len(sm.fetchFields) > 0 { if len(sm.fetchFields) > 0 {
s += ", " + fieldNamesString(sm.fetchFields) s += ", " + fieldNamesString(sm.fetchFields)
} }
@ -26,7 +26,7 @@ func (sm *statsFieldsMax) String() string {
return s return s
} }
func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) { func (sm *statsRowMax) updateNeededFields(neededFields fieldsSet) {
if len(sm.fetchFields) == 0 { if len(sm.fetchFields) == 0 {
neededFields.add("*") neededFields.add("*")
} else { } else {
@ -35,22 +35,22 @@ func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) {
neededFields.add(sm.srcField) neededFields.add(sm.srcField)
} }
func (sm *statsFieldsMax) newStatsProcessor() (statsProcessor, int) { func (sm *statsRowMax) newStatsProcessor() (statsProcessor, int) {
smp := &statsFieldsMaxProcessor{ smp := &statsRowMaxProcessor{
sm: sm, sm: sm,
} }
return smp, int(unsafe.Sizeof(*smp)) return smp, int(unsafe.Sizeof(*smp))
} }
type statsFieldsMaxProcessor struct { type statsRowMaxProcessor struct {
sm *statsFieldsMax sm *statsRowMax
max string max string
fields []Field fields []Field
} }
func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField) c := br.getColumnByName(smp.sm.srcField)
@ -114,7 +114,7 @@ func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
return stateSizeIncrease return stateSizeIncrease
} }
func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (smp *statsRowMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField) c := br.getColumnByName(smp.sm.srcField)
@ -138,27 +138,27 @@ func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx in
return stateSizeIncrease return stateSizeIncrease
} }
func (smp *statsFieldsMaxProcessor) mergeState(sfp statsProcessor) { func (smp *statsRowMaxProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsFieldsMaxProcessor) src := sfp.(*statsRowMaxProcessor)
if smp.needUpdateStateString(src.max) { if smp.needUpdateStateString(src.max) {
smp.max = src.max smp.max = src.max
smp.fields = src.fields smp.fields = src.fields
} }
} }
func (smp *statsFieldsMaxProcessor) needUpdateStateBytes(b []byte) bool { func (smp *statsRowMaxProcessor) needUpdateStateBytes(b []byte) bool {
v := bytesutil.ToUnsafeString(b) v := bytesutil.ToUnsafeString(b)
return smp.needUpdateStateString(v) return smp.needUpdateStateString(v)
} }
func (smp *statsFieldsMaxProcessor) needUpdateStateString(v string) bool { func (smp *statsRowMaxProcessor) needUpdateStateString(v string) bool {
if v == "" { if v == "" {
return false return false
} }
return smp.max == "" || lessString(smp.max, v) return smp.max == "" || lessString(smp.max, v)
} }
func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int { func (smp *statsRowMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
if !smp.needUpdateStateString(v) { if !smp.needUpdateStateString(v) {
@ -204,7 +204,7 @@ func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowId
return stateSizeIncrease return stateSizeIncrease
} }
func (smp *statsFieldsMaxProcessor) finalizeStats() string { func (smp *statsRowMaxProcessor) finalizeStats() string {
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalFieldsToJSON(bb.B, smp.fields) bb.B = marshalFieldsToJSON(bb.B, smp.fields)
result := string(bb.B) result := string(bb.B)
@ -213,18 +213,18 @@ func (smp *statsFieldsMaxProcessor) finalizeStats() string {
return result return result
} }
func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) { func parseStatsRowMax(lex *lexer) (*statsRowMax, error) {
if !lex.isKeyword("fields_max") { if !lex.isKeyword("row_max") {
return nil, fmt.Errorf("unexpected func; got %q; want 'fields_max'", lex.token) return nil, fmt.Errorf("unexpected func; got %q; want 'row_max'", lex.token)
} }
lex.nextToken() lex.nextToken()
fields, err := parseFieldNamesInParens(lex) fields, err := parseFieldNamesInParens(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_max' args: %w", err) return nil, fmt.Errorf("cannot parse 'row_max' args: %w", err)
} }
if len(fields) == 0 { if len(fields) == 0 {
return nil, fmt.Errorf("missing first arg for 'fields_max' func - source field") return nil, fmt.Errorf("missing first arg for 'row_max' func - source field")
} }
srcField := fields[0] srcField := fields[0]
@ -233,7 +233,7 @@ func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) {
fetchFields = nil fetchFields = nil
} }
sm := &statsFieldsMax{ sm := &statsRowMax{
srcField: srcField, srcField: srcField,
fetchFields: fetchFields, fetchFields: fetchFields,
} }

View file

@ -4,35 +4,35 @@ import (
"testing" "testing"
) )
func TestParseStatsFieldsMaxSuccess(t *testing.T) { func TestParseStatsRowMaxSuccess(t *testing.T) {
f := func(pipeStr string) { f := func(pipeStr string) {
t.Helper() t.Helper()
expectParseStatsFuncSuccess(t, pipeStr) expectParseStatsFuncSuccess(t, pipeStr)
} }
f(`fields_max(foo)`) f(`row_max(foo)`)
f(`fields_max(foo, bar)`) f(`row_max(foo, bar)`)
f(`fields_max(foo, bar, baz)`) f(`row_max(foo, bar, baz)`)
} }
func TestParseStatsFieldsMaxFailure(t *testing.T) { func TestParseStatsRowMaxFailure(t *testing.T) {
f := func(pipeStr string) { f := func(pipeStr string) {
t.Helper() t.Helper()
expectParseStatsFuncFailure(t, pipeStr) expectParseStatsFuncFailure(t, pipeStr)
} }
f(`fields_max`) f(`row_max`)
f(`fields_max()`) f(`row_max()`)
f(`fields_max(x) bar`) f(`row_max(x) bar`)
} }
func TestStatsFieldsMax(t *testing.T) { func TestStatsRowMax(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) { f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper() t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected) expectPipeResults(t, pipeStr, rows, rowsExpected)
} }
f("stats fields_max(a) as x", [][]Field{ f("stats row_max(a) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -52,7 +52,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats fields_max(foo) as x", [][]Field{ f("stats row_max(foo) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -72,7 +72,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats fields_max(b, a) as x", [][]Field{ f("stats row_max(b, a) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -93,7 +93,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats fields_max(b, a, x, b) as x", [][]Field{ f("stats row_max(b, a, x, b) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -114,7 +114,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats fields_max(a) if (b:*) as x", [][]Field{ f("stats row_max(a) if (b:*) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -134,7 +134,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats by (b) fields_max(a) if (b:*) as x", [][]Field{ f("stats by (b) row_max(a) if (b:*) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -160,7 +160,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats by (a) fields_max(b) as x", [][]Field{ f("stats by (a) row_max(b) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},
@ -189,7 +189,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats by (a) fields_max(c) as x", [][]Field{ f("stats by (a) row_max(c) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},
@ -218,7 +218,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats by (a) fields_max(b, c) as x", [][]Field{ f("stats by (a) row_max(b, c) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},
@ -250,7 +250,7 @@ func TestStatsFieldsMax(t *testing.T) {
}, },
}) })
f("stats by (a, b) fields_max(c) as x", [][]Field{ f("stats by (a, b) row_max(c) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},

View file

@ -11,14 +11,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
type statsFieldsMin struct { type statsRowMin struct {
srcField string srcField string
fetchFields []string fetchFields []string
} }
func (sm *statsFieldsMin) String() string { func (sm *statsRowMin) String() string {
s := "fields_min(" + quoteTokenIfNeeded(sm.srcField) s := "row_min(" + quoteTokenIfNeeded(sm.srcField)
if len(sm.fetchFields) > 0 { if len(sm.fetchFields) > 0 {
s += ", " + fieldNamesString(sm.fetchFields) s += ", " + fieldNamesString(sm.fetchFields)
} }
@ -26,7 +26,7 @@ func (sm *statsFieldsMin) String() string {
return s return s
} }
func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) { func (sm *statsRowMin) updateNeededFields(neededFields fieldsSet) {
if len(sm.fetchFields) == 0 { if len(sm.fetchFields) == 0 {
neededFields.add("*") neededFields.add("*")
} else { } else {
@ -35,22 +35,22 @@ func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) {
neededFields.add(sm.srcField) neededFields.add(sm.srcField)
} }
func (sm *statsFieldsMin) newStatsProcessor() (statsProcessor, int) { func (sm *statsRowMin) newStatsProcessor() (statsProcessor, int) {
smp := &statsFieldsMinProcessor{ smp := &statsRowMinProcessor{
sm: sm, sm: sm,
} }
return smp, int(unsafe.Sizeof(*smp)) return smp, int(unsafe.Sizeof(*smp))
} }
type statsFieldsMinProcessor struct { type statsRowMinProcessor struct {
sm *statsFieldsMin sm *statsRowMin
min string min string
fields []Field fields []Field
} }
func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField) c := br.getColumnByName(smp.sm.srcField)
@ -114,7 +114,7 @@ func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int {
return stateSizeIncrease return stateSizeIncrease
} }
func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (smp *statsRowMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField) c := br.getColumnByName(smp.sm.srcField)
@ -138,27 +138,27 @@ func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx in
return stateSizeIncrease return stateSizeIncrease
} }
func (smp *statsFieldsMinProcessor) mergeState(sfp statsProcessor) { func (smp *statsRowMinProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsFieldsMinProcessor) src := sfp.(*statsRowMinProcessor)
if smp.needUpdateStateString(src.min) { if smp.needUpdateStateString(src.min) {
smp.min = src.min smp.min = src.min
smp.fields = src.fields smp.fields = src.fields
} }
} }
func (smp *statsFieldsMinProcessor) needUpdateStateBytes(b []byte) bool { func (smp *statsRowMinProcessor) needUpdateStateBytes(b []byte) bool {
v := bytesutil.ToUnsafeString(b) v := bytesutil.ToUnsafeString(b)
return smp.needUpdateStateString(v) return smp.needUpdateStateString(v)
} }
func (smp *statsFieldsMinProcessor) needUpdateStateString(v string) bool { func (smp *statsRowMinProcessor) needUpdateStateString(v string) bool {
if v == "" { if v == "" {
return false return false
} }
return smp.min == "" || lessString(v, smp.min) return smp.min == "" || lessString(v, smp.min)
} }
func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowIdx int) int { func (smp *statsRowMinProcessor) updateState(v string, br *blockResult, rowIdx int) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
if !smp.needUpdateStateString(v) { if !smp.needUpdateStateString(v) {
@ -204,7 +204,7 @@ func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowId
return stateSizeIncrease return stateSizeIncrease
} }
func (smp *statsFieldsMinProcessor) finalizeStats() string { func (smp *statsRowMinProcessor) finalizeStats() string {
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalFieldsToJSON(bb.B, smp.fields) bb.B = marshalFieldsToJSON(bb.B, smp.fields)
result := string(bb.B) result := string(bb.B)
@ -213,18 +213,18 @@ func (smp *statsFieldsMinProcessor) finalizeStats() string {
return result return result
} }
func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) { func parseStatsRowMin(lex *lexer) (*statsRowMin, error) {
if !lex.isKeyword("fields_min") { if !lex.isKeyword("row_min") {
return nil, fmt.Errorf("unexpected func; got %q; want 'fields_min'", lex.token) return nil, fmt.Errorf("unexpected func; got %q; want 'row_min'", lex.token)
} }
lex.nextToken() lex.nextToken()
fields, err := parseFieldNamesInParens(lex) fields, err := parseFieldNamesInParens(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_min' args: %w", err) return nil, fmt.Errorf("cannot parse 'row_min' args: %w", err)
} }
if len(fields) == 0 { if len(fields) == 0 {
return nil, fmt.Errorf("missing first arg for 'fields_min' func - source field") return nil, fmt.Errorf("missing first arg for 'row_min' func - source field")
} }
srcField := fields[0] srcField := fields[0]
@ -233,7 +233,7 @@ func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) {
fetchFields = nil fetchFields = nil
} }
sm := &statsFieldsMin{ sm := &statsRowMin{
srcField: srcField, srcField: srcField,
fetchFields: fetchFields, fetchFields: fetchFields,
} }

View file

@ -4,35 +4,35 @@ import (
"testing" "testing"
) )
func TestParseStatsFieldsMinSuccess(t *testing.T) { func TestParseStatsRowMinSuccess(t *testing.T) {
f := func(pipeStr string) { f := func(pipeStr string) {
t.Helper() t.Helper()
expectParseStatsFuncSuccess(t, pipeStr) expectParseStatsFuncSuccess(t, pipeStr)
} }
f(`fields_min(foo)`) f(`row_min(foo)`)
f(`fields_min(foo, bar)`) f(`row_min(foo, bar)`)
f(`fields_min(foo, bar, baz)`) f(`row_min(foo, bar, baz)`)
} }
func TestParseStatsFieldsMinFailure(t *testing.T) { func TestParseStatsRowMinFailure(t *testing.T) {
f := func(pipeStr string) { f := func(pipeStr string) {
t.Helper() t.Helper()
expectParseStatsFuncFailure(t, pipeStr) expectParseStatsFuncFailure(t, pipeStr)
} }
f(`fields_min`) f(`row_min`)
f(`fields_min()`) f(`row_min()`)
f(`fields_min(x) bar`) f(`row_min(x) bar`)
} }
func TestStatsFieldsMin(t *testing.T) { func TestStatsRowMin(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) { f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper() t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected) expectPipeResults(t, pipeStr, rows, rowsExpected)
} }
f("stats fields_min(a) as x", [][]Field{ f("stats row_min(a) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -52,7 +52,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats fields_min(foo) as x", [][]Field{ f("stats row_min(foo) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -72,7 +72,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats fields_min(b, a) as x", [][]Field{ f("stats row_min(b, a) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -93,7 +93,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats fields_min(b, a, x, b) as x", [][]Field{ f("stats row_min(b, a, x, b) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -114,7 +114,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats fields_min(a) if (b:*) as x", [][]Field{ f("stats row_min(a) if (b:*) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -134,7 +134,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats by (b) fields_min(a) if (b:*) as x", [][]Field{ f("stats by (b) row_min(a) if (b:*) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -160,7 +160,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats by (a) fields_min(b) as x", [][]Field{ f("stats by (a) row_min(b) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},
@ -189,7 +189,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats by (a) fields_min(c) as x", [][]Field{ f("stats by (a) row_min(c) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},
@ -218,7 +218,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats by (a) fields_min(b, c) as x", [][]Field{ f("stats by (a) row_min(b, c) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},
@ -249,7 +249,7 @@ func TestStatsFieldsMin(t *testing.T) {
}, },
}) })
f("stats by (a, b) fields_min(c) as x", [][]Field{ f("stats by (a, b) row_min(c) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},