mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
![Aliaksandr Valialkin](/assets/img/avatar_default.png)
This allows reducing the state of every statsProcessor by removing pointer to the corresponding statsFunc. For example, this reduces statsCountProcessor size by 2x.
181 lines
4.2 KiB
Go
181 lines
4.2 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"math"
|
|
"strings"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
type statsMin struct {
|
|
fields []string
|
|
}
|
|
|
|
func (sm *statsMin) String() string {
|
|
return "min(" + statsFuncFieldsToString(sm.fields) + ")"
|
|
}
|
|
|
|
func (sm *statsMin) updateNeededFields(neededFields fieldsSet) {
|
|
updateNeededFieldsForStatsFunc(neededFields, sm.fields)
|
|
}
|
|
|
|
func (sm *statsMin) newStatsProcessor(a *chunkedAllocator) statsProcessor {
|
|
return a.newStatsMinProcessor()
|
|
}
|
|
|
|
type statsMinProcessor struct {
|
|
min string
|
|
hasItems bool
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStatsForAllRows(sf statsFunc, br *blockResult) int {
|
|
sm := sf.(*statsMin)
|
|
minLen := len(smp.min)
|
|
|
|
fields := sm.fields
|
|
if len(fields) == 0 {
|
|
// Find the minimum value across all the columns
|
|
for _, c := range br.getColumns() {
|
|
smp.updateStateForColumn(br, c)
|
|
}
|
|
} else {
|
|
// Find the minimum value across the requested columns
|
|
for _, field := range fields {
|
|
c := br.getColumnByName(field)
|
|
smp.updateStateForColumn(br, c)
|
|
}
|
|
}
|
|
|
|
return len(smp.min) - minLen
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStatsForRow(sf statsFunc, br *blockResult, rowIdx int) int {
|
|
sm := sf.(*statsMin)
|
|
minLen := len(smp.min)
|
|
|
|
fields := sm.fields
|
|
if len(fields) == 0 {
|
|
// Find the minimum value across all the fields for the given row
|
|
for _, c := range br.getColumns() {
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
smp.updateStateString(v)
|
|
}
|
|
} else {
|
|
// Find the minimum value across the requested fields for the given row
|
|
for _, field := range fields {
|
|
c := br.getColumnByName(field)
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
smp.updateStateString(v)
|
|
}
|
|
}
|
|
|
|
return minLen - len(smp.min)
|
|
}
|
|
|
|
func (smp *statsMinProcessor) mergeState(_ statsFunc, sfp statsProcessor) {
|
|
src := sfp.(*statsMinProcessor)
|
|
if src.hasItems {
|
|
smp.updateStateString(src.min)
|
|
}
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) {
|
|
if br.rowsLen == 0 {
|
|
return
|
|
}
|
|
|
|
if c.isTime {
|
|
timestamp, ok := TryParseTimestampRFC3339Nano(smp.min)
|
|
if !ok {
|
|
timestamp = (1 << 63) - 1
|
|
}
|
|
minTimestamp := br.getMinTimestamp(timestamp)
|
|
if minTimestamp >= timestamp {
|
|
return
|
|
}
|
|
|
|
bb := bbPool.Get()
|
|
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp)
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
|
|
return
|
|
}
|
|
if c.isConst {
|
|
// Special case for const column
|
|
v := c.valuesEncoded[0]
|
|
smp.updateStateString(v)
|
|
return
|
|
}
|
|
|
|
switch c.valueType {
|
|
case valueTypeString:
|
|
for _, v := range c.getValuesEncoded(br) {
|
|
smp.updateStateString(v)
|
|
}
|
|
case valueTypeDict:
|
|
c.forEachDictValue(br, func(v string) {
|
|
smp.updateStateString(v)
|
|
})
|
|
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalUint64String(bb.B[:0], c.minValue)
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeInt64:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalInt64String(bb.B[:0], int64(c.minValue))
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeFloat64:
|
|
f := math.Float64frombits(c.minValue)
|
|
bb := bbPool.Get()
|
|
bb.B = marshalFloat64String(bb.B[:0], f)
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeIPv4:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue))
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
case valueTypeTimestampISO8601:
|
|
bb := bbPool.Get()
|
|
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue))
|
|
smp.updateStateBytes(bb.B)
|
|
bbPool.Put(bb)
|
|
default:
|
|
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
|
}
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStateBytes(b []byte) {
|
|
v := bytesutil.ToUnsafeString(b)
|
|
smp.updateStateString(v)
|
|
}
|
|
|
|
func (smp *statsMinProcessor) updateStateString(v string) {
|
|
if !smp.hasItems {
|
|
smp.min = strings.Clone(v)
|
|
smp.hasItems = true
|
|
return
|
|
}
|
|
if lessString(v, smp.min) {
|
|
smp.min = strings.Clone(v)
|
|
}
|
|
}
|
|
|
|
func (smp *statsMinProcessor) finalizeStats(_ statsFunc, dst []byte, _ <-chan struct{}) []byte {
|
|
return append(dst, smp.min...)
|
|
}
|
|
|
|
func parseStatsMin(lex *lexer) (*statsMin, error) {
|
|
fields, err := parseStatsFuncFields(lex, "min")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sm := &statsMin{
|
|
fields: fields,
|
|
}
|
|
return sm, nil
|
|
}
|