app/vmselect/promql: use scrapeInterval instead of window in denominator when calculating rate for the first point on the time series

This should provide better estimation for `rate` in the beginning of time series.
This commit is contained in:
Aliaksandr Valialkin 2020-01-03 19:00:20 +02:00
parent 588531dd76
commit 3d0c7b095a
2 changed files with 47 additions and 35 deletions

View file

@ -23,9 +23,9 @@ var rollupFuncs = map[string]newRollupFunc{
"holt_winters": newRollupHoltWinters, "holt_winters": newRollupHoltWinters,
"idelta": newRollupFuncOneArg(rollupIdelta), "idelta": newRollupFuncOneArg(rollupIdelta),
"increase": newRollupFuncOneArg(rollupIncrease), // + rollupFuncsRemoveCounterResets "increase": newRollupFuncOneArg(rollupIncrease), // + rollupFuncsRemoveCounterResets
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets "irate": newRollupFuncOneArg(rollupIrate), // + rollupFuncsRemoveCounterResets
"predict_linear": newRollupPredictLinear, "predict_linear": newRollupPredictLinear,
"rate": newRollupFuncOneArg(rollupDerivIncrease), // + rollupFuncsRemoveCounterResets "rate": newRollupFuncOneArg(rollupRate), // + rollupFuncsRemoveCounterResets
"resets": newRollupFuncOneArg(rollupResets), "resets": newRollupFuncOneArg(rollupResets),
"avg_over_time": newRollupFuncOneArg(rollupAvg), "avg_over_time": newRollupFuncOneArg(rollupAvg),
"min_over_time": newRollupFuncOneArg(rollupMin), "min_over_time": newRollupFuncOneArg(rollupMin),
@ -111,10 +111,10 @@ type rollupFuncArg struct {
values []float64 values []float64
timestamps []int64 timestamps []int64
currTimestamp int64 currTimestamp int64
idx int idx int
step int64 step int64
window int64 scrapeInterval int64
// Real previous value even if it is located too far from the current window. // Real previous value even if it is located too far from the current window.
// It matches prevValue if prevValue is not nan. // It matches prevValue if prevValue is not nan.
@ -129,7 +129,7 @@ func (rfa *rollupFuncArg) reset() {
rfa.currTimestamp = 0 rfa.currTimestamp = 0
rfa.idx = 0 rfa.idx = 0
rfa.step = 0 rfa.step = 0
rfa.window = 0 rfa.scrapeInterval = 0
rfa.realPrevValue = nan rfa.realPrevValue = nan
} }
@ -210,7 +210,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
rfa := getRollupFuncArg() rfa := getRollupFuncArg()
rfa.idx = 0 rfa.idx = 0
rfa.step = rc.Step rfa.step = rc.Step
rfa.window = window rfa.scrapeInterval = scrapeInterval
rfa.realPrevValue = nan rfa.realPrevValue = nan
i := 0 i := 0
@ -776,12 +776,7 @@ func rollupDeltaInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float64 {
return nan return nan
} }
// Assume that the previous non-existing value was 0. // Assume that the previous non-existing value was 0.
if canUseRealPrevValue && !math.IsNaN(rfa.realPrevValue) { prevValue = getPrevValue(rfa, canUseRealPrevValue)
// Fix against removeCounterResets.
prevValue = rfa.realPrevValue
} else {
prevValue = 0
}
} }
if len(values) == 0 { if len(values) == 0 {
// Assume that the value didn't change on the given interval. // Assume that the value didn't change on the given interval.
@ -825,7 +820,7 @@ func rollupDerivFast(rfa *rollupFuncArg) float64 {
return rollupDerivFastInternal(rfa, false) return rollupDerivFastInternal(rfa, false)
} }
func rollupDerivIncrease(rfa *rollupFuncArg) float64 { func rollupRate(rfa *rollupFuncArg) float64 {
return rollupDerivFastInternal(rfa, true) return rollupDerivFastInternal(rfa, true)
} }
@ -840,16 +835,10 @@ func rollupDerivFastInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float
if len(values) == 0 { if len(values) == 0 {
return nan return nan
} }
// Assume that the value changed from 0 to the current value during rfa.window // Assume that the value changed from 0 to the current value during rfa.scrapeInterval
if canUseRealPrevValue && !math.IsNaN(rfa.realPrevValue) { prevValue = getPrevValue(rfa, canUseRealPrevValue)
// Fix against removeCounterResets. prevTimestamp = timestamps[0] - rfa.scrapeInterval
prevValue = rfa.realPrevValue } else if len(values) == 0 {
} else {
prevValue = 0
}
prevTimestamp = timestamps[0] - rfa.window
}
if len(values) == 0 {
// Assume that the value didn't change on the given interval. // Assume that the value didn't change on the given interval.
return 0 return 0
} }
@ -861,16 +850,30 @@ func rollupDerivFastInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float
} }
func rollupIderiv(rfa *rollupFuncArg) float64 { func rollupIderiv(rfa *rollupFuncArg) float64 {
return rollupIderivInternal(rfa, false)
}
func rollupIrate(rfa *rollupFuncArg) float64 {
return rollupIderivInternal(rfa, true)
}
func rollupIderivInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float64 {
// There is no need in handling NaNs here, since they must be cleaned up // There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
timestamps := rfa.timestamps timestamps := rfa.timestamps
if len(values) < 2 { if len(values) < 2 {
if len(values) == 0 || math.IsNaN(rfa.prevValue) { if len(values) == 0 {
// It is impossible to calculate derivative on 0 or 1 values.
return nan return nan
} }
return (values[0] - rfa.prevValue) / (float64(timestamps[0]-rfa.prevTimestamp) * 1e-3) prevValue := rfa.prevValue
prevTimestamp := rfa.prevTimestamp
if math.IsNaN(prevValue) {
// Assume that the value changed from 0 to the current value during rfa.scrapeInterval.
prevValue = getPrevValue(rfa, canUseRealPrevValue)
prevTimestamp = timestamps[0] - rfa.scrapeInterval
}
return (values[0] - prevValue) / (float64(timestamps[0]-prevTimestamp) * 1e-3)
} }
vEnd := values[len(values)-1] vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1] tEnd := timestamps[len(timestamps)-1]
@ -897,6 +900,14 @@ func rollupIderiv(rfa *rollupFuncArg) float64 {
return dv / (float64(dt) * 1e-3) return dv / (float64(dt) * 1e-3)
} }
func getPrevValue(rfa *rollupFuncArg, canUseRealPrevValue bool) float64 {
prevValue := rfa.realPrevValue
if !canUseRealPrevValue || math.IsNaN(prevValue) {
return 0
}
return prevValue
}
func rollupLifetime(rfa *rollupFuncArg) float64 { func rollupLifetime(rfa *rollupFuncArg) float64 {
// Calculate the duration between the first and the last data points. // Calculate the duration between the first and the last data points.
timestamps := rfa.timestamps timestamps := rfa.timestamps

View file

@ -42,13 +42,14 @@ func TestRollupIderivDuplicateTimestamps(t *testing.T) {
} }
rfa = &rollupFuncArg{ rfa = &rollupFuncArg{
prevValue: nan, prevValue: nan,
values: []float64{15}, values: []float64{15},
timestamps: []int64{100}, timestamps: []int64{100},
scrapeInterval: 200,
} }
n = rollupIderiv(rfa) n = rollupIderiv(rfa)
if !math.IsNaN(n) { if n != 75 {
t.Fatalf("unexpected value; got %v; want %v", n, nan) t.Fatalf("unexpected value; got %v; want %v", n, 75)
} }
rfa = &rollupFuncArg{ rfa = &rollupFuncArg{
@ -59,7 +60,7 @@ func TestRollupIderivDuplicateTimestamps(t *testing.T) {
} }
n = rollupIderiv(rfa) n = rollupIderiv(rfa)
if n != 500 { if n != 500 {
t.Fatalf("unexpected value; got %v; want %v", n, 0.5) t.Fatalf("unexpected value; got %v; want %v", n, 500)
} }
rfa = &rollupFuncArg{ rfa = &rollupFuncArg{
@ -830,7 +831,7 @@ func TestRollupFuncsNoWindow(t *testing.T) {
} }
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps) values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, nan, 30750, 0, -8900, 0} valuesExpected := []float64{nan, nan, 10250, 0, -8900, 0}
timestampsExpected := []int64{0, 4, 8, 12, 16, 20} timestampsExpected := []int64{0, 4, 8, 12, 16, 20}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
}) })