mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
app/vmselect/promql: use linear regression in deriv
func like Prometheus does
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/73
This commit is contained in:
parent
f334908c22
commit
98eafdbd58
2 changed files with 52 additions and 33 deletions
|
@ -19,13 +19,13 @@ var rollupFuncs = map[string]newRollupFunc{
|
||||||
// See funcs accepting range-vector on https://prometheus.io/docs/prometheus/latest/querying/functions/ .
|
// See funcs accepting range-vector on https://prometheus.io/docs/prometheus/latest/querying/functions/ .
|
||||||
"changes": newRollupFuncOneArg(rollupChanges),
|
"changes": newRollupFuncOneArg(rollupChanges),
|
||||||
"delta": newRollupFuncOneArg(rollupDelta),
|
"delta": newRollupFuncOneArg(rollupDelta),
|
||||||
"deriv": newRollupFuncOneArg(rollupDeriv),
|
"deriv": newRollupFuncOneArg(rollupDerivSlow),
|
||||||
"holt_winters": newRollupHoltWinters,
|
"holt_winters": newRollupHoltWinters,
|
||||||
"idelta": newRollupFuncOneArg(rollupIdelta),
|
"idelta": newRollupFuncOneArg(rollupIdelta),
|
||||||
"increase": newRollupFuncOneArg(rollupDelta), // + rollupFuncsRemoveCounterResets
|
"increase": newRollupFuncOneArg(rollupDelta), // + rollupFuncsRemoveCounterResets
|
||||||
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
|
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
|
||||||
"predict_linear": newRollupPredictLinear,
|
"predict_linear": newRollupPredictLinear,
|
||||||
"rate": newRollupFuncOneArg(rollupDeriv), // + rollupFuncsRemoveCounterResets
|
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
|
||||||
"resets": newRollupFuncOneArg(rollupResets),
|
"resets": newRollupFuncOneArg(rollupResets),
|
||||||
"avg_over_time": newRollupFuncOneArg(rollupAvg),
|
"avg_over_time": newRollupFuncOneArg(rollupAvg),
|
||||||
"min_over_time": newRollupFuncOneArg(rollupMin),
|
"min_over_time": newRollupFuncOneArg(rollupMin),
|
||||||
|
@ -341,41 +341,53 @@ func newRollupPredictLinear(args []interface{}) (rollupFunc, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rf := func(rfa *rollupFuncArg) float64 {
|
rf := func(rfa *rollupFuncArg) float64 {
|
||||||
// There is no need in handling NaNs here, since they must be cleanup up
|
v, k := linearRegression(rfa)
|
||||||
// before calling rollup funcs.
|
if math.IsNaN(v) {
|
||||||
values := rfa.values
|
|
||||||
timestamps := rfa.timestamps
|
|
||||||
if len(values) == 0 {
|
|
||||||
return nan
|
return nan
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
|
|
||||||
// TODO: determine whether this shit really works.
|
|
||||||
tFirst := rfa.prevTimestamp
|
|
||||||
vSum := rfa.prevValue
|
|
||||||
if math.IsNaN(rfa.prevValue) {
|
|
||||||
tFirst = timestamps[0]
|
|
||||||
vSum = 0
|
|
||||||
}
|
|
||||||
tSum := float64(0)
|
|
||||||
tvSum := float64(0)
|
|
||||||
ttSum := float64(0)
|
|
||||||
for i, v := range values {
|
|
||||||
dt := float64(timestamps[i]-tFirst) * 1e-3
|
|
||||||
vSum += v
|
|
||||||
tSum += dt
|
|
||||||
tvSum += dt * v
|
|
||||||
ttSum += dt * dt
|
|
||||||
}
|
|
||||||
n := float64(len(values))
|
|
||||||
k := (n*tvSum - tSum*vSum) / (n*ttSum - tSum*tSum)
|
|
||||||
v := (vSum - k*tSum) / n
|
|
||||||
sec := secs[rfa.idx]
|
sec := secs[rfa.idx]
|
||||||
return v + k*sec
|
return v + k*sec
|
||||||
}
|
}
|
||||||
return rf, nil
|
return rf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func linearRegression(rfa *rollupFuncArg) (float64, float64) {
|
||||||
|
// There is no need in handling NaNs here, since they must be cleanup up
|
||||||
|
// before calling rollup funcs.
|
||||||
|
values := rfa.values
|
||||||
|
timestamps := rfa.timestamps
|
||||||
|
if len(values) == 0 {
|
||||||
|
return nan, nan
|
||||||
|
}
|
||||||
|
|
||||||
|
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
|
||||||
|
tFirst := rfa.prevTimestamp
|
||||||
|
vSum := rfa.prevValue
|
||||||
|
n := 1.0
|
||||||
|
if math.IsNaN(rfa.prevValue) {
|
||||||
|
tFirst = timestamps[0]
|
||||||
|
vSum = 0
|
||||||
|
n = 0
|
||||||
|
}
|
||||||
|
tSum := float64(0)
|
||||||
|
tvSum := float64(0)
|
||||||
|
ttSum := float64(0)
|
||||||
|
for i, v := range values {
|
||||||
|
dt := float64(timestamps[i]-tFirst) * 1e-3
|
||||||
|
vSum += v
|
||||||
|
tSum += dt
|
||||||
|
tvSum += dt * v
|
||||||
|
ttSum += dt * dt
|
||||||
|
}
|
||||||
|
n += float64(len(values))
|
||||||
|
if n == 1 {
|
||||||
|
return vSum, 0
|
||||||
|
}
|
||||||
|
k := (n*tvSum - tSum*vSum) / (n*ttSum - tSum*tSum)
|
||||||
|
v := (vSum - k*tSum) / n
|
||||||
|
return v, k
|
||||||
|
}
|
||||||
|
|
||||||
func newRollupQuantile(args []interface{}) (rollupFunc, error) {
|
func newRollupQuantile(args []interface{}) (rollupFunc, error) {
|
||||||
if err := expectRollupArgsNum(args, 2); err != nil {
|
if err := expectRollupArgsNum(args, 2); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -539,7 +551,14 @@ func rollupIdelta(rfa *rollupFuncArg) float64 {
|
||||||
return lastValue - values[len(values)-1]
|
return lastValue - values[len(values)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
func rollupDeriv(rfa *rollupFuncArg) float64 {
|
func rollupDerivSlow(rfa *rollupFuncArg) float64 {
|
||||||
|
// Use linear regression like Prometheus does.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/73
|
||||||
|
_, k := linearRegression(rfa)
|
||||||
|
return k
|
||||||
|
}
|
||||||
|
|
||||||
|
func rollupDerivFast(rfa *rollupFuncArg) float64 {
|
||||||
// There is no need in handling NaNs here, since they must be cleanup up
|
// There is no need in handling NaNs here, since they must be cleanup up
|
||||||
// before calling rollup funcs.
|
// before calling rollup funcs.
|
||||||
values := rfa.values
|
values := rfa.values
|
||||||
|
|
|
@ -192,7 +192,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
|
||||||
f("default_rollup", 34)
|
f("default_rollup", 34)
|
||||||
f("changes", 10)
|
f("changes", 10)
|
||||||
f("delta", -89)
|
f("delta", -89)
|
||||||
f("deriv", -712)
|
f("deriv", -266.85860231406065)
|
||||||
f("idelta", 0)
|
f("idelta", 0)
|
||||||
f("increase", 275)
|
f("increase", 275)
|
||||||
f("irate", 0)
|
f("irate", 0)
|
||||||
|
@ -543,7 +543,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
||||||
})
|
})
|
||||||
t.Run("deriv", func(t *testing.T) {
|
t.Run("deriv", func(t *testing.T) {
|
||||||
rc := rollupConfig{
|
rc := rollupConfig{
|
||||||
Func: rollupDeriv,
|
Func: rollupDerivSlow,
|
||||||
Start: 0,
|
Start: 0,
|
||||||
End: 160,
|
End: 160,
|
||||||
Step: 40,
|
Step: 40,
|
||||||
|
@ -551,7 +551,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
||||||
}
|
}
|
||||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||||
values := rc.Do(nil, testValues, testTimestamps)
|
values := rc.Do(nil, testValues, testTimestamps)
|
||||||
valuesExpected := []float64{nan, -3290.3225806451615, -204.54545454545456, 550, 0}
|
valuesExpected := []float64{nan, -2879.310344827587, 558.0608793686592, 422.84569138276544, 0}
|
||||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue