diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 4c2cd4138..ac4d2a1b5 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3002,6 +3002,101 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{} f(q, resultExpected) }) + t.Run(`buckets_limit(zero)`, func(t *testing.T) { + t.Parallel() + q := `buckets_limit(0, ( + alias(label_set(100, "le", "inf", "x", "y"), "metric"), + alias(label_set(50, "le", "120", "x", "y"), "metric"), + ))` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) + t.Run(`buckets_limit(unused)`, func(t *testing.T) { + t.Parallel() + q := `sort(buckets_limit(5, ( + alias(label_set(100, "le", "inf", "x", "y"), "metric"), + alias(label_set(50, "le", "120", "x", "y"), "metric"), + )))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{50, 50, 50, 50, 50, 50}, + Timestamps: timestampsExpected, + } + r1.MetricName.MetricGroup = []byte("metric") + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("120"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{100, 100, 100, 100, 100, 100}, + Timestamps: timestampsExpected, + } + r2.MetricName.MetricGroup = []byte("metric") + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("inf"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) + t.Run(`buckets_limit(used)`, func(t *testing.T) { + t.Parallel() + q := `sort(buckets_limit(2, ( + alias(label_set(100, "le", "inf", "x", "y"), "metric"), + alias(label_set(52, "le", "200", "x", "y"), "metric"), + alias(label_set(50, "le", "120", "x", "y"), "metric"), + alias(label_set(20, "le", "70", "x", "y"), "metric"), + alias(label_set(10, "le", "30", "x", "y"), "metric"), + alias(label_set(9, "le", "10", "x", "y"), "metric"), + )))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{50, 50, 50, 50, 50, 50}, + Timestamps: timestampsExpected, + } + r1.MetricName.MetricGroup = []byte("metric") + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("120"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{100, 100, 100, 100, 100, 100}, + Timestamps: timestampsExpected, + } + r2.MetricName.MetricGroup = []byte("metric") + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("inf"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) t.Run(`prometheus_buckets(missing-vmrange)`, func(t *testing.T) { t.Parallel() q := `sort(prometheus_buckets(( @@ -5762,6 +5857,9 @@ func TestExecError(t *testing.T) { f(`mode_over_time()`) f(`rate_over_sum()`) f(`mode()`) + f(`prometheus_buckets()`) + f(`buckets_limit()`) + f(`buckets_limit(1)`) // Invalid argument type f(`median_over_time({}, 2)`) diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 32f3fe4f3..4e0616460 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -98,6 +98,7 @@ var transformFuncs = map[string]transformFunc{ "asin": newTransformFuncOneArg(transformAsin), "acos": newTransformFuncOneArg(transformAcos), "prometheus_buckets": transformPrometheusBuckets, + "buckets_limit": transformBucketsLimit, "histogram_share": transformHistogramShare, "sort_by_label": newTransformFuncSortByLabel(false), "sort_by_label_desc": newTransformFuncSortByLabel(true), @@ -282,6 +283,86 @@ func transformFloor(v float64) float64 { return math.Floor(v) } +func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) { + args := tfa.args + if err := expectTransformArgsNum(args, 2); err != nil { + return nil, err + } + limits, err := getScalar(args[0], 1) + if err != nil { + return nil, err + } + limit := int(limits[0]) + if limit <= 0 { + return nil, nil + } + tss := vmrangeBucketsToLE(args[1]) + if len(tss) == 0 { + return nil, nil + } + + // Group timeseries by all MetricGroup+tags excluding `le` tag. + type x struct { + le float64 + delta float64 + ts *timeseries + } + m := make(map[string][]x) + var b []byte + var mn storage.MetricName + for _, ts := range tss { + leStr := ts.MetricName.GetTagValue("le") + if len(leStr) == 0 { + // Skip time series without `le` tag. + continue + } + le, err := strconv.ParseFloat(string(leStr), 64) + if err != nil { + // Skip time series with invalid `le` tag. + continue + } + mn.CopyFrom(&ts.MetricName) + mn.RemoveTag("le") + b = marshalMetricNameSorted(b[:0], &mn) + m[string(b)] = append(m[string(b)], x{ + le: le, + ts: ts, + }) + } + + // Remove buckets with the smallest counters. + rvs := make([]*timeseries, 0, len(tss)) + for _, leGroup := range m { + if len(leGroup) <= limit { + // The number of buckets in leGroup doesn't exceed the limit. + for _, xx := range leGroup { + rvs = append(rvs, xx.ts) + } + continue + } + // The number of buckets in leGroup exceeds the limit. Remove buckets with the smallest sums. + sort.Slice(leGroup, func(i, j int) bool { + return leGroup[i].le < leGroup[j].le + }) + for n := range tss[0].Values { + prevValue := float64(0) + for i := range leGroup { + xx := &leGroup[i] + value := xx.ts.Values[n] + xx.delta += value - prevValue + prevValue = value + } + } + sort.Slice(leGroup, func(i, j int) bool { + return leGroup[i].delta < leGroup[j].delta + }) + for _, xx := range leGroup[len(leGroup)-limit:] { + rvs = append(rvs, xx.ts) + } + } + return rvs, nil +} + func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 3342cc6b4..7874135d3 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -90,6 +90,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `rand()`, `rand_normal()` and `rand_exponential()` functions - for generating pseudo-random series with even, normal and exponential distribution. - `increases_over_time(m[d])` and `decreases_over_time(m[d])` - returns the number of `m` increases or decreases over the given duration `d`. - `prometheus_buckets(q)` - converts [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) buckets to Prometheus buckets with `le` labels. +- `buckets_limit(k, q)` - limits the number of buckets (Prometheus-style or [VictoriaMetrics-style](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram)) + per each metric returned by by `q` to `k`. It also converts VictoriaMetrics-style buckets to Prometheus-style buckets, i.e. the end result are buckets with with `le` labels. - `histogram(q)` - calculates aggregate histogram over `q` time series for each point on the graph. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details. - `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`. For example, the following query calculates median temperature by country over the last 24 hours: diff --git a/go.mod b/go.mod index 20afb43f7..c523f489b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.0.1 github.com/VictoriaMetrics/metrics v1.12.0 - github.com/VictoriaMetrics/metricsql v0.2.8 + github.com/VictoriaMetrics/metricsql v0.2.9 github.com/aws/aws-sdk-go v1.33.9 github.com/cespare/xxhash/v2 v2.1.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index f18b0b025..9019d63df 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ= github.com/VictoriaMetrics/metrics v1.12.0 h1:BudxtRYSA6j8H9mzjhXNEIsCPIEUPCb76QwFEptQzvQ= github.com/VictoriaMetrics/metrics v1.12.0/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= -github.com/VictoriaMetrics/metricsql v0.2.8 h1:RET+5ZKSHFpcm7RNEEHFMiSNYtd6GlGKyNn/ZO53zhA= -github.com/VictoriaMetrics/metricsql v0.2.8/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= +github.com/VictoriaMetrics/metricsql v0.2.9 h1:RHLEmt4VNZ2RAqZjmXyRtKpCrtSuYUS1+TyOqfXbHWs= +github.com/VictoriaMetrics/metricsql v0.2.9/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/aws/aws-sdk-go v1.33.9 h1:nkC8YxL1nxwshIoO3UM2486Ph+zs7IZWjhRHjmXeCPw= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/transform.go b/vendor/github.com/VictoriaMetrics/metricsql/transform.go index d19222637..cd83a903b 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/transform.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/transform.go @@ -77,6 +77,7 @@ var transformFuncs = map[string]bool{ "asin": true, "acos": true, "prometheus_buckets": true, + "buckets_limit": true, "histogram_share": true, "sort_by_label": true, "sort_by_label_desc": true, diff --git a/vendor/modules.txt b/vendor/modules.txt index 1b89543e8..46c888e71 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.12.0 github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.2.8 +# github.com/VictoriaMetrics/metricsql v0.2.9 github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop # github.com/aws/aws-sdk-go v1.33.9