From 858746fa6c619a1901f781cf03f65f77b968180d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 3 Jul 2019 12:27:07 +0300 Subject: [PATCH] app/vmselect/promql: gracefully handle duplicate timestamps in `irate` and `rollup_rate` funcs Previously such timestamps result in `+Inf` results. Now the previous timestamp is used for the calculations. --- app/vmselect/promql/rollup.go | 39 +++++++++----- app/vmselect/promql/rollup_test.go | 81 +++++++++++++++++++++++++++--- 2 files changed, 102 insertions(+), 18 deletions(-) diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index f7bda8e73..bd264a0a8 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -264,12 +264,14 @@ func deltaValues(values []float64) { if len(values) == 0 { return } + prevDelta := float64(0) prevValue := values[0] for i, v := range values[1:] { - values[i] = v - prevValue + prevDelta = v - prevValue + values[i] = prevDelta prevValue = v } - values[len(values)-1] = nan + values[len(values)-1] = prevDelta } func derivValues(values []float64, timestamps []int64) { @@ -278,16 +280,23 @@ func derivValues(values []float64, timestamps []int64) { if len(values) == 0 { return } + prevDeriv := float64(0) prevValue := values[0] prevTs := timestamps[0] for i, v := range values[1:] { ts := timestamps[i+1] + if ts == prevTs { + // Use the previous value for duplicate timestamps. + values[i] = prevDeriv + continue + } dt := float64(ts-prevTs) * 1e-3 - values[i] = (v - prevValue) / dt + prevDeriv = (v - prevValue) / dt + values[i] = prevDeriv prevValue = v prevTs = ts } - values[len(values)-1] = nan + values[len(values)-1] = prevDeriv } type newRollupFunc func(args []interface{}) (rollupFunc, error) @@ -665,18 +674,24 @@ func rollupIderiv(rfa *rollupFuncArg) float64 { tEnd := timestamps[len(timestamps)-1] values = values[:len(values)-1] timestamps = timestamps[:len(timestamps)-1] - prevValue := rfa.prevValue - prevTimestamp := rfa.prevTimestamp - if len(values) == 0 { - if math.IsNaN(prevValue) { + // Skip data points with duplicate timestamps. + for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= tEnd { + timestamps = timestamps[:len(timestamps)-1] + } + var tStart int64 + var vStart float64 + if len(timestamps) == 0 { + if math.IsNaN(rfa.prevValue) { return 0 } + tStart = rfa.prevTimestamp + vStart = rfa.prevValue } else { - prevValue = values[len(values)-1] - prevTimestamp = timestamps[len(timestamps)-1] + tStart = timestamps[len(timestamps)-1] + vStart = values[len(timestamps)-1] } - dv := vEnd - prevValue - dt := tEnd - prevTimestamp + dv := vEnd - vStart + dt := tEnd - tStart return dv / (float64(dt) / 1000) } diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 8997e361c..e5b621fb9 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -10,6 +10,68 @@ var ( testTimestamps = []int64{5, 15, 24, 36, 49, 60, 78, 80, 97, 115, 120, 130} ) +func TestRollupIderivDuplicateTimestamps(t *testing.T) { + rfa := &rollupFuncArg{ + values: []float64{1, 2, 3, 4, 5}, + timestamps: []int64{100, 100, 200, 300, 300}, + } + n := rollupIderiv(rfa) + if n != 20 { + t.Fatalf("unexpected value; got %v; want %v", n, 20) + } + + rfa = &rollupFuncArg{ + values: []float64{1, 2, 3, 4, 5}, + timestamps: []int64{100, 100, 300, 300, 300}, + } + n = rollupIderiv(rfa) + if n != 15 { + t.Fatalf("unexpected value; got %v; want %v", n, 15) + } + + rfa = &rollupFuncArg{ + prevValue: nan, + values: []float64{}, + timestamps: []int64{}, + } + n = rollupIderiv(rfa) + if !math.IsNaN(n) { + t.Fatalf("unexpected value; got %v; want %v", n, nan) + } + + rfa = &rollupFuncArg{ + prevValue: nan, + values: []float64{15}, + timestamps: []int64{100}, + } + n = rollupIderiv(rfa) + if n != 0 { + t.Fatalf("unexpected value; got %v; want %v", n, 0) + } + + rfa = &rollupFuncArg{ + prevTimestamp: 100, + prevValue: 10, + values: []float64{15}, + timestamps: []int64{100}, + } + n = rollupIderiv(rfa) + if n != inf { + t.Fatalf("unexpected value; got %v; want %v", n, inf) + } + + rfa = &rollupFuncArg{ + prevTimestamp: 100, + prevValue: 10, + values: []float64{15, 20}, + timestamps: []int64{100, 100}, + } + n = rollupIderiv(rfa) + if n != inf { + t.Fatalf("unexpected value; got %v; want %v", n, inf) + } +} + func TestRemoveCounterResets(t *testing.T) { removeCounterResets(nil) @@ -38,19 +100,19 @@ func TestDeltaValues(t *testing.T) { values := []float64{123} deltaValues(values) - valuesExpected := []float64{nan} + valuesExpected := []float64{0} testRowsEqual(t, values, testTimestamps[:1], valuesExpected, testTimestamps[:1]) values = append([]float64{}, testValues...) deltaValues(values) - valuesExpected = []float64{-89, 10, -23, 33, -20, 65, -87, 32, -12, 2, 0, nan} + valuesExpected = []float64{-89, 10, -23, 33, -20, 65, -87, 32, -12, 2, 0, 0} testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps) // remove counter resets values = append([]float64{}, testValues...) removeCounterResets(values) deltaValues(values) - valuesExpected = []float64{34, 10, 21, 33, 34, 65, 12, 32, 32, 2, 0, nan} + valuesExpected = []float64{34, 10, 21, 33, 34, 65, 12, 32, 32, 2, 0, 0} testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps) } @@ -59,13 +121,13 @@ func TestDerivValues(t *testing.T) { values := []float64{123} derivValues(values, testTimestamps[:1]) - valuesExpected := []float64{nan} + valuesExpected := []float64{0} testRowsEqual(t, values, testTimestamps[:1], valuesExpected, testTimestamps[:1]) values = append([]float64{}, testValues...) derivValues(values, testTimestamps) valuesExpected = []float64{-8900, 1111.111111111111, -1916.6666666666665, 2538.461538461538, -1818.1818181818182, 3611.111111111111, - -43500, 1882.3529411764705, -666.6666666666666, 400, 0, nan} + -43500, 1882.3529411764705, -666.6666666666666, 400, 0, 0} testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps) // remove counter resets @@ -73,8 +135,15 @@ func TestDerivValues(t *testing.T) { removeCounterResets(values) derivValues(values, testTimestamps) valuesExpected = []float64{3400, 1111.111111111111, 1750, 2538.461538461538, 3090.909090909091, 3611.111111111111, - 6000, 1882.3529411764705, 1777.7777777777776, 400, 0, nan} + 6000, 1882.3529411764705, 1777.7777777777776, 400, 0, 0} testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps) + + // duplicate timestamps + values = []float64{1, 2, 3, 4, 5, 6, 7} + timestamps := []int64{100, 100, 200, 200, 300, 400, 400} + derivValues(values, timestamps) + valuesExpected = []float64{0, 20, 20, 20, 10, 10, 10} + testRowsEqual(t, values, timestamps, valuesExpected, timestamps) } func testRollupFunc(t *testing.T, funcName string, args []interface{}, meExpected *metricExpr, vExpected float64) {