mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
177 lines
4.2 KiB
Go
177 lines
4.2 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"math"
|
|
"strings"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type statsMin struct {
|
|
fields []string
|
|
}
|
|
|
|
func (sm *statsMin) String() string {
|
|
return "min(" + statsFuncFieldsToString(sm.fields) + ")"
|
|
}
|
|
|
|
func (sm *statsMin) updateNeededFields(neededFields fieldsSet) {
|
|
updateNeededFieldsForStatsFunc(neededFields, sm.fields)
|
|
}
|
|
|
|
func (sm *statsMin) newStatsProcessor(a *chunkedAllocator) statsProcessor {
|
|
return a.newStatsMinProcessor()
|
|
}
|
|
|
|
type statsMinProcessor struct {
|
|
min string
|
|
hasItems bool
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStatsForAllRows(sf statsFunc, br *blockResult) int {
|
|
sm := sf.(*statsMin)
|
|
minLen := len(smp.min)
|
|
|
|
fields := sm.fields
|
|
if len(fields) == 0 {
|
|
// Find the minimum value across all the columns
|
|
for _, c := range br.getColumns() {
|
|
smp.updateStateForColumn(br, c)
|
|
}
|
|
} else {
|
|
// Find the minimum value across the requested columns
|
|
for _, field := range fields {
|
|
c := br.getColumnByName(field)
|
|
smp.updateStateForColumn(br, c)
|
|
}
|
|
}
|
|
|
|
return len(smp.min) - minLen
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStatsForRow(sf statsFunc, br *blockResult, rowIdx int) int {
|
|
sm := sf.(*statsMin)
|
|
minLen := len(smp.min)
|
|
|
|
fields := sm.fields
|
|
if len(fields) == 0 {
|
|
// Find the minimum value across all the fields for the given row
|
|
for _, c := range br.getColumns() {
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
smp.updateStateString(v)
|
|
}
|
|
} else {
|
|
// Find the minimum value across the requested fields for the given row
|
|
for _, field := range fields {
|
|
c := br.getColumnByName(field)
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
smp.updateStateString(v)
|
|
}
|
|
}
|
|
|
|
return minLen - len(smp.min)
|
|
}
|
|
|
|
func (smp *statsMinProcessor) mergeState(_ *chunkedAllocator, _ statsFunc, sfp statsProcessor) {
|
|
src := sfp.(*statsMinProcessor)
|
|
if src.hasItems {
|
|
smp.updateStateString(src.min)
|
|
}
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) {
|
|
if c.isTime {
|
|
timestamp, ok := TryParseTimestampRFC3339Nano(smp.min)
|
|
if !ok {
|
|
timestamp = (1 << 63) - 1
|
|
}
|
|
minTimestamp := br.getMinTimestamp(timestamp)
|
|
if minTimestamp >= timestamp {
|
|
return
|
|
}
|
|
|
|
bb := bbPool.Get()
|
|
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp)
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
|
|
return
|
|
}
|
|
if c.isConst {
|
|
// Special case for const column
|
|
v := c.valuesEncoded[0]
|
|
smp.updateStateString(v)
|
|
return
|
|
}
|
|
|
|
switch c.valueType {
|
|
case valueTypeString:
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
smp.updateStateString(v)
|
|
}
|
|
case valueTypeDict:
|
|
c.forEachDictValue(br, func(v string) {
|
|
smp.updateStateString(v)
|
|
})
|
|
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalUint64String(bb.B[:0], c.minValue)
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeInt64:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalInt64String(bb.B[:0], int64(c.minValue))
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeFloat64:
|
|
f := math.Float64frombits(c.minValue)
|
|
bb := bbPool.Get()
|
|
bb.B = marshalFloat64String(bb.B[:0], f)
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeIPv4:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue))
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeTimestampISO8601:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue))
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
default:
|
|
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
|
}
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStateBytes(b []byte) {
|
|
v := bytesutil.ToUnsafeString(b)
|
|
smp.updateStateString(v)
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStateString(v string) {
|
|
if !smp.hasItems {
|
|
smp.min = strings.Clone(v)
|
|
smp.hasItems = true
|
|
return
|
|
}
|
|
if lessString(v, smp.min) {
|
|
smp.min = strings.Clone(v)
|
|
}
|
|
}
|
|
|
|
func (smp *statsMinProcessor) finalizeStats(_ statsFunc, dst []byte, _ <-chan struct{}) []byte {
|
|
return append(dst, smp.min...)
|
|
}
|
|
|
|
func parseStatsMin(lex *lexer) (*statsMin, error) {
|
|
fields, err := parseStatsFuncFields(lex, "min")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sm := &statsMin{
|
|
fields: fields,
|
|
}
|
|
return sm, nil
|
|
}
|