From 3f8de4f82fa79dfcadb69dc1d14daf63138efa7a Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Thu, 16 Sep 2021 13:33:53 +0300
Subject: [PATCH] app/vmselect/promql: add `mad(q)` and
 `outliers_mad(tolerance, q)` functions to MetricsQL

---
 app/vmselect/promql/aggr.go                   | 102 +++++++++++++++---
 app/vmselect/promql/exec_test.go              |  44 ++++++++
 docs/CHANGELOG.md                             |   2 +
 docs/MetricsQL.md                             |  10 +-
 go.mod                                        |   2 +-
 go.sum                                        |   4 +-
 .../VictoriaMetrics/metricsql/aggr.go         |   2 +
 .../VictoriaMetrics/metricsql/lexer.go        |   2 +-
 vendor/modules.txt                            |   2 +-
 9 files changed, 149 insertions(+), 21 deletions(-)

diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go
index 78f549fb78..744ffbf38e 100644
--- a/app/vmselect/promql/aggr.go
+++ b/app/vmselect/promql/aggr.go
@@ -45,6 +45,8 @@ var aggrFuncs = map[string]aggrFunc{
 	"bottomk_avg":    newAggrFuncRangeTopK(avgValue, true),
 	"bottomk_median": newAggrFuncRangeTopK(medianValue, true),
 	"any":            aggrFuncAny,
+	"mad":            newAggrFunc(aggrFuncMAD),
+	"outliers_mad":   aggrFuncOutliersMAD,
 	"outliersk":      aggrFuncOutliersK,
 	"mode":           newAggrFunc(aggrFuncMode),
 	"zscore":         aggrFuncZScore,
@@ -485,7 +487,6 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
 				ts.Values[i] = (v - avg) / stddev
 			}
 		}
-
 		// Remove MetricGroup from all the tss.
 		for _, ts := range tss {
 			ts.MetricName.ResetMetricGroup()
@@ -811,6 +812,52 @@ func medianValue(values []float64) float64 {
 	return value
 }
 
+func aggrFuncMAD(tss []*timeseries) []*timeseries {
+	// Calculate medians for each point across tss.
+	medians := getPerPointMedians(tss)
+	// Calculate MAD values multipled by tolerance for each point across tss.
+	// See https://en.wikipedia.org/wiki/Median_absolute_deviation
+	mads := getPerPointMADs(tss, medians)
+	tss[0].Values = append(tss[0].Values[:0], mads...)
+	return tss[:1]
+}
+
+func aggrFuncOutliersMAD(afa *aggrFuncArg) ([]*timeseries, error) {
+	args := afa.args
+	if err := expectTransformArgsNum(args, 2); err != nil {
+		return nil, err
+	}
+	tolerances, err := getScalar(args[0], 0)
+	if err != nil {
+		return nil, err
+	}
+	afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
+		// Calculate medians for each point across tss.
+		medians := getPerPointMedians(tss)
+		// Calculate MAD values multipled by tolerance for each point across tss.
+		// See https://en.wikipedia.org/wiki/Median_absolute_deviation
+		mads := getPerPointMADs(tss, medians)
+		for n := range mads {
+			mads[n] *= tolerances[n]
+		}
+		// Leave only time series with at least a single peak above the MAD multiplied by tolerance.
+		tssDst := tss[:0]
+		for _, ts := range tss {
+			values := ts.Values
+			for n, v := range values {
+				ad := math.Abs(v - medians[n])
+				mad := mads[n]
+				if ad > mad {
+					tssDst = append(tssDst, ts)
+					break
+				}
+			}
+		}
+		return tssDst
+	}
+	return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
+}
+
 func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
 	args := afa.args
 	if err := expectTransformArgsNum(args, 2); err != nil {
@@ -822,20 +869,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
 	}
 	afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
 		// Calculate medians for each point across tss.
-		medians := make([]float64, len(ks))
-		h := histogram.GetFast()
-		for n := range ks {
-			h.Reset()
-			for j := range tss {
-				v := tss[j].Values[n]
-				if !math.IsNaN(v) {
-					h.Update(v)
-				}
-			}
-			medians[n] = h.Quantile(0.5)
-		}
-		histogram.PutFast(h)
-
+		medians := getPerPointMedians(tss)
 		// Return topK time series with the highest variance from median.
 		f := func(values []float64) float64 {
 			sum2 := float64(0)
@@ -850,6 +884,44 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
 	return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
 }
 
