From 619dff68618ecb7a1bd4f7b79b2dae47656b7ca4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 1 May 2024 02:08:37 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 13 +++- lib/logstorage/block_result.go | 103 +++++++++++++++++++++++++++++++ lib/logstorage/parser_test.go | 9 +++ lib/logstorage/pipe_stats.go | 6 ++ lib/logstorage/stats_min.go | 107 +++++++++++++++++++++++++++++++++ 5 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 lib/logstorage/stats_min.go diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 1a6d22dd0..03108799b 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1092,14 +1092,21 @@ LogsQL supports calculating the following stats: - Sum for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values. Non-numeric values are ignored. Examples: - `error | stats sum(duration) duration_total` - returns the sum of `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word). - - `GET | stats by (path) sum(response_size)` - returns the sum of `response_size` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values + - `GET | stats by (path) sum(response_size) response_size_sum` - returns the sum of `response_size` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. - The maximum value across the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Non-numeric values are ignored. Examples: - `error | stats max(duration) duration_max` - returns the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word). - - `GET | stats by (path) max(response_size)` - returns the maximum value for the `response_size` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + - `GET | stats by (path) max(response_size) max_response_size` - returns the maximum value for the `response_size` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped + by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. + +- The minimum value across the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Non-numeric values are ignored. Examples: + - `error | stats min(duration) duration_min` - returns the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word). + - `GET | stats by (path) min(response_size) min_response_size` - returns the minimum value for the `response_size` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. @@ -1117,7 +1124,7 @@ error | stats by (namespace) LogsQL will support calculating the following additional stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) and fields created by [transformations](#transformations): -- The min, max, avg, and sum for the given field. +- The avg for the given field. - The median and [percentile](https://en.wikipedia.org/wiki/Percentile) for the given field. It will be possible specifying an optional condition [filter](#post-filters) when calculating the stats. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 1d249ad9c..6c92cc13b 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -838,6 +838,109 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { } } +func (c *blockResultColumn) getMinValue(br *blockResult) float64 { + if c.isConst { + v := c.encodedValues[0] + f, ok := tryParseFloat64(v) + if !ok { + return nan + } + return f + } + if c.isTime { + return nan + } + + switch c.valueType { + case valueTypeString: + min := math.Inf(1) + f := float64(0) + ok := false + values := c.encodedValues + for i := range values { + if i == 0 || values[i-1] != values[i] { + f, ok = tryParseFloat64(values[i]) + } + if ok && f < min { + min = f + } + } + if math.IsInf(min, 1) { + return nan + } + return min + case valueTypeDict: + a := encoding.GetFloat64s(len(c.dictValues)) + dictValuesFloat := a.A + for i, v := range c.dictValues { + f, ok := tryParseFloat64(v) + if !ok { + f = nan + } + dictValuesFloat[i] = f + } + min := math.Inf(1) + for _, v := range c.encodedValues { + dictIdx := v[0] + f := dictValuesFloat[dictIdx] + if f < min { + min = f + } + } + encoding.PutFloat64s(a) + if math.IsInf(min, 1) { + return nan + } + return min + case valueTypeUint8: + min := math.Inf(1) + for _, v := range c.encodedValues { + f := float64(v[0]) + if f < min { + min = f + } + } + return min + case valueTypeUint16: + min := math.Inf(1) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + f := float64(encoding.UnmarshalUint16(b)) + if f < min { + min = f + } + } + return min + case valueTypeUint32: + min := math.Inf(1) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + f := float64(encoding.UnmarshalUint32(b)) + if f < min { + min = f + } + } + return min + case valueTypeUint64: + min := math.Inf(1) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + f := float64(encoding.UnmarshalUint64(b)) + if f < min { + min = f + } + } + return min + case valueTypeIPv4: + return nan + case valueTypeTimestampISO8601: + return nan + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return nan + } +} + func (c *blockResultColumn) sumValues(br *blockResult) float64 { if c.isConst { v := c.encodedValues[0] diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 1ff8a3976..87e5f11cb 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -846,6 +846,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Max(foo) bar`, `* | stats max(foo) as bar`) f(`* | stats BY(x, y, ) MAX(foo,bar,) bar`, `* | stats by (x, y) max(foo, bar) as bar`) + // stats pipe min + f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`) + f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`) + // stats pipe uniq f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`) f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`) @@ -1108,6 +1112,11 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats max()`) f(`foo | stats max() as abc`) + // invalid stats min + f(`foo | stats min`) + f(`foo | stats min()`) + f(`foo | stats min() as abc`) + // invalid stats uniq f(`foo | stats uniq`) f(`foo | stats uniq()`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 03e0ced86..e22fcd60a 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -440,6 +440,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'max' func: %w", err) } sf = sms + case lex.isKeyword("min"): + sms, err := parseStatsMin(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'min' func: %w", err) + } + sf = sms default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go new file mode 100644 index 000000000..67852de7d --- /dev/null +++ b/lib/logstorage/stats_min.go @@ -0,0 +1,107 @@ +package logstorage + +import ( + "fmt" + "slices" + "strconv" + "unsafe" +) + +type statsMin struct { + fields []string + containsStar bool +} + +func (sm *statsMin) String() string { + return "min(" + fieldNamesString(sm.fields) + ")" +} + +func (sm *statsMin) neededFields() []string { + return sm.fields +} + +func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { + smp := &statsMinProcessor{ + sm: sm, + } + return smp, int(unsafe.Sizeof(*smp)) +} + +type statsMinProcessor struct { + sm *statsMin + + min float64 +} + +func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { + if smp.sm.containsStar { + // Find the minimum value across all the columns + for _, c := range br.getColumns() { + f := c.getMinValue(br) + if f < smp.min { + smp.min = f + } + } + return 0 + } + + // Find the minimum value across the requested columns + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getMinValue(br) + if f < smp.min { + smp.min = f + } + } + return 0 +} + +func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if smp.sm.containsStar { + // Find the minimum value across all the fields for the given row + for _, c := range br.getColumns() { + f := c.getFloatValueAtRow(rowIdx) + if f < smp.min { + smp.min = f + } + } + return 0 + } + + // Find the minimum value across the requested fields for the given row + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if f < smp.min { + smp.min = f + } + } + return 0 +} + +func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsMinProcessor) + if src.min < smp.min { + smp.min = src.min + } +} + +func (smp *statsMinProcessor) finalizeStats() string { + return strconv.FormatFloat(smp.min, 'g', -1, 64) +} + +func parseStatsMin(lex *lexer) (*statsMin, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'min' args: %w", err) + } + if len(fields) == 0 { + return nil, fmt.Errorf("'min' must contain at least one arg") + } + sm := &statsMin{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + return sm, nil +}