From e60dfc96ffee2c74f7b8433c4613161198499e74 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 16 Sep 2021 13:33:53 +0300 Subject: [PATCH] app/vmselect/promql: add `mad(q)` and `outliers_mad(tolerance, q)` functions to MetricsQL --- app/vmselect/promql/aggr.go | 102 +++++++++++++++--- app/vmselect/promql/exec_test.go | 44 ++++++++ docs/CHANGELOG.md | 2 + docs/MetricsQL.md | 10 +- go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/aggr.go | 2 + .../VictoriaMetrics/metricsql/lexer.go | 2 +- vendor/modules.txt | 2 +- 9 files changed, 149 insertions(+), 21 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index 78f549fb7..744ffbf38 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -45,6 +45,8 @@ var aggrFuncs = map[string]aggrFunc{ "bottomk_avg": newAggrFuncRangeTopK(avgValue, true), "bottomk_median": newAggrFuncRangeTopK(medianValue, true), "any": aggrFuncAny, + "mad": newAggrFunc(aggrFuncMAD), + "outliers_mad": aggrFuncOutliersMAD, "outliersk": aggrFuncOutliersK, "mode": newAggrFunc(aggrFuncMode), "zscore": aggrFuncZScore, @@ -485,7 +487,6 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) { ts.Values[i] = (v - avg) / stddev } } - // Remove MetricGroup from all the tss. for _, ts := range tss { ts.MetricName.ResetMetricGroup() @@ -811,6 +812,52 @@ func medianValue(values []float64) float64 { return value } +func aggrFuncMAD(tss []*timeseries) []*timeseries { + // Calculate medians for each point across tss. + medians := getPerPointMedians(tss) + // Calculate MAD values multipled by tolerance for each point across tss. + // See https://en.wikipedia.org/wiki/Median_absolute_deviation + mads := getPerPointMADs(tss, medians) + tss[0].Values = append(tss[0].Values[:0], mads...) + return tss[:1] +} + +func aggrFuncOutliersMAD(afa *aggrFuncArg) ([]*timeseries, error) { + args := afa.args + if err := expectTransformArgsNum(args, 2); err != nil { + return nil, err + } + tolerances, err := getScalar(args[0], 0) + if err != nil { + return nil, err + } + afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { + // Calculate medians for each point across tss. + medians := getPerPointMedians(tss) + // Calculate MAD values multipled by tolerance for each point across tss. + // See https://en.wikipedia.org/wiki/Median_absolute_deviation + mads := getPerPointMADs(tss, medians) + for n := range mads { + mads[n] *= tolerances[n] + } + // Leave only time series with at least a single peak above the MAD multiplied by tolerance. + tssDst := tss[:0] + for _, ts := range tss { + values := ts.Values + for n, v := range values { + ad := math.Abs(v - medians[n]) + mad := mads[n] + if ad > mad { + tssDst = append(tssDst, ts) + break + } + } + } + return tssDst + } + return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) +} + func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { @@ -822,20 +869,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { } afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { // Calculate medians for each point across tss. - medians := make([]float64, len(ks)) - h := histogram.GetFast() - for n := range ks { - h.Reset() - for j := range tss { - v := tss[j].Values[n] - if !math.IsNaN(v) { - h.Update(v) - } - } - medians[n] = h.Quantile(0.5) - } - histogram.PutFast(h) - + medians := getPerPointMedians(tss) // Return topK time series with the highest variance from median. f := func(values []float64) float64 { sum2 := float64(0) @@ -850,6 +884,44 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } +func getPerPointMedians(tss []*timeseries) []float64 { + if len(tss) == 0 { + logger.Panicf("BUG: expecting non-empty tss") + } + medians := make([]float64, len(tss[0].Values)) + h := histogram.GetFast() + for n := range medians { + h.Reset() + for j := range tss { + v := tss[j].Values[n] + if !math.IsNaN(v) { + h.Update(v) + } + } + medians[n] = h.Quantile(0.5) + } + histogram.PutFast(h) + return medians +} + +func getPerPointMADs(tss []*timeseries, medians []float64) []float64 { + mads := make([]float64, len(medians)) + h := histogram.GetFast() + for n, median := range medians { + h.Reset() + for j := range tss { + v := tss[j].Values[n] + if !math.IsNaN(v) { + ad := math.Abs(v - median) + h.Update(ad) + } + } + mads[n] = h.Quantile(0.5) + } + histogram.PutFast(h) + return mads +} + func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 25f2613d3..8c9f22cdd 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -5524,6 +5524,47 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{} f(q, resultExpected) }) + t.Run(`mad()`, func(t *testing.T) { + t.Parallel() + q := `mad( + alias(time(), "metric1"), + alias(time()*1.5, "metric2"), + label_set(time()*0.9, "baz", "sss"), + )` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{100, 120, 140, 160, 180, 200}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`outliers_mad(1)`, func(t *testing.T) { + t.Parallel() + q := `outliers_mad(1, ( + alias(time(), "metric1"), + alias(time()*1.5, "metric2"), + label_set(time()*0.9, "baz", "sss"), + ))` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1500, 1800, 2100, 2400, 2700, 3000}, + Timestamps: timestampsExpected, + } + r.MetricName.MetricGroup = []byte("metric2") + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`outliers_mad(5)`, func(t *testing.T) { + t.Parallel() + q := `outliers_mad(5, ( + alias(time(), "metric1"), + alias(time()*1.5, "metric2"), + label_set(time()*0.9, "baz", "sss"), + ))` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) t.Run(`outliersk(0)`, func(t *testing.T) { t.Parallel() q := `outliersk(0, ( @@ -7000,6 +7041,9 @@ func TestExecError(t *testing.T) { f(`hoeffding_bound_upper()`) f(`hoeffding_bound_upper(1)`) f(`hoeffding_bound_upper(0.99, foo, 1)`) + f(`mad()`) + f(`outliers_mad()`) + f(`outliers_mad(1)`) f(`outliersk()`) f(`outliersk(1)`) f(`mode_over_time()`) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1a9c10d56..fcfd9b298 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,8 @@ sort: 15 * FEATURE: vmui: use Prometheus-compatible query args, so `vmui` could be accessed from graph editor in Grafana. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1619). Thanks to @Loori-R. * FEATURE: vmselect: automatically add missing port to `-storageNode` hostnames. For example, `-storageNode=vmstorage1,vmstorage2` is automatically translated to `-storageNode=vmstorage1:8401,vmstorage2:8401`. This simplifies [manual setup of VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#cluster-setup). * FEATURE: vminsert: automatically add missing port to `-storageNode` hostnames. For example, `-storageNode=vmstorage1,vmstorage2` is automatically translated to `-storageNode=vmstorage1:8400,vmstorage2:8400`. This simplifies [manual setup of VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#cluster-setup). +* FEATURE: add [mad(q)](https://docs.victoriametrics.com/MetricsQL.html#mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) for groups of points with identical timestamps across multiple time series. +* FEATURE: add [outliers_mad(tolerance, q)](https://docs.victoriametrics.com/MetricsQL.html#outliers_mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It returns time series with peaks outside the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) multiplied by `tolerance`. * BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601). * BUGFIX: vmselect: reset connection timeouts after each request to `vmstorage`. This should prevent from `cannot read data in 0.000 seconds: unexpected EOF` warning in logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1562). Thanks to @mxlxm . diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 711ac0ff2..97a5a7871 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -748,6 +748,10 @@ See also [implicit query conversions](#implicit-query-conversions). `limitk(k, q) by (group_labels)` returns up to `k` time series per each `group_labels` out of time series returned by `q`. The returned set of time series can change with each call. +#### mad + +`mad(q) by (group_labels)` returns the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. See also [outliers_mad](#outliers_mad) and [stddev](#stddev). + #### max `max(q) by (group_labels)` returns the maximum value per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL. @@ -764,9 +768,13 @@ See also [implicit query conversions](#implicit-query-conversions). `mode(q) by (group_labels)` returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. +#### outliers_mad + +`outliers_mad(tolerance, q)` returns time series from `q` with at least a single point outside [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (aka MAD) multiplied by `tolerance`. E.g. it returns time series with at least a single point below `median(q) - mad(q)` or a single point above `median(q) + mad(q)`. See also [outliersk](#outliersk) and [mad](#mad). + #### outliersk -`outliersk(k, q)` returns up to `k` time series with the biggest standard deviation (aka outliers) out of time series returned by `q`. +`outliersk(k, q)` returns up to `k` time series with the biggest standard deviation (aka outliers) out of time series returned by `q`. See also [outliers_mad](#outliers_mad). #### quantile diff --git a/go.mod b/go.mod index 1bc2db3d4..9511e16ee 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.1.0 github.com/VictoriaMetrics/metrics v1.18.0 - github.com/VictoriaMetrics/metricsql v0.21.0 + github.com/VictoriaMetrics/metricsql v0.22.0 github.com/VividCortex/ewma v1.2.0 // indirect github.com/aws/aws-sdk-go v1.40.43 github.com/cespare/xxhash/v2 v2.1.2 diff --git a/go.sum b/go.sum index a9199a6b7..a4c0550ba 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ= github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= -github.com/VictoriaMetrics/metricsql v0.21.0 h1:wA/IVfRFQaThy4bM1kAmPiCR0BkWv4tEXD9lBF+GPdU= -github.com/VictoriaMetrics/metricsql v0.21.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= +github.com/VictoriaMetrics/metricsql v0.22.0 h1:QWXvNfaFGeEQvh7Cjjpfp/7Un1N+W+ODQOiM+numyN0= +github.com/VictoriaMetrics/metricsql v0.22.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go index 27821de5f..7d6d7dceb 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go @@ -35,6 +35,8 @@ var aggrFuncs = map[string]bool{ "bottomk_avg": true, "bottomk_median": true, "any": true, + "mad": true, + "outliers_mad": true, "outliersk": true, "mode": true, "zscore": true, diff --git a/vendor/github.com/VictoriaMetrics/metricsql/lexer.go b/vendor/github.com/VictoriaMetrics/metricsql/lexer.go index 9bf873fc4..c5f0bfabd 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/lexer.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/lexer.go @@ -134,7 +134,7 @@ func scanString(s string) (string, error) { for { n := strings.IndexByte(s[i:], quote) if n < 0 { - return "", fmt.Errorf("cannot find closing quote %ch for the string %q", quote, s) + return "", fmt.Errorf("cannot find closing quote %c for the string %q", quote, s) } i += n bs := 0 diff --git a/vendor/modules.txt b/vendor/modules.txt index 1726d8a2b..5e93f56ae 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -22,7 +22,7 @@ github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.18.0 ## explicit github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.21.0 +# github.com/VictoriaMetrics/metricsql v0.22.0 ## explicit github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop