From fc81ea38d4b59e56c50a12ec9280d0b820040f31 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 19 May 2020 13:52:29 +0300 Subject: [PATCH] app/vmselect/promql: add `outliersk(N, m)` aggregate function for anomaly detection across groups of similar time series --- app/vmselect/promql/aggr.go | 90 +++++++++++++++---- app/vmselect/promql/exec_test.go | 54 ++++++++++- docs/MetricsQL.md | 2 + go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/aggr.go | 1 + vendor/modules.txt | 2 +- 7 files changed, 131 insertions(+), 24 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index 8cf478dc4..2e5e4ebac 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -44,6 +44,7 @@ var aggrFuncs = map[string]aggrFunc{ "bottomk_avg": newAggrFuncRangeTopK(avgValue, true), "bottomk_median": newAggrFuncRangeTopK(medianValue, true), "any": newAggrFunc(aggrFuncAny), + "outliersk": aggrFuncOutliersK, } type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error) @@ -588,16 +589,73 @@ func avgValue(values []float64) float64 { func medianValue(values []float64) float64 { h := histogram.GetFast() for _, v := range values { - if math.IsNaN(v) { - continue + if !math.IsNaN(v) { + h.Update(v) } - h.Update(v) } value := h.Quantile(0.5) histogram.PutFast(h) return value } +func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) { + args := afa.args + if err := expectTransformArgsNum(args, 2); err != nil { + return nil, err + } + ks, err := getScalar(args[0], 0) + if err != nil { + return nil, err + } + afe := func(tss []*timeseries) []*timeseries { + // Calculate medians for each point across tss. + medians := make([]float64, len(ks)) + h := histogram.GetFast() + for n := range ks { + h.Reset() + for j := range tss { + v := tss[j].Values[n] + if !math.IsNaN(v) { + h.Update(v) + } + } + medians[n] = h.Quantile(0.5) + } + histogram.PutFast(h) + + // Calculate variation-like value for each tss. + type variation struct { + sum2 float64 + ts *timeseries + } + variations := make([]variation, len(tss)) + for i, ts := range tss { + sum2 := float64(0) + for n, v := range ts.Values { + d := v - medians[n] + sum2 += d * d + } + variations[i] = variation{ + sum2: sum2, + ts: ts, + } + } + + // Sort variations by sum2. + sort.Slice(variations, func(i, j int) bool { + a, b := variations[i], variations[j] + return lessWithNaNs(a.sum2, b.sum2) + }) + + // Return only up to k time series with the highest variation. + for i, k := range ks { + fillNaNsAtIdx(i, k, tss) + } + return removeNaNs(tss) + } + return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) +} + func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { @@ -658,24 +716,18 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) { func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries { return func(tss []*timeseries) []*timeseries { dst := tss[0] + h := histogram.GetFast() + defer histogram.PutFast(h) for n := range dst.Values { - sort.Slice(tss, func(i, j int) bool { - a := tss[i].Values[n] - b := tss[j].Values[n] - return lessWithNaNs(a, b) - }) + h.Reset() + for j := range tss { + v := tss[j].Values[n] + if !math.IsNaN(v) { + h.Update(v) + } + } phi := phis[n] - if math.IsNaN(phi) { - phi = 1 - } - if phi < 0 { - phi = 0 - } - if phi > 1 { - phi = 1 - } - idx := int(math.Round(float64(len(tss)-1) * phi)) - dst.Values[n] = tss[idx].Values[n] + dst.Values[n] = h.Quantile(phi) } tss[0] = dst return tss[:1] diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 7e3eac101..b9c38c3d9 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -4206,14 +4206,63 @@ func TestExecSuccess(t *testing.T) { t.Run(`quantile(NaN)`, func(t *testing.T) { t.Parallel() q := `quantile(NaN, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) + t.Run(`outliersk(0)`, func(t *testing.T) { + t.Parallel() + q := `outliersk(0, ( + label_set(1300, "foo", "bar"), + label_set(time(), "baz", "sss"), + ))` + resultExpected := []netstorage.Result{} + f(q, resultExpected) + }) + t.Run(`outliersk(1)`, func(t *testing.T) { + t.Parallel() + q := `outliersk(1, ( + label_set(1300, "foo", "bar"), + label_set(time(), "baz", "sss"), + ))` r := netstorage.Result{ MetricName: metricNameExpected, - Values: []float64{10, 10, 10, 10.666666666666666, 12, 13.333333333333334}, + Values: []float64{1000, 1200, 1400, 1600, 1800, 2000}, Timestamps: timestampsExpected, } + r.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`outliersk(3)`, func(t *testing.T) { + t.Parallel() + q := `sort_desc(outliersk(3, ( + label_set(1300, "foo", "bar"), + label_set(time(), "baz", "sss"), + )))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1000, 1200, 1400, 1600, 1800, 2000}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("baz"), + Value: []byte("sss"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1300, 1300, 1300, 1300, 1300, 1300}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) t.Run(`range_quantile(0.5)`, func(t *testing.T) { t.Parallel() q := `range_quantile(0.5, time())` @@ -5545,6 +5594,8 @@ func TestExecError(t *testing.T) { f(`hoeffding_bound_upper()`) f(`hoeffding_bound_upper(1)`) f(`hoeffding_bound_upper(0.99, foo, 1)`) + f(`outliersk()`) + f(`outliersk(1)`) // Invalid argument type f(`median_over_time({}, 2)`) @@ -5584,6 +5635,7 @@ func TestExecError(t *testing.T) { f(`alias(1, 2)`) f(`aggr_over_time(1, 2)`) f(`aggr_over_time(("foo", "bar"), 3)`) + f(`outliersk((label_set(1, "foo", "bar"), label_set(2, "x", "y")), 123)`) // Duplicate timeseries f(`(label_set(1, "foo", "bar") or label_set(2, "foo", "baz")) diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index dea5008a8..10bf5202d 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -116,3 +116,5 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g for the given `phi` in the range `[0..1]`. - `last_over_time(m[d])` - returns the last value for `m` on the time range `d`. - `first_over_time(m[d])` - returns the first value for `m` on the time range `d`. +- `outliersk(N, m)` - returns up to `N` outlier time series for `m`. Outlier time series have the highest deviation from the `median(m)`. + This aggregate function is useful to detect anomalies across groups of similar time series. diff --git a/go.mod b/go.mod index 8c51f3541..6eec4679e 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.0.1 github.com/VictoriaMetrics/metrics v1.11.2 - github.com/VictoriaMetrics/metricsql v0.2.1 + github.com/VictoriaMetrics/metricsql v0.2.2 github.com/aws/aws-sdk-go v1.30.28 github.com/cespare/xxhash/v2 v2.1.1 github.com/golang/protobuf v1.4.2 // indirect diff --git a/go.sum b/go.sum index 2f57dfd37..c28cd7108 100644 --- a/go.sum +++ b/go.sum @@ -45,8 +45,8 @@ github.com/VictoriaMetrics/fasthttp v1.0.1 h1:I7YdbswTIW63WxoFoUOSNxeOEGB46rdKUL github.com/VictoriaMetrics/fasthttp v1.0.1/go.mod h1:BqgsieH90PR7x97c89j+eqZDloKkDhAEQTwhLw6jw/4= github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/tGoxihHvx8= github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ= -github.com/VictoriaMetrics/metricsql v0.2.1 h1:OI/W2QCFiQiFULVN3ZiC/iCqZFt25rXp/O7P2NiAwYU= -github.com/VictoriaMetrics/metricsql v0.2.1/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= +github.com/VictoriaMetrics/metricsql v0.2.2 h1:3PhBV4g2z7lm8adPShC4vr1PfSkRcLoSq5XOEpSgJPg= +github.com/VictoriaMetrics/metricsql v0.2.2/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/aws/aws-sdk-go v1.30.28 h1:SaPM7dlmp7h3Lj1nJ4jdzOkTdom08+g20k7AU5heZYg= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go index 5cd88de20..a6dddbabc 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go @@ -34,6 +34,7 @@ var aggrFuncs = map[string]bool{ "bottomk_avg": true, "bottomk_median": true, "any": true, + "outliersk": true, } func isAggrFunc(s string) bool { diff --git a/vendor/modules.txt b/vendor/modules.txt index 75384a77b..8bb9fbfc1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -18,7 +18,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.11.2 github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.2.1 +# github.com/VictoriaMetrics/metricsql v0.2.2 github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop # github.com/aws/aws-sdk-go v1.30.28