From b1ded7cf9a28b16f5293fd4818018d7392749381 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 4 Jan 2020 00:46:39 +0200 Subject: [PATCH] app/vmselect/promql: add `absent_over_time(m[d])` func similar to the function in Prometheus 2.16 See https://github.com/prometheus/prometheus/issues/2882 --- app/vmselect/promql/eval.go | 13 +++++++- app/vmselect/promql/exec_test.go | 53 +++++++++++++++++++++++++++++--- app/vmselect/promql/rollup.go | 8 +++++ app/vmselect/promql/transform.go | 43 ++++++++++++++------------ lib/metricsql/rollup.go | 1 + 5 files changed, 93 insertions(+), 25 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index c47b53a67..8e0883dec 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -481,6 +481,13 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re * if err != nil { return nil, err } + if len(tssSQ) == 0 { + if name == "absent_over_time" { + tss := evalNumber(ec, 1) + return tss, nil + } + return nil, nil + } sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step) preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) @@ -606,10 +613,14 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, rssLen := rss.Len() if rssLen == 0 { rss.Cancel() + var tss []*timeseries + if name == "absent_over_time" { + tss = getAbsentTimeseries(ec, me) + } // Add missing points until ec.End. // Do not cache the result, since missing points // may be backfilled in the future. - tss := mergeTimeseries(tssCached, nil, start, ec) + tss = mergeTimeseries(tssCached, tss, start, ec) return tss, nil } sharedTimestamps := getTimestamps(start, ec.End, ec.Step) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index a27024121..dd1fabf86 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -532,6 +532,12 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{} f(q, resultExpected) }) + t.Run("absent_over_time(time())", func(t *testing.T) { + t.Parallel() + q := `absent_over_time(time())` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) t.Run("absent(123)", func(t *testing.T) { t.Parallel() q := `absent(123)` @@ -555,6 +561,17 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run("absent_over_time(nan[200s:10s])", func(t *testing.T) { + t.Parallel() + q := `absent_over_time(nan[200s:10s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`absent(scalar(multi-timeseries))`, func(t *testing.T) { t.Parallel() q := ` @@ -571,6 +588,34 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`absent_over_time(scalar(multi-timeseries))`, func(t *testing.T) { + t.Parallel() + q := ` + absent_over_time(label_set(scalar(1 or label_set(2, "xx", "foo")), "yy", "foo"))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, 1, 1, 1}, + Timestamps: timestampsExpected, + } + r.MetricName.Tags = []storage.Tag{{ + Key: []byte("yy"), + Value: []byte("foo"), + }} + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`absent(time() > 1500)`, func(t *testing.T) { + t.Parallel() + q := ` + absent(time() > 1500)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1, 1, 1, nan, nan, nan}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run("clamp_max(time(), 1400)", func(t *testing.T) { t.Parallel() q := `clamp_max(time(), 1400)` @@ -3177,11 +3222,11 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) - t.Run(`topk_min(histogram_over_time)`, func(t *testing.T) { + t.Run(`topk_max(histogram_over_time)`, func(t *testing.T) { t.Parallel() - q := `topk_min(1, histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))` + q := `topk_max(1, histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))` r := netstorage.Result{ - Values: []float64{14, 15, 12, 13, 15, 11}, + Values: []float64{13, 11, 16, 19, 13, 16}, Timestamps: timestampsExpected, } r.MetricName.Tags = []storage.Tag{ @@ -3191,7 +3236,7 @@ func TestExecSuccess(t *testing.T) { }, { Key: []byte("vmrange"), - Value: []byte("2.0e0...2.5e0"), + Value: []byte("1.5e0...2.0e0"), }, } resultExpected := []netstorage.Result{r} diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index b74b150e8..cd970b2f0 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -37,6 +37,7 @@ var rollupFuncs = map[string]newRollupFunc{ "quantile_over_time": newRollupQuantile, "stddev_over_time": newRollupFuncOneArg(rollupStddev), "stdvar_over_time": newRollupFuncOneArg(rollupStdvar), + "absent_over_time": newRollupFuncOneArg(rollupAbsent), // Additional rollup funcs. "sum2_over_time": newRollupFuncOneArg(rollupSum2), @@ -793,6 +794,13 @@ func rollupGeomean(rfa *rollupFuncArg) float64 { return math.Pow(p, 1/float64(len(values))) } +func rollupAbsent(rfa *rollupFuncArg) float64 { + if len(rfa.values) == 0 { + return 1 + } + return nan +} + func rollupCount(rfa *rollupFuncArg) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs. diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index f503638e2..9e84556f3 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -146,29 +146,10 @@ func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) { return nil, err } arg := args[0] - if len(arg) == 0 { - // Copy tags from arg - rvs := evalNumber(tfa.ec, 1) - rv := rvs[0] - me, ok := tfa.fe.Args[0].(*metricsql.MetricExpr) - if !ok { - return rvs, nil - } - tfs := toTagFilters(me.LabelFilters) - for i := range tfs { - tf := &tfs[i] - if len(tf.Key) == 0 { - continue - } - if tf.IsRegexp || tf.IsNegative { - continue - } - rv.MetricName.AddTagBytes(tf.Key, tf.Value) - } + rvs := getAbsentTimeseries(tfa.ec, tfa.fe.Args[0]) return rvs, nil } - for _, ts := range arg { ts.MetricName.ResetMetricGroup() for i, v := range ts.Values { @@ -183,6 +164,28 @@ func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) { return arg, nil } +func getAbsentTimeseries(ec *EvalConfig, arg metricsql.Expr) []*timeseries { + // Copy tags from arg + rvs := evalNumber(ec, 1) + rv := rvs[0] + me, ok := arg.(*metricsql.MetricExpr) + if !ok { + return rvs + } + tfs := toTagFilters(me.LabelFilters) + for i := range tfs { + tf := &tfs[i] + if len(tf.Key) == 0 { + continue + } + if tf.IsRegexp || tf.IsNegative { + continue + } + rv.MetricName.AddTagBytes(tf.Key, tf.Value) + } + return rvs +} + func transformCeil(v float64) float64 { return math.Ceil(v) } diff --git a/lib/metricsql/rollup.go b/lib/metricsql/rollup.go index 606ac5163..aa3c63c1e 100644 --- a/lib/metricsql/rollup.go +++ b/lib/metricsql/rollup.go @@ -26,6 +26,7 @@ var rollupFuncs = map[string]bool{ "quantile_over_time": true, "stddev_over_time": true, "stdvar_over_time": true, + "absent_over_time": true, // Additional rollup funcs. "default_rollup": true,