+func getPerPointMedians(tss []*timeseries) []float64 {
+	if len(tss) == 0 {
+		logger.Panicf("BUG: expecting non-empty tss")
+	}
+	medians := make([]float64, len(tss[0].Values))
+	h := histogram.GetFast()
+	for n := range medians {
+		h.Reset()
+		for j := range tss {
+			v := tss[j].Values[n]
+			if !math.IsNaN(v) {
+				h.Update(v)
+			}
+		}
+		medians[n] = h.Quantile(0.5)
+	}
+	histogram.PutFast(h)
+	return medians
+}
+
+func getPerPointMADs(tss []*timeseries, medians []float64) []float64 {
+	mads := make([]float64, len(medians))
+	h := histogram.GetFast()
+	for n, median := range medians {
+		h.Reset()
+		for j := range tss {
+			v := tss[j].Values[n]
+			if !math.IsNaN(v) {
+				ad := math.Abs(v - median)
+				h.Update(ad)
+			}
+		}
+		mads[n] = h.Quantile(0.5)
+	}
+	histogram.PutFast(h)
+	return mads
+}
+
 func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
 	args := afa.args
 	if err := expectTransformArgsNum(args, 2); err != nil {
diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go
index cd1f0c4b1c..959cdfa119 100644
--- a/app/vmselect/promql/exec_test.go
+++ b/app/vmselect/promql/exec_test.go
@@ -5534,6 +5534,47 @@ func TestExecSuccess(t *testing.T) {
 		resultExpected := []netstorage.Result{}
 		f(q, resultExpected)
 	})
+	t.Run(`mad()`, func(t *testing.T) {
+		t.Parallel()
+		q := `mad(
+			alias(time(), "metric1"),
+			alias(time()*1.5, "metric2"),
+			label_set(time()*0.9, "baz", "sss"),
+		)`
+		r := netstorage.Result{
+			MetricName: metricNameExpected,
+			Values:     []float64{100, 120, 140, 160, 180, 200},
+			Timestamps: timestampsExpected,
+		}
+		resultExpected := []netstorage.Result{r}
+		f(q, resultExpected)
+	})
+	t.Run(`outliers_mad(1)`, func(t *testing.T) {
+		t.Parallel()
+		q := `outliers_mad(1, (
+			alias(time(), "metric1"),
+			alias(time()*1.5, "metric2"),
+			label_set(time()*0.9, "baz", "sss"),
+		))`
+		r := netstorage.Result{
+			MetricName: metricNameExpected,
+			Values:     []float64{1500, 1800, 2100, 2400, 2700, 3000},
+			Timestamps: timestampsExpected,
+		}
+		r.MetricName.MetricGroup = []byte("metric2")
+		resultExpected := []netstorage.Result{r}
+		f(q, resultExpected)
+	})
+	t.Run(`outliers_mad(5)`, func(t *testing.T) {
+		t.Parallel()
+		q := `outliers_mad(5, (
+			alias(time(), "metric1"),
+			alias(time()*1.5, "metric2"),
+			label_set(time()*0.9, "baz", "sss"),
+		))`
+		resultExpected := []netstorage.Result{}
+		f(q, resultExpected)
+	})
 	t.Run(`outliersk(0)`, func(t *testing.T) {
 		t.Parallel()
 		q := `outliersk(0, (
@@ -7014,6 +7055,9 @@ func TestExecError(t *testing.T) {
 	f(`hoeffding_bound_upper()`)
 	f(`hoeffding_bound_upper(1)`)
 	f(`hoeffding_bound_upper(0.99, foo, 1)`)
+	f(`mad()`)
+	f(`outliers_mad()`)
+	f(`outliers_mad(1)`)
 	f(`outliersk()`)
 	f(`outliersk(1)`)
 	f(`mode_over_time()`)
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 1a9c10d56a..fcfd9b2989 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -24,6 +24,8 @@ sort: 15
 * FEATURE: vmui: use Prometheus-compatible query args, so `vmui` could be accessed from graph editor in Grafana. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1619). Thanks to @Loori-R.
 * FEATURE: vmselect: automatically add missing port to `-storageNode` hostnames. For example, `-storageNode=vmstorage1,vmstorage2` is automatically translated to `-storageNode=vmstorage1:8401,vmstorage2:8401`. This simplifies [manual setup of VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#cluster-setup).
 * FEATURE: vminsert: automatically add missing port to `-storageNode` hostnames. For example, `-storageNode=vmstorage1,vmstorage2` is automatically translated to `-storageNode=vmstorage1:8400,vmstorage2:8400`. This simplifies [manual setup of VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#cluster-setup).
+* FEATURE: add [mad(q)](https://docs.victoriametrics.com/MetricsQL.html#mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) for groups of points with identical timestamps across multiple time series.
+* FEATURE: add [outliers_mad(tolerance, q)](https://docs.victoriametrics.com/MetricsQL.html#outliers_mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It returns time series with peaks outside the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) multiplied by `tolerance`.
 
 * BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601).
 * BUGFIX: vmselect: reset connection timeouts after each request to `vmstorage`. This should prevent from `cannot read data in 0.000 seconds: unexpected EOF` warning in logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1562). Thanks to @mxlxm .
diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md
index 711ac0ff27..97a5a7871c 100644
--- a/docs/MetricsQL.md
+++ b/docs/MetricsQL.md
@@ -748,6 +748,10 @@ See also [implicit query conversions](#implicit-query-conversions).
 
 `limitk(k, q) by (group_labels)` returns up to `k` time series per each `group_labels` out of time series returned by `q`. The returned set of time series can change with each call.
 
+#### mad
+
+`mad(q) by (group_labels)` returns the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. See also [outliers_mad](#outliers_mad) and [stddev](#stddev).
+
 #### max
 
 `max(q) by (group_labels)` returns the maximum value per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL.
@@ -764,9 +768,13 @@ See also [implicit query conversions](#implicit-query-conversions).
 
 `mode(q) by (group_labels)` returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp.
 
+#### outliers_mad
+
+`outliers_mad(tolerance, q)` returns time series from `q` with at least a single point outside [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (aka MAD) multiplied by `tolerance`. E.g. it returns time series with at least a single point below `median(q) - mad(q)` or a single point above `median(q) + mad(q)`. See also [outliersk](#outliersk) and [mad](#mad).
+
 #### outliersk
 
-`outliersk(k, q)` returns up to `k` time series with the biggest standard deviation (aka outliers) out of time series returned by `q`.
+`outliersk(k, q)` returns up to `k` time series with the biggest standard deviation (aka outliers) out of time series returned by `q`. See also [outliers_mad](#outliers_mad).
 
 #### quantile
 
diff --git a/go.mod b/go.mod
index fe6af65481..22f95daacc 100644
--- a/go.mod
+++ b/go.mod
@@ -9,7 +9,7 @@ require (
 	// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
 	github.com/VictoriaMetrics/fasthttp v1.1.0
 	github.com/VictoriaMetrics/metrics v1.18.0
-	github.com/VictoriaMetrics/metricsql v0.21.0
+	github.com/VictoriaMetrics/metricsql v0.22.0
 	github.com/VividCortex/ewma v1.2.0 // indirect
 	github.com/aws/aws-sdk-go v1.40.43
 	github.com/cespare/xxhash/v2 v2.1.2
diff --git a/go.sum b/go.sum
index 5684d4a8e7..d8ee67b497 100644
--- a/go.sum
+++ b/go.sum
@@ -109,8 +109,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR
 github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
 github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ=
 github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
-github.com/VictoriaMetrics/metricsql v0.21.0 h1:wA/IVfRFQaThy4bM1kAmPiCR0BkWv4tEXD9lBF+GPdU=
-github.com/VictoriaMetrics/metricsql v0.21.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
+github.com/VictoriaMetrics/metricsql v0.22.0 h1:QWXvNfaFGeEQvh7Cjjpfp/7Un1N+W+ODQOiM+numyN0=
+github.com/VictoriaMetrics/metricsql v0.22.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
 github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
 github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
 github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
diff --git a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go
index 27821de5f6..7d6d7dceb9 100644
--- a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go
+++ b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go
@@ -35,6 +35,8 @@ var aggrFuncs = map[string]bool{
 	"bottomk_avg":    true,
 	"bottomk_median": true,
 	"any":            true,
+	"mad":            true,
+	"outliers_mad":   true,
 	"outliersk":      true,
 	"mode":           true,
 	"zscore":         true,
diff --git a/vendor/github.com/VictoriaMetrics/metricsql/lexer.go b/vendor/github.com/VictoriaMetrics/metricsql/lexer.go
index 9bf873fc41..c5f0bfabd6 100644
--- a/vendor/github.com/VictoriaMetrics/metricsql/lexer.go
+++ b/vendor/github.com/VictoriaMetrics/metricsql/lexer.go
@@ -134,7 +134,7 @@ func scanString(s string) (string, error) {
 	for {
 		n := strings.IndexByte(s[i:], quote)
 		if n < 0 {
-			return "", fmt.Errorf("cannot find closing quote %ch for the string %q", quote, s)
+			return "", fmt.Errorf("cannot find closing quote %c for the string %q", quote, s)
 		}
 		i += n
 		bs := 0
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 50559ed3b0..67efd870b9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -22,7 +22,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
 # github.com/VictoriaMetrics/metrics v1.18.0
 ## explicit
 github.com/VictoriaMetrics/metrics
-# github.com/VictoriaMetrics/metricsql v0.21.0
+# github.com/VictoriaMetrics/metricsql v0.22.0
 ## explicit
 github.com/VictoriaMetrics/metricsql
 github.com/VictoriaMetrics/metricsql/binaryop