diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 5a3348dce..5162c5f57 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1842,5 +1842,12 @@ func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(v }) } +func getCanonicalColumnName(columnName string) string { + if columnName == "" { + return "_msg" + } + return columnName +} + var nan = math.NaN() var inf = math.Inf(1) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 7b986feee..1eb73ab75 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -882,6 +882,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats min(*) x`, `* | stats min(*) as x`) f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`) + // stats pipe fields_min + f(`* | stats fields_Min(foo) bar`, `* | stats fields_min(foo) as bar`) + f(`* | stats BY(x, y, ) fields_MIN(foo,bar,) bar`, `* | stats by (x, y) fields_min(foo, bar) as bar`) + // stats pipe avg f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`) f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`) @@ -1315,6 +1319,10 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats min`) f(`foo | stats min()`) + // invalid stats min + f(`foo | stats fields_min`) + f(`foo | stats fields_min()`) + // invalid stats avg f(`foo | stats avg`) f(`foo | stats avg()`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 20dd69f32..03d6cc17f 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -553,6 +553,12 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { func parseStatsFunc(lex *lexer) (statsFunc, error) { switch { + case lex.isKeyword("avg"): + sas, err := parseStatsAvg(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'avg' func: %w", err) + } + return sas, nil case lex.isKeyword("count"): scs, err := parseStatsCount(lex) if err != nil { @@ -571,30 +577,48 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err) } return sus, nil - case lex.isKeyword("sum"): - sss, err := parseStatsSum(lex) + case lex.isKeyword("fields_min"): + sms, err := parseStatsFieldsMin(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'sum' func: %w", err) + return nil, fmt.Errorf("cannot parse 'fields_min' func: %w", err) } - return sss, nil + return sms, nil case lex.isKeyword("max"): sms, err := parseStatsMax(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'max' func: %w", err) } return sms, nil + case lex.isKeyword("median"): + sms, err := parseStatsMedian(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'median' func: %w", err) + } + return sms, nil case lex.isKeyword("min"): sms, err := parseStatsMin(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'min' func: %w", err) } return sms, nil - case lex.isKeyword("avg"): - sas, err := parseStatsAvg(lex) + case lex.isKeyword("quantile"): + sqs, err := parseStatsQuantile(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'avg' func: %w", err) + return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err) } - return sas, nil + return sqs, nil + case lex.isKeyword("sum"): + sss, err := parseStatsSum(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'sum' func: %w", err) + } + return sss, nil + case lex.isKeyword("sum_len"): + sss, err := parseStatsSumLen(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'sum_len' func: %w", err) + } + return sss, nil case lex.isKeyword("uniq_values"): sus, err := parseStatsUniqValues(lex) if err != nil { @@ -607,24 +631,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'values' func: %w", err) } return svs, nil - case lex.isKeyword("sum_len"): - sss, err := parseStatsSumLen(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'sum_len' func: %w", err) - } - return sss, nil - case lex.isKeyword("quantile"): - sqs, err := parseStatsQuantile(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err) - } - return sqs, nil - case lex.isKeyword("median"): - sms, err := parseStatsMedian(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'median' func: %w", err) - } - return sms, nil default: return nil, fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index fe7c268fd..192205f9c 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "strconv" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" @@ -24,8 +25,8 @@ func (f *Field) Reset() { // String returns string representation of f. func (f *Field) String() string { - name := getCanonicalColumnName(f.Name) - return fmt.Sprintf("%q:%q", name, f.Value) + x := f.marshalToJSON(nil) + return string(x) } func (f *Field) marshal(dst []byte) []byte { @@ -56,6 +57,27 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) { return src, nil } +func (f *Field) marshalToJSON(dst []byte) []byte { + dst = strconv.AppendQuote(dst, f.Name) + dst = append(dst, ':') + dst = strconv.AppendQuote(dst, f.Value) + return dst +} + +func marshalFieldsToJSON(dst []byte, fields []Field) []byte { + dst = append(dst, '{') + if len(fields) > 0 { + dst = fields[0].marshalToJSON(dst) + fields = fields[1:] + for i := range fields { + dst = append(dst, ',') + dst = fields[i].marshalToJSON(dst) + } + } + dst = append(dst, '}') + return dst +} + func appendFields(a *arena, dst, src []Field) []Field { for _, f := range src { dst = append(dst, Field{ @@ -126,10 +148,3 @@ func (rs *rows) mergeRows(timestampsA, timestampsB []int64, fieldsA, fieldsB [][ rs.appendRows(timestampsA, fieldsA) } } - -func getCanonicalColumnName(columnName string) string { - if columnName == "" { - return "_msg" - } - return columnName -} diff --git a/lib/logstorage/stats_fields_min.go b/lib/logstorage/stats_fields_min.go new file mode 100644 index 000000000..287b134b8 --- /dev/null +++ b/lib/logstorage/stats_fields_min.go @@ -0,0 +1,236 @@ +package logstorage + +import ( + "fmt" + "math" + "slices" + "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +type statsFieldsMin struct { + srcField string + + resultFields []string +} + +func (sm *statsFieldsMin) String() string { + s := "fields_min(" + quoteTokenIfNeeded(sm.srcField) + if len(sm.resultFields) > 0 { + s += ", " + fieldNamesString(sm.resultFields) + } + s += ")" + return s +} + +func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) { + neededFields.add(sm.srcField) + neededFields.addFields(sm.resultFields) +} + +func (sm *statsFieldsMin) newStatsProcessor() (statsProcessor, int) { + smp := &statsFieldsMinProcessor{ + sm: sm, + } + return smp, int(unsafe.Sizeof(*smp)) +} + +type statsFieldsMinProcessor struct { + sm *statsFieldsMin + + min string + + fields []Field +} + +func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int { + stateSizeIncrease := 0 + + c := br.getColumnByName(smp.sm.srcField) + if c.isConst { + v := c.valuesEncoded[0] + stateSizeIncrease += smp.updateState(v, br, 0) + return stateSizeIncrease + } + if c.isTime { + bb := bbPool.Get() + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0]) + v := bytesutil.ToUnsafeString(bb.B) + stateSizeIncrease += smp.updateState(v, br, 0) + bbPool.Put(bb) + return stateSizeIncrease + } + + needUpdateState := false + switch c.valueType { + case valueTypeString: + needUpdateState = true + case valueTypeDict: + for _, v := range c.dictValues { + if smp.needUpdateStateString(v) { + needUpdateState = true + break + } + } + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: + bb := bbPool.Get() + bb.B = marshalUint64String(bb.B[:0], c.minValue) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeFloat64: + f := math.Float64frombits(c.minValue) + bb := bbPool.Get() + bb.B = marshalFloat64String(bb.B[:0], f) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeIPv4: + bb := bbPool.Get() + bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue)) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + case valueTypeTimestampISO8601: + bb := bbPool.Get() + bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue)) + needUpdateState = smp.needUpdateStateBytes(bb.B) + bbPool.Put(bb) + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + } + + if needUpdateState { + values := c.getValues(br) + for i, v := range values { + stateSizeIncrease += smp.updateState(v, br, i) + } + } + + return stateSizeIncrease +} + +func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + stateSizeIncrease := 0 + + c := br.getColumnByName(smp.sm.srcField) + if c.isConst { + v := c.valuesEncoded[0] + stateSizeIncrease += smp.updateState(v, br, rowIdx) + return stateSizeIncrease + } + if c.isTime { + bb := bbPool.Get() + bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx]) + v := bytesutil.ToUnsafeString(bb.B) + stateSizeIncrease += smp.updateState(v, br, rowIdx) + bbPool.Put(bb) + return stateSizeIncrease + } + + v := c.getValueAtRow(br, rowIdx) + stateSizeIncrease += smp.updateState(v, br, rowIdx) + + return stateSizeIncrease +} + +func (smp *statsFieldsMinProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsFieldsMinProcessor) + if smp.needUpdateStateString(src.min) { + smp.min = src.min + smp.fields = src.fields + } +} + +func (smp *statsFieldsMinProcessor) needUpdateStateBytes(b []byte) bool { + v := bytesutil.ToUnsafeString(b) + return smp.needUpdateStateString(v) +} + +func (smp *statsFieldsMinProcessor) needUpdateStateString(v string) bool { + if v == "" { + return false + } + return smp.min == "" || lessString(v, smp.min) +} + +func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowIdx int) int { + stateSizeIncrease := 0 + + if !smp.needUpdateStateString(v) { + // There is no need in updating state + return stateSizeIncrease + } + + stateSizeIncrease -= len(smp.min) + stateSizeIncrease += len(v) + smp.min = strings.Clone(v) + + fields := smp.fields + for _, f := range fields { + stateSizeIncrease -= len(f.Name) + len(f.Value) + } + + clear(fields) + fields = fields[:0] + if len(smp.sm.resultFields) == 0 { + cs := br.getColumns() + for _, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } else { + for _, field := range smp.sm.resultFields { + c := br.getColumnByName(field) + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: strings.Clone(c.name), + Value: strings.Clone(v), + }) + stateSizeIncrease += len(c.name) + len(v) + } + } + smp.fields = fields + + return stateSizeIncrease +} + +func (smp *statsFieldsMinProcessor) finalizeStats() string { + bb := bbPool.Get() + bb.B = marshalFieldsToJSON(bb.B, smp.fields) + result := string(bb.B) + bbPool.Put(bb) + + return result +} + +func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) { + if !lex.isKeyword("fields_min") { + return nil, fmt.Errorf("unexpected func; got %q; want 'fields_min'", lex.token) + } + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'fields_min' args: %w", err) + } + + if len(fields) == 0 { + return nil, fmt.Errorf("missing first arg for 'fields_min' func - source field") + } + + srcField := fields[0] + resultFields := fields[1:] + if slices.Contains(resultFields, "*") { + resultFields = nil + } + + sm := &statsFieldsMin{ + srcField: srcField, + resultFields: resultFields, + } + return sm, nil +} diff --git a/lib/logstorage/stats_fields_min_test.go b/lib/logstorage/stats_fields_min_test.go new file mode 100644 index 000000000..f45d3a139 --- /dev/null +++ b/lib/logstorage/stats_fields_min_test.go @@ -0,0 +1,285 @@ +package logstorage + +import ( + "testing" +) + +func TestParseStatsFieldsMinSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncSuccess(t, pipeStr) + } + + f(`fields_min(foo)`) + f(`fields_min(foo, bar)`) + f(`fields_min(foo, bar, baz)`) +} + +func TestParseStatsFieldsMinFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncFailure(t, pipeStr) + } + + f(`fields_min`) + f(`fields_min()`) + f(`fields_min(x) bar`) +} + +func TestStatsFieldsMin(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("stats fields_min(a) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", `{"_msg":"def","a":"1"}`}, + }, + }) + + f("stats fields_min(foo) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", `{}`}, + }, + }) + + f("stats fields_min(b, a) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + {"c", "1232"}, + }, + }, [][]Field{ + { + {"x", `{"a":"2"}`}, + }, + }) + + f("stats fields_min(b, a, x, b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + {"c", "1232"}, + }, + }, [][]Field{ + { + {"x", `{"a":"2","x":"","b":"3"}`}, + }, + }) + + f("stats fields_min(a) if (b:*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", `{"_msg":"abc","a":"2","b":"3"}`}, + }, + }) + + f("stats by (b) fields_min(a) if (b:*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `-12.34`}, + {"b", "3"}, + }, + { + {"a", `3`}, + {"c", `54`}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"x", `{"_msg":"def","a":"-12.34","b":"3"}`}, + }, + { + {"b", ""}, + {"x", `{}`}, + }, + }) + + f("stats by (a) fields_min(b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `5`}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"_msg":"abc","a":"1","b":"3"}`}, + }, + { + {"a", "3"}, + {"x", `{"a":"3","b":"5"}`}, + }, + }) + + f("stats by (a) fields_min(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"c", `foo`}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{}`}, + }, + { + {"a", "3"}, + {"x", `{"a":"3","c":"foo"}`}, + }, + }) + + f("stats by (a) fields_min(b, c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `34`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "3"}, + }, + { + {"a", `3`}, + {"b", `5`}, + {"c", "foo"}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", `{"c":""}`}, + }, + { + {"a", "3"}, + {"x", `{"c":"foo"}`}, + }, + }) + + f("stats by (a, b) fields_min(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "foo"}, + }, + { + {"a", `3`}, + {"b", `5`}, + {"c", "4"}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"b", "3"}, + {"x", `{}`}, + }, + { + {"a", "1"}, + {"b", ""}, + {"x", `{"_msg":"def","a":"1","c":"foo","b":""}`}, + }, + { + {"a", "3"}, + {"b", "5"}, + {"x", `{"a":"3","b":"5","c":"4"}`}, + }, + }) +} diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index 66a63a16e..f56fa6763 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -34,7 +34,6 @@ type statsMaxProcessor struct { sm *statsMax max string - hasMax bool } func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { @@ -79,9 +78,7 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsMaxProcessor) - if src.hasMax { - smp.updateStateString(src.max) - } + smp.updateStateString(src.max) } func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { @@ -157,17 +154,13 @@ func (smp *statsMaxProcessor) updateStateString(v string) { if v == "" { // Skip empty strings } - if smp.hasMax && !lessString(smp.max, v) { + if smp.max != "" && !lessString(smp.max, v) { return } smp.max = strings.Clone(v) - smp.hasMax = true } func (smp *statsMaxProcessor) finalizeStats() string { - if !smp.hasMax { - return "" - } return smp.max } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 285b4eaa4..56d4ae711 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -34,7 +34,6 @@ type statsMinProcessor struct { sm *statsMin min string - hasMin bool } func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { @@ -79,9 +78,7 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsMinProcessor) - if src.hasMin { - smp.updateStateString(src.min) - } + smp.updateStateString(src.min) } func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { @@ -158,17 +155,13 @@ func (smp *statsMinProcessor) updateStateString(v string) { // Skip empty strings return } - if smp.hasMin && !lessString(v, smp.min) { + if smp.min != "" && !lessString(v, smp.min) { return } smp.min = strings.Clone(v) - smp.hasMin = true } func (smp *statsMinProcessor) finalizeStats() string { - if !smp.hasMin { - return "" - } return smp.min }