From c883c15878e01a3981d60a11c3b67019ca4b2c6c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Jan 2022 22:12:02 +0200 Subject: [PATCH] app/vmselect/promql: add support for `@` modifier Add support for `@` modifier in MetricsQL according to https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier Extend the support with the following features: * Allow using `@` modifier everywhere in the query. For example, `sum(foo) @ end()` * Allow using arbitrary expression as `@` modifier. For example, `foo @ (end() - 1h)` returns `foo` value at `end - 1 hour` timestamp on the selected time range `[start ... end]` Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1348 --- app/vmselect/promql/eval.go | 37 +++++++++- app/vmselect/promql/exec_test.go | 44 +++++++++++ app/vmselect/promql/rollup.go | 16 ---- docs/CHANGELOG.md | 3 + docs/MetricsQL.md | 3 +- go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/lexer.go | 2 +- .../VictoriaMetrics/metricsql/optimizer.go | 20 ++--- .../VictoriaMetrics/metricsql/parser.go | 73 +++++++++++++++++-- .../VictoriaMetrics/metricsql/rollup.go | 20 +++++ vendor/modules.txt | 2 +- 12 files changed, 184 insertions(+), 42 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 33971bdc7..008822137 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -391,7 +391,7 @@ func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.F if nrf == nil { return nil, nil } - rollupArgIdx := getRollupArgIdx(fe) + rollupArgIdx := metricsql.GetRollupArgIdx(fe) if rollupArgIdx >= len(fe.Args) { // Incorrect number of args for rollup func. return nil, nil @@ -431,7 +431,7 @@ func evalExprs(ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) { func evalRollupFuncArgs(ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) { var re *metricsql.RollupExpr - rollupArgIdx := getRollupArgIdx(fe) + rollupArgIdx := metricsql.GetRollupArgIdx(fe) if len(fe.Args) <= rollupArgIdx { return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil)) } @@ -480,6 +480,39 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr { } func evalRollupFunc(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { + if re.At == nil { + return evalRollupFuncWithoutAt(ec, funcName, rf, expr, re, iafc) + } + tssAt, err := evalExpr(ec, re.At) + if err != nil { + return nil, fmt.Errorf("cannot evaluate `@` modifier: %w", err) + } + if len(tssAt) != 1 { + return nil, fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)) + } + atTimestamp := int64(tssAt[0].Values[0] * 1000) + ecNew := newEvalConfig(ec) + ecNew.Start = atTimestamp + ecNew.End = atTimestamp + tss, err := evalRollupFuncWithoutAt(ecNew, funcName, rf, expr, re, iafc) + if err != nil { + return nil, err + } + // expand tssAtTimestamp to the original time range. + timestamps := ec.getSharedTimestamps() + for _, ts := range tss { + v := ts.Values[0] + values := make([]float64, len(timestamps)) + for i := range timestamps { + values[i] = v + } + ts.Timestamps = timestamps + ts.Values = values + } + return tss, nil +} + +func evalRollupFuncWithoutAt(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { funcName = strings.ToLower(funcName) ecNew := ec var offset int64 diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 69b6f1eb4..2db29bc05 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -974,6 +974,50 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run("time() @ 1h", func(t *testing.T) { + t.Parallel() + q := `time() @ 1h` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{3600, 3600, 3600, 3600, 3600, 3600}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run("time() @ start()", func(t *testing.T) { + t.Parallel() + q := `time() @ start()` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1000, 1000, 1000, 1000, 1000, 1000}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run("time() @ end()", func(t *testing.T) { + t.Parallel() + q := `time() @ end()` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{2000, 2000, 2000, 2000, 2000, 2000}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run("time() @ (end()-10m)", func(t *testing.T) { + t.Parallel() + q := `time() @ (end()-10m)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1400, 1400, 1400, 1400, 1400, 1400}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run("rand()", func(t *testing.T) { t.Parallel() q := `round(rand()/2)` diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 0a7a15099..477d091ab 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -244,22 +244,6 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { return aggrFuncNames, nil } -func getRollupArgIdx(fe *metricsql.FuncExpr) int { - funcName := strings.ToLower(fe.Name) - if rollupFuncs[funcName] == nil { - logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", fe.Name) - } - switch funcName { - case "quantile_over_time", "aggr_over_time", - "hoeffding_bound_lower", "hoeffding_bound_upper": - return 1 - case "quantiles_over_time": - return len(fe.Args) - 1 - default: - return 0 - } -} - func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) ( func(values []float64, timestamps []int64), []*rollupConfig, error) { preFunc := func(values []float64, timestamps []int64) {} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 170bc22b2..ce6f11081 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,9 @@ sort: 15 ## tip +* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add support for `@` modifier, which is enabled by default in Prometheus starting from [Prometheus v2.33.0](https://github.com/prometheus/prometheus/pull/10121). See [these docs](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1348). VictoriaMetrics extends `@` modifier with the following additional features: + * It can contain arbitrary expression. For example, `foo @ (end() - 1h)` would return `foo` value at `end - 1 hour` timestamp on the selected time range `[start ... end]`. Another example: `foo @ now() - 10m` would return `foo` value 10 minutes ago from the current time. + * It can be put everywhere in the query. For example, `sum(foo) @ start()` would calculate `sum(foo)` at `start` timestamp on the selected time range `[start ... end]`. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): log error message when remote storage returns 400 or 409 http errors. This should simplify detection and debugging of this case. See [this issue](vmagent_remotewrite_packets_dropped_total). * FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): store `restore-in-progress` file in `-dst` directory while `vmrestore` is running. This file is automatically deleted when `vmrestore` is successfully finished. This helps detecting incompletely restored data on VictoriaMetrics start. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): print the last sample timestamp when the data migration is interrupted either by user or by error. This helps continuing the data migration from the interruption moment. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1236). diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index ea4bf732f..903fd962c 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -33,9 +33,10 @@ This functionality can be evaluated at [an editable Grafana dashboard](https://p - Graphite-compatible filters can be passed via `{__graphite__="foo.*.bar"}` syntax. See [these docs](https://docs.victoriametrics.com/#selecting-graphite-metrics). VictoriaMetrics also can be used as Graphite datasource in Grafana. See [these docs](https://docs.victoriametrics.com/#graphite-api-usage) for details. See also [label_graphite_group](#label_graphite_group) function, which can be used for extracting the given groups from Graphite metric name. - Lookbehind window in square brackets may be omitted. VictoriaMetrics automatically selects the lookbehind window depending on the current step used for building the graph (e.g. `step` query arg passed to [/api/v1/query_range](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries)). For instance, the following query is valid in VictoriaMetrics: `rate(node_network_receive_bytes_total)`. It is equivalent to `rate(node_network_receive_bytes_total[$__interval])` when used in Grafana. - [Aggregate functions](#aggregate-functions) accept arbitrary number of args. For example, `avg(q1, q2, q3)` would return the average values for every point across time series returned by `q1`, `q2` and `q3`. +- [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier) can be put anywhere in the query. For example, `sum(foo) @ end()` calculates `sum(foo)` at the `end` timestamp of the selected time range `[start ... end]`. +- Arbitrary subexpression can be used as [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier). For example, `foo @ (end() - 1h)` calculates `foo` at the `end - 1 hour` timestamp on the selected time range `[start ... end]`. - [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier), lookbehind window in square brackets and `step` value for [subquery](#subqueries) may refer to the current step aka `$__interval` value from Grafana with `[Ni]` syntax. For instance, `rate(metric[10i] offset 5i)` would return per-second rate over a range covering 10 previous steps with the offset of 5 steps. - [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be put anywere in the query. For instance, `sum(foo) offset 24h`. -- [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be negative. For example, `q offset -1h`. - Lookbehind window in square brackets and [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be fractional. For instance, `rate(node_network_receive_bytes_total[1.5m] offset 0.5d)`. - The duration suffix is optional. The duration is in seconds if the suffix is missing. For example, `rate(m[300] offset 1800)` is equivalent to `rate(m[5m]) offset 30m`. - The duration can be placed anywhere in the query. For example, `sum_over_time(m[1h]) / 1h` is equivalent to `sum_over_time(m[1h]) / 3600`. diff --git a/go.mod b/go.mod index 4d2efc872..69c3a88a4 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.1.0 github.com/VictoriaMetrics/metrics v1.18.1 - github.com/VictoriaMetrics/metricsql v0.34.0 + github.com/VictoriaMetrics/metricsql v0.35.0 github.com/aws/aws-sdk-go v1.42.31 github.com/cespare/xxhash/v2 v2.1.2 github.com/cheggaaa/pb/v3 v3.0.8 diff --git a/go.sum b/go.sum index 31cb2246c..6d5ebbf8c 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= -github.com/VictoriaMetrics/metricsql v0.34.0 h1:zF9yzRyNCAxzgEBBnE4y/p0QYNpSQp2jGEBCVE2fUD0= -github.com/VictoriaMetrics/metricsql v0.34.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= +github.com/VictoriaMetrics/metricsql v0.35.0 h1:jK/+UhPb5o2OESZcqifp9hl0rURPgCwY6coy8OlbIss= +github.com/VictoriaMetrics/metricsql v0.35.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/lexer.go b/vendor/github.com/VictoriaMetrics/metricsql/lexer.go index c5f0bfabd..947da2c53 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/lexer.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/lexer.go @@ -83,7 +83,7 @@ again: } lex.sTail = s[n+1:] goto again - case '{', '}', '[', ']', '(', ')', ',': + case '{', '}', '[', ']', '(', ')', ',', '@': token = s[:1] goto tokenFoundLabel } diff --git a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go index 24674fbc0..a644d63f2 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go @@ -70,6 +70,11 @@ func optimizeBinaryOpArgs(be *BinaryOpExpr) *BinaryOpExpr { } func getMetricExprForOptimization(e Expr) *MetricExpr { + re, ok := e.(*RollupExpr) + if ok { + // Try optimizing the inner expression in RollupExpr. + return getMetricExprForOptimization(re.Expr) + } me, ok := e.(*MetricExpr) if ok { // Ordinary metric expression, i.e. `foo{bar="baz"}` @@ -95,17 +100,12 @@ func getMetricExprForOptimization(e Expr) *MetricExpr { return nil } if IsRollupFunc(fe.Name) { - for _, arg := range fe.Args { - re, ok := arg.(*RollupExpr) - if !ok { - continue - } - if me, ok := re.Expr.(*MetricExpr); ok { - // rollup_func(foo{bar="baz"}[d]) - return me - } + argIdx := GetRollupArgIdx(fe) + if argIdx >= len(fe.Args) { + return nil } - return nil + arg := fe.Args[argIdx] + return getMetricExprForOptimization(arg) } if IsTransformFunc(fe.Name) { switch strings.ToLower(fe.Name) { diff --git a/vendor/github.com/VictoriaMetrics/metricsql/parser.go b/vendor/github.com/VictoriaMetrics/metricsql/parser.go index a5f81ada7..e7fce640c 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/parser.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/parser.go @@ -104,6 +104,9 @@ func mustParseWithArgExpr(s string) *withArgExpr { func removeParensExpr(e Expr) Expr { if re, ok := e.(*RollupExpr); ok { re.Expr = removeParensExpr(re.Expr) + if re.At != nil { + re.At = removeParensExpr(re.At) + } return re } if be, ok := e.(*BinaryOpExpr); ok { @@ -415,13 +418,17 @@ func (p *parser) parseSingleExpr() (Expr, error) { if err != nil { return nil, err } - if p.lex.Token != "[" && !isOffset(p.lex.Token) { + if !isRollupStartToken(p.lex.Token) { // There is no rollup expression. return e, nil } return p.parseRollupExpr(e) } +func isRollupStartToken(token string) bool { + return token == "[" || token == "@" || isOffset(token) +} + func (p *parser) parseSingleExprWithoutRollupSuffix() (Expr, error) { if isPositiveDuration(p.lex.Token) { return p.parsePositiveDuration() @@ -1268,6 +1275,20 @@ func (p *parser) parseWindowAndStep() (*DurationExpr, *DurationExpr, bool, error return window, step, inheritStep, nil } +func (p *parser) parseAtExpr() (Expr, error) { + if p.lex.Token != "@" { + return nil, fmt.Errorf(`unexpected token %q; want "@"`, p.lex.Token) + } + if err := p.lex.Next(); err != nil { + return nil, err + } + e, err := p.parseSingleExprWithoutRollupSuffix() + if err != nil { + return nil, fmt.Errorf("cannot parse `@` expresion: %w", err) + } + return e, nil +} + func (p *parser) parseOffset() (*DurationExpr, error) { if !isOffset(p.lex.Token) { return nil, fmt.Errorf(`offset: unexpected token %q; want "offset"`, p.lex.Token) @@ -1374,11 +1395,11 @@ func (p *parser) parseIdentExpr() (Expr, error) { return p.parseAggrFuncExpr() } return p.parseFuncExpr() - case "{", "[", ")", ",": + case "{", "[", ")", ",", "@": p.lex.Prev() return p.parseMetricExpr() default: - return nil, fmt.Errorf(`identExpr: unexpected token %q; want "(", "{", "[", ")", ","`, p.lex.Token) + return nil, fmt.Errorf(`identExpr: unexpected token %q; want "(", "{", "[", ")", "," or "@"`, p.lex.Token) } } @@ -1417,15 +1438,34 @@ func (p *parser) parseRollupExpr(arg Expr) (Expr, error) { re.Window = window re.Step = step re.InheritStep = inheritStep - if !isOffset(p.lex.Token) { + if !isOffset(p.lex.Token) && p.lex.Token != "@" { return &re, nil } } - offset, err := p.parseOffset() - if err != nil { - return nil, err + if p.lex.Token == "@" { + at, err := p.parseAtExpr() + if err != nil { + return nil, err + } + re.At = at + } + if isOffset(p.lex.Token) { + offset, err := p.parseOffset() + if err != nil { + return nil, err + } + re.Offset = offset + } + if p.lex.Token == "@" { + if re.At != nil { + return nil, fmt.Errorf("duplicate `@` token") + } + at, err := p.parseAtExpr() + if err != nil { + return nil, err + } + re.At = at } - re.Offset = offset return &re, nil } @@ -1677,6 +1717,12 @@ type RollupExpr struct { // If set to true, then `foo[1h:]` would print the same // instead of `foo[1h]`. InheritStep bool + + // At contains an optional expression after `@` modifier. + // + // For example, `foo @ end()` or `bar[5m] @ 12345` + // See https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier + At Expr } // ForSubquery returns true if re represents subquery. @@ -1720,6 +1766,17 @@ func (re *RollupExpr) AppendString(dst []byte) []byte { dst = append(dst, " offset "...) dst = re.Offset.AppendString(dst) } + if re.At != nil { + dst = append(dst, " @ "...) + _, needAtParens := re.At.(*BinaryOpExpr) + if needAtParens { + dst = append(dst, '(') + } + dst = re.At.AppendString(dst) + if needAtParens { + dst = append(dst, ')') + } + } return dst } diff --git a/vendor/github.com/VictoriaMetrics/metricsql/rollup.go b/vendor/github.com/VictoriaMetrics/metricsql/rollup.go index 3ecc2b73d..9782d4c80 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/rollup.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/rollup.go @@ -84,3 +84,23 @@ func IsRollupFunc(funcName string) bool { s := strings.ToLower(funcName) return rollupFuncs[s] } + +// GetRollupArgIdx returns the argument index for the given fe, which accepts the rollup argument. +// +// -1 is returned if fe isn't a rollup function. +func GetRollupArgIdx(fe *FuncExpr) int { + funcName := fe.Name + funcName = strings.ToLower(funcName) + if !rollupFuncs[funcName] { + return -1 + } + switch funcName { + case "quantile_over_time", "aggr_over_time", + "hoeffding_bound_lower", "hoeffding_bound_upper": + return 1 + case "quantiles_over_time": + return len(fe.Args) - 1 + default: + return 0 + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index ed6e86095..527dae4fd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -26,7 +26,7 @@ github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.18.1 ## explicit; go 1.12 github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.34.0 +# github.com/VictoriaMetrics/metricsql v0.35.0 ## explicit; go 1.13 github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop