VictoriaMetrics/lib/logstorage/stats_quantile.go
2024-05-22 21:01:20 +02:00

294 lines
5.7 KiB
Go

package logstorage
import (
"fmt"
"math"
"slices"
"strconv"
"unsafe"
"github.com/valyala/fastrand"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type statsQuantile struct {
fields []string
phi float64
phiStr string
}
func (sq *statsQuantile) String() string {
s := "quantile(" + sq.phiStr
if len(sq.fields) > 0 {
s += ", " + fieldNamesString(sq.fields)
}
s += ")"
return s
}
func (sq *statsQuantile) updateNeededFields(neededFields fieldsSet) {
updateNeededFieldsForStatsFunc(neededFields, sq.fields)
}
func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) {
sqp := &statsQuantileProcessor{
sq: sq,
}
return sqp, int(unsafe.Sizeof(*sqp))
}
type statsQuantileProcessor struct {
sq *statsQuantile
h histogram
}
func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0
fields := sqp.sq.fields
if len(fields) == 0 {
for _, c := range br.getColumns() {
stateSizeIncrease += sqp.updateStateForColumn(br, c)
}
} else {
for _, field := range fields {
c := br.getColumnByName(field)
stateSizeIncrease += sqp.updateStateForColumn(br, c)
}
}
return stateSizeIncrease
}
func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
h := &sqp.h
stateSizeIncrease := 0
fields := sqp.sq.fields
if len(fields) == 0 {
for _, c := range br.getColumns() {
f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok {
stateSizeIncrease += h.update(f)
}
}
} else {
for _, field := range fields {
c := br.getColumnByName(field)
f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok {
stateSizeIncrease += h.update(f)
}
}
}
return stateSizeIncrease
}
func (sqp *statsQuantileProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) int {
h := &sqp.h
stateSizeIncrease := 0
if c.isConst {
f, ok := tryParseFloat64(c.valuesEncoded[0])
if ok {
for range br.timestamps {
stateSizeIncrease += h.update(f)
}
}
return stateSizeIncrease
}
if c.isTime {
return 0
}
switch c.valueType {
case valueTypeString:
for _, v := range c.getValues(br) {
f, ok := tryParseFloat64(v)
if ok {
stateSizeIncrease += h.update(f)
}
}
case valueTypeDict:
dictValues := c.dictValues
a := encoding.GetFloat64s(len(dictValues))
for i, v := range dictValues {
f, ok := tryParseFloat64(v)
if !ok {
f = nan
}
a.A[i] = f
}
for _, v := range c.getValuesEncoded(br) {
idx := v[0]
f := a.A[idx]
if !math.IsNaN(f) {
h.update(f)
}
}
encoding.PutFloat64s(a)
case valueTypeUint8:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint8(v)
h.update(float64(n))
}
case valueTypeUint16:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint16(v)
h.update(float64(n))
}
case valueTypeUint32:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint32(v)
h.update(float64(n))
}
case valueTypeUint64:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint64(v)
h.update(float64(n))
}
case valueTypeFloat64:
for _, v := range c.getValuesEncoded(br) {
f := unmarshalFloat64(v)
if !math.IsNaN(f) {
h.update(f)
}
}
case valueTypeIPv4:
case valueTypeTimestampISO8601:
default:
logger.Panicf("BUG: unexpected valueType=%d", c.valueType)
}
return stateSizeIncrease
}
func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsQuantileProcessor)
sqp.h.mergeState(&src.h)
}
func (sqp *statsQuantileProcessor) finalizeStats() string {
q := sqp.h.quantile(sqp.sq.phi)
return strconv.FormatFloat(q, 'f', -1, 64)
}
func parseStatsQuantile(lex *lexer) (*statsQuantile, error) {
if !lex.isKeyword("quantile") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "quantile")
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err)
}
if len(fields) < 1 {
return nil, fmt.Errorf("'quantile' must have at least phi arg")
}
// Parse phi
phiStr := fields[0]
phi, ok := tryParseFloat64(phiStr)
if !ok {
return nil, fmt.Errorf("phi arg in 'quantile' must be floating point number; got %q", phiStr)
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi arg in 'quantile' must be in the range [0..1]; got %q", phiStr)
}
// Parse fields
fields = fields[1:]
if slices.Contains(fields, "*") {
fields = nil
}
sq := &statsQuantile{
fields: fields,
phi: phi,
phiStr: phiStr,
}
return sq, nil
}
type histogram struct {
a []float64
min float64
max float64
count uint64
rng fastrand.RNG
}
func (h *histogram) update(f float64) int {
if h.count == 0 || f < h.min {
h.min = f
}
if h.count == 0 || f > h.max {
h.max = f
}
h.count++
if len(h.a) < maxHistogramSamples {
h.a = append(h.a, f)
return int(unsafe.Sizeof(f))
}
if n := h.rng.Uint32n(uint32(h.count)); n < uint32(len(h.a)) {
h.a[n] = f
}
return 0
}
const maxHistogramSamples = 100_000
func (h *histogram) mergeState(src *histogram) {
if src.count == 0 {
// Nothing to merge
return
}
if h.count == 0 {
h.a = append(h.a, src.a...)
h.min = src.min
h.max = src.max
h.count = src.count
return
}
h.a = append(h.a, src.a...)
if src.min < h.min {
h.min = src.min
}
if src.max > h.max {
h.max = src.max
}
h.count += src.count
}
func (h *histogram) quantile(phi float64) float64 {
if len(h.a) == 0 {
return nan
}
if len(h.a) == 1 {
return h.a[0]
}
if phi <= 0 {
return h.min
}
if phi >= 1 {
return h.max
}
slices.Sort(h.a)
idx := int(phi * float64(len(h.a)))
if idx == len(h.a) {
return h.max
}
return h.a[idx]
}