VictoriaMetrics/app/vmselect/promql/rollup.go

861 lines
21 KiB
Go

package promql
import (
"fmt"
"math"
"sort"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/valyala/histogram"
)
var rollupFuncs = map[string]newRollupFunc{
"default_rollup": newRollupFuncOneArg(rollupDefault), // default rollup func
// Standard rollup funcs from PromQL.
// See funcs accepting range-vector on https://prometheus.io/docs/prometheus/latest/querying/functions/ .
"changes": newRollupFuncOneArg(rollupChanges),
"delta": newRollupFuncOneArg(rollupDelta),
"deriv": newRollupFuncOneArg(rollupDerivSlow),
"deriv_fast": newRollupFuncOneArg(rollupDerivFast),
"holt_winters": newRollupHoltWinters,
"idelta": newRollupFuncOneArg(rollupIdelta),
"increase": newRollupFuncOneArg(rollupDelta), // + rollupFuncsRemoveCounterResets
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
"predict_linear": newRollupPredictLinear,
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
"resets": newRollupFuncOneArg(rollupResets),
"avg_over_time": newRollupFuncOneArg(rollupAvg),
"min_over_time": newRollupFuncOneArg(rollupMin),
"max_over_time": newRollupFuncOneArg(rollupMax),
"sum_over_time": newRollupFuncOneArg(rollupSum),
"count_over_time": newRollupFuncOneArg(rollupCount),
"quantile_over_time": newRollupQuantile,
"stddev_over_time": newRollupFuncOneArg(rollupStddev),
"stdvar_over_time": newRollupFuncOneArg(rollupStdvar),
// Additional rollup funcs.
"sum2_over_time": newRollupFuncOneArg(rollupSum2),
"geomean_over_time": newRollupFuncOneArg(rollupGeomean),
"first_over_time": newRollupFuncOneArg(rollupFirst),
"last_over_time": newRollupFuncOneArg(rollupLast),
"distinct_over_time": newRollupFuncOneArg(rollupDistinct),
"integrate": newRollupFuncOneArg(rollupIntegrate),
"ideriv": newRollupFuncOneArg(rollupIderiv),
"rollup": newRollupFuncOneArg(rollupFake),
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_deriv": newRollupFuncOneArg(rollupFake),
"rollup_delta": newRollupFuncOneArg(rollupFake),
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
}
var rollupFuncsMayAdjustWindow = map[string]bool{
"deriv": true,
"deriv_fast": true,
"irate": true,
"rate": true,
}
var rollupFuncsRemoveCounterResets = map[string]bool{
"increase": true,
"irate": true,
"rate": true,
"rollup_rate": true,
"rollup_increase": true,
}
var rollupFuncsKeepMetricGroup = map[string]bool{
"default_rollup": true,
"avg_over_time": true,
"min_over_time": true,
"max_over_time": true,
"quantile_over_time": true,
"rollup": true,
"geomean_over_time": true,
}
func getRollupArgIdx(funcName string) int {
funcName = strings.ToLower(funcName)
if rollupFuncs[funcName] == nil {
logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName)
}
if funcName == "quantile_over_time" {
return 1
}
return 0
}
func getRollupFunc(funcName string) newRollupFunc {
funcName = strings.ToLower(funcName)
return rollupFuncs[funcName]
}
func isRollupFunc(funcName string) bool {
return getRollupFunc(funcName) != nil
}
type rollupFuncArg struct {
prevValue float64
prevTimestamp int64
values []float64
timestamps []int64
idx int
step int64
}
func (rfa *rollupFuncArg) reset() {
rfa.prevValue = 0
rfa.prevTimestamp = 0
rfa.values = nil
rfa.timestamps = nil
rfa.idx = 0
rfa.step = 0
}
// rollupFunc must return rollup value for the given rfa.
//
// prevValue may be nan, values and timestamps may be empty.
type rollupFunc func(rfa *rollupFuncArg) float64
type rollupConfig struct {
// This tag value must be added to "rollup" tag if non-empty.
TagValue string
Func rollupFunc
Start int64
End int64
Step int64
Window int64
// Whether window may be adjusted to 2 x interval between data points.
// This is needed for functions which have dt in the denominator
// such as rate, deriv, etc.
// Without the adjustement their value would jump in unexpected directions
// when using window smaller than 2 x scrape_interval.
MayAdjustWindow bool
Timestamps []int64
}
var (
nan = math.NaN()
inf = math.Inf(1)
)
// The maximum interval without previous rows.
const maxSilenceInterval = 5 * 60 * 1000
// Do calculates rollups for the given timestamps and values, appends
// them to dstValues and returns results.
//
// rc.Timestamps are used as timestamps for dstValues.
//
// timestamps must cover time range [rc.Start - rc.Window - maxSilenceInterval ... rc.End + rc.Step].
//
// Cannot be called from concurrent goroutines.
func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []int64) []float64 {
// Sanity checks.
if rc.Step <= 0 {
logger.Panicf("BUG: Step must be bigger than 0; got %d", rc.Step)
}
if rc.Start > rc.End {
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", rc.Start, rc.End)
}
if rc.Window < 0 {
logger.Panicf("BUG: Window must be non-negative; got %d", rc.Window)
}
if err := ValidateMaxPointsPerTimeseries(rc.Start, rc.End, rc.Step); err != nil {
logger.Panicf("BUG: %s; this must be validated before the call to rollupConfig.Do", err)
}
// Extend dstValues in order to remove mallocs below.
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
maxPrevInterval := getMaxPrevInterval(timestamps)
window := rc.Window
if window <= 0 {
window = rc.Step
}
if rc.MayAdjustWindow && window < maxPrevInterval {
window = maxPrevInterval
}
rfa := getRollupFuncArg()
rfa.idx = 0
rfa.step = rc.Step
i := 0
j := 0
for _, tEnd := range rc.Timestamps {
tStart := tEnd - window
n := sort.Search(len(timestamps)-i, func(n int) bool {
return timestamps[i+n] > tStart
})
i += n
if j < i {
j = i
}
n = sort.Search(len(timestamps)-j, func(n int) bool {
return timestamps[j+n] > tEnd
})
j += n
rfa.prevValue = nan
rfa.prevTimestamp = tStart - maxPrevInterval
if i > 0 && timestamps[i-1] > rfa.prevTimestamp {
rfa.prevValue = values[i-1]
rfa.prevTimestamp = timestamps[i-1]
}
rfa.values = values[i:j]
rfa.timestamps = timestamps[i:j]
value := rc.Func(rfa)
rfa.idx++
dstValues = append(dstValues, value)
}
putRollupFuncArg(rfa)
return dstValues
}
func getMaxPrevInterval(timestamps []int64) int64 {
if len(timestamps) < 2 {
return int64(maxSilenceInterval)
}
d := (timestamps[len(timestamps)-1] - timestamps[0]) / int64(len(timestamps)-1)
if d <= 0 {
return 1
}
// Slightly increase d in order to handle possible jitter in scrape interval.
return d + (d / 16)
}
func removeCounterResets(values []float64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
var correction float64
prevValue := values[0]
for i, v := range values {
d := v - prevValue
if d < 0 {
if (-d * 8) < prevValue {
// This is likely jitter from `Prometheus HA pairs`.
// Just substitute v with prevValue.
v = prevValue
} else {
correction += prevValue
}
}
prevValue = v
values[i] = v + correction
}
}
func deltaValues(values []float64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
prevValue := values[0]
for i, v := range values[1:] {
values[i] = v - prevValue
prevValue = v
}
values[len(values)-1] = nan
}
func derivValues(values []float64, timestamps []int64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
prevValue := values[0]
prevTs := timestamps[0]
for i, v := range values[1:] {
ts := timestamps[i+1]
dt := float64(ts-prevTs) * 1e-3
values[i] = (v - prevValue) / dt
prevValue = v
prevTs = ts
}
values[len(values)-1] = nan
}
type newRollupFunc func(args []interface{}) (rollupFunc, error)
func newRollupFuncOneArg(rf rollupFunc) newRollupFunc {
return func(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 1); err != nil {
return nil, err
}
return rf, nil
}
}
func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 3); err != nil {
return nil, err
}
sfs, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
tfs, err := getScalar(args[2], 2)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
sf := sfs[rfa.idx]
if sf <= 0 || sf >= 1 {
return nan
}
tf := tfs[rfa.idx]
if tf <= 0 || tf >= 1 {
return nan
}
// See https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing .
// TODO: determine whether this shit really works.
s0 := rfa.prevValue
if math.IsNaN(s0) {
s0 = values[0]
values = values[1:]
if len(values) == 0 {
return s0
}
}
b0 := values[0] - s0
for _, v := range values {
s1 := sf*v + (1-sf)*(s0+b0)
b1 := tf*(s1-s0) + (1-tf)*b0
s0 = s1
b0 = b1
}
return s0
}
return rf, nil
}
func newRollupPredictLinear(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
secs, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
v, k := linearRegression(rfa)
if math.IsNaN(v) {
return nan
}
sec := secs[rfa.idx]
return v + k*sec
}
return rf, nil
}
func linearRegression(rfa *rollupFuncArg) (float64, float64) {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
return rfa.prevValue, 0
}
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
tFirst := rfa.prevTimestamp
vSum := rfa.prevValue
tSum := float64(0)
tvSum := float64(0)
ttSum := float64(0)
n := 1.0
if math.IsNaN(rfa.prevValue) {
tFirst = timestamps[0]
vSum = 0
n = 0
}
for i, v := range values {
dt := float64(timestamps[i]-tFirst) * 1e-3
vSum += v
tSum += dt
tvSum += dt * v
ttSum += dt * dt
}
n += float64(len(values))
if n == 1 {
return vSum, 0
}
k := (n*tvSum - tSum*vSum) / (n*ttSum - tSum*tSum)
v := (vSum - k*tSum) / n
// Adjust v to the last timestamp on the given time range.
v += k * (float64(timestamps[len(timestamps)-1]-tFirst) * 1e-3)
return v, k
}
func newRollupQuantile(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
if len(values) == 1 {
// Fast path - only a single value.
return values[0]
}
hf := histogram.GetFast()
for _, v := range values {
hf.Update(v)
}
phi := phis[rfa.idx]
qv := hf.Quantile(phi)
histogram.PutFast(hf)
return qv
}
return rf, nil
}
func rollupAvg(rfa *rollupFuncArg) float64 {
// Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation,
// since it is slower and has no significant benefits in precision.
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
var sum float64
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func rollupMin(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
minValue := values[0]
for _, v := range values {
if v < minValue {
minValue = v
}
}
return minValue
}
func rollupMax(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
maxValue := values[0]
for _, v := range values {
if v > maxValue {
maxValue = v
}
}
return maxValue
}
func rollupSum(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
var sum float64
for _, v := range values {
sum += v
}
return sum
}
func rollupSum2(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue * rfa.prevValue
}
var sum2 float64
for _, v := range values {
sum2 += v * v
}
return sum2
}
func rollupGeomean(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
p := 1.0
for _, v := range values {
p *= v
}
return math.Pow(p, 1/float64(len(values)))
}
func rollupCount(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
return float64(len(values))
}
func rollupStddev(rfa *rollupFuncArg) float64 {
stdvar := rollupStdvar(rfa)
return math.Sqrt(stdvar)
}
func rollupStdvar(rfa *rollupFuncArg) float64 {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
if len(values) == 1 {
// Fast path.
return values[0]
}
var avg float64
var count float64
var q float64
for _, v := range values {
count++
avgNew := avg + (v-avg)/count
q += (v - avg) * (v - avgNew)
avg = avgNew
}
return q / count
}
func rollupDelta(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
}
if len(values) == 0 {
return 0
}
return values[len(values)-1] - prevValue
}
func rollupIdelta(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
lastValue := values[len(values)-1]
values = values[:len(values)-1]
if len(values) == 0 {
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
return 0
}
return lastValue - prevValue
}
return lastValue - values[len(values)-1]
}
func rollupDerivSlow(rfa *rollupFuncArg) float64 {
// Use linear regression like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/73
_, k := linearRegression(rfa)
return k
}
func rollupDerivFast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
prevValue := rfa.prevValue
prevTimestamp := rfa.prevTimestamp
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
prevTimestamp = timestamps[0]
values = values[1:]
timestamps = timestamps[1:]
}
if len(values) == 0 {
return 0
}
vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1]
dv := vEnd - prevValue
dt := float64(tEnd-prevTimestamp) * 1e-3
return dv / dt
}
func rollupIderiv(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1]
values = values[:len(values)-1]
timestamps = timestamps[:len(timestamps)-1]
prevValue := rfa.prevValue
prevTimestamp := rfa.prevTimestamp
if len(values) == 0 {
if math.IsNaN(prevValue) {
return 0
}
} else {
prevValue = values[len(values)-1]
prevTimestamp = timestamps[len(timestamps)-1]
}
dv := vEnd - prevValue
dt := tEnd - prevTimestamp
return dv / (float64(dt) / 1000)
}
func rollupChanges(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
n := 0
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
n++
}
for _, v := range values {
if v != prevValue {
n++
prevValue = v
}
}
return float64(n)
}
func rollupResets(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
prevValue = values[0]
values = values[1:]
}
if len(values) == 0 {
return 0
}
n := 0
for _, v := range values {
if v < prevValue {
n++
}
prevValue = v
}
return float64(n)
}
func rollupFirst(rfa *rollupFuncArg) float64 {
// See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness
v := rfa.prevValue
if !math.IsNaN(v) {
return v
}
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan
}
return values[0]
}
var rollupDefault = rollupLast
func rollupLast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
return values[len(values)-1]
}
func rollupDistinct(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
m := make(map[float64]struct{})
for _, v := range values {
m[v] = struct{}{}
}
return float64(len(m))
}
func rollupIntegrate(rfa *rollupFuncArg) float64 {
prevTimestamp := rfa.prevTimestamp
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
prevValue = values[0]
prevTimestamp = timestamps[0]
values = values[1:]
timestamps = timestamps[1:]
}
if len(values) == 0 {
return 0
}
var sum float64
for i, v := range values {
timestamp := timestamps[i]
dt := float64(timestamp-prevTimestamp) * 1e-3
sum += 0.5 * (v + prevValue) * dt
}
return sum
}
func rollupFake(rfa *rollupFuncArg) float64 {
logger.Panicf("BUG: rollupFake shouldn't be called")
return 0
}
func getScalar(arg interface{}, argNum int) ([]float64, error) {
ts, ok := arg.([]*timeseries)
if !ok {
return nil, fmt.Errorf(`unexpected type for arg #%d; got %T; want %T`, argNum+1, arg, ts)
}
if len(ts) != 1 {
return nil, fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(ts))
}
return ts[0].Values, nil
}
func getString(tss []*timeseries, argNum int) (string, error) {
if len(tss) != 1 {
return "", fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(tss))
}
ts := tss[0]
for _, v := range ts.Values {
if !math.IsNaN(v) {
return "", fmt.Errorf(`arg #%d contains non-string timeseries`, argNum+1)
}
}
return string(ts.MetricName.MetricGroup), nil
}
func expectRollupArgsNum(args []interface{}, expectedNum int) error {
if len(args) == expectedNum {
return nil
}
return fmt.Errorf(`unexpected number of args; got %d; want %d`, len(args), expectedNum)
}
func getRollupFuncArg() *rollupFuncArg {
v := rfaPool.Get()
if v == nil {
return &rollupFuncArg{}
}
return v.(*rollupFuncArg)
}
func putRollupFuncArg(rfa *rollupFuncArg) {
rfa.reset()
rfaPool.Put(rfa)
}
var rfaPool sync.Pool