app/vmselect/promql: add limit_offset(limit, offset, q) function, which can be used for paging over big number of time series

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1778
This commit is contained in:
Aliaksandr Valialkin 2021-11-03 16:02:20 +02:00
parent 43a58bd618
commit 27044b84d2
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
10 changed files with 94 additions and 36 deletions

View file

@ -32,6 +32,7 @@ var aggrFuncs = map[string]aggrFunc{
// PromQL extension funcs // PromQL extension funcs
"median": aggrFuncMedian, "median": aggrFuncMedian,
"limitk": aggrFuncLimitK, "limitk": aggrFuncLimitK,
"limit_offset": aggrFuncLimitOffset,
"distinct": newAggrFunc(aggrFuncDistinct), "distinct": newAggrFunc(aggrFuncDistinct),
"sum2": newAggrFunc(aggrFuncSum2), "sum2": newAggrFunc(aggrFuncSum2),
"geomean": newAggrFunc(aggrFuncGeomean), "geomean": newAggrFunc(aggrFuncGeomean),
@ -999,20 +1000,45 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
if err := expectTransformArgsNum(args, 2); err != nil { if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err return nil, err
} }
ks, err := getScalar(args[0], 0) limits, err := getScalar(args[0], 0)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain limit arg: %w", err)
}
limit := 0
if len(limits) > 0 {
limit = int(limits[0])
}
afe := newLimitOffsetAggrFunc(limit, 0)
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func aggrFuncLimitOffset(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 3); err != nil {
return nil, err return nil, err
} }
maxK := 0 limit, err := getIntNumber(args[0], 0)
for _, kf := range ks { if err != nil {
k := int(kf) return nil, fmt.Errorf("cannot obtain limit arg: %w", err)
if k > maxK {
maxK = k
}
} }
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { offset, err := getIntNumber(args[1], 1)
// Sort series by metricName in order to get consistent set of output series if err != nil {
// across multiple calls to limitk() function. return nil, fmt.Errorf("cannot obtain offset arg: %w", err)
}
afe := newLimitOffsetAggrFunc(limit, offset)
return aggrFuncExt(afe, args[2], &afa.ae.Modifier, afa.ae.Limit, true)
}
func newLimitOffsetAggrFunc(limit, offset int) func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
if offset < 0 {
offset = 0
}
if limit < 0 {
limit = 0
}
return func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Sort series by metricName hash in order to get consistent set of output series
// across multiple calls to limitk() and limit_offset() functions.
// Sort series by hash in order to guarantee uniform selection across series. // Sort series by hash in order to guarantee uniform selection across series.
type hashSeries struct { type hashSeries struct {
h uint64 h uint64
@ -1033,21 +1059,15 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
for i, hs := range hss { for i, hs := range hss {
tss[i] = hs.ts tss[i] = hs.ts
} }
if len(tss) > maxK { if offset > len(tss) {
tss = tss[:maxK] return nil
} }
for i, kf := range ks { tss = tss[offset:]
k := int(kf) if limit < len(tss) {
if k < 0 { tss = tss[:limit]
k = 0
}
for j := k; j < len(tss); j++ {
tss[j].Values[i] = nan
}
} }
return tss return tss
} }
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
} }
func getHash(d *xxhash.Digest, mn *storage.MetricName) uint64 { func getHash(d *xxhash.Digest, mn *storage.MetricName) uint64 {

View file

@ -5139,6 +5139,21 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1} resultExpected := []netstorage.Result{r1}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`limit_offset()`, func(t *testing.T) {
t.Parallel()
q := `limit_offset(1, 0, (label_set(10, "foo", "bar"), label_set(time()/150, "xbaz", "sss")))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10, 10, 10},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
resultExpected := []netstorage.Result{r1}
f(q, resultExpected)
})
t.Run(`limitk(10)`, func(t *testing.T) { t.Run(`limitk(10)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort(limitk(10, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))` q := `sort(limitk(10, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
@ -7436,6 +7451,7 @@ func TestExecError(t *testing.T) {
f(`bitmap_or()`) f(`bitmap_or()`)
f(`bitmap_xor()`) f(`bitmap_xor()`)
f(`quantiles()`) f(`quantiles()`)
f(`limit_offset()`)
// Invalid argument type // Invalid argument type
f(`median_over_time({}, 2)`) f(`median_over_time({}, 2)`)
@ -7448,6 +7464,8 @@ func TestExecError(t *testing.T) {
f(`topk(label_set(2, "xx", "foo") or 1, 12)`) f(`topk(label_set(2, "xx", "foo") or 1, 12)`)
f(`topk_avg(label_set(2, "xx", "foo") or 1, 12)`) f(`topk_avg(label_set(2, "xx", "foo") or 1, 12)`)
f(`limitk(label_set(2, "xx", "foo") or 1, 12)`) f(`limitk(label_set(2, "xx", "foo") or 1, 12)`)
f(`limit_offet((alias(1,"foo"),alias(2,"bar")), 2, 10)`)
f(`limit_offet(1, (alias(1,"foo"),alias(2,"bar")), 10)`)
f(`round(1, 1 or label_set(2, "xx", "foo"))`) f(`round(1, 1 or label_set(2, "xx", "foo"))`)
f(`histogram_quantile(1 or label_set(2, "xx", "foo"), 1)`) f(`histogram_quantile(1 or label_set(2, "xx", "foo"), 1)`)
f(`label_set(1, 2, 3)`) f(`label_set(1, 2, 3)`)

View file

@ -1950,6 +1950,18 @@ func getScalar(arg interface{}, argNum int) ([]float64, error) {
return ts[0].Values, nil return ts[0].Values, nil
} }
func getIntNumber(arg interface{}, argNum int) (int, error) {
v, err := getScalar(arg, argNum)
if err != nil {
return 0, err
}
n := 0
if len(v) > 0 {
n = int(v[0])
}
return n, nil
}
func getString(tss []*timeseries, argNum int) (string, error) { func getString(tss []*timeseries, argNum int) (string, error) {
if len(tss) != 1 { if len(tss) != 1 {
return "", fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(tss)) return "", fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(tss))

View file

@ -359,7 +359,10 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
limit := int(limits[0]) limit := 0
if len(limits) > 0 {
limit = int(limits[0])
}
if limit <= 0 { if limit <= 0 {
return nil, nil return nil, nil
} }
@ -1185,10 +1188,10 @@ func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(phis) == 0 { phi := float64(0)
return nil, nil if len(phis) > 0 {
phi = phis[0]
} }
phi := phis[0]
rvs := args[1] rvs := args[1]
a := getFloat64s() a := getFloat64s()
values := a.A[:0] values := a.A[:0]
@ -1745,14 +1748,10 @@ func transformLabelGraphiteGroup(tfa *transformFuncArg) ([]*timeseries, error) {
groupArgs := args[1:] groupArgs := args[1:]
groupIDs := make([]int, len(groupArgs)) groupIDs := make([]int, len(groupArgs))
for i, arg := range groupArgs { for i, arg := range groupArgs {
tmp, err := getScalar(arg, i+1) groupID, err := getIntNumber(arg, i+1)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot get group name from arg #%d: %w", i+1, err) return nil, fmt.Errorf("cannot get group name from arg #%d: %w", i+1, err)
} }
groupID := 0
if len(tmp) > 0 {
groupID = int(tmp[0])
}
groupIDs[i] = groupID groupIDs[i] = groupID
} }
for _, ts := range tss { for _, ts := range tss {
@ -2012,7 +2011,9 @@ func newTransformRand(newRandFunc func(r *rand.Rand) func() float64) transformFu
if err != nil { if err != nil {
return nil, err return nil, err
} }
seed = int64(tmp[0]) if len(tmp) > 0 {
seed = int64(tmp[0])
}
} else { } else {
seed = time.Now().UnixNano() seed = time.Now().UnixNano()
} }

View file

@ -11,6 +11,7 @@ sort: 15
* FEATURE: automatically detect timestamp precision (ns, us, ms or s) for the data ingested into VictoriaMetrics via [InfluxDB line protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). * FEATURE: automatically detect timestamp precision (ns, us, ms or s) for the data ingested into VictoriaMetrics via [InfluxDB line protocol](https://docs.victoriametrics.com/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
* FEATURE: vmagent: add ability to protect `/config` page with auth key via `-configAuthKey` command-line flag. This page may contain sensitive information such as passwords, so it may be good to restrict access to this page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1764). * FEATURE: vmagent: add ability to protect `/config` page with auth key via `-configAuthKey` command-line flag. This page may contain sensitive information such as passwords, so it may be good to restrict access to this page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1764).
* FEATURE: add [label_graphite_group](https://docs.victoriametrics.com/MetricsQL.html#label_graphite_group) function for extracting the given groups from Graphite metric names. * FEATURE: add [label_graphite_group](https://docs.victoriametrics.com/MetricsQL.html#label_graphite_group) function for extracting the given groups from Graphite metric names.
* FEATURE: add [limit_offset](https://docs.victoriametrics.com/MetricsQL.html#limit_offset) function, which can be used for implementing simple paging over big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1778).
* BUGFIX: vmagent: reduce the increased memory usage when scraping targets with big number of metrics which periodically change. The memory usage has been increased in v1.68.0 after vmagent started generating staleness markers in [stream parse mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1745). * BUGFIX: vmagent: reduce the increased memory usage when scraping targets with big number of metrics which periodically change. The memory usage has been increased in v1.68.0 after vmagent started generating staleness markers in [stream parse mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1745).
* BUGFIX: vmagent: properly display `proxy_url` config option at `http://vmagent:8429/config` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1755). * BUGFIX: vmagent: properly display `proxy_url` config option at `http://vmagent:8429/config` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1755).

View file

@ -815,9 +815,14 @@ See also [implicit query conversions](#implicit-query-conversions).
`histogram(q)` calculates [VictoriaMetrics histogram](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) per each group of points with the same timestamp. Useful for visualizing big number of time series via a heatmap. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details. `histogram(q)` calculates [VictoriaMetrics histogram](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) per each group of points with the same timestamp. Useful for visualizing big number of time series via a heatmap. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details.
#### limit_offset
`limit_offset(limit, offset, q)` skips `offset` time series from series returned by `q` and then returns up to `limit` of the remaining time series. This allows implementing simple paging for `q` time series. See also [limitk](#limitk).
#### limitk #### limitk
`limitk(k, q) by (group_labels)` returns up to `k` time series per each `group_labels` out of time series returned by `q`. The returned set of time series remain the same across calls. `limitk(k, q) by (group_labels)` returns up to `k` time series per each `group_labels` out of time series returned by `q`. The returned set of time series remain the same across calls. See also [limit_offset](#limit_offset).
#### mad #### mad

2
go.mod
View file

@ -8,7 +8,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.1.0 github.com/VictoriaMetrics/fasthttp v1.1.0
github.com/VictoriaMetrics/metrics v1.18.1 github.com/VictoriaMetrics/metrics v1.18.1
github.com/VictoriaMetrics/metricsql v0.28.0 github.com/VictoriaMetrics/metricsql v0.29.0
github.com/VividCortex/ewma v1.2.0 // indirect github.com/VividCortex/ewma v1.2.0 // indirect
github.com/aws/aws-sdk-go v1.41.14 github.com/aws/aws-sdk-go v1.41.14
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect

4
go.sum
View file

@ -108,8 +108,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0=
github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/VictoriaMetrics/metricsql v0.28.0 h1:yYtKa95ux5xhH+ziL/QZSc6TWb/5s7yEosmwPBxSvF8= github.com/VictoriaMetrics/metricsql v0.29.0 h1:bQRxsHT6rWZgyMZqocV0lAM5GjpuPvAAW9eFXYR3iBY=
github.com/VictoriaMetrics/metricsql v0.28.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= github.com/VictoriaMetrics/metricsql v0.29.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=

View file

@ -22,6 +22,7 @@ var aggrFuncs = map[string]bool{
// MetricsQL extension funcs // MetricsQL extension funcs
"median": true, "median": true,
"limitk": true, "limitk": true,
"limit_offset": true,
"distinct": true, "distinct": true,
"sum2": true, "sum2": true,
"geomean": true, "geomean": true,

2
vendor/modules.txt vendored
View file

@ -21,7 +21,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.18.1 # github.com/VictoriaMetrics/metrics v1.18.1
## explicit ## explicit
github.com/VictoriaMetrics/metrics github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.28.0 # github.com/VictoriaMetrics/metricsql v0.29.0
## explicit ## explicit
github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop github.com/VictoriaMetrics/metricsql/binaryop