VictoriaMetrics/app/vmselect/graphite/aggr_state.go

725 lines
15 KiB
Go
Raw Normal View History

package graphite
import (
"fmt"
"math"
"strings"
"github.com/valyala/histogram"
)
var aggrStateFuncs = map[string]func(int) aggrState{
"average": newAggrStateAvg,
"avg": newAggrStateAvg,
"avg_zero": newAggrStateAvgZero,
"median": newAggrStateMedian,
"sum": newAggrStateSum,
"total": newAggrStateSum,
"min": newAggrStateMin,
"max": newAggrStateMax,
"diff": newAggrStateDiff,
"pow": newAggrStatePow,
"stddev": newAggrStateStddev,
"count": newAggrStateCount,
"range": newAggrStateRange,
"rangeOf": newAggrStateRange,
"multiply": newAggrStateMultiply,
"first": newAggrStateFirst,
"last": newAggrStateLast,
"current": newAggrStateLast,
}
type aggrState interface {
Update(values []float64)
Finalize(xFilesFactor float64) []float64
}
func newAggrState(pointsLen int, funcName string) (aggrState, error) {
s := strings.TrimSuffix(funcName, "Series")
asf := aggrStateFuncs[s]
if asf == nil {
return nil, fmt.Errorf("unsupported aggregate function %q", funcName)
}
return asf(pointsLen), nil
}
type aggrStateAvg struct {
pointsLen int
sums []float64
counts []int
seriesTotal int
}
func newAggrStateAvg(pointsLen int) aggrState {
return &aggrStateAvg{
pointsLen: pointsLen,
sums: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateAvg) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
sums := as.sums
counts := as.counts
for i, v := range values {
if !math.IsNaN(v) {
sums[i] += v
counts[i]++
}
}
as.seriesTotal++
}
func (as *aggrStateAvg) Finalize(xFilesFactor float64) []float64 {
sums := as.sums
counts := as.counts
values := make([]float64, as.pointsLen)
xff := int(xFilesFactor * float64(as.seriesTotal))
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = sums[i] / float64(count)
}
values[i] = v
}
return values
}
type aggrStateAvgZero struct {
pointsLen int
sums []float64
seriesTotal int
}
func newAggrStateAvgZero(pointsLen int) aggrState {
return &aggrStateAvgZero{
pointsLen: pointsLen,
sums: make([]float64, pointsLen),
}
}
func (as *aggrStateAvgZero) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
sums := as.sums
for i, v := range values {
if !math.IsNaN(v) {
sums[i] += v
}
}
as.seriesTotal++
}
func (as *aggrStateAvgZero) Finalize(_ float64) []float64 {
sums := as.sums
values := make([]float64, as.pointsLen)
count := float64(as.seriesTotal)
for i, sum := range sums {
v := nan
if count > 0 {
v = sum / count
}
values[i] = v
}
return values
}
func newAggrStateMedian(pointsLen int) aggrState {
return newAggrStatePercentile(pointsLen, 50)
}
type aggrStatePercentile struct {
phi float64
pointsLen int
hs []*histogram.Fast
counts []int
seriesTotal int
}
func newAggrStatePercentile(pointsLen int, n float64) aggrState {
hs := make([]*histogram.Fast, pointsLen)
for i := 0; i < pointsLen; i++ {
hs[i] = histogram.NewFast()
}
return &aggrStatePercentile{
phi: n / 100,
pointsLen: pointsLen,
hs: hs,
counts: make([]int, pointsLen),
}
}
func (as *aggrStatePercentile) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
hs := as.hs
counts := as.counts
for i, v := range values {
if !math.IsNaN(v) {
hs[i].Update(v)
counts[i]++
}
}
as.seriesTotal++
}
func (as *aggrStatePercentile) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
hs := as.hs
for i, count := range as.counts {
v := nan
if count > 0 && count >= xff {
v = hs[i].Quantile(as.phi)
}
values[i] = v
}
return values
}
type aggrStateSum struct {
pointsLen int
sums []float64
counts []int
seriesTotal int
}
func newAggrStateSum(pointsLen int) aggrState {
return &aggrStateSum{
pointsLen: pointsLen,
sums: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateSum) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
sums := as.sums
counts := as.counts
for i, v := range values {
if !math.IsNaN(v) {
sums[i] += v
counts[i]++
}
}
as.seriesTotal++
}
func (as *aggrStateSum) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
sums := as.sums
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = sums[i]
}
values[i] = v
}
return values
}
type aggrStateMin struct {
pointsLen int
mins []float64
counts []int
seriesTotal int
}
func newAggrStateMin(pointsLen int) aggrState {
return &aggrStateMin{
pointsLen: pointsLen,
mins: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateMin) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
mins := as.mins
counts := as.counts
for i, v := range values {
if math.IsNaN(v) {
continue
}
counts[i]++
if counts[i] == 1 {
mins[i] = v
} else if v < mins[i] {
mins[i] = v
}
}
as.seriesTotal++
}
func (as *aggrStateMin) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
mins := as.mins
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = mins[i]
}
values[i] = v
}
return values
}
type aggrStateMax struct {
pointsLen int
maxs []float64
counts []int
seriesTotal int
}
func newAggrStateMax(pointsLen int) aggrState {
return &aggrStateMax{
pointsLen: pointsLen,
maxs: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateMax) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
maxs := as.maxs
counts := as.counts
for i, v := range values {
if math.IsNaN(v) {
continue
}
counts[i]++
if counts[i] == 1 {
maxs[i] = v
} else if v > maxs[i] {
maxs[i] = v
}
}
as.seriesTotal++
}
func (as *aggrStateMax) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
maxs := as.maxs
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = maxs[i]
}
values[i] = v
}
return values
}
type aggrStateDiff struct {
pointsLen int
vs []float64
counts []int
seriesTotal int
}
func newAggrStateDiff(pointsLen int) aggrState {
return &aggrStateDiff{
pointsLen: pointsLen,
vs: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateDiff) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
vs := as.vs
counts := as.counts
for i, v := range values {
if !math.IsNaN(v) {
if counts[i] == 0 {
vs[i] = v
} else {
vs[i] -= v
}
counts[i]++
}
}
as.seriesTotal++
}
func (as *aggrStateDiff) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
vs := as.vs
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = vs[i]
}
values[i] = v
}
return values
}
type aggrStatePow struct {
pointsLen int
vs []float64
counts []int
seriesTotal int
}
func newAggrStatePow(pointsLen int) aggrState {
return &aggrStatePow{
pointsLen: pointsLen,
vs: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStatePow) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
vs := as.vs
counts := as.counts
for i, v := range values {
if !math.IsNaN(v) {
if counts[i] == 0 {
vs[i] = v
} else {
vs[i] = math.Pow(vs[i], v)
}
counts[i]++
}
}
as.seriesTotal++
}
func (as *aggrStatePow) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
vs := as.vs
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = vs[i]
}
values[i] = v
}
return values
}
type aggrStateStddev struct {
pointsLen int
means []float64
m2s []float64
counts []int
seriesTotal int
}
func newAggrStateStddev(pointsLen int) aggrState {
return &aggrStateStddev{
pointsLen: pointsLen,
means: make([]float64, pointsLen),
m2s: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateStddev) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
means := as.means
m2s := as.m2s
counts := as.counts
for i, v := range values {
if math.IsNaN(v) {
continue
}
// See https://en.m.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
count := counts[i]
mean := means[i]
count++
delta := v - mean
mean += delta / float64(count)
delta2 := v - mean
means[i] = mean
m2s[i] += delta * delta2
counts[i] = count
}
as.seriesTotal++
}
func (as *aggrStateStddev) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
m2s := as.m2s
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = math.Sqrt(m2s[i] / float64(count))
}
values[i] = v
}
return values
}
type aggrStateCount struct {
pointsLen int
counts []int
seriesTotal int
}
func newAggrStateCount(pointsLen int) aggrState {
return &aggrStateCount{
pointsLen: pointsLen,
counts: make([]int, pointsLen),
}
}
func (as *aggrStateCount) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
counts := as.counts
for i, v := range values {
if !math.IsNaN(v) {
counts[i]++
}
}
as.seriesTotal++
}
func (as *aggrStateCount) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = float64(count)
}
values[i] = v
}
return values
}
type aggrStateRange struct {
pointsLen int
mins []float64
maxs []float64
counts []int
seriesTotal int
}
func newAggrStateRange(pointsLen int) aggrState {
return &aggrStateRange{
pointsLen: pointsLen,
mins: make([]float64, pointsLen),
maxs: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateRange) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
mins := as.mins
maxs := as.maxs
counts := as.counts
for i, v := range values {
if math.IsNaN(v) {
continue
}
counts[i]++
if counts[i] == 1 {
mins[i] = v
maxs[i] = v
} else if v < mins[i] {
mins[i] = v
} else if v > maxs[i] {
maxs[i] = v
}
}
as.seriesTotal++
}
func (as *aggrStateRange) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
mins := as.mins
maxs := as.maxs
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = maxs[i] - mins[i]
}
values[i] = v
}
return values
}
type aggrStateMultiply struct {
pointsLen int
ms []float64
counts []int
seriesTotal int
}
func newAggrStateMultiply(pointsLen int) aggrState {
return &aggrStateMultiply{
pointsLen: pointsLen,
ms: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateMultiply) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
ms := as.ms
counts := as.counts
for i, v := range values {
if math.IsNaN(v) {
continue
}
counts[i]++
if counts[i] == 1 {
ms[i] = v
} else {
ms[i] *= v
}
}
as.seriesTotal++
}
func (as *aggrStateMultiply) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
ms := as.ms
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = ms[i]
}
values[i] = v
}
return values
}
type aggrStateFirst struct {
pointsLen int
vs []float64
counts []int
seriesTotal int
}
func newAggrStateFirst(pointsLen int) aggrState {
return &aggrStateFirst{
pointsLen: pointsLen,
vs: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateFirst) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
vs := as.vs
counts := as.counts
for i, v := range values {
if math.IsNaN(v) {
continue
}
counts[i]++
if counts[i] == 1 {
vs[i] = v
}
}
as.seriesTotal++
}
func (as *aggrStateFirst) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
vs := as.vs
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = vs[i]
}
values[i] = v
}
return values
}
type aggrStateLast struct {
pointsLen int
vs []float64
counts []int
seriesTotal int
}
func newAggrStateLast(pointsLen int) aggrState {
return &aggrStateLast{
pointsLen: pointsLen,
vs: make([]float64, pointsLen),
counts: make([]int, pointsLen),
}
}
func (as *aggrStateLast) Update(values []float64) {
if len(values) != as.pointsLen {
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
}
vs := as.vs
counts := as.counts
for i, v := range values {
if math.IsNaN(v) {
continue
}
vs[i] = v
counts[i]++
}
as.seriesTotal++
}
func (as *aggrStateLast) Finalize(xFilesFactor float64) []float64 {
xff := int(xFilesFactor * float64(as.seriesTotal))
values := make([]float64, as.pointsLen)
vs := as.vs
counts := as.counts
for i, count := range counts {
v := nan
if count > 0 && count >= xff {
v = vs[i]
}
values[i] = v
}
return values
}