mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect/promql: gracefully handle duplicate timestamps in irate
and rollup_rate
funcs
Previously such timestamps result in `+Inf` results. Now the previous timestamp is used for the calculations.
This commit is contained in:
parent
81da1c7b47
commit
858746fa6c
2 changed files with 102 additions and 18 deletions
|
@ -264,12 +264,14 @@ func deltaValues(values []float64) {
|
||||||
if len(values) == 0 {
|
if len(values) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
prevDelta := float64(0)
|
||||||
prevValue := values[0]
|
prevValue := values[0]
|
||||||
for i, v := range values[1:] {
|
for i, v := range values[1:] {
|
||||||
values[i] = v - prevValue
|
prevDelta = v - prevValue
|
||||||
|
values[i] = prevDelta
|
||||||
prevValue = v
|
prevValue = v
|
||||||
}
|
}
|
||||||
values[len(values)-1] = nan
|
values[len(values)-1] = prevDelta
|
||||||
}
|
}
|
||||||
|
|
||||||
func derivValues(values []float64, timestamps []int64) {
|
func derivValues(values []float64, timestamps []int64) {
|
||||||
|
@ -278,16 +280,23 @@ func derivValues(values []float64, timestamps []int64) {
|
||||||
if len(values) == 0 {
|
if len(values) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
prevDeriv := float64(0)
|
||||||
prevValue := values[0]
|
prevValue := values[0]
|
||||||
prevTs := timestamps[0]
|
prevTs := timestamps[0]
|
||||||
for i, v := range values[1:] {
|
for i, v := range values[1:] {
|
||||||
ts := timestamps[i+1]
|
ts := timestamps[i+1]
|
||||||
|
if ts == prevTs {
|
||||||
|
// Use the previous value for duplicate timestamps.
|
||||||
|
values[i] = prevDeriv
|
||||||
|
continue
|
||||||
|
}
|
||||||
dt := float64(ts-prevTs) * 1e-3
|
dt := float64(ts-prevTs) * 1e-3
|
||||||
values[i] = (v - prevValue) / dt
|
prevDeriv = (v - prevValue) / dt
|
||||||
|
values[i] = prevDeriv
|
||||||
prevValue = v
|
prevValue = v
|
||||||
prevTs = ts
|
prevTs = ts
|
||||||
}
|
}
|
||||||
values[len(values)-1] = nan
|
values[len(values)-1] = prevDeriv
|
||||||
}
|
}
|
||||||
|
|
||||||
type newRollupFunc func(args []interface{}) (rollupFunc, error)
|
type newRollupFunc func(args []interface{}) (rollupFunc, error)
|
||||||
|
@ -665,18 +674,24 @@ func rollupIderiv(rfa *rollupFuncArg) float64 {
|
||||||
tEnd := timestamps[len(timestamps)-1]
|
tEnd := timestamps[len(timestamps)-1]
|
||||||
values = values[:len(values)-1]
|
values = values[:len(values)-1]
|
||||||
timestamps = timestamps[:len(timestamps)-1]
|
timestamps = timestamps[:len(timestamps)-1]
|
||||||
prevValue := rfa.prevValue
|
// Skip data points with duplicate timestamps.
|
||||||
prevTimestamp := rfa.prevTimestamp
|
for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= tEnd {
|
||||||
if len(values) == 0 {
|
timestamps = timestamps[:len(timestamps)-1]
|
||||||
if math.IsNaN(prevValue) {
|
}
|
||||||
|
var tStart int64
|
||||||
|
var vStart float64
|
||||||
|
if len(timestamps) == 0 {
|
||||||
|
if math.IsNaN(rfa.prevValue) {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
tStart = rfa.prevTimestamp
|
||||||
|
vStart = rfa.prevValue
|
||||||
} else {
|
} else {
|
||||||
prevValue = values[len(values)-1]
|
tStart = timestamps[len(timestamps)-1]
|
||||||
prevTimestamp = timestamps[len(timestamps)-1]
|
vStart = values[len(timestamps)-1]
|
||||||
}
|
}
|
||||||
dv := vEnd - prevValue
|
dv := vEnd - vStart
|
||||||
dt := tEnd - prevTimestamp
|
dt := tEnd - tStart
|
||||||
return dv / (float64(dt) / 1000)
|
return dv / (float64(dt) / 1000)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,68 @@ var (
|
||||||
testTimestamps = []int64{5, 15, 24, 36, 49, 60, 78, 80, 97, 115, 120, 130}
|
testTimestamps = []int64{5, 15, 24, 36, 49, 60, 78, 80, 97, 115, 120, 130}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestRollupIderivDuplicateTimestamps(t *testing.T) {
|
||||||
|
rfa := &rollupFuncArg{
|
||||||
|
values: []float64{1, 2, 3, 4, 5},
|
||||||
|
timestamps: []int64{100, 100, 200, 300, 300},
|
||||||
|
}
|
||||||
|
n := rollupIderiv(rfa)
|
||||||
|
if n != 20 {
|
||||||
|
t.Fatalf("unexpected value; got %v; want %v", n, 20)
|
||||||
|
}
|
||||||
|
|
||||||
|
rfa = &rollupFuncArg{
|
||||||
|
values: []float64{1, 2, 3, 4, 5},
|
||||||
|
timestamps: []int64{100, 100, 300, 300, 300},
|
||||||
|
}
|
||||||
|
n = rollupIderiv(rfa)
|
||||||
|
if n != 15 {
|
||||||
|
t.Fatalf("unexpected value; got %v; want %v", n, 15)
|
||||||
|
}
|
||||||
|
|
||||||
|
rfa = &rollupFuncArg{
|
||||||
|
prevValue: nan,
|
||||||
|
values: []float64{},
|
||||||
|
timestamps: []int64{},
|
||||||
|
}
|
||||||
|
n = rollupIderiv(rfa)
|
||||||
|
if !math.IsNaN(n) {
|
||||||
|
t.Fatalf("unexpected value; got %v; want %v", n, nan)
|
||||||
|
}
|
||||||
|
|
||||||
|
rfa = &rollupFuncArg{
|
||||||
|
prevValue: nan,
|
||||||
|
values: []float64{15},
|
||||||
|
timestamps: []int64{100},
|
||||||
|
}
|
||||||
|
n = rollupIderiv(rfa)
|
||||||
|
if n != 0 {
|
||||||
|
t.Fatalf("unexpected value; got %v; want %v", n, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
rfa = &rollupFuncArg{
|
||||||
|
prevTimestamp: 100,
|
||||||
|
prevValue: 10,
|
||||||
|
values: []float64{15},
|
||||||
|
timestamps: []int64{100},
|
||||||
|
}
|
||||||
|
n = rollupIderiv(rfa)
|
||||||
|
if n != inf {
|
||||||
|
t.Fatalf("unexpected value; got %v; want %v", n, inf)
|
||||||
|
}
|
||||||
|
|
||||||
|
rfa = &rollupFuncArg{
|
||||||
|
prevTimestamp: 100,
|
||||||
|
prevValue: 10,
|
||||||
|
values: []float64{15, 20},
|
||||||
|
timestamps: []int64{100, 100},
|
||||||
|
}
|
||||||
|
n = rollupIderiv(rfa)
|
||||||
|
if n != inf {
|
||||||
|
t.Fatalf("unexpected value; got %v; want %v", n, inf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRemoveCounterResets(t *testing.T) {
|
func TestRemoveCounterResets(t *testing.T) {
|
||||||
removeCounterResets(nil)
|
removeCounterResets(nil)
|
||||||
|
|
||||||
|
@ -38,19 +100,19 @@ func TestDeltaValues(t *testing.T) {
|
||||||
|
|
||||||
values := []float64{123}
|
values := []float64{123}
|
||||||
deltaValues(values)
|
deltaValues(values)
|
||||||
valuesExpected := []float64{nan}
|
valuesExpected := []float64{0}
|
||||||
testRowsEqual(t, values, testTimestamps[:1], valuesExpected, testTimestamps[:1])
|
testRowsEqual(t, values, testTimestamps[:1], valuesExpected, testTimestamps[:1])
|
||||||
|
|
||||||
values = append([]float64{}, testValues...)
|
values = append([]float64{}, testValues...)
|
||||||
deltaValues(values)
|
deltaValues(values)
|
||||||
valuesExpected = []float64{-89, 10, -23, 33, -20, 65, -87, 32, -12, 2, 0, nan}
|
valuesExpected = []float64{-89, 10, -23, 33, -20, 65, -87, 32, -12, 2, 0, 0}
|
||||||
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
||||||
|
|
||||||
// remove counter resets
|
// remove counter resets
|
||||||
values = append([]float64{}, testValues...)
|
values = append([]float64{}, testValues...)
|
||||||
removeCounterResets(values)
|
removeCounterResets(values)
|
||||||
deltaValues(values)
|
deltaValues(values)
|
||||||
valuesExpected = []float64{34, 10, 21, 33, 34, 65, 12, 32, 32, 2, 0, nan}
|
valuesExpected = []float64{34, 10, 21, 33, 34, 65, 12, 32, 32, 2, 0, 0}
|
||||||
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,13 +121,13 @@ func TestDerivValues(t *testing.T) {
|
||||||
|
|
||||||
values := []float64{123}
|
values := []float64{123}
|
||||||
derivValues(values, testTimestamps[:1])
|
derivValues(values, testTimestamps[:1])
|
||||||
valuesExpected := []float64{nan}
|
valuesExpected := []float64{0}
|
||||||
testRowsEqual(t, values, testTimestamps[:1], valuesExpected, testTimestamps[:1])
|
testRowsEqual(t, values, testTimestamps[:1], valuesExpected, testTimestamps[:1])
|
||||||
|
|
||||||
values = append([]float64{}, testValues...)
|
values = append([]float64{}, testValues...)
|
||||||
derivValues(values, testTimestamps)
|
derivValues(values, testTimestamps)
|
||||||
valuesExpected = []float64{-8900, 1111.111111111111, -1916.6666666666665, 2538.461538461538, -1818.1818181818182, 3611.111111111111,
|
valuesExpected = []float64{-8900, 1111.111111111111, -1916.6666666666665, 2538.461538461538, -1818.1818181818182, 3611.111111111111,
|
||||||
-43500, 1882.3529411764705, -666.6666666666666, 400, 0, nan}
|
-43500, 1882.3529411764705, -666.6666666666666, 400, 0, 0}
|
||||||
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
||||||
|
|
||||||
// remove counter resets
|
// remove counter resets
|
||||||
|
@ -73,8 +135,15 @@ func TestDerivValues(t *testing.T) {
|
||||||
removeCounterResets(values)
|
removeCounterResets(values)
|
||||||
derivValues(values, testTimestamps)
|
derivValues(values, testTimestamps)
|
||||||
valuesExpected = []float64{3400, 1111.111111111111, 1750, 2538.461538461538, 3090.909090909091, 3611.111111111111,
|
valuesExpected = []float64{3400, 1111.111111111111, 1750, 2538.461538461538, 3090.909090909091, 3611.111111111111,
|
||||||
6000, 1882.3529411764705, 1777.7777777777776, 400, 0, nan}
|
6000, 1882.3529411764705, 1777.7777777777776, 400, 0, 0}
|
||||||
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
testRowsEqual(t, values, testTimestamps, valuesExpected, testTimestamps)
|
||||||
|
|
||||||
|
// duplicate timestamps
|
||||||
|
values = []float64{1, 2, 3, 4, 5, 6, 7}
|
||||||
|
timestamps := []int64{100, 100, 200, 200, 300, 400, 400}
|
||||||
|
derivValues(values, timestamps)
|
||||||
|
valuesExpected = []float64{0, 20, 20, 20, 10, 10, 10}
|
||||||
|
testRowsEqual(t, values, timestamps, valuesExpected, timestamps)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRollupFunc(t *testing.T, funcName string, args []interface{}, meExpected *metricExpr, vExpected float64) {
|
func testRollupFunc(t *testing.T, funcName string, args []interface{}, meExpected *metricExpr, vExpected float64) {
|
||||||
|
|
Loading…
Reference in a new issue