mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: adjust the provided window only for range functions with dt in denominator
This should fix range function calculations such as `changes(m[d])` where `d` is smaller than the scrape interval. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/72
This commit is contained in:
parent
ec57e59154
commit
0eac538fc8
3 changed files with 111 additions and 49 deletions
|
@ -591,13 +591,14 @@ func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64
|
|||
}
|
||||
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
|
||||
return &rollupConfig{
|
||||
TagValue: tagValue,
|
||||
Func: rf,
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
Window: window,
|
||||
Timestamps: sharedTimestamps,
|
||||
TagValue: tagValue,
|
||||
Func: rf,
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
Window: window,
|
||||
MayAdjustWindow: rollupFuncsMayAdjustWindow[name],
|
||||
Timestamps: sharedTimestamps,
|
||||
}
|
||||
}
|
||||
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
||||
|
|
|
@ -50,6 +50,13 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
}
|
||||
|
||||
var rollupFuncsMayAdjustWindow = map[string]bool{
|
||||
"deriv": true,
|
||||
"deriv_fast": true,
|
||||
"irate": true,
|
||||
"rate": true,
|
||||
}
|
||||
|
||||
var rollupFuncsRemoveCounterResets = map[string]bool{
|
||||
"increase": true,
|
||||
"irate": true,
|
||||
|
@ -121,6 +128,13 @@ type rollupConfig struct {
|
|||
Step int64
|
||||
Window int64
|
||||
|
||||
// Whether window may be adjusted to 2 x interval between data points.
|
||||
// This is needed for functions which have dt in the denominator
|
||||
// such as rate, deriv, etc.
|
||||
// Without the adjustement their value would jump in unexpected directions
|
||||
// when using window smaller than 2 x scrape_interval.
|
||||
MayAdjustWindow bool
|
||||
|
||||
Timestamps []int64
|
||||
}
|
||||
|
||||
|
@ -163,7 +177,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
|||
if window <= 0 {
|
||||
window = rc.Step
|
||||
}
|
||||
if window < maxPrevInterval {
|
||||
if rc.MayAdjustWindow && window < maxPrevInterval {
|
||||
window = maxPrevInterval
|
||||
}
|
||||
rfa := getRollupFuncArg()
|
||||
|
@ -300,7 +314,7 @@ func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return rfa.prevValue
|
||||
}
|
||||
sf := sfs[rfa.idx]
|
||||
if sf <= 0 || sf >= 1 {
|
||||
|
@ -358,21 +372,21 @@ func linearRegression(rfa *rollupFuncArg) (float64, float64) {
|
|||
values := rfa.values
|
||||
timestamps := rfa.timestamps
|
||||
if len(values) == 0 {
|
||||
return nan, nan
|
||||
return rfa.prevValue, 0
|
||||
}
|
||||
|
||||
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
|
||||
tFirst := rfa.prevTimestamp
|
||||
vSum := rfa.prevValue
|
||||
tSum := float64(0)
|
||||
tvSum := float64(0)
|
||||
ttSum := float64(0)
|
||||
n := 1.0
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
tFirst = timestamps[0]
|
||||
vSum = 0
|
||||
n = 0
|
||||
}
|
||||
tSum := float64(0)
|
||||
tvSum := float64(0)
|
||||
ttSum := float64(0)
|
||||
for i, v := range values {
|
||||
dt := float64(timestamps[i]-tFirst) * 1e-3
|
||||
vSum += v
|
||||
|
@ -402,7 +416,11 @@ func newRollupQuantile(args []interface{}) (rollupFunc, error) {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return rfa.prevValue
|
||||
}
|
||||
if len(values) == 1 {
|
||||
// Fast path - only a single value.
|
||||
return values[0]
|
||||
}
|
||||
hf := histogram.GetFast()
|
||||
for _, v := range values {
|
||||
|
@ -424,7 +442,7 @@ func rollupAvg(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return rfa.prevValue
|
||||
}
|
||||
var sum float64
|
||||
for _, v := range values {
|
||||
|
@ -438,7 +456,7 @@ func rollupMin(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return rfa.prevValue
|
||||
}
|
||||
minValue := values[0]
|
||||
for _, v := range values {
|
||||
|
@ -454,7 +472,7 @@ func rollupMax(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return rfa.prevValue
|
||||
}
|
||||
maxValue := values[0]
|
||||
for _, v := range values {
|
||||
|
@ -470,7 +488,7 @@ func rollupSum(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return rfa.prevValue
|
||||
}
|
||||
var sum float64
|
||||
for _, v := range values {
|
||||
|
@ -484,7 +502,10 @@ func rollupCount(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return 0
|
||||
}
|
||||
return float64(len(values))
|
||||
}
|
||||
|
@ -501,7 +522,14 @@ func rollupStdvar(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return 0
|
||||
}
|
||||
if len(values) == 1 {
|
||||
// Fast path.
|
||||
return values[0]
|
||||
}
|
||||
var avg float64
|
||||
var count float64
|
||||
|
@ -528,7 +556,7 @@ func rollupDelta(rfa *rollupFuncArg) float64 {
|
|||
values = values[1:]
|
||||
}
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return 0
|
||||
}
|
||||
return values[len(values)-1] - prevValue
|
||||
}
|
||||
|
@ -538,14 +566,17 @@ func rollupIdelta(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return 0
|
||||
}
|
||||
lastValue := values[len(values)-1]
|
||||
values = values[:len(values)-1]
|
||||
if len(values) == 0 {
|
||||
prevValue := rfa.prevValue
|
||||
if math.IsNaN(prevValue) {
|
||||
return nan
|
||||
return 0
|
||||
}
|
||||
return lastValue - prevValue
|
||||
}
|
||||
|
@ -576,7 +607,7 @@ func rollupDerivFast(rfa *rollupFuncArg) float64 {
|
|||
timestamps = timestamps[1:]
|
||||
}
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return 0
|
||||
}
|
||||
vEnd := values[len(values)-1]
|
||||
tEnd := timestamps[len(timestamps)-1]
|
||||
|
@ -591,7 +622,10 @@ func rollupIderiv(rfa *rollupFuncArg) float64 {
|
|||
values := rfa.values
|
||||
timestamps := rfa.timestamps
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return 0
|
||||
}
|
||||
vEnd := values[len(values)-1]
|
||||
tEnd := timestamps[len(timestamps)-1]
|
||||
|
@ -601,7 +635,7 @@ func rollupIderiv(rfa *rollupFuncArg) float64 {
|
|||
prevTimestamp := rfa.prevTimestamp
|
||||
if len(values) == 0 {
|
||||
if math.IsNaN(prevValue) {
|
||||
return nan
|
||||
return 0
|
||||
}
|
||||
} else {
|
||||
prevValue = values[len(values)-1]
|
||||
|
@ -616,13 +650,15 @@ func rollupChanges(rfa *rollupFuncArg) float64 {
|
|||
// There is no need in handling NaNs here, since they must be cleaned up
|
||||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
}
|
||||
n := 0
|
||||
prevValue := rfa.prevValue
|
||||
n := 0
|
||||
if math.IsNaN(prevValue) {
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
}
|
||||
prevValue = values[0]
|
||||
values = values[1:]
|
||||
n++
|
||||
}
|
||||
for _, v := range values {
|
||||
if v != prevValue {
|
||||
|
@ -638,7 +674,10 @@ func rollupResets(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return 0
|
||||
}
|
||||
prevValue := rfa.prevValue
|
||||
if math.IsNaN(prevValue) {
|
||||
|
@ -646,7 +685,7 @@ func rollupResets(rfa *rollupFuncArg) float64 {
|
|||
values = values[1:]
|
||||
}
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return 0
|
||||
}
|
||||
n := 0
|
||||
for _, v := range values {
|
||||
|
@ -681,7 +720,7 @@ func rollupLast(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return rfa.prevValue
|
||||
}
|
||||
return values[len(values)-1]
|
||||
}
|
||||
|
@ -691,7 +730,10 @@ func rollupDistinct(rfa *rollupFuncArg) float64 {
|
|||
// before calling rollup funcs.
|
||||
values := rfa.values
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return 0
|
||||
}
|
||||
m := make(map[float64]struct{})
|
||||
for _, v := range values {
|
||||
|
@ -708,7 +750,10 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 {
|
|||
values := rfa.values
|
||||
timestamps := rfa.timestamps
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return 0
|
||||
}
|
||||
prevValue := rfa.prevValue
|
||||
if math.IsNaN(prevValue) {
|
||||
|
@ -718,7 +763,7 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 {
|
|||
timestamps = timestamps[1:]
|
||||
}
|
||||
if len(values) == 0 {
|
||||
return nan
|
||||
return 0
|
||||
}
|
||||
|
||||
var sum float64
|
||||
|
|
|
@ -190,7 +190,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
|
|||
}
|
||||
|
||||
f("default_rollup", 34)
|
||||
f("changes", 10)
|
||||
f("changes", 11)
|
||||
f("delta", -89)
|
||||
f("deriv", -266.85860231406065)
|
||||
f("deriv_fast", -712)
|
||||
|
@ -209,6 +209,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
|
|||
f("first_over_time", 123)
|
||||
f("last_over_time", 34)
|
||||
f("integrate", 61.0275)
|
||||
f("distinct_over_time", 8)
|
||||
f("ideriv", 0)
|
||||
}
|
||||
|
||||
func TestRollupNewRollupFuncError(t *testing.T) {
|
||||
|
@ -268,14 +270,14 @@ func TestRollupNoWindowNoPoints(t *testing.T) {
|
|||
rc := rollupConfig{
|
||||
Func: rollupDelta,
|
||||
Start: 120,
|
||||
End: 144,
|
||||
End: 148,
|
||||
Step: 4,
|
||||
Window: 0,
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{2, 2, 2, 0, 0, 0, nan}
|
||||
timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144}
|
||||
valuesExpected := []float64{2, 0, 0, 0, 0, 0, 0, nan}
|
||||
timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144, 148}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
}
|
||||
|
@ -305,7 +307,7 @@ func TestRollupWindowNoPoints(t *testing.T) {
|
|||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{34, 34, nan, nan}
|
||||
valuesExpected := []float64{34, nan, nan, nan}
|
||||
timestampsExpected := []int64{141, 151, 161, 171}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
|
@ -316,14 +318,14 @@ func TestRollupNoWindowPartialPoints(t *testing.T) {
|
|||
rc := rollupConfig{
|
||||
Func: rollupFirst,
|
||||
Start: 0,
|
||||
End: 20,
|
||||
End: 25,
|
||||
Step: 5,
|
||||
Window: 0,
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 123, 123, 123, 123}
|
||||
timestampsExpected := []int64{0, 5, 10, 15, 20}
|
||||
valuesExpected := []float64{nan, 123, 123, 123, 34, 34}
|
||||
timestampsExpected := []int64{0, 5, 10, 15, 20, 25}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
t.Run("afterEnd", func(t *testing.T) {
|
||||
|
@ -395,7 +397,7 @@ func TestRollupWindowPartialPoints(t *testing.T) {
|
|||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 54, 44, nan}
|
||||
valuesExpected := []float64{nan, 54, 44, 34}
|
||||
timestampsExpected := []int64{0, 50, 100, 150}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
|
@ -496,7 +498,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 33, -87, 0}
|
||||
valuesExpected := []float64{0, 33, -87, 0}
|
||||
timestampsExpected := []int64{10, 50, 90, 130}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
|
@ -510,10 +512,24 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 3, 4, 3, 0}
|
||||
valuesExpected := []float64{nan, 4, 4, 3, 0}
|
||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
t.Run("changes_small_window", func(t *testing.T) {
|
||||
rc := rollupConfig{
|
||||
Func: rollupChanges,
|
||||
Start: 0,
|
||||
End: 45,
|
||||
Step: 9,
|
||||
Window: 9,
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 1, 1, 1, 1, 0}
|
||||
timestampsExpected := []int64{0, 9, 18, 27, 36, 45}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
t.Run("resets", func(t *testing.T) {
|
||||
rc := rollupConfig{
|
||||
Func: rollupResets,
|
||||
|
@ -552,7 +568,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, -2879.310344827587, 558.0608793686592, 422.84569138276544, 0}
|
||||
valuesExpected := []float64{0, -2879.310344827587, 558.0608793686592, 422.84569138276544, 0}
|
||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
|
@ -580,7 +596,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, 0}
|
||||
valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, 5.830951894845301}
|
||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue