From 994fa2f3bf292965fefe2fceb747f4cec99f49f1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 24 Jul 2020 16:10:47 +0300 Subject: [PATCH] app/vmselect/promql: add `buckets_limit(k, buckets)` function, which limits the number of buckets per time series to `k` This function works with both Prometheus-style and VictoriaMetrics-style buckets. The function removes buckets with the lowest values in order to reserve the highest precision. The function is useful for building heatmaps in Grafana from too big number of buckets. --- app/vmselect/promql/exec_test.go | 98 +++++++++++++++++++ app/vmselect/promql/transform.go | 81 +++++++++++++++ docs/MetricsQL.md | 2 + go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/transform.go | 1 + vendor/modules.txt | 2 +- 7 files changed, 186 insertions(+), 4 deletions(-) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 4c2cd4138..ac4d2a1b5 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3002,6 +3002,101 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{} f(q, resultExpected) }) + t.Run(`buckets_limit(zero)`, func(t *testing.T) { + t.Parallel() + q := `buckets_limit(0, ( + alias(label_set(100, "le", "inf", "x", "y"), "metric"), + alias(label_set(50, "le", "120", "x", "y"), "metric"), + ))` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) + t.Run(`buckets_limit(unused)`, func(t *testing.T) { + t.Parallel() + q := `sort(buckets_limit(5, ( + alias(label_set(100, "le", "inf", "x", "y"), "metric"), + alias(label_set(50, "le", "120", "x", "y"), "metric"), + )))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{50, 50, 50, 50, 50, 50}, + Timestamps: timestampsExpected, + } + r1.MetricName.MetricGroup = []byte("metric") + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("120"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{100, 100, 100, 100, 100, 100}, + Timestamps: timestampsExpected, + } + r2.MetricName.MetricGroup = []byte("metric") + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("inf"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) + t.Run(`buckets_limit(used)`, func(t *testing.T) { + t.Parallel() + q := `sort(buckets_limit(2, ( + alias(label_set(100, "le", "inf", "x", "y"), "metric"), + alias(label_set(52, "le", "200", "x", "y"), "metric"), + alias(label_set(50, "le", "120", "x", "y"), "metric"), + alias(label_set(20, "le", "70", "x", "y"), "metric"), + alias(label_set(10, "le", "30", "x", "y"), "metric"), + alias(label_set(9, "le", "10", "x", "y"), "metric"), + )))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{50, 50, 50, 50, 50, 50}, + Timestamps: timestampsExpected, + } + r1.MetricName.MetricGroup = []byte("metric") + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("120"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{100, 100, 100, 100, 100, 100}, + Timestamps: timestampsExpected, + } + r2.MetricName.MetricGroup = []byte("metric") + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("le"), + Value: []byte("inf"), + }, + { + Key: []byte("x"), + Value: []byte("y"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) t.Run(`prometheus_buckets(missing-vmrange)`, func(t *testing.T) { t.Parallel() q := `sort(prometheus_buckets(( @@ -5762,6 +5857,9 @@ func TestExecError(t *testing.T) { f(`mode_over_time()`) f(`rate_over_sum()`) f(`mode()`) + f(`prometheus_buckets()`) + f(`buckets_limit()`) + f(`buckets_limit(1)`) // Invalid argument type f(`median_over_time({}, 2)`) diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 32f3fe4f3..4e0616460 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -98,6 +98,7 @@ var transformFuncs = map[string]transformFunc{ "asin": newTransformFuncOneArg(transformAsin), "acos": newTransformFuncOneArg(transformAcos), "prometheus_buckets": transformPrometheusBuckets, + "buckets_limit": transformBucketsLimit, "histogram_share": transformHistogramShare, "sort_by_label": newTransformFuncSortByLabel(false), "sort_by_label_desc": newTransformFuncSortByLabel(true), @@ -282,6 +283,86 @@ func transformFloor(v float64) float64 { return math.Floor(v) } +func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) { + args := tfa.args + if err := expectTransformArgsNum(args, 2); err != nil { + return nil, err + } + limits, err := getScalar(args[0], 1) + if err != nil { + return nil, err + } + limit := int(limits[0]) + if limit <= 0 { + return nil, nil + } + tss := vmrangeBucketsToLE(args[1]) + if len(tss) == 0 { + return nil, nil + } + + // Group timeseries by all MetricGroup+tags excluding `le` tag. + type x struct { + le float64 + delta float64 + ts *timeseries + } + m := make(map[string][]x) + var b []byte + var mn storage.MetricName + for _, ts := range tss { + leStr := ts.MetricName.GetTagValue("le") + if len(leStr) == 0 { + // Skip time series without `le` tag. + continue + } + le, err := strconv.ParseFloat(string(leStr), 64) + if err != nil { + // Skip time series with invalid `le` tag. + continue + } + mn.CopyFrom(&ts.MetricName) + mn.RemoveTag("le") + b = marshalMetricNameSorted(b[:0], &mn) + m[string(b)] = append(m[string(b)], x{ + le: le, + ts: ts, + }) + } + + // Remove buckets with the smallest counters. + rvs := make([]*timeseries, 0, len(tss)) + for _, leGroup := range m { + if len(leGroup) <= limit { + // The number of buckets in leGroup doesn't exceed the limit. + for _, xx := range leGroup { + rvs = append(rvs, xx.ts) + } + continue + } + // The number of buckets in leGroup exceeds the limit. Remove buckets with the smallest sums. + sort.Slice(leGroup, func(i, j int) bool { + return leGroup[i].le < leGroup[j].le + }) + for n := range tss[0].Values { + prevValue := float64(0) + for i := range leGroup { + xx := &leGroup[i] + value := xx.ts.Values[n] + xx.delta += value - prevValue + prevValue = value + } + } + sort.Slice(leGroup, func(i, j int) bool { + return leGroup[i].delta < leGroup[j].delta + }) + for _, xx := range leGroup[len(leGroup)-limit:] { + rvs = append(rvs, xx.ts) + } + } + return rvs, nil +} + func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 3342cc6b4..7874135d3 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -90,6 +90,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `rand()`, `rand_normal()` and `rand_exponential()` functions - for generating pseudo-random series with even, normal and exponential distribution. - `increases_over_time(m[d])` and `decreases_over_time(m[d])` - returns the number of `m` increases or decreases over the given duration `d`. - `prometheus_buckets(q)` - converts [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) buckets to Prometheus buckets with `le` labels. +- `buckets_limit(k, q)` - limits the number of buckets (Prometheus-style or [VictoriaMetrics-style](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram)) + per each metric returned by by `q` to `k`. It also converts VictoriaMetrics-style buckets to Prometheus-style buckets, i.e. the end result are buckets with with `le` labels. - `histogram(q)` - calculates aggregate histogram over `q` time series for each point on the graph. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details. - `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`. For example, the following query calculates median temperature by country over the last 24 hours: diff --git a/go.mod b/go.mod index 20afb43f7..c523f489b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.0.1 github.com/VictoriaMetrics/metrics v1.12.0 - github.com/VictoriaMetrics/metricsql v0.2.8 + github.com/VictoriaMetrics/metricsql v0.2.9 github.com/aws/aws-sdk-go v1.33.9 github.com/cespare/xxhash/v2 v2.1.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index f18b0b025..9019d63df 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ= github.com/VictoriaMetrics/metrics v1.12.0 h1:BudxtRYSA6j8H9mzjhXNEIsCPIEUPCb76QwFEptQzvQ= github.com/VictoriaMetrics/metrics v1.12.0/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= -github.com/VictoriaMetrics/metricsql v0.2.8 h1:RET+5ZKSHFpcm7RNEEHFMiSNYtd6GlGKyNn/ZO53zhA= -github.com/VictoriaMetrics/metricsql v0.2.8/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= +github.com/VictoriaMetrics/metricsql v0.2.9 h1:RHLEmt4VNZ2RAqZjmXyRtKpCrtSuYUS1+TyOqfXbHWs= +github.com/VictoriaMetrics/metricsql v0.2.9/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/aws/aws-sdk-go v1.33.9 h1:nkC8YxL1nxwshIoO3UM2486Ph+zs7IZWjhRHjmXeCPw= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/transform.go b/vendor/github.com/VictoriaMetrics/metricsql/transform.go index d19222637..cd83a903b 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/transform.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/transform.go @@ -77,6 +77,7 @@ var transformFuncs = map[string]bool{ "asin": true, "acos": true, "prometheus_buckets": true, + "buckets_limit": true, "histogram_share": true, "sort_by_label": true, "sort_by_label_desc": true, diff --git a/vendor/modules.txt b/vendor/modules.txt index 1b89543e8..46c888e71 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.12.0 github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.2.8 +# github.com/VictoriaMetrics/metricsql v0.2.9 github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop # github.com/aws/aws-sdk-go v1.33.9