diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 625e21bad..cf73137b5 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -584,13 +584,14 @@ func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64 } newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig { return &rollupConfig{ - TagValue: tagValue, - Func: rf, - Start: start, - End: end, - Step: step, - Window: window, - Timestamps: sharedTimestamps, + TagValue: tagValue, + Func: rf, + Start: start, + End: end, + Step: step, + Window: window, + MayAdjustWindow: rollupFuncsMayAdjustWindow[name], + Timestamps: sharedTimestamps, } } appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig { diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 5b3b64fb8..667c7c724 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -50,6 +50,13 @@ var rollupFuncs = map[string]newRollupFunc{ "rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets } +var rollupFuncsMayAdjustWindow = map[string]bool{ + "deriv": true, + "deriv_fast": true, + "irate": true, + "rate": true, +} + var rollupFuncsRemoveCounterResets = map[string]bool{ "increase": true, "irate": true, @@ -121,6 +128,13 @@ type rollupConfig struct { Step int64 Window int64 + // Whether window may be adjusted to 2 x interval between data points. + // This is needed for functions which have dt in the denominator + // such as rate, deriv, etc. + // Without the adjustement their value would jump in unexpected directions + // when using window smaller than 2 x scrape_interval. + MayAdjustWindow bool + Timestamps []int64 } @@ -163,7 +177,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i if window <= 0 { window = rc.Step } - if window < maxPrevInterval { + if rc.MayAdjustWindow && window < maxPrevInterval { window = maxPrevInterval } rfa := getRollupFuncArg() @@ -300,7 +314,7 @@ func newRollupHoltWinters(args []interface{}) (rollupFunc, error) { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + return rfa.prevValue } sf := sfs[rfa.idx] if sf <= 0 || sf >= 1 { @@ -358,21 +372,21 @@ func linearRegression(rfa *rollupFuncArg) (float64, float64) { values := rfa.values timestamps := rfa.timestamps if len(values) == 0 { - return nan, nan + return rfa.prevValue, 0 } // See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example tFirst := rfa.prevTimestamp vSum := rfa.prevValue + tSum := float64(0) + tvSum := float64(0) + ttSum := float64(0) n := 1.0 if math.IsNaN(rfa.prevValue) { tFirst = timestamps[0] vSum = 0 n = 0 } - tSum := float64(0) - tvSum := float64(0) - ttSum := float64(0) for i, v := range values { dt := float64(timestamps[i]-tFirst) * 1e-3 vSum += v @@ -402,7 +416,11 @@ func newRollupQuantile(args []interface{}) (rollupFunc, error) { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + return rfa.prevValue + } + if len(values) == 1 { + // Fast path - only a single value. + return values[0] } hf := histogram.GetFast() for _, v := range values { @@ -424,7 +442,7 @@ func rollupAvg(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + return rfa.prevValue } var sum float64 for _, v := range values { @@ -438,7 +456,7 @@ func rollupMin(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + return rfa.prevValue } minValue := values[0] for _, v := range values { @@ -454,7 +472,7 @@ func rollupMax(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + return rfa.prevValue } maxValue := values[0] for _, v := range values { @@ -470,7 +488,7 @@ func rollupSum(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + return rfa.prevValue } var sum float64 for _, v := range values { @@ -484,7 +502,10 @@ func rollupCount(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 } return float64(len(values)) } @@ -501,7 +522,14 @@ func rollupStdvar(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 + } + if len(values) == 1 { + // Fast path. + return values[0] } var avg float64 var count float64 @@ -528,7 +556,7 @@ func rollupDelta(rfa *rollupFuncArg) float64 { values = values[1:] } if len(values) == 0 { - return nan + return 0 } return values[len(values)-1] - prevValue } @@ -538,14 +566,17 @@ func rollupIdelta(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 } lastValue := values[len(values)-1] values = values[:len(values)-1] if len(values) == 0 { prevValue := rfa.prevValue if math.IsNaN(prevValue) { - return nan + return 0 } return lastValue - prevValue } @@ -576,7 +607,7 @@ func rollupDerivFast(rfa *rollupFuncArg) float64 { timestamps = timestamps[1:] } if len(values) == 0 { - return nan + return 0 } vEnd := values[len(values)-1] tEnd := timestamps[len(timestamps)-1] @@ -591,7 +622,10 @@ func rollupIderiv(rfa *rollupFuncArg) float64 { values := rfa.values timestamps := rfa.timestamps if len(values) == 0 { - return nan + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 } vEnd := values[len(values)-1] tEnd := timestamps[len(timestamps)-1] @@ -601,7 +635,7 @@ func rollupIderiv(rfa *rollupFuncArg) float64 { prevTimestamp := rfa.prevTimestamp if len(values) == 0 { if math.IsNaN(prevValue) { - return nan + return 0 } } else { prevValue = values[len(values)-1] @@ -616,13 +650,15 @@ func rollupChanges(rfa *rollupFuncArg) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs. values := rfa.values - if len(values) == 0 { - return nan - } - n := 0 prevValue := rfa.prevValue + n := 0 if math.IsNaN(prevValue) { + if len(values) == 0 { + return nan + } prevValue = values[0] + values = values[1:] + n++ } for _, v := range values { if v != prevValue { @@ -638,7 +674,10 @@ func rollupResets(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 } prevValue := rfa.prevValue if math.IsNaN(prevValue) { @@ -646,7 +685,7 @@ func rollupResets(rfa *rollupFuncArg) float64 { values = values[1:] } if len(values) == 0 { - return nan + return 0 } n := 0 for _, v := range values { @@ -681,7 +720,7 @@ func rollupLast(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + return rfa.prevValue } return values[len(values)-1] } @@ -691,7 +730,10 @@ func rollupDistinct(rfa *rollupFuncArg) float64 { // before calling rollup funcs. values := rfa.values if len(values) == 0 { - return nan + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 } m := make(map[float64]struct{}) for _, v := range values { @@ -708,7 +750,10 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 { values := rfa.values timestamps := rfa.timestamps if len(values) == 0 { - return nan + if math.IsNaN(rfa.prevValue) { + return nan + } + return 0 } prevValue := rfa.prevValue if math.IsNaN(prevValue) { @@ -718,7 +763,7 @@ func rollupIntegrate(rfa *rollupFuncArg) float64 { timestamps = timestamps[1:] } if len(values) == 0 { - return nan + return 0 } var sum float64 diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 519283227..202aa1879 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -190,7 +190,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) { } f("default_rollup", 34) - f("changes", 10) + f("changes", 11) f("delta", -89) f("deriv", -266.85860231406065) f("deriv_fast", -712) @@ -209,6 +209,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) { f("first_over_time", 123) f("last_over_time", 34) f("integrate", 61.0275) + f("distinct_over_time", 8) + f("ideriv", 0) } func TestRollupNewRollupFuncError(t *testing.T) { @@ -268,14 +270,14 @@ func TestRollupNoWindowNoPoints(t *testing.T) { rc := rollupConfig{ Func: rollupDelta, Start: 120, - End: 144, + End: 148, Step: 4, Window: 0, } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{2, 2, 2, 0, 0, 0, nan} - timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144} + valuesExpected := []float64{2, 0, 0, 0, 0, 0, 0, nan} + timestampsExpected := []int64{120, 124, 128, 132, 136, 140, 144, 148} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) } @@ -305,7 +307,7 @@ func TestRollupWindowNoPoints(t *testing.T) { } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{34, 34, nan, nan} + valuesExpected := []float64{34, nan, nan, nan} timestampsExpected := []int64{141, 151, 161, 171} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) @@ -316,14 +318,14 @@ func TestRollupNoWindowPartialPoints(t *testing.T) { rc := rollupConfig{ Func: rollupFirst, Start: 0, - End: 20, + End: 25, Step: 5, Window: 0, } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{nan, 123, 123, 123, 123} - timestampsExpected := []int64{0, 5, 10, 15, 20} + valuesExpected := []float64{nan, 123, 123, 123, 34, 34} + timestampsExpected := []int64{0, 5, 10, 15, 20, 25} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) t.Run("afterEnd", func(t *testing.T) { @@ -395,7 +397,7 @@ func TestRollupWindowPartialPoints(t *testing.T) { } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{nan, 54, 44, nan} + valuesExpected := []float64{nan, 54, 44, 34} timestampsExpected := []int64{0, 50, 100, 150} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) @@ -496,7 +498,7 @@ func TestRollupFuncsNoWindow(t *testing.T) { } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{nan, 33, -87, 0} + valuesExpected := []float64{0, 33, -87, 0} timestampsExpected := []int64{10, 50, 90, 130} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) @@ -510,10 +512,24 @@ func TestRollupFuncsNoWindow(t *testing.T) { } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{nan, 3, 4, 3, 0} + valuesExpected := []float64{nan, 4, 4, 3, 0} timestampsExpected := []int64{0, 40, 80, 120, 160} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) + t.Run("changes_small_window", func(t *testing.T) { + rc := rollupConfig{ + Func: rollupChanges, + Start: 0, + End: 45, + Step: 9, + Window: 9, + } + rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) + values := rc.Do(nil, testValues, testTimestamps) + valuesExpected := []float64{nan, 1, 1, 1, 1, 0} + timestampsExpected := []int64{0, 9, 18, 27, 36, 45} + testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) + }) t.Run("resets", func(t *testing.T) { rc := rollupConfig{ Func: rollupResets, @@ -552,7 +568,7 @@ func TestRollupFuncsNoWindow(t *testing.T) { } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{nan, -2879.310344827587, 558.0608793686592, 422.84569138276544, 0} + valuesExpected := []float64{0, -2879.310344827587, 558.0608793686592, 422.84569138276544, 0} timestampsExpected := []int64{0, 40, 80, 120, 160} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) @@ -580,7 +596,7 @@ func TestRollupFuncsNoWindow(t *testing.T) { } rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) values := rc.Do(nil, testValues, testTimestamps) - valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, 0} + valuesExpected := []float64{nan, 39.81519810323691, 32.080952292598795, 5.2493385826745405, 5.830951894845301} timestampsExpected := []int64{0, 40, 80, 120, 160} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) })