From c4b68956fe041850152e6456909525f5f57909c1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 21 May 2024 23:33:18 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe_stats.go | 6 + lib/logstorage/stats_fields_max.go | 236 +++++++++++++++++++ lib/logstorage/stats_fields_max_test.go | 286 ++++++++++++++++++++++++ 3 files changed, 528 insertions(+) create mode 100644 lib/logstorage/stats_fields_max.go create mode 100644 lib/logstorage/stats_fields_max_test.go diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 03d6cc17f..0ae44335f 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -577,6 +577,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err) } return sus, nil + case lex.isKeyword("fields_max"): + sms, err := parseStatsFieldsMax(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'fields_max' func: %w", err) + } + return sms, nil case lex.isKeyword("fields_min"): sms, err := parseStatsFieldsMin(lex) if err != nil { diff --git a/lib/logstorage/stats_fields_max.go b/lib/logstorage/stats_fields_max.go new file mode 100644 index 000000000..309152abc --- /dev/null +++ b/lib/logstorage/stats_fields_max.go @@ -0,0 +1,236 @@ +package logstorage + +import ( + "fmt" + "math" + "slices" + "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +type statsFieldsMax struct { + srcField string + + resultFields []string +} + +func (sm *statsFieldsMax) String() string { + s := "fields_max(" + quoteTokenIfNeeded(sm.srcField) + if len(sm.resultFields) > 0 { + s += ", " + fieldNamesString(sm.resultFields) + } + s += ")" + return s +} + +func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) { + neededFields.add(sm.srcField) + neededFields.addFields(sm.resultFields) +} + +func (sm *statsFieldsMax) newStatsProcessor() (statsProcessor, int) { + smp := &statsFieldsMaxProcessor{ + sm: sm, + } + return smp, int(unsafe.Sizeof(*smp)) +} + +type statsFieldsMaxProcessor struct { + sm *statsFieldsMax + + max string + + fields []Field +} + +func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int { + stateSizeIncrease := 0 + + c := br.getColumnByName(smp.sm.srcField) + if c.isConst { + v := c.valuesEncoded[0] + stateSizeIncrease += smp.updateState(v, br, 0) + return stateSizeIncrease + } + if c.isTime { + bb := bbPool.Get() + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0]) + v := bytesutil.ToUnsafeString(bb.B) + stateSizeIncrease += smp.updateState(v, br, 0) + bbPool.Put(bb) + return stateSizeIncrease + } + + needUpdateState := false + switch c.valueType { + case valueTypeString: + needUpdateState = true + case valueTypeDict: + for _, v := range c.dictValues { + if smp.needUpdateStateString(v) { + needUpdateState = true + break + } + } + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: + bb := bbPool.Get() + bb.B = marshalUint64String(bb.B[:0], c.maxValue) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeFloat64: + f := math.Float64frombits(c.maxValue) + bb := bbPool.Get() + bb.B = marshalFloat64String(bb.B[:0], f) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeIPv4: + bb := bbPool.Get() + bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue)) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeTimestampISO8601: + bb := bbPool.Get() + bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue)) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + } + + if needUpdateState { + values := c.getValues(br) + for i, v := range values { + stateSizeIncrease += smp.updateState(v, br, i) + } + } + + return stateSizeIncrease +} + +func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + stateSizeIncrease := 0 + + c := br.getColumnByName(smp.sm.srcField) + if c.isConst { + v := c.valuesEncoded[0] + stateSizeIncrease += smp.updateState(v, br, rowIdx) + return stateSizeIncrease + } + if c.isTime { + bb := bbPool.Get() + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx]) + v := bytesutil.ToUnsafeString(bb.B) + stateSizeIncrease += smp.updateState(v, br, rowIdx) + bbPool.Put(bb) + return stateSizeIncrease + } + + v := c.getValueAtRow(br, rowIdx) + stateSizeIncrease += smp.updateState(v, br, rowIdx) + + return stateSizeIncrease +} + +func (smp *statsFieldsMaxProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsFieldsMaxProcessor) + if smp.needUpdateStateString(src.max) { + smp.max = src.max + smp.fields = src.fields + } +} + +func (smp *statsFieldsMaxProcessor) needUpdateStateBytes(b []byte) bool { + v := bytesutil.ToUnsafeString(b) + return smp.needUpdateStateString(v) +} + +func (smp *statsFieldsMaxProcessor) needUpdateStateString(v string) bool { + if v == "" { + return false + } + return smp.max == "" || lessString(smp.max, v) +} + +func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int { + stateSizeIncrease := 0 + + if !smp.needUpdateStateString(v) { + // There is no need in updating state + return stateSizeIncrease + } + + stateSizeIncrease -= len(smp.max) + stateSizeIncrease += len(v) + smp.max = strings.Clone(v) + + fields := smp.fields + for _, f := range fields { + stateSizeIncrease -= len(f.Name) + len(f.Value) + } + + clear(fields) + fields = fields[:0] + if len(smp.sm.resultFields) == 0 { + cs := br.getColumns() + for _, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } else { + for _, field := range smp.sm.resultFields { + c := br.getColumnByName(field) + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } + smp.fields = fields + + return stateSizeIncrease +} + +func (smp *statsFieldsMaxProcessor) finalizeStats() string { + bb := bbPool.Get() + bb.B = marshalFieldsToJSON(bb.B, smp.fields) + result := string(bb.B) + bbPool.Put(bb) + + return result +} + +func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) { + if !lex.isKeyword("fields_max") { + return nil, fmt.Errorf("unexpected func; got %q; want 'fields_max'", lex.token) + } + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'fields_max' args: %w", err) + } + + if len(fields) == 0 { + return nil, fmt.Errorf("missing first arg for 'fields_max' func - source field") + } + + srcField := fields[0] + resultFields := fields[1:] + if slices.Contains(resultFields, "*") { + resultFields = nil + } + + sm := &statsFieldsMax{ + srcField: srcField, + resultFields: resultFields, + } + return sm, nil +} diff --git a/lib/logstorage/stats_fields_max_test.go b/lib/logstorage/stats_fields_max_test.go new file mode 100644 index 000000000..6f1a59ce5 --- /dev/null +++ b/lib/logstorage/stats_fields_max_test.go @@ -0,0 +1,286 @@ +package logstorage + +import ( + "testing" +) + +func TestParseStatsFieldsMaxSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncSuccess(t, pipeStr) + } + + f(`fields_max(foo)`) + f(`fields_max(foo, bar)`) + f(`fields_max(foo, bar, baz)`) +} + +func TestParseStatsFieldsMaxFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncFailure(t, pipeStr) + } + + f(`fields_max`) + f(`fields_max()`) + f(`fields_max(x) bar`) +} + +func TestStatsFieldsMax(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("stats fields_max(a) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", `{"a":"3","b":"54"}`}, + }, + }) + + f("stats fields_max(foo) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", `{}`}, + }, + }) + + f("stats fields_max(b, a) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + {"c", "1232"}, + }, + }, [][]Field{ + { + {"x", `{"a":"3"}`}, + }, + }) + + f("stats fields_max(b, a, x, b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + {"c", "1232"}, + }, + }, [][]Field{ + { + {"x", `{"a":"3","x":"","b":"54"}`}, + }, + }) + + f("stats fields_max(a) if (b:*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", `{"a":"3","b":"54"}`}, + }, + }) + + f("stats by (b) fields_max(a) if (b:*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `-12.34`}, + {"b", "3"}, + }, + { + {"a", `3`}, + {"c", `54`}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"x", `{"_msg":"abc","a":"2","b":"3"}`}, + }, + { + {"b", ""}, + {"x", `{}`}, + }, + }) + + f("stats by (a) fields_max(b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `5`}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"_msg":"abc","a":"1","b":"3"}`}, + }, + { + {"a", "3"}, + {"x", `{"a":"3","b":"7"}`}, + }, + }) + + f("stats by (a) fields_max(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"c", `foo`}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{}`}, + }, + { + {"a", "3"}, + {"x", `{"a":"3","c":"foo"}`}, + }, + }) + + f("stats by (a) fields_max(b, c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `34`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "3"}, + }, + { + {"a", `3`}, + {"b", `5`}, + {"c", "foo"}, + }, + { + {"a", `3`}, + {"b", `7`}, + {"c", "bar"}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"c":""}`}, + }, + { + {"a", "3"}, + {"x", `{"c":"bar"}`}, + }, + }) + + f("stats by (a, b) fields_max(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "foo"}, + }, + { + {"a", `3`}, + {"b", `5`}, + {"c", "4"}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"b", "3"}, + {"x", `{}`}, + }, + { + {"a", "1"}, + {"b", ""}, + {"x", `{"_msg":"def","a":"1","c":"foo","b":""}`}, + }, + { + {"a", "3"}, + {"b", "5"}, + {"x", `{"a":"3","b":"5","c":"4"}`}, + }, + }) +}