From d62bac5609e3b0b9ff62110978f7b0e12a5a1f68 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 1 May 2024 01:58:35 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 7 ++ lib/logstorage/block_result.go | 141 ++++++++++++++++++++++++++++++--- lib/logstorage/parser_test.go | 9 +++ lib/logstorage/pipe_stats.go | 6 ++ lib/logstorage/stats_max.go | 107 +++++++++++++++++++++++++ 5 files changed, 257 insertions(+), 13 deletions(-) create mode 100644 lib/logstorage/stats_max.go diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 550b159e4..1a6d22dd0 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1096,6 +1096,13 @@ LogsQL supports calculating the following stats: across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. +- The maximum value across the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Non-numeric values are ignored. Examples: + - `error | stats max(duration) duration_max` - returns the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word). + - `GET | stats by (path) max(response_size)` - returns the maximum value for the `response_size` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped + by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. + Stats calculations can be combined. For example, the following query calculates the number of log messages with the `error` [word](#word), the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the sum of `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `namespace` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model): diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index c93a09f20..1d249ad9c 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -686,20 +686,29 @@ func (c *blockResultColumn) getValues(br *blockResult) []string { func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { if c.isConst { v := c.encodedValues[0] - f, _ := tryParseFloat64(v) + f, ok := tryParseFloat64(v) + if !ok { + return nan + } return f } if c.isTime { - return 0 + return nan } switch c.valueType { case valueTypeString: - f, _ := tryParseFloat64(c.encodedValues[rowIdx]) + f, ok := tryParseFloat64(c.encodedValues[rowIdx]) + if !ok { + return nan + } return f case valueTypeDict: dictIdx := c.encodedValues[rowIdx][0] - f, _ := tryParseFloat64(c.dictValues[dictIdx]) + f, ok := tryParseFloat64(c.dictValues[dictIdx]) + if !ok { + return nan + } return f case valueTypeUint8: return float64(c.encodedValues[rowIdx][0]) @@ -717,20 +726,123 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { n := encoding.UnmarshalUint64(b) return math.Float64frombits(n) case valueTypeIPv4: - return 0 + return nan case valueTypeTimestampISO8601: - return 0 + return nan default: logger.Panicf("BUG: unknown valueType=%d", c.valueType) - return 0 + return nan + } +} + +func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { + if c.isConst { + v := c.encodedValues[0] + f, ok := tryParseFloat64(v) + if !ok { + return nan + } + return f + } + if c.isTime { + return nan + } + + switch c.valueType { + case valueTypeString: + max := math.Inf(-1) + f := float64(0) + ok := false + values := c.encodedValues + for i := range values { + if i == 0 || values[i-1] != values[i] { + f, ok = tryParseFloat64(values[i]) + } + if ok && f > max { + max = f + } + } + if math.IsInf(max, -1) { + return nan + } + return max + case valueTypeDict: + a := encoding.GetFloat64s(len(c.dictValues)) + dictValuesFloat := a.A + for i, v := range c.dictValues { + f, ok := tryParseFloat64(v) + if !ok { + f = nan + } + dictValuesFloat[i] = f + } + max := math.Inf(-1) + for _, v := range c.encodedValues { + dictIdx := v[0] + f := dictValuesFloat[dictIdx] + if f > max { + max = f + } + } + encoding.PutFloat64s(a) + if math.IsInf(max, -1) { + return nan + } + return max + case valueTypeUint8: + max := math.Inf(-1) + for _, v := range c.encodedValues { + f := float64(v[0]) + if f > max { + max = f + } + } + return max + case valueTypeUint16: + max := math.Inf(-1) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + f := float64(encoding.UnmarshalUint16(b)) + if f > max { + max = f + } + } + return max + case valueTypeUint32: + max := math.Inf(-1) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + f := float64(encoding.UnmarshalUint32(b)) + if f > max { + max = f + } + } + return max + case valueTypeUint64: + max := math.Inf(-1) + for _, v := range c.encodedValues { + b := bytesutil.ToUnsafeBytes(v) + f := float64(encoding.UnmarshalUint64(b)) + if f > max { + max = f + } + } + return max + case valueTypeIPv4: + return nan + case valueTypeTimestampISO8601: + return nan + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return nan } } func (c *blockResultColumn) sumValues(br *blockResult) float64 { if c.isConst { v := c.encodedValues[0] - f, _ := tryParseFloat64(v) - if f == 0 || math.IsNaN(f) { + f, ok := tryParseFloat64(v) + if !ok { return 0 } return f * float64(len(br.timestamps)) @@ -743,12 +855,13 @@ func (c *blockResultColumn) sumValues(br *blockResult) float64 { case valueTypeString: sum := float64(0) f := float64(0) + ok := false values := c.encodedValues for i := range values { if i == 0 || values[i-1] != values[i] { - f, _ = tryParseFloat64(values[i]) + f, ok = tryParseFloat64(values[i]) } - if !math.IsNaN(f) { + if ok { sum += f } } @@ -757,8 +870,8 @@ func (c *blockResultColumn) sumValues(br *blockResult) float64 { a := encoding.GetFloat64s(len(c.dictValues)) dictValuesFloat := a.A for i, v := range c.dictValues { - f, _ := tryParseFloat64(v) - if math.IsNaN(f) { + f, ok := tryParseFloat64(v) + if !ok { f = 0 } dictValuesFloat[i] = f @@ -817,3 +930,5 @@ func (c *blockResultColumn) sumValues(br *blockResult) float64 { return 0 } } + +var nan = math.NaN() diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 9089fb837..1ff8a3976 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -842,6 +842,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`) f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`) + // stats pipe max + f(`* | stats Max(foo) bar`, `* | stats max(foo) as bar`) + f(`* | stats BY(x, y, ) MAX(foo,bar,) bar`, `* | stats by (x, y) max(foo, bar) as bar`) + // stats pipe uniq f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`) f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`) @@ -1099,6 +1103,11 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats sum()`) f(`foo | stats sum() as abc`) + // invalid stats max + f(`foo | stats max`) + f(`foo | stats max()`) + f(`foo | stats max() as abc`) + // invalid stats uniq f(`foo | stats uniq`) f(`foo | stats uniq()`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index b4e598155..03e0ced86 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -434,6 +434,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'sum' func: %w", err) } sf = sfs + case lex.isKeyword("max"): + sms, err := parseStatsMax(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'max' func: %w", err) + } + sf = sms default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go new file mode 100644 index 000000000..ce57f9577 --- /dev/null +++ b/lib/logstorage/stats_max.go @@ -0,0 +1,107 @@ +package logstorage + +import ( + "fmt" + "slices" + "strconv" + "unsafe" +) + +type statsMax struct { + fields []string + containsStar bool +} + +func (sm *statsMax) String() string { + return "max(" + fieldNamesString(sm.fields) + ")" +} + +func (sm *statsMax) neededFields() []string { + return sm.fields +} + +func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { + smp := &statsMaxProcessor{ + sm: sm, + } + return smp, int(unsafe.Sizeof(*smp)) +} + +type statsMaxProcessor struct { + sm *statsMax + + max float64 +} + +func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { + if smp.sm.containsStar { + // Find the maximum value across all the columns + for _, c := range br.getColumns() { + f := c.getMaxValue(br) + if f > smp.max { + smp.max = f + } + } + return 0 + } + + // Find the maximum value across the requested columns + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getMaxValue(br) + if f > smp.max { + smp.max = f + } + } + return 0 +} + +func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if smp.sm.containsStar { + // Find the maximum value across all the fields for the given row + for _, c := range br.getColumns() { + f := c.getFloatValueAtRow(rowIdx) + if f > smp.max { + smp.max = f + } + } + return 0 + } + + // Find the maximum value across the requested fields for the given row + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if f > smp.max { + smp.max = f + } + } + return 0 +} + +func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsMaxProcessor) + if src.max > smp.max { + smp.max = src.max + } +} + +func (smp *statsMaxProcessor) finalizeStats() string { + return strconv.FormatFloat(smp.max, 'g', -1, 64) +} + +func parseStatsMax(lex *lexer) (*statsMax, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'max' args: %w", err) + } + if len(fields) == 0 { + return nil, fmt.Errorf("'max' must contain at least one arg") + } + sm := &statsMax{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + return sm, nil +}