From 27044b84d27d7814afd286fd1eae412155a707f8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 3 Nov 2021 16:02:20 +0200 Subject: [PATCH] app/vmselect/promql: add `limit_offset(limit, offset, q)` function, which can be used for paging over big number of time series Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1778 --- app/vmselect/promql/aggr.go | 62 ++++++++++++------- app/vmselect/promql/exec_test.go | 18 ++++++ app/vmselect/promql/rollup.go | 12 ++++ app/vmselect/promql/transform.go | 21 ++++--- docs/CHANGELOG.md | 1 + docs/MetricsQL.md | 7 ++- go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/aggr.go | 1 + vendor/modules.txt | 2 +- 10 files changed, 94 insertions(+), 36 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index ba71ea0d6..ab495daee 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -32,6 +32,7 @@ var aggrFuncs = map[string]aggrFunc{ // PromQL extension funcs "median": aggrFuncMedian, "limitk": aggrFuncLimitK, + "limit_offset": aggrFuncLimitOffset, "distinct": newAggrFunc(aggrFuncDistinct), "sum2": newAggrFunc(aggrFuncSum2), "geomean": newAggrFunc(aggrFuncGeomean), @@ -999,20 +1000,45 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } - ks, err := getScalar(args[0], 0) + limits, err := getScalar(args[0], 0) if err != nil { + return nil, fmt.Errorf("cannot obtain limit arg: %w", err) + } + limit := 0 + if len(limits) > 0 { + limit = int(limits[0]) + } + afe := newLimitOffsetAggrFunc(limit, 0) + return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) +} + +func aggrFuncLimitOffset(afa *aggrFuncArg) ([]*timeseries, error) { + args := afa.args + if err := expectTransformArgsNum(args, 3); err != nil { return nil, err } - maxK := 0 - for _, kf := range ks { - k := int(kf) - if k > maxK { - maxK = k - } + limit, err := getIntNumber(args[0], 0) + if err != nil { + return nil, fmt.Errorf("cannot obtain limit arg: %w", err) } - afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { - // Sort series by metricName in order to get consistent set of output series - // across multiple calls to limitk() function. + offset, err := getIntNumber(args[1], 1) + if err != nil { + return nil, fmt.Errorf("cannot obtain offset arg: %w", err) + } + afe := newLimitOffsetAggrFunc(limit, offset) + return aggrFuncExt(afe, args[2], &afa.ae.Modifier, afa.ae.Limit, true) +} + +func newLimitOffsetAggrFunc(limit, offset int) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { + if offset < 0 { + offset = 0 + } + if limit < 0 { + limit = 0 + } + return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { + // Sort series by metricName hash in order to get consistent set of output series + // across multiple calls to limitk() and limit_offset() functions. // Sort series by hash in order to guarantee uniform selection across series. type hashSeries struct { h uint64 @@ -1033,21 +1059,15 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { for i, hs := range hss { tss[i] = hs.ts } - if len(tss) > maxK { - tss = tss[:maxK] + if offset > len(tss) { + return nil } - for i, kf := range ks { - k := int(kf) - if k < 0 { - k = 0 - } - for j := k; j < len(tss); j++ { - tss[j].Values[i] = nan - } + tss = tss[offset:] + if limit < len(tss) { + tss = tss[:limit] } return tss } - return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } func getHash(d *xxhash.Digest, mn *storage.MetricName) uint64 { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 6cab70ed3..007a2bcf1 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -5139,6 +5139,21 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r1} f(q, resultExpected) }) + t.Run(`limit_offset()`, func(t *testing.T) { + t.Parallel() + q := `limit_offset(1, 0, (label_set(10, "foo", "bar"), label_set(time()/150, "xbaz", "sss")))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{10, 10, 10, 10, 10, 10}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) t.Run(`limitk(10)`, func(t *testing.T) { t.Parallel() q := `sort(limitk(10, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` @@ -7436,6 +7451,7 @@ func TestExecError(t *testing.T) { f(`bitmap_or()`) f(`bitmap_xor()`) f(`quantiles()`) + f(`limit_offset()`) // Invalid argument type f(`median_over_time({}, 2)`) @@ -7448,6 +7464,8 @@ func TestExecError(t *testing.T) { f(`topk(label_set(2, "xx", "foo") or 1, 12)`) f(`topk_avg(label_set(2, "xx", "foo") or 1, 12)`) f(`limitk(label_set(2, "xx", "foo") or 1, 12)`) + f(`limit_offet((alias(1,"foo"),alias(2,"bar")), 2, 10)`) + f(`limit_offet(1, (alias(1,"foo"),alias(2,"bar")), 10)`) f(`round(1, 1 or label_set(2, "xx", "foo"))`) f(`histogram_quantile(1 or label_set(2, "xx", "foo"), 1)`) f(`label_set(1, 2, 3)`) diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index f540ba9bb..b80c953a2 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -1950,6 +1950,18 @@ func getScalar(arg interface{}, argNum int) ([]float64, error) { return ts[0].Values, nil } +func getIntNumber(arg interface{}, argNum int) (int, error) { + v, err := getScalar(arg, argNum) + if err != nil { + return 0, err + } + n := 0 + if len(v) > 0 { + n = int(v[0]) + } + return n, nil +} + func getString(tss []*timeseries, argNum int) (string, error) { if len(tss) != 1 { return "", fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(tss)) diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 3cdf8c35e..89cae9eed 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -359,7 +359,10 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - limit := int(limits[0]) + limit := 0 + if len(limits) > 0 { + limit = int(limits[0]) + } if limit <= 0 { return nil, nil } @@ -1185,10 +1188,10 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) { if err != nil { return nil, err } - if len(phis) == 0 { - return nil, nil + phi := float64(0) + if len(phis) > 0 { + phi = phis[0] } - phi := phis[0] rvs := args[1] a := getFloat64s() values := a.A[:0] @@ -1745,14 +1748,10 @@ func transformLabelGraphiteGroup(tfa *transformFuncArg) ([]*timeseries, error) { groupArgs := args[1:] groupIDs := make([]int, len(groupArgs)) for i, arg := range groupArgs { - tmp, err := getScalar(arg, i+1) + groupID, err := getIntNumber(arg, i+1) if err != nil { return nil, fmt.Errorf("cannot get group name from arg #%d: %w", i+1, err) } - groupID := 0 - if len(tmp) > 0 { - groupID = int(tmp[0]) - } groupIDs[i] = groupID } for _, ts := range tss { @@ -2012,7 +2011,9 @@ func newTransformRand(newRandFunc func(r *rand.Rand) func() float64) transformFu if err != nil { return nil, err } - seed = int64(tmp[0]) + if len(tmp) > 0 { + seed = int64(tmp[0]) + } } else { seed = time.Now().UnixNano() } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 92ea45e73..44fe2878c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,6 +11,7 @@ sort: 15 * FEATURE: automatically detect timestamp precision (ns, us, ms or s) for the data ingested into VictoriaMetrics via [InfluxDB line protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). * FEATURE: vmagent: add ability to protect `/config` page with auth key via `-configAuthKey` command-line flag. This page may contain sensitive information such as passwords, so it may be good to restrict access to this page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1764). * FEATURE: add [label_graphite_group](https://docs.victoriametrics.com/MetricsQL.html#label_graphite_group) function for extracting the given groups from Graphite metric names. +* FEATURE: add [limit_offset](https://docs.victoriametrics.com/MetricsQL.html#limit_offset) function, which can be used for implementing simple paging over big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1778). * BUGFIX: vmagent: reduce the increased memory usage when scraping targets with big number of metrics which periodically change. The memory usage has been increased in v1.68.0 after vmagent started generating staleness markers in [stream parse mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1745). * BUGFIX: vmagent: properly display `proxy_url` config option at `http://vmagent:8429/config` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1755). diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 690561dbd..cfc6c9712 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -815,9 +815,14 @@ See also [implicit query conversions](#implicit-query-conversions). `histogram(q)` calculates [VictoriaMetrics histogram](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) per each group of points with the same timestamp. Useful for visualizing big number of time series via a heatmap. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details. +#### limit_offset + +`limit_offset(limit, offset, q)` skips `offset` time series from series returned by `q` and then returns up to `limit` of the remaining time series. This allows implementing simple paging for `q` time series. See also [limitk](#limitk). + + #### limitk -`limitk(k, q) by (group_labels)` returns up to `k` time series per each `group_labels` out of time series returned by `q`. The returned set of time series remain the same across calls. +`limitk(k, q) by (group_labels)` returns up to `k` time series per each `group_labels` out of time series returned by `q`. The returned set of time series remain the same across calls. See also [limit_offset](#limit_offset). #### mad diff --git a/go.mod b/go.mod index 02fe0d142..2cc6950aa 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.1.0 github.com/VictoriaMetrics/metrics v1.18.1 - github.com/VictoriaMetrics/metricsql v0.28.0 + github.com/VictoriaMetrics/metricsql v0.29.0 github.com/VividCortex/ewma v1.2.0 // indirect github.com/aws/aws-sdk-go v1.41.14 github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect diff --git a/go.sum b/go.sum index 4a9e9de53..f44398c6e 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= -github.com/VictoriaMetrics/metricsql v0.28.0 h1:yYtKa95ux5xhH+ziL/QZSc6TWb/5s7yEosmwPBxSvF8= -github.com/VictoriaMetrics/metricsql v0.28.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= +github.com/VictoriaMetrics/metricsql v0.29.0 h1:bQRxsHT6rWZgyMZqocV0lAM5GjpuPvAAW9eFXYR3iBY= +github.com/VictoriaMetrics/metricsql v0.29.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go index 68e9c47d3..ab4691e8d 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go @@ -22,6 +22,7 @@ var aggrFuncs = map[string]bool{ // MetricsQL extension funcs "median": true, "limitk": true, + "limit_offset": true, "distinct": true, "sum2": true, "geomean": true, diff --git a/vendor/modules.txt b/vendor/modules.txt index 728fa7ac8..be29337df 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -21,7 +21,7 @@ github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.18.1 ## explicit github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.28.0 +# github.com/VictoriaMetrics/metricsql v0.29.0 ## explicit github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop