mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
724 lines
15 KiB
Go
724 lines
15 KiB
Go
package graphite
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"strings"
|
|
|
|
"github.com/valyala/histogram"
|
|
)
|
|
|
|
var aggrStateFuncs = map[string]func(int) aggrState{
|
|
"average": newAggrStateAvg,
|
|
"avg": newAggrStateAvg,
|
|
"avg_zero": newAggrStateAvgZero,
|
|
"median": newAggrStateMedian,
|
|
"sum": newAggrStateSum,
|
|
"total": newAggrStateSum,
|
|
"min": newAggrStateMin,
|
|
"max": newAggrStateMax,
|
|
"diff": newAggrStateDiff,
|
|
"pow": newAggrStatePow,
|
|
"stddev": newAggrStateStddev,
|
|
"count": newAggrStateCount,
|
|
"range": newAggrStateRange,
|
|
"rangeOf": newAggrStateRange,
|
|
"multiply": newAggrStateMultiply,
|
|
"first": newAggrStateFirst,
|
|
"last": newAggrStateLast,
|
|
"current": newAggrStateLast,
|
|
}
|
|
|
|
type aggrState interface {
|
|
Update(values []float64)
|
|
Finalize(xFilesFactor float64) []float64
|
|
}
|
|
|
|
func newAggrState(pointsLen int, funcName string) (aggrState, error) {
|
|
s := strings.TrimSuffix(funcName, "Series")
|
|
asf := aggrStateFuncs[s]
|
|
if asf == nil {
|
|
return nil, fmt.Errorf("unsupported aggregate function %q", funcName)
|
|
}
|
|
return asf(pointsLen), nil
|
|
}
|
|
|
|
type aggrStateAvg struct {
|
|
pointsLen int
|
|
sums []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateAvg(pointsLen int) aggrState {
|
|
return &aggrStateAvg{
|
|
pointsLen: pointsLen,
|
|
sums: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateAvg) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
sums := as.sums
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
sums[i] += v
|
|
counts[i]++
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateAvg) Finalize(xFilesFactor float64) []float64 {
|
|
sums := as.sums
|
|
counts := as.counts
|
|
values := make([]float64, as.pointsLen)
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = sums[i] / float64(count)
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateAvgZero struct {
|
|
pointsLen int
|
|
sums []float64
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateAvgZero(pointsLen int) aggrState {
|
|
return &aggrStateAvgZero{
|
|
pointsLen: pointsLen,
|
|
sums: make([]float64, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateAvgZero) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
sums := as.sums
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
sums[i] += v
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateAvgZero) Finalize(_ float64) []float64 {
|
|
sums := as.sums
|
|
values := make([]float64, as.pointsLen)
|
|
count := float64(as.seriesTotal)
|
|
for i, sum := range sums {
|
|
v := nan
|
|
if count > 0 {
|
|
v = sum / count
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
func newAggrStateMedian(pointsLen int) aggrState {
|
|
return newAggrStatePercentile(pointsLen, 50)
|
|
}
|
|
|
|
type aggrStatePercentile struct {
|
|
phi float64
|
|
pointsLen int
|
|
hs []*histogram.Fast
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStatePercentile(pointsLen int, n float64) aggrState {
|
|
hs := make([]*histogram.Fast, pointsLen)
|
|
for i := 0; i < pointsLen; i++ {
|
|
hs[i] = histogram.NewFast()
|
|
}
|
|
return &aggrStatePercentile{
|
|
phi: n / 100,
|
|
pointsLen: pointsLen,
|
|
hs: hs,
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStatePercentile) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
hs := as.hs
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
hs[i].Update(v)
|
|
counts[i]++
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStatePercentile) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
hs := as.hs
|
|
for i, count := range as.counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = hs[i].Quantile(as.phi)
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateSum struct {
|
|
pointsLen int
|
|
sums []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateSum(pointsLen int) aggrState {
|
|
return &aggrStateSum{
|
|
pointsLen: pointsLen,
|
|
sums: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateSum) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
sums := as.sums
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
sums[i] += v
|
|
counts[i]++
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateSum) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
sums := as.sums
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = sums[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateMin struct {
|
|
pointsLen int
|
|
mins []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateMin(pointsLen int) aggrState {
|
|
return &aggrStateMin{
|
|
pointsLen: pointsLen,
|
|
mins: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateMin) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
mins := as.mins
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
counts[i]++
|
|
if counts[i] == 1 {
|
|
mins[i] = v
|
|
} else if v < mins[i] {
|
|
mins[i] = v
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateMin) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
mins := as.mins
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = mins[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateMax struct {
|
|
pointsLen int
|
|
maxs []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateMax(pointsLen int) aggrState {
|
|
return &aggrStateMax{
|
|
pointsLen: pointsLen,
|
|
maxs: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateMax) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
maxs := as.maxs
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
counts[i]++
|
|
if counts[i] == 1 {
|
|
maxs[i] = v
|
|
} else if v > maxs[i] {
|
|
maxs[i] = v
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateMax) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
maxs := as.maxs
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = maxs[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateDiff struct {
|
|
pointsLen int
|
|
vs []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateDiff(pointsLen int) aggrState {
|
|
return &aggrStateDiff{
|
|
pointsLen: pointsLen,
|
|
vs: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateDiff) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
if counts[i] == 0 {
|
|
vs[i] = v
|
|
} else {
|
|
vs[i] -= v
|
|
}
|
|
counts[i]++
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateDiff) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = vs[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStatePow struct {
|
|
pointsLen int
|
|
vs []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStatePow(pointsLen int) aggrState {
|
|
return &aggrStatePow{
|
|
pointsLen: pointsLen,
|
|
vs: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStatePow) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
if counts[i] == 0 {
|
|
vs[i] = v
|
|
} else {
|
|
vs[i] = math.Pow(vs[i], v)
|
|
}
|
|
counts[i]++
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStatePow) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = vs[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateStddev struct {
|
|
pointsLen int
|
|
means []float64
|
|
m2s []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateStddev(pointsLen int) aggrState {
|
|
return &aggrStateStddev{
|
|
pointsLen: pointsLen,
|
|
means: make([]float64, pointsLen),
|
|
m2s: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateStddev) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
means := as.means
|
|
m2s := as.m2s
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
// See https://en.m.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
|
|
count := counts[i]
|
|
mean := means[i]
|
|
count++
|
|
delta := v - mean
|
|
mean += delta / float64(count)
|
|
delta2 := v - mean
|
|
means[i] = mean
|
|
m2s[i] += delta * delta2
|
|
counts[i] = count
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateStddev) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
m2s := as.m2s
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = math.Sqrt(m2s[i] / float64(count))
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateCount struct {
|
|
pointsLen int
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateCount(pointsLen int) aggrState {
|
|
return &aggrStateCount{
|
|
pointsLen: pointsLen,
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateCount) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
counts[i]++
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateCount) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = float64(count)
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateRange struct {
|
|
pointsLen int
|
|
mins []float64
|
|
maxs []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateRange(pointsLen int) aggrState {
|
|
return &aggrStateRange{
|
|
pointsLen: pointsLen,
|
|
mins: make([]float64, pointsLen),
|
|
maxs: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateRange) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
mins := as.mins
|
|
maxs := as.maxs
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
counts[i]++
|
|
if counts[i] == 1 {
|
|
mins[i] = v
|
|
maxs[i] = v
|
|
} else if v < mins[i] {
|
|
mins[i] = v
|
|
} else if v > maxs[i] {
|
|
maxs[i] = v
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateRange) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
mins := as.mins
|
|
maxs := as.maxs
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = maxs[i] - mins[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateMultiply struct {
|
|
pointsLen int
|
|
ms []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateMultiply(pointsLen int) aggrState {
|
|
return &aggrStateMultiply{
|
|
pointsLen: pointsLen,
|
|
ms: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateMultiply) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
ms := as.ms
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
counts[i]++
|
|
if counts[i] == 1 {
|
|
ms[i] = v
|
|
} else {
|
|
ms[i] *= v
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateMultiply) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
ms := as.ms
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = ms[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateFirst struct {
|
|
pointsLen int
|
|
vs []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateFirst(pointsLen int) aggrState {
|
|
return &aggrStateFirst{
|
|
pointsLen: pointsLen,
|
|
vs: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateFirst) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
counts[i]++
|
|
if counts[i] == 1 {
|
|
vs[i] = v
|
|
}
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateFirst) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = vs[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|
|
|
|
type aggrStateLast struct {
|
|
pointsLen int
|
|
vs []float64
|
|
counts []int
|
|
seriesTotal int
|
|
}
|
|
|
|
func newAggrStateLast(pointsLen int) aggrState {
|
|
return &aggrStateLast{
|
|
pointsLen: pointsLen,
|
|
vs: make([]float64, pointsLen),
|
|
counts: make([]int, pointsLen),
|
|
}
|
|
}
|
|
|
|
func (as *aggrStateLast) Update(values []float64) {
|
|
if len(values) != as.pointsLen {
|
|
panic(fmt.Errorf("BUG: unexpected number of points in values; got %d; want %d", len(values), as.pointsLen))
|
|
}
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
vs[i] = v
|
|
counts[i]++
|
|
}
|
|
as.seriesTotal++
|
|
}
|
|
|
|
func (as *aggrStateLast) Finalize(xFilesFactor float64) []float64 {
|
|
xff := int(xFilesFactor * float64(as.seriesTotal))
|
|
values := make([]float64, as.pointsLen)
|
|
vs := as.vs
|
|
counts := as.counts
|
|
for i, count := range counts {
|
|
v := nan
|
|
if count > 0 && count >= xff {
|
|
v = vs[i]
|
|
}
|
|
values[i] = v
|
|
}
|
|
return values
|
|
}
|