VictoriaMetrics/lib/logstorage/stats_min.go
Aliaksandr Valialkin 639b3091b5
wip
2024-05-15 15:46:42 +02:00

244 lines
5.4 KiB
Go

package logstorage
import (
"math"
"slices"
"strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type statsMin struct {
fields []string
containsStar bool
}
func (sm *statsMin) String() string {
return "min(" + fieldNamesString(sm.fields) + ")"
}
func (sm *statsMin) neededFields() []string {
return sm.fields
}
func (sm *statsMin) newStatsProcessor() (statsProcessor, int) {
smp := &statsMinProcessor{
sm: sm,
}
return smp, int(unsafe.Sizeof(*smp))
}
type statsMinProcessor struct {
sm *statsMin
min string
hasMin bool
}
func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
minLen := len(smp.min)
if smp.sm.containsStar {
// Find the minimum value across all the columns
for _, c := range br.getColumns() {
smp.updateStateForColumn(br, c)
}
} else {
// Find the minimum value across the requested columns
for _, field := range smp.sm.fields {
c := br.getColumnByName(field)
smp.updateStateForColumn(br, c)
}
}
return len(smp.min) - minLen
}
func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
minLen := len(smp.min)
if smp.sm.containsStar {
// Find the minimum value across all the fields for the given row
for _, c := range br.getColumns() {
v := c.getValueAtRow(br, rowIdx)
smp.updateStateString(v)
}
} else {
// Find the minimum value across the requested fields for the given row
for _, field := range smp.sm.fields {
c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx)
smp.updateStateString(v)
}
}
return minLen - len(smp.min)
}
func (smp *statsMinProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsMinProcessor)
if src.hasMin {
smp.updateStateString(src.min)
}
}
func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) {
if len(br.timestamps) == 0 {
return
}
if c.isTime {
// Special case for time column
timestamps := br.timestamps
minTimestamp := timestamps[0]
for _, timestamp := range timestamps[1:] {
if timestamp < minTimestamp {
minTimestamp = timestamp
}
}
bb := bbPool.Get()
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
return
}
if c.isConst {
// Special case for const column
v := c.encodedValues[0]
smp.updateStateString(v)
return
}
switch c.valueType {
case valueTypeString:
for _, v := range c.encodedValues {
smp.updateStateString(v)
}
case valueTypeDict:
for _, v := range c.dictValues {
smp.updateStateString(v)
}
case valueTypeUint8:
minN := unmarshalUint8(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint8(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint8String(bb.B[:0], minN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint16:
minN := unmarshalUint16(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint16(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint16String(bb.B[:0], minN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint32:
minN := unmarshalUint32(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint32(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint32String(bb.B[:0], minN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeUint64:
minN := unmarshalUint64(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
n := unmarshalUint64(v)
if n < minN {
minN = n
}
}
bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], minN)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeFloat64:
minF := unmarshalFloat64(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
f := unmarshalFloat64(v)
if math.IsNaN(minF) || f < minF {
minF = f
}
}
bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], minF)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeIPv4:
minIP := unmarshalIPv4(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
ip := unmarshalIPv4(v)
if ip < minIP {
minIP = ip
}
}
bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], minIP)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
case valueTypeTimestampISO8601:
minTimestamp := unmarshalTimestampISO8601(c.encodedValues[0])
for _, v := range c.encodedValues[1:] {
timestamp := unmarshalTimestampISO8601(v)
if timestamp < minTimestamp {
minTimestamp = timestamp
}
}
bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], minTimestamp)
smp.updateStateBytes(bb.B)
bbPool.Put(bb)
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
}
}
func (smp *statsMinProcessor) updateStateBytes(b []byte) {
v := bytesutil.ToUnsafeString(b)
smp.updateStateString(v)
}
func (smp *statsMinProcessor) updateStateString(v string) {
if smp.hasMin && !lessString(v, smp.min) {
return
}
smp.min = strings.Clone(v)
smp.hasMin = true
}
func (smp *statsMinProcessor) finalizeStats() string {
if !smp.hasMin {
return "NaN"
}
return smp.min
}
func parseStatsMin(lex *lexer) (*statsMin, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "min")
if err != nil {
return nil, err
}
sm := &statsMin{
fields: fields,
containsStar: slices.Contains(fields, "*"),
}
return sm, nil
}