app/vmselect/promql: adjust the provided window only for range functions with dt in denominator

This should fix range function calculations such as `changes(m[d])` where `d` is smaller
than the scrape interval.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/72
This commit is contained in:
Aliaksandr Valialkin 2019-06-23 19:06:16 +03:00
parent ee13256f74
commit 61926bae01
3 changed files with 111 additions and 49 deletions

View file

@ -584,13 +584,14 @@ func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64
} }
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig { newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
return &rollupConfig{ return &rollupConfig{
TagValue: tagValue, TagValue: tagValue,
Func: rf, Func: rf,
Start: start, Start: start,
End: end, End: end,
Step: step, Step: step,
Window: window, Window: window,
Timestamps: sharedTimestamps, MayAdjustWindow: rollupFuncsMayAdjustWindow[name],
Timestamps: sharedTimestamps,
} }
} }
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig { appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {

View file

@ -50,6 +50,13 @@ var rollupFuncs = map[string]newRollupFunc{
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
} }
var rollupFuncsMayAdjustWindow = map[string]bool{
"deriv": true,
"deriv_fast": true,
"irate": true,
"rate": true,
}
var rollupFuncsRemoveCounterResets = map[string]bool{ var rollupFuncsRemoveCounterResets = map[string]bool{
"increase": true, "increase": true,
"irate": true, "irate": true,
@ -121,6 +128,13 @@ type rollupConfig struct {
Step int64 Step int64
Window int64 Window int64
// Whether window may be adjusted to 2 x interval between data points.
// This is needed for functions which have dt in the denominator
// such as rate, deriv, etc.
// Without the adjustement their value would jump in unexpected directions
// when using window smaller than 2 x scrape_interval.
MayAdjustWindow bool
Timestamps []int64 Timestamps []int64
} }
@ -163,7 +177,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
if window <= 0 { if window <= 0 {
window = rc.Step window = rc.Step
} }
if window < maxPrevInterval { if rc.MayAdjustWindow && window < maxPrevInterval {
window = maxPrevInterval window = maxPrevInterval
} }
rfa := getRollupFuncArg() rfa := getRollupFuncArg()
@ -300,7 +314,7 @@ func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan return rfa.prevValue
} }
sf := sfs[rfa.idx] sf := sfs[rfa.idx]
if sf <= 0 || sf >= 1 { if sf <= 0 || sf >= 1 {
@ -358,21 +372,21 @@ func linearRegression(rfa *rollupFuncArg) (float64, float64) {
values := rfa.values values := rfa.values
timestamps := rfa.timestamps timestamps := rfa.timestamps
if len(values) == 0 { if len(values) == 0 {
return nan, nan return rfa.prevValue, 0
} }
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example // See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
tFirst := rfa.prevTimestamp tFirst := rfa.prevTimestamp
vSum := rfa.prevValue vSum := rfa.prevValue
tSum := float64(0)
tvSum := float64(0)
ttSum := float64(0)
n := 1.0 n := 1.0
if math.IsNaN(rfa.prevValue) { if math.IsNaN(rfa.prevValue) {
tFirst = timestamps[0] tFirst = timestamps[0]
vSum = 0 vSum = 0
n = 0 n = 0
} }
tSum := float64(0)
tvSum := float64(0)
ttSum := float64(0)
for i, v := range values { for i, v := range values {
dt := float64(timestamps[i]-tFirst) * 1e-3 dt := float64(timestamps[i]-tFirst) * 1e-3
vSum += v vSum += v
@ -402,7 +416,11 @@ func newRollupQuantile(args []interface{}) (rollupFunc, error) {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan return rfa.prevValue
}
if len(values) == 1 {
// Fast path - only a single value.
return values[0]
} }
hf := histogram.GetFast() hf := histogram.GetFast()
for _, v := range values { for _, v := range values {
@ -424,7 +442,7 @@ func rollupAvg(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan return rfa.prevValue
} }
var sum float64 var sum float64
for _, v := range values { for _, v := range values {
@ -438,7 +456,7 @@ func rollupMin(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan return rfa.prevValue
} }
minValue := values[0] minValue := values[0]
for _, v := range values { for _, v := range values {
@ -454,7 +472,7 @@ func rollupMax(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan return rfa.prevValue
} }
maxValue := values[0] maxValue := values[0]
for _, v := range values { for _, v := range values {
@ -470,7 +488,7 @@ func rollupSum(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan return rfa.prevValue
} }
var sum float64 var sum float64
for _, v := range values { for _, v := range values {
@ -484,7 +502,10 @@ func rollupCount(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
} }
return float64(len(values)) return float64(len(values))
} }
@ -501,7 +522,14 @@ func rollupStdvar(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
if len(values) == 1 {
// Fast path.
return values[0]
} }
var avg float64 var avg float64
var count float64 var count float64
@ -528,7 +556,7 @@ func rollupDelta(rfa *rollupFuncArg) float64 {
values = values[1:] values = values[1:]
} }
if len(values) == 0 { if len(values) == 0 {
return nan return 0
} }
return values[len(values)-1] - prevValue return values[len(values)-1] - prevValue
} }
@ -538,14 +566,17 @@ func rollupIdelta(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
} }
lastValue := values[len(values)-1] lastValue := values[len(values)-1]
values = values[:len(values)-1] values = values[:len(values)-1]
if len(values) == 0 { if len(values) == 0 {
prevValue := rfa.prevValue prevValue := rfa.prevValue
if math.IsNaN(prevValue) { if math.IsNaN(prevValue) {
return nan return 0
} }
return lastValue - prevValue return lastValue - prevValue
} }
@ -576,7 +607,7 @@ func rollupDerivFast(rfa *rollupFuncArg) float64 {
timestamps = timestamps[1:] timestamps = timestamps[1:]
} }
if len(values) == 0 { if len(values) == 0 {
return nan return 0
} }
vEnd := values[len(values)-1] vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1] tEnd := timestamps[len(timestamps)-1]
@ -591,7 +622,10 @@ func rollupIderiv(rfa *rollupFuncArg) float64 {
values := rfa.values values := rfa.values
timestamps := rfa.timestamps timestamps := rfa.timestamps
if len(values) == 0 { if len(values) == 0 {
return nan if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
} }
vEnd := values[len(values)-1] vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1] tEnd := timestamps[len(timestamps)-1]
@ -601,7 +635,7 @@ func rollupIderiv(rfa *rollupFuncArg) float64 {
prevTimestamp := rfa.prevTimestamp prevTimestamp := rfa.prevTimestamp
if len(values) == 0 { if len(values) == 0 {
if math.IsNaN(prevValue) { if math.IsNaN(prevValue) {
return nan return 0
} }
} else { } else {
prevValue = values[len(values)-1] prevValue = values[len(values)-1]
@ -616,13 +650,15 @@ func rollupChanges(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up // There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 {
return nan
}
n := 0
prevValue := rfa.prevValue prevValue := rfa.prevValue
n := 0
if math.IsNaN(prevValue) { if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0] prevValue = values[0]
values = values[1:]
n++
} }
for _, v := range values { for _, v := range values {
if v != prevValue { if v != prevValue {
@ -638,7 +674,10 @@ func rollupResets(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
} }
prevValue := rfa.prevValue prevValue := rfa.prevValue
if math.IsNaN(prevValue) { if math.IsNaN(prevValue) {
@ -646,7 +685,7 @@ func rollupResets(rfa *rollupFuncArg) float64 {
values = values[1:] values = values[1:]
} }
if len(values) == 0 { if len(values) == 0 {
return nan return 0
} }
n := 0 n := 0
for _, v := range values { for _, v := range values {
@ -681,7 +720,7 @@ func rollupLast(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan return rfa.prevValue
} }
return values[len(values)-1] return values[len(values)-1]
} }
@ -691,7 +730,10 @@ func rollupDistinct(rfa *rollupFuncArg) float64 {
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
if len(values) == 0 { if len(values) == 0 {
return nan if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
} }
m := make(map[float64]struct{}) m := make(map[float64]struct{})
for _, v := range values { for _, v := range values {
@ -708,7 +750,10 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 {
values := rfa.values values := rfa.values
timestamps := rfa.timestamps timestamps := rfa.timestamps
if len(values) == 0 { if len(values) == 0 {
return nan if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
} }
prevValue := rfa.prevValue prevValue := rfa.prevValue
if math.IsNaN(prevValue) { if math.IsNaN(prevValue) {
@ -718,7 +763,7 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 {
timestamps = timestamps[1:] timestamps = timestamps[1:]
} }
if len(values) == 0 { if len(values) == 0 {
return nan return 0
} }
var sum float64 var sum float64

View file

@ -190,7 +190,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
} }
f("default_rollup", 34) f("default_rollup", 34)
f("changes", 10) f("changes", 11)
f("delta", -89) f("delta", -89)
f("deriv", -266.85860231406065) f("deriv", -266.85860231406065)
f("deriv_fast", -712) f("deriv_fast", -712)
@ -209,6 +209,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
f("first_over_time", 123) f("first_over_time", 123)
f("last_over_time", 34) f("last_over_time", 34)
f("integrate", 61.0275) f("integrate", 61.0275)
f("distinct_over_time", 8)
f("ideriv", 0)
} }
func TestRollupNewRollupFuncError(t *testing.T) { func TestRollupNewRollupFuncError(t *testing.T) {
@ -268,14 +270,14 @@ func TestRollupNoWindowNoPoints(t *testing.T) {
rc := rollupConfig{ rc := rollupConfig{
Func: rollupDelta, Func: rollupDelta,
Start: 120, Start: 120,
End: 144, End: 148,
Step: 4, Step: 4,
Window: 0, Window: 0,
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{2, 2, 2, 0, 0, 0, nan} valuesExpected := []float64{2, 0, 0, 0, 0, 0, 0, nan}
timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144} timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144, 148}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })
} }
@ -305,7 +307,7 @@ func TestRollupWindowNoPoints(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{34, 34, nan, nan} valuesExpected := []float64{34, nan, nan, nan}
timestampsExpected := []int64{141, 151, 161, 171} timestampsExpected := []int64{141, 151, 161, 171}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })
@ -316,14 +318,14 @@ func TestRollupNoWindowPartialPoints(t *testing.T) {
rc := rollupConfig{ rc := rollupConfig{
Func: rollupFirst, Func: rollupFirst,
Start: 0, Start: 0,
End: 20, End: 25,
Step: 5, Step: 5,
Window: 0, Window: 0,
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 123, 123, 123, 123} valuesExpected := []float64{nan, 123, 123, 123, 34, 34}
timestampsExpected := []int64{0, 5, 10, 15, 20} timestampsExpected := []int64{0, 5, 10, 15, 20, 25}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })
t.Run("afterEnd", func(t *testing.T) { t.Run("afterEnd", func(t *testing.T) {
@ -395,7 +397,7 @@ func TestRollupWindowPartialPoints(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 54, 44, nan} valuesExpected := []float64{nan, 54, 44, 34}
timestampsExpected := []int64{0, 50, 100, 150} timestampsExpected := []int64{0, 50, 100, 150}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })
@ -496,7 +498,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 33, -87, 0} valuesExpected := []float64{0, 33, -87, 0}
timestampsExpected := []int64{10, 50, 90, 130} timestampsExpected := []int64{10, 50, 90, 130}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })
@ -510,10 +512,24 @@ func TestRollupFuncsNoWindow(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 3, 4, 3, 0} valuesExpected := []float64{nan, 4, 4, 3, 0}
timestampsExpected := []int64{0, 40, 80, 120, 160} timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })
t.Run("changes_small_window", func(t *testing.T) {
rc := rollupConfig{
Func: rollupChanges,
Start: 0,
End: 45,
Step: 9,
Window: 9,
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 1, 1, 1, 1, 0}
timestampsExpected := []int64{0, 9, 18, 27, 36, 45}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
t.Run("resets", func(t *testing.T) { t.Run("resets", func(t *testing.T) {
rc := rollupConfig{ rc := rollupConfig{
Func: rollupResets, Func: rollupResets,
@ -552,7 +568,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, -2879.310344827587, 558.0608793686592, 422.84569138276544, 0} valuesExpected := []float64{0, -2879.310344827587, 558.0608793686592, 422.84569138276544, 0}
timestampsExpected := []int64{0, 40, 80, 120, 160} timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })
@ -580,7 +596,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, 0} valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, 5.830951894845301}
timestampsExpected := []int64{0, 40, 80, 120, 160} timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })