app/vmselect/promql: fix corner case for increase over time series with gaps

In this case `increase` could return invalid high value for the first point after the gap.
This commit is contained in:
Aliaksandr Valialkin 2019-11-30 01:24:49 +02:00
parent 4c63caa37c
commit 1e2019b1b6

View file

@ -22,7 +22,7 @@ var rollupFuncs = map[string]newRollupFunc{
"deriv_fast": newRollupFuncOneArg(rollupDerivFast), "deriv_fast": newRollupFuncOneArg(rollupDerivFast),
"holt_winters": newRollupHoltWinters, "holt_winters": newRollupHoltWinters,
"idelta": newRollupFuncOneArg(rollupIdelta), "idelta": newRollupFuncOneArg(rollupIdelta),
"increase": newRollupFuncOneArg(rollupDelta), // + rollupFuncsRemoveCounterResets "increase": newRollupFuncOneArg(rollupIncrease), // + rollupFuncsRemoveCounterResets
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets "irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
"predict_linear": newRollupPredictLinear, "predict_linear": newRollupPredictLinear,
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets "rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
@ -116,6 +116,7 @@ type rollupFuncArg struct {
currTimestamp int64 currTimestamp int64
idx int idx int
step int64 step int64
realPrevValue float64
} }
func (rfa *rollupFuncArg) reset() { func (rfa *rollupFuncArg) reset() {
@ -126,6 +127,7 @@ func (rfa *rollupFuncArg) reset() {
rfa.currTimestamp = 0 rfa.currTimestamp = 0
rfa.idx = 0 rfa.idx = 0
rfa.step = 0 rfa.step = 0
rfa.realPrevValue = nan
} }
// rollupFunc must return rollup value for the given rfa. // rollupFunc must return rollup value for the given rfa.
@ -204,6 +206,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
rfa := getRollupFuncArg() rfa := getRollupFuncArg()
rfa.idx = 0 rfa.idx = 0
rfa.step = rc.Step rfa.step = rc.Step
rfa.realPrevValue = nan
i := 0 i := 0
j := 0 j := 0
@ -229,6 +232,9 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
rfa.values = values[i:j] rfa.values = values[i:j]
rfa.timestamps = timestamps[i:j] rfa.timestamps = timestamps[i:j]
rfa.currTimestamp = tEnd rfa.currTimestamp = tEnd
if i > 0 {
rfa.realPrevValue = values[i-1]
}
value := rc.Func(rfa) value := rc.Func(rfa)
rfa.idx++ rfa.idx++
dstValues = append(dstValues, value) dstValues = append(dstValues, value)
@ -681,6 +687,14 @@ func rollupStdvar(rfa *rollupFuncArg) float64 {
} }
func rollupDelta(rfa *rollupFuncArg) float64 { func rollupDelta(rfa *rollupFuncArg) float64 {
return rollupDeltaInternal(rfa, false)
}
func rollupIncrease(rfa *rollupFuncArg) float64 {
return rollupDeltaInternal(rfa, true)
}
func rollupDeltaInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float64 {
// There is no need in handling NaNs here, since they must be cleaned up // There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs. // before calling rollup funcs.
values := rfa.values values := rfa.values
@ -690,6 +704,10 @@ func rollupDelta(rfa *rollupFuncArg) float64 {
return nan return nan
} }
if len(values) == 1 { if len(values) == 1 {
if canUseRealPrevValue && !math.IsNaN(rfa.realPrevValue) {
// Fix against removeCounterResets.
return values[0] - rfa.realPrevValue
}
// Assume that the previous non-existing value was 0. // Assume that the previous non-existing value was 0.
return values[0] return values[0]
} }