diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index e190254987..f23ef36fe5 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -25,7 +25,7 @@ var rollupFuncs = map[string]newRollupFunc{ "increase": newRollupFuncOneArg(rollupIncrease), // + rollupFuncsRemoveCounterResets "irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets "predict_linear": newRollupPredictLinear, - "rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets + "rate": newRollupFuncOneArg(rollupDerivIncrease), // + rollupFuncsRemoveCounterResets "resets": newRollupFuncOneArg(rollupResets), "avg_over_time": newRollupFuncOneArg(rollupAvg), "min_over_time": newRollupFuncOneArg(rollupMin), @@ -116,7 +116,13 @@ type rollupFuncArg struct { currTimestamp int64 idx int step int64 + + // Real previous value even if it is located too far from the current window. + // It matches prevValue if prevValue is not nan. realPrevValue float64 + + // Global scrape interval across all the data points in [Start...End] time range. + scrapeInterval int64 } func (rfa *rollupFuncArg) reset() { @@ -128,6 +134,7 @@ func (rfa *rollupFuncArg) reset() { rfa.idx = 0 rfa.step = 0 rfa.realPrevValue = nan + rfa.scrapeInterval = 0 } // rollupFunc must return rollup value for the given rfa. @@ -192,7 +199,8 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i // Extend dstValues in order to remove mallocs below. dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps)) - maxPrevInterval := getMaxPrevInterval(timestamps) + scrapeInterval := getScrapeInterval(timestamps) + maxPrevInterval := getMaxPrevInterval(scrapeInterval) if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta { maxPrevInterval = rc.LookbackDelta } @@ -207,6 +215,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i rfa.idx = 0 rfa.step = rc.Step rfa.realPrevValue = nan + rfa.scrapeInterval = scrapeInterval i := 0 j := 0 @@ -296,7 +305,7 @@ func binarySearchInt64(a []int64, v int64) uint { return i } -func getMaxPrevInterval(timestamps []int64) int64 { +func getScrapeInterval(timestamps []int64) int64 { if len(timestamps) < 2 { return int64(maxSilenceInterval) } @@ -312,30 +321,34 @@ func getMaxPrevInterval(timestamps []int64) int64 { h.Update(float64(ts - tsPrev)) tsPrev = ts } - d := int64(h.Quantile(0.6)) + scrapeInterval := int64(h.Quantile(0.6)) histogram.PutFast(h) - if d <= 0 { + if scrapeInterval <= 0 { return int64(maxSilenceInterval) } - // Increase d more for smaller scrape intervals in order to hide possible gaps + return scrapeInterval +} + +func getMaxPrevInterval(scrapeInterval int64) int64 { + // Increase scrapeInterval more for smaller scrape intervals in order to hide possible gaps // when high jitter is present. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/139 . - if d <= 2*1000 { - return d + 4*d + if scrapeInterval <= 2*1000 { + return scrapeInterval + 4*scrapeInterval } - if d <= 4*1000 { - return d + 2*d + if scrapeInterval <= 4*1000 { + return scrapeInterval + 2*scrapeInterval } - if d <= 8*1000 { - return d + d + if scrapeInterval <= 8*1000 { + return scrapeInterval + scrapeInterval } - if d <= 16*1000 { - return d + d/2 + if scrapeInterval <= 16*1000 { + return scrapeInterval + scrapeInterval/2 } - if d <= 32*1000 { - return d + d/4 + if scrapeInterval <= 32*1000 { + return scrapeInterval + scrapeInterval/4 } - return d + d/8 + return scrapeInterval + scrapeInterval/8 } func removeCounterResets(values []float64) { @@ -766,6 +779,14 @@ func rollupDerivSlow(rfa *rollupFuncArg) float64 { } func rollupDerivFast(rfa *rollupFuncArg) float64 { + return rollupDerivFastInternal(rfa, false) +} + +func rollupDerivIncrease(rfa *rollupFuncArg) float64 { + return rollupDerivFastInternal(rfa, true) +} + +func rollupDerivFastInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs. values := rfa.values @@ -773,10 +794,18 @@ func rollupDerivFast(rfa *rollupFuncArg) float64 { prevValue := rfa.prevValue prevTimestamp := rfa.prevTimestamp if math.IsNaN(prevValue) { - if len(values) < 2 { - // It is impossible to calculate derivative on 0 or 1 values. + if len(values) == 0 { return nan } + if len(values) == 1 { + // Assume that the value changed from 0 to the current value during rfa.scrapeInterval. + delta := values[0] + if canUseRealPrevValue && !math.IsNaN(rfa.realPrevValue) { + // Fix against removeCounterResets. + delta -= rfa.realPrevValue + } + return float64(delta) / float64(rfa.scrapeInterval) + } prevValue = values[0] prevTimestamp = timestamps[0] values = values[1:] diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index cbb1bf1d9d..76ca7ec61d 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -772,6 +772,20 @@ func TestRollupFuncsNoWindow(t *testing.T) { timestampsExpected := []int64{0, 40, 80, 120, 160} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) + t.Run("deriv_fast", func(t *testing.T) { + rc := rollupConfig{ + Func: rollupDerivFast, + Start: 0, + End: 20, + Step: 4, + Window: 0, + } + rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) + values := rc.Do(nil, testValues, testTimestamps) + valuesExpected := []float64{nan, nan, 10.25, 0, -8900, 0} + timestampsExpected := []int64{0, 4, 8, 12, 16, 20} + testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) + }) t.Run("ideriv", func(t *testing.T) { rc := rollupConfig{ Func: rollupIderiv,