app/vmselect/promql: add support for @ modifier

Add support for `@` modifier in MetricsQL according to https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier

Extend the support with the following features:
* Allow using `@` modifier everywhere in the query. For example, `sum(foo) @ end()`
* Allow using arbitrary expression as `@` modifier. For example, `foo @ (end() - 1h)`
  returns `foo` value at `end - 1 hour` timestamp on the selected time range `[start ... end]`

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1348
This commit is contained in:
Aliaksandr Valialkin 2022-01-13 22:12:02 +02:00
parent 7d635a2311
commit 0580a58feb
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
12 changed files with 184 additions and 42 deletions

View file

@ -408,7 +408,7 @@ func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.F
if nrf == nil {
return nil, nil
}
rollupArgIdx := getRollupArgIdx(fe)
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if rollupArgIdx >= len(fe.Args) {
// Incorrect number of args for rollup func.
return nil, nil
@ -448,7 +448,7 @@ func evalExprs(ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
func evalRollupFuncArgs(ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
var re *metricsql.RollupExpr
rollupArgIdx := getRollupArgIdx(fe)
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if len(fe.Args) <= rollupArgIdx {
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
}
@ -497,6 +497,39 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
}
func evalRollupFunc(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
if re.At == nil {
return evalRollupFuncWithoutAt(ec, funcName, rf, expr, re, iafc)
}
tssAt, err := evalExpr(ec, re.At)
if err != nil {
return nil, fmt.Errorf("cannot evaluate `@` modifier: %w", err)
}
if len(tssAt) != 1 {
return nil, fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt))
}
atTimestamp := int64(tssAt[0].Values[0] * 1000)
ecNew := newEvalConfig(ec)
ecNew.Start = atTimestamp
ecNew.End = atTimestamp
tss, err := evalRollupFuncWithoutAt(ecNew, funcName, rf, expr, re, iafc)
if err != nil {
return nil, err
}
// expand tssAtTimestamp to the original time range.
timestamps := ec.getSharedTimestamps()
for _, ts := range tss {
v := ts.Values[0]
values := make([]float64, len(timestamps))
for i := range timestamps {
values[i] = v
}
ts.Timestamps = timestamps
ts.Values = values
}
return tss, nil
}
func evalRollupFuncWithoutAt(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
funcName = strings.ToLower(funcName)
ecNew := ec
var offset int64

View file

@ -984,6 +984,50 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time() @ 1h", func(t *testing.T) {
t.Parallel()
q := `time() @ 1h`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3600, 3600, 3600, 3600, 3600, 3600},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time() @ start()", func(t *testing.T) {
t.Parallel()
q := `time() @ start()`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1000, 1000, 1000, 1000, 1000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time() @ end()", func(t *testing.T) {
t.Parallel()
q := `time() @ end()`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{2000, 2000, 2000, 2000, 2000, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("time() @ (end()-10m)", func(t *testing.T) {
t.Parallel()
q := `time() @ (end()-10m)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1400, 1400, 1400, 1400, 1400, 1400},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run("rand()", func(t *testing.T) {
t.Parallel()
q := `round(rand()/2)`

View file

@ -244,22 +244,6 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
return aggrFuncNames, nil
}
func getRollupArgIdx(fe *metricsql.FuncExpr) int {
funcName := strings.ToLower(fe.Name)
if rollupFuncs[funcName] == nil {
logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", fe.Name)
}
switch funcName {
case "quantile_over_time", "aggr_over_time",
"hoeffding_bound_lower", "hoeffding_bound_upper":
return 1
case "quantiles_over_time":
return len(fe.Args) - 1
default:
return 0
}
}
func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) (
func(values []float64, timestamps []int64), []*rollupConfig, error) {
preFunc := func(values []float64, timestamps []int64) {}

View file

@ -6,6 +6,9 @@ sort: 15
## tip
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add support for `@` modifier, which is enabled by default in Prometheus starting from [Prometheus v2.33.0](https://github.com/prometheus/prometheus/pull/10121). See [these docs](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1348). VictoriaMetrics extends `@` modifier with the following additional features:
* It can contain arbitrary expression. For example, `foo @ (end() - 1h)` would return `foo` value at `end - 1 hour` timestamp on the selected time range `[start ... end]`. Another example: `foo @ now() - 10m` would return `foo` value 10 minutes ago from the current time.
* It can be put everywhere in the query. For example, `sum(foo) @ start()` would calculate `sum(foo)` at `start` timestamp on the selected time range `[start ... end]`.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): log error message when remote storage returns 400 or 409 http errors. This should simplify detection and debugging of this case. See [this issue](vmagent_remotewrite_packets_dropped_total).
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): store `restore-in-progress` file in `-dst` directory while `vmrestore` is running. This file is automatically deleted when `vmrestore` is successfully finished. This helps detecting incompletely restored data on VictoriaMetrics start. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): print the last sample timestamp when the data migration is interrupted either by user or by error. This helps continuing the data migration from the interruption moment. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1236).

View file

@ -33,9 +33,10 @@ This functionality can be evaluated at [an editable Grafana dashboard](https://p
- Graphite-compatible filters can be passed via `{__graphite__="foo.*.bar"}` syntax. See [these docs](https://docs.victoriametrics.com/#selecting-graphite-metrics). VictoriaMetrics also can be used as Graphite datasource in Grafana. See [these docs](https://docs.victoriametrics.com/#graphite-api-usage) for details. See also [label_graphite_group](#label_graphite_group) function, which can be used for extracting the given groups from Graphite metric name.
- Lookbehind window in square brackets may be omitted. VictoriaMetrics automatically selects the lookbehind window depending on the current step used for building the graph (e.g. `step` query arg passed to [/api/v1/query_range](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries)). For instance, the following query is valid in VictoriaMetrics: `rate(node_network_receive_bytes_total)`. It is equivalent to `rate(node_network_receive_bytes_total[$__interval])` when used in Grafana.
- [Aggregate functions](#aggregate-functions) accept arbitrary number of args. For example, `avg(q1, q2, q3)` would return the average values for every point across time series returned by `q1`, `q2` and `q3`.
- [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier) can be put anywhere in the query. For example, `sum(foo) @ end()` calculates `sum(foo)` at the `end` timestamp of the selected time range `[start ... end]`.
- Arbitrary subexpression can be used as [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier). For example, `foo @ (end() - 1h)` calculates `foo` at the `end - 1 hour` timestamp on the selected time range `[start ... end]`.
- [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier), lookbehind window in square brackets and `step` value for [subquery](#subqueries) may refer to the current step aka `$__interval` value from Grafana with `[Ni]` syntax. For instance, `rate(metric[10i] offset 5i)` would return per-second rate over a range covering 10 previous steps with the offset of 5 steps.
- [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be put anywere in the query. For instance, `sum(foo) offset 24h`.
- [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be negative. For example, `q offset -1h`.
- Lookbehind window in square brackets and [offset](https://prometheus.io/docs/prometheus/latest/querying/basics/#offset-modifier) may be fractional. For instance, `rate(node_network_receive_bytes_total[1.5m] offset 0.5d)`.
- The duration suffix is optional. The duration is in seconds if the suffix is missing. For example, `rate(m[300] offset 1800)` is equivalent to `rate(m[5m]) offset 30m`.
- The duration can be placed anywhere in the query. For example, `sum_over_time(m[1h]) / 1h` is equivalent to `sum_over_time(m[1h]) / 3600`.

2
go.mod
View file

@ -10,7 +10,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.1.0
github.com/VictoriaMetrics/metrics v1.18.1
github.com/VictoriaMetrics/metricsql v0.34.0
github.com/VictoriaMetrics/metricsql v0.35.0
github.com/aws/aws-sdk-go v1.42.31
github.com/cespare/xxhash/v2 v2.1.2
github.com/cheggaaa/pb/v3 v3.0.8

4
go.sum
View file

@ -115,8 +115,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0=
github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/VictoriaMetrics/metricsql v0.34.0 h1:zF9yzRyNCAxzgEBBnE4y/p0QYNpSQp2jGEBCVE2fUD0=
github.com/VictoriaMetrics/metricsql v0.34.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VictoriaMetrics/metricsql v0.35.0 h1:jK/+UhPb5o2OESZcqifp9hl0rURPgCwY6coy8OlbIss=
github.com/VictoriaMetrics/metricsql v0.35.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=

View file

@ -83,7 +83,7 @@ again:
}
lex.sTail = s[n+1:]
goto again
case '{', '}', '[', ']', '(', ')', ',':
case '{', '}', '[', ']', '(', ')', ',', '@':
token = s[:1]
goto tokenFoundLabel
}

View file

@ -70,6 +70,11 @@ func optimizeBinaryOpArgs(be *BinaryOpExpr) *BinaryOpExpr {
}
func getMetricExprForOptimization(e Expr) *MetricExpr {
re, ok := e.(*RollupExpr)
if ok {
// Try optimizing the inner expression in RollupExpr.
return getMetricExprForOptimization(re.Expr)
}
me, ok := e.(*MetricExpr)
if ok {
// Ordinary metric expression, i.e. `foo{bar="baz"}`
@ -95,18 +100,13 @@ func getMetricExprForOptimization(e Expr) *MetricExpr {
return nil
}
if IsRollupFunc(fe.Name) {
for _, arg := range fe.Args {
re, ok := arg.(*RollupExpr)
if !ok {
continue
}
if me, ok := re.Expr.(*MetricExpr); ok {
// rollup_func(foo{bar="baz"}[d])
return me
}
}
argIdx := GetRollupArgIdx(fe)
if argIdx >= len(fe.Args) {
return nil
}
arg := fe.Args[argIdx]
return getMetricExprForOptimization(arg)
}
if IsTransformFunc(fe.Name) {
switch strings.ToLower(fe.Name) {
case "absent", "histogram_quantile", "label_join", "label_replace", "scalar", "vector",

View file

@ -104,6 +104,9 @@ func mustParseWithArgExpr(s string) *withArgExpr {
func removeParensExpr(e Expr) Expr {
if re, ok := e.(*RollupExpr); ok {
re.Expr = removeParensExpr(re.Expr)
if re.At != nil {
re.At = removeParensExpr(re.At)
}
return re
}
if be, ok := e.(*BinaryOpExpr); ok {
@ -415,13 +418,17 @@ func (p *parser) parseSingleExpr() (Expr, error) {
if err != nil {
return nil, err
}
if p.lex.Token != "[" && !isOffset(p.lex.Token) {
if !isRollupStartToken(p.lex.Token) {
// There is no rollup expression.
return e, nil
}
return p.parseRollupExpr(e)
}
func isRollupStartToken(token string) bool {
return token == "[" || token == "@" || isOffset(token)
}
func (p *parser) parseSingleExprWithoutRollupSuffix() (Expr, error) {
if isPositiveDuration(p.lex.Token) {
return p.parsePositiveDuration()
@ -1268,6 +1275,20 @@ func (p *parser) parseWindowAndStep() (*DurationExpr, *DurationExpr, bool, error
return window, step, inheritStep, nil
}
func (p *parser) parseAtExpr() (Expr, error) {
if p.lex.Token != "@" {
return nil, fmt.Errorf(`unexpected token %q; want "@"`, p.lex.Token)
}
if err := p.lex.Next(); err != nil {
return nil, err
}
e, err := p.parseSingleExprWithoutRollupSuffix()
if err != nil {
return nil, fmt.Errorf("cannot parse `@` expresion: %w", err)
}
return e, nil
}
func (p *parser) parseOffset() (*DurationExpr, error) {
if !isOffset(p.lex.Token) {
return nil, fmt.Errorf(`offset: unexpected token %q; want "offset"`, p.lex.Token)
@ -1374,11 +1395,11 @@ func (p *parser) parseIdentExpr() (Expr, error) {
return p.parseAggrFuncExpr()
}
return p.parseFuncExpr()
case "{", "[", ")", ",":
case "{", "[", ")", ",", "@":
p.lex.Prev()
return p.parseMetricExpr()
default:
return nil, fmt.Errorf(`identExpr: unexpected token %q; want "(", "{", "[", ")", ","`, p.lex.Token)
return nil, fmt.Errorf(`identExpr: unexpected token %q; want "(", "{", "[", ")", "," or "@"`, p.lex.Token)
}
}
@ -1417,15 +1438,34 @@ func (p *parser) parseRollupExpr(arg Expr) (Expr, error) {
re.Window = window
re.Step = step
re.InheritStep = inheritStep
if !isOffset(p.lex.Token) {
if !isOffset(p.lex.Token) && p.lex.Token != "@" {
return &re, nil
}
}
if p.lex.Token == "@" {
at, err := p.parseAtExpr()
if err != nil {
return nil, err
}
re.At = at
}
if isOffset(p.lex.Token) {
offset, err := p.parseOffset()
if err != nil {
return nil, err
}
re.Offset = offset
}
if p.lex.Token == "@" {
if re.At != nil {
return nil, fmt.Errorf("duplicate `@` token")
}
at, err := p.parseAtExpr()
if err != nil {
return nil, err
}
re.At = at
}
return &re, nil
}
@ -1677,6 +1717,12 @@ type RollupExpr struct {
// If set to true, then `foo[1h:]` would print the same
// instead of `foo[1h]`.
InheritStep bool
// At contains an optional expression after `@` modifier.
//
// For example, `foo @ end()` or `bar[5m] @ 12345`
// See https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier
At Expr
}
// ForSubquery returns true if re represents subquery.
@ -1720,6 +1766,17 @@ func (re *RollupExpr) AppendString(dst []byte) []byte {
dst = append(dst, " offset "...)
dst = re.Offset.AppendString(dst)
}
if re.At != nil {
dst = append(dst, " @ "...)
_, needAtParens := re.At.(*BinaryOpExpr)
if needAtParens {
dst = append(dst, '(')
}
dst = re.At.AppendString(dst)
if needAtParens {
dst = append(dst, ')')
}
}
return dst
}

View file

@ -84,3 +84,23 @@ func IsRollupFunc(funcName string) bool {
s := strings.ToLower(funcName)
return rollupFuncs[s]
}
// GetRollupArgIdx returns the argument index for the given fe, which accepts the rollup argument.
//
// -1 is returned if fe isn't a rollup function.
func GetRollupArgIdx(fe *FuncExpr) int {
funcName := fe.Name
funcName = strings.ToLower(funcName)
if !rollupFuncs[funcName] {
return -1
}
switch funcName {
case "quantile_over_time", "aggr_over_time",
"hoeffding_bound_lower", "hoeffding_bound_upper":
return 1
case "quantiles_over_time":
return len(fe.Args) - 1
default:
return 0
}
}

2
vendor/modules.txt vendored
View file

@ -26,7 +26,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.18.1
## explicit; go 1.12
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.34.0
# github.com/VictoriaMetrics/metricsql v0.35.0
## explicit; go 1.13
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop