app/vmselect/promql: add increases_over_time and decreases_over_time functions

`increases_over_time(q[d])` returns the number of `q` increases during the given duration `d`.
`decreases_over_time(q[d])` returns the number of `q` decreases during the given duration `d`.
This commit is contained in:
Aliaksandr Valialkin 2019-09-25 20:36:51 +03:00
parent 2444433d83
commit 26dc21cf64
3 changed files with 72 additions and 15 deletions

View file

@ -2671,6 +2671,28 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`increases_over_time`, func(t *testing.T) {
t.Parallel()
q := `increases_over_time(rand(0)[200s:10s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{11, 9, 9, 12, 9, 8},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`decreases_over_time`, func(t *testing.T) {
t.Parallel()
q := `decreases_over_time(rand(0)[200s:10s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{9, 11, 11, 8, 11, 12},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`limitk(-1)`, func(t *testing.T) { t.Run(`limitk(-1)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `limitk(-1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))` q := `limitk(-1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))`

View file

@ -38,21 +38,23 @@ var rollupFuncs = map[string]newRollupFunc{
"stdvar_over_time": newRollupFuncOneArg(rollupStdvar), "stdvar_over_time": newRollupFuncOneArg(rollupStdvar),
// Additional rollup funcs. // Additional rollup funcs.
"sum2_over_time": newRollupFuncOneArg(rollupSum2), "sum2_over_time": newRollupFuncOneArg(rollupSum2),
"geomean_over_time": newRollupFuncOneArg(rollupGeomean), "geomean_over_time": newRollupFuncOneArg(rollupGeomean),
"first_over_time": newRollupFuncOneArg(rollupFirst), "first_over_time": newRollupFuncOneArg(rollupFirst),
"last_over_time": newRollupFuncOneArg(rollupLast), "last_over_time": newRollupFuncOneArg(rollupLast),
"distinct_over_time": newRollupFuncOneArg(rollupDistinct), "distinct_over_time": newRollupFuncOneArg(rollupDistinct),
"integrate": newRollupFuncOneArg(rollupIntegrate), "increases_over_time": newRollupFuncOneArg(rollupIncreases),
"ideriv": newRollupFuncOneArg(rollupIderiv), "decreases_over_time": newRollupFuncOneArg(rollupDecreases),
"lifetime": newRollupFuncOneArg(rollupLifetime), "integrate": newRollupFuncOneArg(rollupIntegrate),
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval), "ideriv": newRollupFuncOneArg(rollupIderiv),
"rollup": newRollupFuncOneArg(rollupFake), "lifetime": newRollupFuncOneArg(rollupLifetime),
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets "scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
"rollup_deriv": newRollupFuncOneArg(rollupFake), "rollup": newRollupFuncOneArg(rollupFake),
"rollup_delta": newRollupFuncOneArg(rollupFake), "rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets "rollup_deriv": newRollupFuncOneArg(rollupFake),
"rollup_candlestick": newRollupFuncOneArg(rollupFake), "rollup_delta": newRollupFuncOneArg(rollupFake),
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
} }
var rollupFuncsMayAdjustWindow = map[string]bool{ var rollupFuncsMayAdjustWindow = map[string]bool{
@ -820,6 +822,37 @@ func rollupChanges(rfa *rollupFuncArg) float64 {
return float64(n) return float64(n)
} }
func rollupIncreases(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
prevValue = values[0]
values = values[1:]
}
if len(values) == 0 {
return 0
}
n := 0
for _, v := range values {
if v > prevValue {
n++
}
prevValue = v
}
return float64(n)
}
// `decreases_over_time` logic is the same as `resets` logic.
var rollupDecreases = rollupResets
func rollupResets(rfa *rollupFuncArg) float64 { func rollupResets(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up // There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs. // before calling rollup funcs.

View file

@ -294,6 +294,8 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
f("integrate", 61.0275) f("integrate", 61.0275)
f("distinct_over_time", 8) f("distinct_over_time", 8)
f("ideriv", 0) f("ideriv", 0)
f("decreases_over_time", 5)
f("increases_over_time", 5)
} }
func TestRollupNewRollupFuncError(t *testing.T) { func TestRollupNewRollupFuncError(t *testing.T) {