app/vmselect/promql: add absent_over_time(m[d]) func similar to the function in Prometheus 2.16

See https://github.com/prometheus/prometheus/issues/2882
This commit is contained in:
Aliaksandr Valialkin 2020-01-04 00:46:39 +02:00
parent a8360d04c0
commit b1ded7cf9a
5 changed files with 93 additions and 25 deletions

View file

@ -481,6 +481,13 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(tssSQ) == 0 {
if name == "absent_over_time" {
tss := evalNumber(ec, 1)
return tss, nil
}
return nil, nil
}
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step) sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step)
preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
@ -606,10 +613,14 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
rssLen := rss.Len() rssLen := rss.Len()
if rssLen == 0 { if rssLen == 0 {
rss.Cancel() rss.Cancel()
var tss []*timeseries
if name == "absent_over_time" {
tss = getAbsentTimeseries(ec, me)
}
// Add missing points until ec.End. // Add missing points until ec.End.
// Do not cache the result, since missing points // Do not cache the result, since missing points
// may be backfilled in the future. // may be backfilled in the future.
tss := mergeTimeseries(tssCached, nil, start, ec) tss = mergeTimeseries(tssCached, tss, start, ec)
return tss, nil return tss, nil
} }
sharedTimestamps := getTimestamps(start, ec.End, ec.Step) sharedTimestamps := getTimestamps(start, ec.End, ec.Step)

View file

@ -532,6 +532,12 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{} resultExpected := []netstorage.Result{}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run("absent_over_time(time())", func(t *testing.T) {
t.Parallel()
q := `absent_over_time(time())`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run("absent(123)", func(t *testing.T) { t.Run("absent(123)", func(t *testing.T) {
t.Parallel() t.Parallel()
q := `absent(123)` q := `absent(123)`
@ -555,6 +561,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run("absent_over_time(nan[200s:10s])", func(t *testing.T) {
t.Parallel()
q := `absent_over_time(nan[200s:10s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`absent(scalar(multi-timeseries))`, func(t *testing.T) { t.Run(`absent(scalar(multi-timeseries))`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := ` q := `
@ -571,6 +588,34 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`absent_over_time(scalar(multi-timeseries))`, func(t *testing.T) {
t.Parallel()
q := `
absent_over_time(label_set(scalar(1 or label_set(2, "xx", "foo")), "yy", "foo"))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("yy"),
Value: []byte("foo"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`absent(time() > 1500)`, func(t *testing.T) {
t.Parallel()
q := `
absent(time() > 1500)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, nan, nan, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("clamp_max(time(), 1400)", func(t *testing.T) { t.Run("clamp_max(time(), 1400)", func(t *testing.T) {
t.Parallel() t.Parallel()
q := `clamp_max(time(), 1400)` q := `clamp_max(time(), 1400)`
@ -3177,11 +3222,11 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`topk_min(histogram_over_time)`, func(t *testing.T) { t.Run(`topk_max(histogram_over_time)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `topk_min(1, histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))` q := `topk_max(1, histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]))`
r := netstorage.Result{ r := netstorage.Result{
Values: []float64{14, 15, 12, 13, 15, 11}, Values: []float64{13, 11, 16, 19, 13, 16},
Timestamps: timestampsExpected, Timestamps: timestampsExpected,
} }
r.MetricName.Tags = []storage.Tag{ r.MetricName.Tags = []storage.Tag{
@ -3191,7 +3236,7 @@ func TestExecSuccess(t *testing.T) {
}, },
{ {
Key: []byte("vmrange"), Key: []byte("vmrange"),
Value: []byte("2.0e0...2.5e0"), Value: []byte("1.5e0...2.0e0"),
}, },
} }
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}

View file

@ -37,6 +37,7 @@ var rollupFuncs = map[string]newRollupFunc{
"quantile_over_time": newRollupQuantile, "quantile_over_time": newRollupQuantile,
"stddev_over_time": newRollupFuncOneArg(rollupStddev), "stddev_over_time": newRollupFuncOneArg(rollupStddev),
"stdvar_over_time": newRollupFuncOneArg(rollupStdvar), "stdvar_over_time": newRollupFuncOneArg(rollupStdvar),
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
// Additional rollup funcs. // Additional rollup funcs.
"sum2_over_time": newRollupFuncOneArg(rollupSum2), "sum2_over_time": newRollupFuncOneArg(rollupSum2),
@ -793,6 +794,13 @@ func rollupGeomean(rfa *rollupFuncArg) float64 {
return math.Pow(p, 1/float64(len(values))) return math.Pow(p, 1/float64(len(values)))
} }
func rollupAbsent(rfa *rollupFuncArg) float64 {
if len(rfa.values) == 0 {
return 1
}
return nan
}
func rollupCount(rfa *rollupFuncArg) float64 { func rollupCount(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up // There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs. // before calling rollup funcs.

View file

@ -146,29 +146,10 @@ func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) {
return nil, err return nil, err
} }
arg := args[0] arg := args[0]
if len(arg) == 0 { if len(arg) == 0 {
// Copy tags from arg rvs := getAbsentTimeseries(tfa.ec, tfa.fe.Args[0])
rvs := evalNumber(tfa.ec, 1)
rv := rvs[0]
me, ok := tfa.fe.Args[0].(*metricsql.MetricExpr)
if !ok {
return rvs, nil
}
tfs := toTagFilters(me.LabelFilters)
for i := range tfs {
tf := &tfs[i]
if len(tf.Key) == 0 {
continue
}
if tf.IsRegexp || tf.IsNegative {
continue
}
rv.MetricName.AddTagBytes(tf.Key, tf.Value)
}
return rvs, nil return rvs, nil
} }
for _, ts := range arg { for _, ts := range arg {
ts.MetricName.ResetMetricGroup() ts.MetricName.ResetMetricGroup()
for i, v := range ts.Values { for i, v := range ts.Values {
@ -183,6 +164,28 @@ func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) {
return arg, nil return arg, nil
} }
func getAbsentTimeseries(ec *EvalConfig, arg metricsql.Expr) []*timeseries {
// Copy tags from arg
rvs := evalNumber(ec, 1)
rv := rvs[0]
me, ok := arg.(*metricsql.MetricExpr)
if !ok {
return rvs
}
tfs := toTagFilters(me.LabelFilters)
for i := range tfs {
tf := &tfs[i]
if len(tf.Key) == 0 {
continue
}
if tf.IsRegexp || tf.IsNegative {
continue
}
rv.MetricName.AddTagBytes(tf.Key, tf.Value)
}
return rvs
}
func transformCeil(v float64) float64 { func transformCeil(v float64) float64 {
return math.Ceil(v) return math.Ceil(v)
} }

View file

@ -26,6 +26,7 @@ var rollupFuncs = map[string]bool{
"quantile_over_time": true, "quantile_over_time": true,
"stddev_over_time": true, "stddev_over_time": true,
"stdvar_over_time": true, "stdvar_over_time": true,
"absent_over_time": true,
// Additional rollup funcs. // Additional rollup funcs.
"default_rollup": true, "default_rollup": true,