mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
02f30898e1
commit
c4b68956fe
3 changed files with 528 additions and 0 deletions
|
@ -577,6 +577,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
|
|||
return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err)
|
||||
}
|
||||
return sus, nil
|
||||
case lex.isKeyword("fields_max"):
|
||||
sms, err := parseStatsFieldsMax(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'fields_max' func: %w", err)
|
||||
}
|
||||
return sms, nil
|
||||
case lex.isKeyword("fields_min"):
|
||||
sms, err := parseStatsFieldsMin(lex)
|
||||
if err != nil {
|
||||
|
|
236
lib/logstorage/stats_fields_max.go
Normal file
236
lib/logstorage/stats_fields_max.go
Normal file
|
@ -0,0 +1,236 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
type statsFieldsMax struct {
|
||||
srcField string
|
||||
|
||||
resultFields []string
|
||||
}
|
||||
|
||||
func (sm *statsFieldsMax) String() string {
|
||||
s := "fields_max(" + quoteTokenIfNeeded(sm.srcField)
|
||||
if len(sm.resultFields) > 0 {
|
||||
s += ", " + fieldNamesString(sm.resultFields)
|
||||
}
|
||||
s += ")"
|
||||
return s
|
||||
}
|
||||
|
||||
func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) {
|
||||
neededFields.add(sm.srcField)
|
||||
neededFields.addFields(sm.resultFields)
|
||||
}
|
||||
|
||||
func (sm *statsFieldsMax) newStatsProcessor() (statsProcessor, int) {
|
||||
smp := &statsFieldsMaxProcessor{
|
||||
sm: sm,
|
||||
}
|
||||
return smp, int(unsafe.Sizeof(*smp))
|
||||
}
|
||||
|
||||
type statsFieldsMaxProcessor struct {
|
||||
sm *statsFieldsMax
|
||||
|
||||
max string
|
||||
|
||||
fields []Field
|
||||
}
|
||||
|
||||
func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||
stateSizeIncrease := 0
|
||||
|
||||
c := br.getColumnByName(smp.sm.srcField)
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
stateSizeIncrease += smp.updateState(v, br, 0)
|
||||
return stateSizeIncrease
|
||||
}
|
||||
if c.isTime {
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[0])
|
||||
v := bytesutil.ToUnsafeString(bb.B)
|
||||
stateSizeIncrease += smp.updateState(v, br, 0)
|
||||
bbPool.Put(bb)
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
needUpdateState := false
|
||||
switch c.valueType {
|
||||
case valueTypeString:
|
||||
needUpdateState = true
|
||||
case valueTypeDict:
|
||||
for _, v := range c.dictValues {
|
||||
if smp.needUpdateStateString(v) {
|
||||
needUpdateState = true
|
||||
break
|
||||
}
|
||||
}
|
||||
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalUint64String(bb.B[:0], c.maxValue)
|
||||
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
||||
bbPool.Put(bb)
|
||||
case valueTypeFloat64:
|
||||
f := math.Float64frombits(c.maxValue)
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalFloat64String(bb.B[:0], f)
|
||||
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
||||
bbPool.Put(bb)
|
||||
case valueTypeIPv4:
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue))
|
||||
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
||||
bbPool.Put(bb)
|
||||
case valueTypeTimestampISO8601:
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue))
|
||||
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
||||
bbPool.Put(bb)
|
||||
default:
|
||||
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
||||
}
|
||||
|
||||
if needUpdateState {
|
||||
values := c.getValues(br)
|
||||
for i, v := range values {
|
||||
stateSizeIncrease += smp.updateState(v, br, i)
|
||||
}
|
||||
}
|
||||
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
||||
stateSizeIncrease := 0
|
||||
|
||||
c := br.getColumnByName(smp.sm.srcField)
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
stateSizeIncrease += smp.updateState(v, br, rowIdx)
|
||||
return stateSizeIncrease
|
||||
}
|
||||
if c.isTime {
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], br.timestamps[rowIdx])
|
||||
v := bytesutil.ToUnsafeString(bb.B)
|
||||
stateSizeIncrease += smp.updateState(v, br, rowIdx)
|
||||
bbPool.Put(bb)
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
v := c.getValueAtRow(br, rowIdx)
|
||||
stateSizeIncrease += smp.updateState(v, br, rowIdx)
|
||||
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
func (smp *statsFieldsMaxProcessor) mergeState(sfp statsProcessor) {
|
||||
src := sfp.(*statsFieldsMaxProcessor)
|
||||
if smp.needUpdateStateString(src.max) {
|
||||
smp.max = src.max
|
||||
smp.fields = src.fields
|
||||
}
|
||||
}
|
||||
|
||||
func (smp *statsFieldsMaxProcessor) needUpdateStateBytes(b []byte) bool {
|
||||
v := bytesutil.ToUnsafeString(b)
|
||||
return smp.needUpdateStateString(v)
|
||||
}
|
||||
|
||||
func (smp *statsFieldsMaxProcessor) needUpdateStateString(v string) bool {
|
||||
if v == "" {
|
||||
return false
|
||||
}
|
||||
return smp.max == "" || lessString(smp.max, v)
|
||||
}
|
||||
|
||||
func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int {
|
||||
stateSizeIncrease := 0
|
||||
|
||||
if !smp.needUpdateStateString(v) {
|
||||
// There is no need in updating state
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
stateSizeIncrease -= len(smp.max)
|
||||
stateSizeIncrease += len(v)
|
||||
smp.max = strings.Clone(v)
|
||||
|
||||
fields := smp.fields
|
||||
for _, f := range fields {
|
||||
stateSizeIncrease -= len(f.Name) + len(f.Value)
|
||||
}
|
||||
|
||||
clear(fields)
|
||||
fields = fields[:0]
|
||||
if len(smp.sm.resultFields) == 0 {
|
||||
cs := br.getColumns()
|
||||
for _, c := range cs {
|
||||
v := c.getValueAtRow(br, rowIdx)
|
||||
fields = append(fields, Field{
|
||||
Name: strings.Clone(c.name),
|
||||
Value: strings.Clone(v),
|
||||
})
|
||||
stateSizeIncrease += len(c.name) + len(v)
|
||||
}
|
||||
} else {
|
||||
for _, field := range smp.sm.resultFields {
|
||||
c := br.getColumnByName(field)
|
||||
v := c.getValueAtRow(br, rowIdx)
|
||||
fields = append(fields, Field{
|
||||
Name: strings.Clone(c.name),
|
||||
Value: strings.Clone(v),
|
||||
})
|
||||
stateSizeIncrease += len(c.name) + len(v)
|
||||
}
|
||||
}
|
||||
smp.fields = fields
|
||||
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
func (smp *statsFieldsMaxProcessor) finalizeStats() string {
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalFieldsToJSON(bb.B, smp.fields)
|
||||
result := string(bb.B)
|
||||
bbPool.Put(bb)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) {
|
||||
if !lex.isKeyword("fields_max") {
|
||||
return nil, fmt.Errorf("unexpected func; got %q; want 'fields_max'", lex.token)
|
||||
}
|
||||
lex.nextToken()
|
||||
fields, err := parseFieldNamesInParens(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'fields_max' args: %w", err)
|
||||
}
|
||||
|
||||
if len(fields) == 0 {
|
||||
return nil, fmt.Errorf("missing first arg for 'fields_max' func - source field")
|
||||
}
|
||||
|
||||
srcField := fields[0]
|
||||
resultFields := fields[1:]
|
||||
if slices.Contains(resultFields, "*") {
|
||||
resultFields = nil
|
||||
}
|
||||
|
||||
sm := &statsFieldsMax{
|
||||
srcField: srcField,
|
||||
resultFields: resultFields,
|
||||
}
|
||||
return sm, nil
|
||||
}
|
286
lib/logstorage/stats_fields_max_test.go
Normal file
286
lib/logstorage/stats_fields_max_test.go
Normal file
|
@ -0,0 +1,286 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseStatsFieldsMaxSuccess(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParseStatsFuncSuccess(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`fields_max(foo)`)
|
||||
f(`fields_max(foo, bar)`)
|
||||
f(`fields_max(foo, bar, baz)`)
|
||||
}
|
||||
|
||||
func TestParseStatsFieldsMaxFailure(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParseStatsFuncFailure(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`fields_max`)
|
||||
f(`fields_max()`)
|
||||
f(`fields_max(x) bar`)
|
||||
}
|
||||
|
||||
func TestStatsFieldsMax(t *testing.T) {
|
||||
f := func(pipeStr string, rows, rowsExpected [][]Field) {
|
||||
t.Helper()
|
||||
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||
}
|
||||
|
||||
f("stats fields_max(a) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `2`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `54`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `{"a":"3","b":"54"}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats fields_max(foo) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `2`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `54`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `{}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats fields_max(b, a) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `2`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `54`},
|
||||
{"c", "1232"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `{"a":"3"}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats fields_max(b, a, x, b) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `2`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `54`},
|
||||
{"c", "1232"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `{"a":"3","x":"","b":"54"}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats fields_max(a) if (b:*) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `2`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `54`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `{"a":"3","b":"54"}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats by (b) fields_max(a) if (b:*) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `2`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `-12.34`},
|
||||
{"b", "3"},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"c", `54`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"b", "3"},
|
||||
{"x", `{"_msg":"abc","a":"2","b":"3"}`},
|
||||
},
|
||||
{
|
||||
{"b", ""},
|
||||
{"x", `{}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats by (a) fields_max(b) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `1`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `5`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `7`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"a", "1"},
|
||||
{"x", `{"_msg":"abc","a":"1","b":"3"}`},
|
||||
},
|
||||
{
|
||||
{"a", "3"},
|
||||
{"x", `{"a":"3","b":"7"}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats by (a) fields_max(c) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `1`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"c", `foo`},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `7`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"a", "1"},
|
||||
{"x", `{}`},
|
||||
},
|
||||
{
|
||||
{"a", "3"},
|
||||
{"x", `{"a":"3","c":"foo"}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats by (a) fields_max(b, c) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `1`},
|
||||
{"b", `34`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
{"c", "3"},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `5`},
|
||||
{"c", "foo"},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `7`},
|
||||
{"c", "bar"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"a", "1"},
|
||||
{"x", `{"c":""}`},
|
||||
},
|
||||
{
|
||||
{"a", "3"},
|
||||
{"x", `{"c":"bar"}`},
|
||||
},
|
||||
})
|
||||
|
||||
f("stats by (a, b) fields_max(c) as x", [][]Field{
|
||||
{
|
||||
{"_msg", `abc`},
|
||||
{"a", `1`},
|
||||
{"b", `3`},
|
||||
},
|
||||
{
|
||||
{"_msg", `def`},
|
||||
{"a", `1`},
|
||||
{"c", "foo"},
|
||||
},
|
||||
{
|
||||
{"a", `3`},
|
||||
{"b", `5`},
|
||||
{"c", "4"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"a", "1"},
|
||||
{"b", "3"},
|
||||
{"x", `{}`},
|
||||
},
|
||||
{
|
||||
{"a", "1"},
|
||||
{"b", ""},
|
||||
{"x", `{"_msg":"def","a":"1","c":"foo","b":""}`},
|
||||
},
|
||||
{
|
||||
{"a", "3"},
|
||||
{"b", "5"},
|
||||
{"x", `{"a":"3","b":"5","c":"4"}`},
|
||||
},
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue