From 4d76977745a3134cdb79a5849c57b3966ca6dc47 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 23 Nov 2019 11:45:09 +0200 Subject: [PATCH] app/vmselect/promql: transparently apply `prometheus_buckets` in `histogram_quantile` --- app/vmselect/promql/exec_test.go | 77 ++++++++++---- .../promql/rollup_result_cache_test.go | 2 +- app/vmselect/promql/transform.go | 100 +++++++++++++----- 3 files changed, 128 insertions(+), 51 deletions(-) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 4dc590400..2c7c2e405 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -2359,7 +2359,7 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) - t.Run(`histogram_quantile(nan-bucket-count)`, func(t *testing.T) { + t.Run(`histogram_quantile(nan-bucket-count-some)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.6, label_set(90, "foo", "bar", "le", "10") @@ -2368,7 +2368,7 @@ func TestExecSuccess(t *testing.T) { )` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{30, 30, 30, 30, 30, 30}, + Values: []float64{10, 10, 10, 10, 10, 10}, Timestamps: timestampsExpected, } r.MetricName.Tags = []storage.Tag{{ @@ -2378,7 +2378,7 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) - t.Run(`histogram_quantile(nan-bucket-count)`, func(t *testing.T) { + t.Run(`histogram_quantile(normal-bucket-count)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.2, label_set(0, "foo", "bar", "le", "10") @@ -2407,7 +2407,7 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{} f(q, resultExpected) }) - t.Run(`histogram_quantile(nan-bucket-count)`, func(t *testing.T) { + t.Run(`histogram_quantile(nan-bucket-count-all)`, func(t *testing.T) { t.Parallel() q := `histogram_quantile(0.6, label_set(nan, "foo", "bar", "le", "10") @@ -2420,17 +2420,16 @@ func TestExecSuccess(t *testing.T) { t.Run(`prometheus_buckets(missing-vmrange)`, func(t *testing.T) { t.Parallel() q := `sort(prometheus_buckets(( - alias(label_set(90, "foo", "bar", "le", "0"), "xxx"), alias(label_set(time()/20, "foo", "bar", "le", "0.2"), "xxx"), alias(label_set(time()/100, "foo", "bar", "vmrange", "foobar"), "xxx"), alias(label_set(time()/100, "foo", "bar", "vmrange", "30...foobar"), "xxx"), alias(label_set(time()/100, "foo", "bar", "vmrange", "30...40"), "xxx"), + alias(label_set(time()/80, "foo", "bar", "vmrange", "0...900", "le", "54"), "yyy"), alias(label_set(time()/40, "foo", "bar", "vmrange", "900...1000", "le", "2343"), "yyy"), - alias(label_set(time()/10, "foo", "bar", "vmrange", "1000...Inf", "le", "2343"), "yyy"), )))` r1 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{10, 12, 14, 16, 18, 20}, + Values: []float64{0, 0, 0, 0, 0, 0}, Timestamps: timestampsExpected, } r1.MetricName.MetricGroup = []byte("xxx") @@ -2441,15 +2440,15 @@ func TestExecSuccess(t *testing.T) { }, { Key: []byte("le"), - Value: []byte("40"), + Value: []byte("30"), }, } r2 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{25, 30, 35, 40, 45, 50}, + Values: []float64{10, 12, 14, 16, 18, 20}, Timestamps: timestampsExpected, } - r2.MetricName.MetricGroup = []byte("yyy") + r2.MetricName.MetricGroup = []byte("xxx") r2.MetricName.Tags = []storage.Tag{ { Key: []byte("foo"), @@ -2457,12 +2456,12 @@ func TestExecSuccess(t *testing.T) { }, { Key: []byte("le"), - Value: []byte("1000"), + Value: []byte("40"), }, } r3 := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{125, 150, 175, 200, 225, 250}, + Values: []float64{12.5, 15, 17.5, 20, 22.5, 25}, Timestamps: timestampsExpected, } r3.MetricName.MetricGroup = []byte("yyy") @@ -2473,19 +2472,51 @@ func TestExecSuccess(t *testing.T) { }, { Key: []byte("le"), - Value: []byte("Inf"), + Value: []byte("900"), }, } - resultExpected := []netstorage.Result{r1, r2, r3} + r4 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{37.5, 45, 52.5, 60, 67.5, 75}, + Timestamps: timestampsExpected, + } + r4.MetricName.MetricGroup = []byte("yyy") + r4.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("le"), + Value: []byte("1000"), + }, + } + r5 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{50, 60, 70, 80, 90, 100}, + Timestamps: timestampsExpected, + } + r5.MetricName.MetricGroup = []byte("xxx") + r5.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("le"), + Value: []byte("0.2"), + }, + } + resultExpected := []netstorage.Result{r1, r2, r3, r4, r5} f(q, resultExpected) }) t.Run(`prometheus_buckets(valid)`, func(t *testing.T) { t.Parallel() q := `sort(prometheus_buckets(( alias(label_set(90, "foo", "bar", "vmrange", "0...0"), "xxx"), - alias(label_set(time()/20, "foo", "bar", "vmrange", "0.1...0.2"), "xxx"), - alias(label_set(time()/100, "foo", "bar", "vmrange", "30...40"), "xxx"), - alias(label_set(time()/10, "foo", "bar", "vmrange", "1000...Inf"), "xxx"), + alias(label_set(time()/20, "foo", "bar", "vmrange", "0...0.2"), "xxx"), + alias(label_set(time()/100, "foo", "bar", "vmrange", "0.2...40"), "xxx"), + alias(label_set(time()/10, "foo", "bar", "vmrange", "40...Inf"), "xxx"), )))` r1 := netstorage.Result{ MetricName: metricNameExpected, @@ -4424,12 +4455,12 @@ func testResultsEqual(t *testing.T, result, resultExpected []netstorage.Result) for i := range result { r := &result[i] rExpected := &resultExpected[i] - testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName) + testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName, i) testRowsEqual(t, r.Values, r.Timestamps, rExpected.Values, rExpected.Timestamps) } } -func testMetricNamesEqual(t *testing.T, mn, mnExpected *storage.MetricName) { +func testMetricNamesEqual(t *testing.T, mn, mnExpected *storage.MetricName, pos int) { t.Helper() if mn.AccountID != mnExpected.AccountID { t.Fatalf(`unexpected accountID; got %d; want %d`, mn.AccountID, mnExpected.AccountID) @@ -4438,19 +4469,19 @@ func testMetricNamesEqual(t *testing.T, mn, mnExpected *storage.MetricName) { t.Fatalf(`unexpected projectID; got %d; want %d`, mn.ProjectID, mnExpected.ProjectID) } if string(mn.MetricGroup) != string(mnExpected.MetricGroup) { - t.Fatalf(`unexpected MetricGroup; got %q; want %q`, mn.MetricGroup, mnExpected.MetricGroup) + t.Fatalf(`unexpected MetricGroup at #%d; got %q; want %q`, pos, mn.MetricGroup, mnExpected.MetricGroup) } if len(mn.Tags) != len(mnExpected.Tags) { - t.Fatalf(`unexpected tags count; got %d; want %d`, len(mn.Tags), len(mnExpected.Tags)) + t.Fatalf(`unexpected tags count at #%d; got %d; want %d`, pos, len(mn.Tags), len(mnExpected.Tags)) } for i := range mn.Tags { tag := &mn.Tags[i] tagExpected := &mnExpected.Tags[i] if string(tag.Key) != string(tagExpected.Key) { - t.Fatalf(`unexpected tag key; got %q; want %q`, tag.Key, tagExpected.Key) + t.Fatalf(`unexpected tag key at #%d,%d; got %q; want %q`, pos, i, tag.Key, tagExpected.Key) } if string(tag.Value) != string(tagExpected.Value) { - t.Fatalf(`unexpected tag value; got %q; want %q`, tag.Value, tagExpected.Value) + t.Fatalf(`unexpected tag value at #%d,%d; got %q; want %q`, pos, i, tag.Value, tagExpected.Value) } } } diff --git a/app/vmselect/promql/rollup_result_cache_test.go b/app/vmselect/promql/rollup_result_cache_test.go index 998d66de9..26e7b2e3d 100644 --- a/app/vmselect/promql/rollup_result_cache_test.go +++ b/app/vmselect/promql/rollup_result_cache_test.go @@ -394,7 +394,7 @@ func testTimeseriesEqual(t *testing.T, tss, tssExpected []*timeseries) { } for i, ts := range tss { tsExpected := tssExpected[i] - testMetricNamesEqual(t, &ts.MetricName, &tsExpected.MetricName) + testMetricNamesEqual(t, &ts.MetricName, &tsExpected.MetricName, i) testRowsEqual(t, ts.Values, ts.Timestamps, tsExpected.Values, tsExpected.Timestamps) } } diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 33c950b01..445462846 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -278,42 +278,85 @@ func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) { if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } + rvs := vmrangeBucketsToLE(args[0]) + return rvs, nil +} + +func vmrangeBucketsToLE(tss []*timeseries) []*timeseries { + rvs := make([]*timeseries, 0, len(tss)) // Group timeseries by MetricGroup+tags excluding `vmrange` tag. type x struct { - leStr string - le float64 - ts *timeseries + startStr string + endStr string + start float64 + end float64 + ts *timeseries } m := make(map[string][]x) bb := bbPool.Get() defer bbPool.Put(bb) - for _, ts := range args[0] { + for _, ts := range tss { vmrange := ts.MetricName.GetTagValue("vmrange") if len(vmrange) == 0 { + if le := ts.MetricName.GetTagValue("le"); len(le) > 0 { + // Keep Prometheus-compatible buckets. + rvs = append(rvs, ts) + } continue } n := strings.Index(bytesutil.ToUnsafeString(vmrange), "...") if n < 0 { continue } - leStr := string(vmrange[n+len("..."):]) - le, err := strconv.ParseFloat(leStr, 64) + startStr := string(vmrange[:n]) + start, err := strconv.ParseFloat(startStr, 64) if err != nil { continue } + endStr := string(vmrange[n+len("..."):]) + end, err := strconv.ParseFloat(endStr, 64) + if err != nil { + continue + } + ts.MetricName.RemoveTag("le") ts.MetricName.RemoveTag("vmrange") bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) m[string(bb.B)] = append(m[string(bb.B)], x{ - leStr: leStr, - le: le, - ts: ts, + startStr: startStr, + endStr: endStr, + start: start, + end: end, + ts: ts, }) } - rvs := make([]*timeseries, 0, len(args[0])) + // Convert `vmrange` label in each group of time series to `le` label. for _, xss := range m { - sort.Slice(xss, func(i, j int) bool { return xss[i].le < xss[j].le }) + sort.Slice(xss, func(i, j int) bool { return xss[i].end < xss[j].end }) + xssNew := make([]x, 0, len(xss)) + endStrPrev := "0" + for _, xs := range xss { + ts := xs.ts + if xs.startStr != endStrPrev { + var tsDummy timeseries + tsDummy.CopyFromShallowTimestamps(ts) + values := tsDummy.Values + for i := range values { + values[i] = 0 + } + tsDummy.MetricName.AddTag("le", xs.startStr) + xssNew = append(xssNew, x{ + endStr: xs.startStr, + end: xs.start, + ts: &tsDummy, + }) + } + ts.MetricName.AddTag("le", xs.endStr) + xssNew = append(xssNew, xs) + endStrPrev = xs.endStr + } + xss = xssNew for i := range xss[0].ts.Values { count := float64(0) for _, xs := range xss { @@ -322,16 +365,14 @@ func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) { if !math.IsNaN(v) { count += v } - ts.MetricName.RemoveTag("le") - ts.MetricName.AddTag("le", xs.leStr) ts.Values[i] = count } } - for i := range xss { - rvs = append(rvs, xss[i].ts) + for _, xs := range xss { + rvs = append(rvs, xs.ts) } } - return rvs, nil + return rvs } func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { @@ -344,6 +385,9 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { return nil, err } + // Convert buckets with `vmrange` labels to buckets with `le` labels. + tss := vmrangeBucketsToLE(args[1]) + // Group metrics by all tags excluding "le" type x struct { le float64 @@ -351,7 +395,7 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { } m := make(map[string][]x) bb := bbPool.Get() - for _, ts := range args[1] { + for _, ts := range tss { tagValue := ts.MetricName.GetTagValue("le") if len(tagValue) == 0 { continue @@ -375,18 +419,16 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { lastNonInf := func(i int, xss []x) float64 { for len(xss) > 0 { xsLast := xss[len(xss)-1] - if xsLast.ts.Values[i] == 0 { + v := xsLast.ts.Values[i] + if v == 0 { return nan } - if !math.IsInf(xsLast.le, 0) { - break + if !math.IsNaN(v) && !math.IsInf(xsLast.le, 0) { + return xsLast.le } xss = xss[:len(xss)-1] } - if len(xss) == 0 { - return nan - } - return xss[len(xss)-1].le + return nan } quantile := func(i int, phis []float64, xss []x) float64 { phi := phis[i] @@ -399,9 +441,9 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { vPrev := float64(0) for _, xs := range xss { v := xs.ts.Values[i] - if math.IsNaN(v) || v < vPrev { + if v < vPrev { xs.ts.Values[i] = vPrev - } else { + } else if !math.IsNaN(v) { vPrev = v } } @@ -423,6 +465,11 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { lePrev := float64(0) for _, xs := range xss { v := xs.ts.Values[i] + if math.IsNaN(v) { + // Skip NaNs - they may appear if the selected time range + // contains multiple different bucket sets. + continue + } le := xs.le if v < vReq { vPrev = v @@ -450,7 +497,6 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { } rvs = append(rvs, dst) } - return rvs, nil }