2024-05-22 19:01:20 +00:00
|
|
|
package logstorage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"math"
|
|
|
|
"slices"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
)
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
type statsRowMax struct {
|
2024-05-22 19:01:20 +00:00
|
|
|
srcField string
|
|
|
|
|
|
|
|
fetchFields []string
|
|
|
|
}
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
func (sm *statsRowMax) String() string {
|
|
|
|
s := "row_max(" + quoteTokenIfNeeded(sm.srcField)
|
2024-05-22 19:01:20 +00:00
|
|
|
if len(sm.fetchFields) > 0 {
|
|
|
|
s += ", " + fieldNamesString(sm.fetchFields)
|
|
|
|
}
|
|
|
|
s += ")"
|
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
func (sm *statsRowMax) updateNeededFields(neededFields fieldsSet) {
|
2024-05-22 19:01:20 +00:00
|
|
|
if len(sm.fetchFields) == 0 {
|
|
|
|
neededFields.add("*")
|
|
|
|
} else {
|
|
|
|
neededFields.addFields(sm.fetchFields)
|
|
|
|
}
|
|
|
|
neededFields.add(sm.srcField)
|
|
|
|
}
|
|
|
|
|
2024-12-22 01:09:36 +00:00
|
|
|
func (sm *statsRowMax) newStatsProcessor(a *chunkedAllocator) statsProcessor {
|
2025-01-13 03:48:19 +00:00
|
|
|
return a.newStatsRowMaxProcessor()
|
2024-05-22 19:01:20 +00:00
|
|
|
}
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
type statsRowMaxProcessor struct {
|
2024-05-22 19:01:20 +00:00
|
|
|
max string
|
|
|
|
|
|
|
|
fields []Field
|
|
|
|
}
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
func (smp *statsRowMaxProcessor) updateStatsForAllRows(sf statsFunc, br *blockResult) int {
|
|
|
|
sm := sf.(*statsRowMax)
|
2024-05-22 19:01:20 +00:00
|
|
|
stateSizeIncrease := 0
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
c := br.getColumnByName(sm.srcField)
|
2024-05-22 19:01:20 +00:00
|
|
|
if c.isConst {
|
|
|
|
v := c.valuesEncoded[0]
|
2025-01-13 03:48:19 +00:00
|
|
|
stateSizeIncrease += smp.updateState(sm, v, br, 0)
|
2024-05-22 19:01:20 +00:00
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
if c.isTime {
|
2024-09-26 20:22:21 +00:00
|
|
|
timestamp, ok := TryParseTimestampRFC3339Nano(smp.max)
|
|
|
|
if !ok {
|
|
|
|
timestamp = -1 << 63
|
|
|
|
}
|
|
|
|
maxTimestamp := br.getMaxTimestamp(timestamp)
|
|
|
|
if maxTimestamp <= timestamp {
|
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
|
2024-05-22 19:01:20 +00:00
|
|
|
bb := bbPool.Get()
|
2024-09-25 14:16:53 +00:00
|
|
|
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp)
|
2024-05-22 19:01:20 +00:00
|
|
|
v := bytesutil.ToUnsafeString(bb.B)
|
2025-01-13 03:48:19 +00:00
|
|
|
stateSizeIncrease += smp.updateState(sm, v, br, 0)
|
2024-05-22 19:01:20 +00:00
|
|
|
bbPool.Put(bb)
|
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
|
|
|
|
needUpdateState := false
|
|
|
|
switch c.valueType {
|
|
|
|
case valueTypeString:
|
|
|
|
needUpdateState = true
|
|
|
|
case valueTypeDict:
|
2024-11-08 22:17:38 +00:00
|
|
|
c.forEachDictValue(br, func(v string) {
|
|
|
|
if !needUpdateState && smp.needUpdateStateString(v) {
|
2024-05-22 19:01:20 +00:00
|
|
|
needUpdateState = true
|
|
|
|
}
|
2024-11-08 22:17:38 +00:00
|
|
|
})
|
2024-05-22 19:01:20 +00:00
|
|
|
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
|
|
|
|
bb := bbPool.Get()
|
|
|
|
bb.B = marshalUint64String(bb.B[:0], c.maxValue)
|
|
|
|
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
|
|
|
bbPool.Put(bb)
|
2025-01-11 22:15:00 +00:00
|
|
|
case valueTypeInt64:
|
|
|
|
bb := bbPool.Get()
|
|
|
|
bb.B = marshalInt64String(bb.B[:0], int64(c.maxValue))
|
|
|
|
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
|
|
|
bbPool.Put(bb)
|
2024-05-22 19:01:20 +00:00
|
|
|
case valueTypeFloat64:
|
|
|
|
f := math.Float64frombits(c.maxValue)
|
|
|
|
bb := bbPool.Get()
|
|
|
|
bb.B = marshalFloat64String(bb.B[:0], f)
|
|
|
|
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
|
|
|
bbPool.Put(bb)
|
|
|
|
case valueTypeIPv4:
|
|
|
|
bb := bbPool.Get()
|
|
|
|
bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue))
|
|
|
|
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
|
|
|
bbPool.Put(bb)
|
|
|
|
case valueTypeTimestampISO8601:
|
|
|
|
bb := bbPool.Get()
|
|
|
|
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue))
|
|
|
|
needUpdateState = smp.needUpdateStateBytes(bb.B)
|
|
|
|
bbPool.Put(bb)
|
|
|
|
default:
|
|
|
|
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
|
|
|
}
|
|
|
|
|
|
|
|
if needUpdateState {
|
|
|
|
values := c.getValues(br)
|
|
|
|
for i, v := range values {
|
2025-01-13 03:48:19 +00:00
|
|
|
stateSizeIncrease += smp.updateState(sm, v, br, i)
|
2024-05-22 19:01:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
func (smp *statsRowMaxProcessor) updateStatsForRow(sf statsFunc, br *blockResult, rowIdx int) int {
|
|
|
|
sm := sf.(*statsRowMax)
|
2024-05-22 19:01:20 +00:00
|
|
|
stateSizeIncrease := 0
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
c := br.getColumnByName(sm.srcField)
|
2024-05-22 19:01:20 +00:00
|
|
|
if c.isConst {
|
|
|
|
v := c.valuesEncoded[0]
|
2025-01-13 03:48:19 +00:00
|
|
|
stateSizeIncrease += smp.updateState(sm, v, br, rowIdx)
|
2024-05-22 19:01:20 +00:00
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
if c.isTime {
|
2024-09-25 14:16:53 +00:00
|
|
|
timestamps := br.getTimestamps()
|
2024-05-22 19:01:20 +00:00
|
|
|
bb := bbPool.Get()
|
2024-09-25 14:16:53 +00:00
|
|
|
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], timestamps[rowIdx])
|
2024-05-22 19:01:20 +00:00
|
|
|
v := bytesutil.ToUnsafeString(bb.B)
|
2025-01-13 03:48:19 +00:00
|
|
|
stateSizeIncrease += smp.updateState(sm, v, br, rowIdx)
|
2024-05-22 19:01:20 +00:00
|
|
|
bbPool.Put(bb)
|
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
|
|
|
|
v := c.getValueAtRow(br, rowIdx)
|
2025-01-13 03:48:19 +00:00
|
|
|
stateSizeIncrease += smp.updateState(sm, v, br, rowIdx)
|
2024-05-22 19:01:20 +00:00
|
|
|
|
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
|
2025-02-14 14:30:03 +00:00
|
|
|
func (smp *statsRowMaxProcessor) mergeState(_ *chunkedAllocator, _ statsFunc, sfp statsProcessor) {
|
2024-05-30 14:19:23 +00:00
|
|
|
src := sfp.(*statsRowMaxProcessor)
|
2024-05-22 19:01:20 +00:00
|
|
|
if smp.needUpdateStateString(src.max) {
|
|
|
|
smp.max = src.max
|
|
|
|
smp.fields = src.fields
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
func (smp *statsRowMaxProcessor) needUpdateStateBytes(b []byte) bool {
|
2024-05-22 19:01:20 +00:00
|
|
|
v := bytesutil.ToUnsafeString(b)
|
|
|
|
return smp.needUpdateStateString(v)
|
|
|
|
}
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
func (smp *statsRowMaxProcessor) needUpdateStateString(v string) bool {
|
2024-05-22 19:01:20 +00:00
|
|
|
if v == "" {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return smp.max == "" || lessString(smp.max, v)
|
|
|
|
}
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
func (smp *statsRowMaxProcessor) updateState(sm *statsRowMax, v string, br *blockResult, rowIdx int) int {
|
2024-05-22 19:01:20 +00:00
|
|
|
stateSizeIncrease := 0
|
|
|
|
|
|
|
|
if !smp.needUpdateStateString(v) {
|
|
|
|
// There is no need in updating state
|
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
|
|
|
|
stateSizeIncrease -= len(smp.max)
|
|
|
|
stateSizeIncrease += len(v)
|
|
|
|
smp.max = strings.Clone(v)
|
|
|
|
|
|
|
|
fields := smp.fields
|
|
|
|
for _, f := range fields {
|
|
|
|
stateSizeIncrease -= len(f.Name) + len(f.Value)
|
|
|
|
}
|
|
|
|
|
|
|
|
clear(fields)
|
|
|
|
fields = fields[:0]
|
2025-01-13 03:48:19 +00:00
|
|
|
fetchFields := sm.fetchFields
|
2024-05-22 19:01:20 +00:00
|
|
|
if len(fetchFields) == 0 {
|
|
|
|
cs := br.getColumns()
|
|
|
|
for _, c := range cs {
|
|
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
|
|
fields = append(fields, Field{
|
|
|
|
Name: strings.Clone(c.name),
|
|
|
|
Value: strings.Clone(v),
|
|
|
|
})
|
|
|
|
stateSizeIncrease += len(c.name) + len(v)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
for _, field := range fetchFields {
|
|
|
|
c := br.getColumnByName(field)
|
|
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
|
|
fields = append(fields, Field{
|
|
|
|
Name: strings.Clone(c.name),
|
|
|
|
Value: strings.Clone(v),
|
|
|
|
})
|
|
|
|
stateSizeIncrease += len(c.name) + len(v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
smp.fields = fields
|
|
|
|
|
|
|
|
return stateSizeIncrease
|
|
|
|
}
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
func (smp *statsRowMaxProcessor) finalizeStats(_ statsFunc, dst []byte, _ <-chan struct{}) []byte {
|
2024-12-17 14:16:03 +00:00
|
|
|
return MarshalFieldsToJSON(dst, smp.fields)
|
2024-05-22 19:01:20 +00:00
|
|
|
}
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
func parseStatsRowMax(lex *lexer) (*statsRowMax, error) {
|
|
|
|
if !lex.isKeyword("row_max") {
|
|
|
|
return nil, fmt.Errorf("unexpected func; got %q; want 'row_max'", lex.token)
|
2024-05-22 19:01:20 +00:00
|
|
|
}
|
|
|
|
lex.nextToken()
|
|
|
|
fields, err := parseFieldNamesInParens(lex)
|
|
|
|
if err != nil {
|
2024-05-30 14:19:23 +00:00
|
|
|
return nil, fmt.Errorf("cannot parse 'row_max' args: %w", err)
|
2024-05-22 19:01:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(fields) == 0 {
|
2024-05-30 14:19:23 +00:00
|
|
|
return nil, fmt.Errorf("missing first arg for 'row_max' func - source field")
|
2024-05-22 19:01:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
srcField := fields[0]
|
|
|
|
fetchFields := fields[1:]
|
|
|
|
if slices.Contains(fetchFields, "*") {
|
|
|
|
fetchFields = nil
|
|
|
|
}
|
|
|
|
|
2024-05-30 14:19:23 +00:00
|
|
|
sm := &statsRowMax{
|
2024-05-22 19:01:20 +00:00
|
|
|
srcField: srcField,
|
|
|
|
fetchFields: fetchFields,
|
|
|
|
}
|
|
|
|
return sm, nil
|
|
|
|
}
|