This commit is contained in:
Aliaksandr Valialkin 2024-05-21 18:56:35 +02:00
parent 1705ec8611
commit c21cc91552
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 596 additions and 53 deletions

View file

@ -1842,5 +1842,12 @@ func visitValuesReadonly(bs *blockSearch, ch *columnHeader, bm *bitmap, f func(v
})
}
func getCanonicalColumnName(columnName string) string {
if columnName == "" {
return "_msg"
}
return columnName
}
var nan = math.NaN()
var inf = math.Inf(1)

View file

@ -882,6 +882,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats min(*) x`, `* | stats min(*) as x`)
f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`)
// stats pipe fields_min
f(`* | stats fields_Min(foo) bar`, `* | stats fields_min(foo) as bar`)
f(`* | stats BY(x, y, ) fields_MIN(foo,bar,) bar`, `* | stats by (x, y) fields_min(foo, bar) as bar`)
// stats pipe avg
f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`)
f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`)
@ -1315,6 +1319,10 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats min`)
f(`foo | stats min()`)
// invalid stats min
f(`foo | stats fields_min`)
f(`foo | stats fields_min()`)
// invalid stats avg
f(`foo | stats avg`)
f(`foo | stats avg()`)

View file

@ -553,6 +553,12 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) {
func parseStatsFunc(lex *lexer) (statsFunc, error) {
switch {
case lex.isKeyword("avg"):
sas, err := parseStatsAvg(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'avg' func: %w", err)
}
return sas, nil
case lex.isKeyword("count"):
scs, err := parseStatsCount(lex)
if err != nil {
@ -571,30 +577,48 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err)
}
return sus, nil
case lex.isKeyword("sum"):
sss, err := parseStatsSum(lex)
case lex.isKeyword("fields_min"):
sms, err := parseStatsFieldsMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' func: %w", err)
return nil, fmt.Errorf("cannot parse 'fields_min' func: %w", err)
}
return sss, nil
return sms, nil
case lex.isKeyword("max"):
sms, err := parseStatsMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'max' func: %w", err)
}
return sms, nil
case lex.isKeyword("median"):
sms, err := parseStatsMedian(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'median' func: %w", err)
}
return sms, nil
case lex.isKeyword("min"):
sms, err := parseStatsMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'min' func: %w", err)
}
return sms, nil
case lex.isKeyword("avg"):
sas, err := parseStatsAvg(lex)
case lex.isKeyword("quantile"):
sqs, err := parseStatsQuantile(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'avg' func: %w", err)
return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err)
}
return sas, nil
return sqs, nil
case lex.isKeyword("sum"):
sss, err := parseStatsSum(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' func: %w", err)
}
return sss, nil
case lex.isKeyword("sum_len"):
sss, err := parseStatsSumLen(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum_len' func: %w", err)
}
return sss, nil
case lex.isKeyword("uniq_values"):
sus, err := parseStatsUniqValues(lex)
if err != nil {
@ -607,24 +631,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'values' func: %w", err)
}
return svs, nil
case lex.isKeyword("sum_len"):
sss, err := parseStatsSumLen(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum_len' func: %w", err)
}
return sss, nil
case lex.isKeyword("quantile"):
sqs, err := parseStatsQuantile(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err)
}
return sqs, nil
case lex.isKeyword("median"):
sms, err := parseStatsMedian(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'median' func: %w", err)
}
return sms, nil
default:
return nil, fmt.Errorf("unknown stats func %q", lex.token)
}

View file

@ -2,6 +2,7 @@ package logstorage
import (
"fmt"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@ -24,8 +25,8 @@ func (f *Field) Reset() {
// String returns string representation of f.
func (f *Field) String() string {
name := getCanonicalColumnName(f.Name)
return fmt.Sprintf("%q:%q", name, f.Value)
x := f.marshalToJSON(nil)
return string(x)
}
func (f *Field) marshal(dst []byte) []byte {
@ -56,6 +57,27 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
return src, nil
}
func (f *Field) marshalToJSON(dst []byte) []byte {
dst = strconv.AppendQuote(dst, f.Name)
dst = append(dst, ':')
dst = strconv.AppendQuote(dst, f.Value)
return dst
}
func marshalFieldsToJSON(dst []byte, fields []Field) []byte {
dst = append(dst, '{')
if len(fields) > 0 {
dst = fields[0].marshalToJSON(dst)
fields = fields[1:]
for i := range fields {
dst = append(dst, ',')
dst = fields[i].marshalToJSON(dst)
}
}
dst = append(dst, '}')
return dst
}
func appendFields(a *arena, dst, src []Field) []Field {
for _, f := range src {
dst = append(dst, Field{
@ -126,10 +148,3 @@ func (rs *rows) mergeRows(timestampsA, timestampsB []int64, fieldsA, fieldsB [][
rs.appendRows(timestampsA, fieldsA)
}
}
func getCanonicalColumnName(columnName string) string {
if columnName == "" {
return "_msg"
}
return columnName
}

View file

@ -0,0 +1,236 @@
package logstorage
import (
"fmt"
"math"
"slices"
"strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type statsFieldsMin struct {
srcField string
resultFields []string
}
func (sm *statsFieldsMin) String() string {
s := "fields_min(" + quoteTokenIfNeeded(sm.srcField)
if len(sm.resultFields) > 0 {
s += ", " + fieldNamesString(sm.resultFields)
}
s += ")"
return s
}
func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) {
neededFields.add(sm.srcField)
neededFields.addFields(sm.resultFields)
}
func (sm *statsFieldsMin) newStatsProcessor() (statsProcessor, int) {
smp := &statsFieldsMinProcessor{
sm: sm,
}
return smp, int(unsafe.Sizeof(*smp))
}
type statsFieldsMinProcessor struct {
sm *statsFieldsMin
min string
fields []Field
}
func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField)
if c.isConst {
v := c.valuesEncoded[0]
stateSizeIncrease += smp.updateState(v, br, 0)
return stateSizeIncrease
}
if c.isTime {
bb := bbPool.Get()
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0])
v := bytesutil.ToUnsafeString(bb.B)
stateSizeIncrease += smp.updateState(v, br, 0)
bbPool.Put(bb)
return stateSizeIncrease
}
needUpdateState := false
switch c.valueType {
case valueTypeString:
needUpdateState = true
case valueTypeDict:
for _, v := range c.dictValues {
if smp.needUpdateStateString(v) {
needUpdateState = true
break
}
}
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.minValue)
needUpdateState = smp.needUpdateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeFloat64:
f := math.Float64frombits(c.minValue)
bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], f)
needUpdateState = smp.needUpdateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeIPv4:
bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue))
needUpdateState = smp.needUpdateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeTimestampISO8601:
bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue))
needUpdateState = smp.needUpdateStateBytes(bb.B)
bbPool.Put(bb)
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
}
if needUpdateState {
values := c.getValues(br)
for i, v := range values {
stateSizeIncrease += smp.updateState(v, br, i)
}
}
return stateSizeIncrease
}
func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField)
if c.isConst {
v := c.valuesEncoded[0]
stateSizeIncrease += smp.updateState(v, br, rowIdx)
return stateSizeIncrease
}
if c.isTime {
bb := bbPool.Get()
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx])
v := bytesutil.ToUnsafeString(bb.B)
stateSizeIncrease += smp.updateState(v, br, rowIdx)
bbPool.Put(bb)
return stateSizeIncrease
}
v := c.getValueAtRow(br, rowIdx)
stateSizeIncrease += smp.updateState(v, br, rowIdx)
return stateSizeIncrease
}
func (smp *statsFieldsMinProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsFieldsMinProcessor)
if smp.needUpdateStateString(src.min) {
smp.min = src.min
smp.fields = src.fields
}
}
func (smp *statsFieldsMinProcessor) needUpdateStateBytes(b []byte) bool {
v := bytesutil.ToUnsafeString(b)
return smp.needUpdateStateString(v)
}
func (smp *statsFieldsMinProcessor) needUpdateStateString(v string) bool {
if v == "" {
return false
}
return smp.min == "" || lessString(v, smp.min)
}
func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
if !smp.needUpdateStateString(v) {
// There is no need in updating state
return stateSizeIncrease
}
stateSizeIncrease -= len(smp.min)
stateSizeIncrease += len(v)
smp.min = strings.Clone(v)
fields := smp.fields
for _, f := range fields {
stateSizeIncrease -= len(f.Name) + len(f.Value)
}
clear(fields)
fields = fields[:0]
if len(smp.sm.resultFields) == 0 {
cs := br.getColumns()
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
})
stateSizeIncrease += len(c.name) + len(v)
}
} else {
for _, field := range smp.sm.resultFields {
c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
})
stateSizeIncrease += len(c.name) + len(v)
}
}
smp.fields = fields
return stateSizeIncrease
}
func (smp *statsFieldsMinProcessor) finalizeStats() string {
bb := bbPool.Get()
bb.B = marshalFieldsToJSON(bb.B, smp.fields)
result := string(bb.B)
bbPool.Put(bb)
return result
}
func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) {
if !lex.isKeyword("fields_min") {
return nil, fmt.Errorf("unexpected func; got %q; want 'fields_min'", lex.token)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_min' args: %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("missing first arg for 'fields_min' func - source field")
}
srcField := fields[0]
resultFields := fields[1:]
if slices.Contains(resultFields, "*") {
resultFields = nil
}
sm := &statsFieldsMin{
srcField: srcField,
resultFields: resultFields,
}
return sm, nil
}

View file

@ -0,0 +1,285 @@
package logstorage
import (
"testing"
)
func TestParseStatsFieldsMinSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncSuccess(t, pipeStr)
}
f(`fields_min(foo)`)
f(`fields_min(foo, bar)`)
f(`fields_min(foo, bar, baz)`)
}
func TestParseStatsFieldsMinFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncFailure(t, pipeStr)
}
f(`fields_min`)
f(`fields_min()`)
f(`fields_min(x) bar`)
}
func TestStatsFieldsMin(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("stats fields_min(a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", `{"_msg":"def","a":"1"}`},
},
})
f("stats fields_min(foo) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", `{}`},
},
})
f("stats fields_min(b, a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `54`},
{"c", "1232"},
},
}, [][]Field{
{
{"x", `{"a":"2"}`},
},
})
f("stats fields_min(b, a, x, b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `54`},
{"c", "1232"},
},
}, [][]Field{
{
{"x", `{"a":"2","x":"","b":"3"}`},
},
})
f("stats fields_min(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", `{"_msg":"abc","a":"2","b":"3"}`},
},
})
f("stats by (b) fields_min(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `-12.34`},
{"b", "3"},
},
{
{"a", `3`},
{"c", `54`},
},
}, [][]Field{
{
{"b", "3"},
{"x", `{"_msg":"def","a":"-12.34","b":"3"}`},
},
{
{"b", ""},
{"x", `{}`},
},
})
f("stats by (a) fields_min(b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `5`},
},
{
{"a", `3`},
{"b", `7`},
},
}, [][]Field{
{
{"a", "1"},
{"x", `{"_msg":"abc","a":"1","b":"3"}`},
},
{
{"a", "3"},
{"x", `{"a":"3","b":"5"}`},
},
})
f("stats by (a) fields_min(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"c", `foo`},
},
{
{"a", `3`},
{"b", `7`},
},
}, [][]Field{
{
{"a", "1"},
{"x", `{}`},
},
{
{"a", "3"},
{"x", `{"a":"3","c":"foo"}`},
},
})
f("stats by (a) fields_min(b, c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `34`},
},
{
{"_msg", `def`},
{"a", `1`},
{"c", "3"},
},
{
{"a", `3`},
{"b", `5`},
{"c", "foo"},
},
{
{"a", `3`},
{"b", `7`},
},
}, [][]Field{
{
{"a", "1"},
{"x", `{"c":""}`},
},
{
{"a", "3"},
{"x", `{"c":"foo"}`},
},
})
f("stats by (a, b) fields_min(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"c", "foo"},
},
{
{"a", `3`},
{"b", `5`},
{"c", "4"},
},
}, [][]Field{
{
{"a", "1"},
{"b", "3"},
{"x", `{}`},
},
{
{"a", "1"},
{"b", ""},
{"x", `{"_msg":"def","a":"1","c":"foo","b":""}`},
},
{
{"a", "3"},
{"b", "5"},
{"x", `{"a":"3","b":"5","c":"4"}`},
},
})
}

View file

@ -34,7 +34,6 @@ type statsMaxProcessor struct {
sm *statsMax
max string
hasMax bool
}
func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
@ -79,10 +78,8 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsMaxProcessor)
if src.hasMax {
smp.updateStateString(src.max)
}
}
func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) {
if len(br.timestamps) == 0 {
@ -157,17 +154,13 @@ func (smp *statsMaxProcessor) updateStateString(v string) {
if v == "" {
// Skip empty strings
}
if smp.hasMax && !lessString(smp.max, v) {
if smp.max != "" && !lessString(smp.max, v) {
return
}
smp.max = strings.Clone(v)
smp.hasMax = true
}
func (smp *statsMaxProcessor) finalizeStats() string {
if !smp.hasMax {
return ""
}
return smp.max
}

View file

@ -34,7 +34,6 @@ type statsMinProcessor struct {
sm *statsMin
min string
hasMin bool
}
func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
@ -79,10 +78,8 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
func (smp *statsMinProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsMinProcessor)
if src.hasMin {
smp.updateStateString(src.min)
}
}
func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) {
if len(br.timestamps) == 0 {
@ -158,17 +155,13 @@ func (smp *statsMinProcessor) updateStateString(v string) {
// Skip empty strings
return
}
if smp.hasMin && !lessString(v, smp.min) {
if smp.min != "" && !lessString(v, smp.min) {
return
}
smp.min = strings.Clone(v)
smp.hasMin = true
}
func (smp *statsMinProcessor) finalizeStats() string {
if !smp.hasMin {
return ""
}
return smp.min
}