From bdb881c43bf4890b375d72d6b2584e2785dab9d2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 3 Aug 2020 21:51:15 +0300 Subject: [PATCH] app/vmselect/promql: add zscore-related functions: `zscore_over_time(m[d])` and `zscore(q) by (...)` --- app/vmselect/promql/aggr.go | 51 ++++++++++++- app/vmselect/promql/exec_test.go | 71 +++++++++++++++++++ app/vmselect/promql/rollup.go | 17 +++++ app/vmselect/promql/rollup_test.go | 15 ++++ docs/MetricsQL.md | 6 +- go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/aggr.go | 1 + .../VictoriaMetrics/metricsql/rollup.go | 1 + vendor/modules.txt | 2 +- 10 files changed, 162 insertions(+), 8 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index d92064cd3..3a78c6531 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -47,6 +47,7 @@ var aggrFuncs = map[string]aggrFunc{ "any": aggrFuncAny, "outliersk": aggrFuncOutliersK, "mode": newAggrFunc(aggrFuncMode), + "zscore": aggrFuncZScore, } type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error) @@ -355,9 +356,7 @@ func aggrFuncStdvar(tss []*timeseries) []*timeseries { dst := tss[0] for i := range dst.Values { // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation - var avg float64 - var count float64 - var q float64 + var avg, count, q float64 for _, ts := range tss { v := ts.Values[i] if math.IsNaN(v) { @@ -434,6 +433,52 @@ func aggrFuncMode(tss []*timeseries) []*timeseries { return tss[:1] } +func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) { + args := afa.args + if err := expectTransformArgsNum(args, 1); err != nil { + return nil, err + } + afe := func(tss []*timeseries) []*timeseries { + for i := range tss[0].Values { + // Calculate avg and stddev for tss points at position i. + // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation + var avg, count, q float64 + for _, ts := range tss { + v := ts.Values[i] + if math.IsNaN(v) { + continue + } + count++ + avgNew := avg + (v-avg)/count + q += (v - avg) * (v - avgNew) + avg = avgNew + } + if count == 0 { + // Cannot calculate z-score for NaN points. + continue + } + + // Calculate z-score for tss points at position i. + // See https://en.wikipedia.org/wiki/Standard_score + stddev := math.Sqrt(q / count) + for _, ts := range tss { + v := ts.Values[i] + if math.IsNaN(v) { + continue + } + ts.Values[i] = (v - avg) / stddev + } + } + + // Remove MetricGroup from all the tss. + for _, ts := range tss { + ts.MetricName.ResetMetricGroup() + } + return tss + } + return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, true) +} + // modeNoNaNs returns mode for a. // // It is expected that a doesn't contain NaNs. diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index a97fdcfc4..949e28ba0 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -3366,6 +3366,53 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`zscore()`, func(t *testing.T) { + t.Parallel() + q := `sort_by_label(round(zscore(( + label_set(time()/100+10, "k", "v1"), + label_set(time()/200+5, "k", "v2"), + label_set(time()/110-10, "k", "v3"), + label_set(time()/90-5, "k", "v4"), + )), 0.001), "k")` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1.482, 1.511, 1.535, 1.552, 1.564, 1.57}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("k"), + Value: []byte("v1"), + }} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.159, 0.058, -0.042, -0.141, -0.237, -0.329}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{{ + Key: []byte("k"), + Value: []byte("v2"), + }} + r3 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{-1.285, -1.275, -1.261, -1.242, -1.219, -1.193}, + Timestamps: timestampsExpected, + } + r3.MetricName.Tags = []storage.Tag{{ + Key: []byte("k"), + Value: []byte("v3"), + }} + r4 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{-0.356, -0.294, -0.232, -0.17, -0.108, -0.048}, + Timestamps: timestampsExpected, + } + r4.MetricName.Tags = []storage.Tag{{ + Key: []byte("k"), + Value: []byte("v4"), + }} + resultExpected := []netstorage.Result{r1, r2, r3, r4} + f(q, resultExpected) + }) t.Run(`avg(scalar) without (xx, yy)`, func(t *testing.T) { t.Parallel() q := `avg without (xx, yy) (123)` @@ -4654,6 +4701,28 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`zscore_over_time(rand)`, func(t *testing.T) { + t.Parallel() + q := `round(zscore_over_time(rand(0)[100s:10s]), 0.01)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{-1.17, -0.08, 0.98, 0.67, 1.61, 1.55}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`zscore_over_time(const)`, func(t *testing.T) { + t.Parallel() + q := `zscore_over_time(1[100s:10s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0, 0, 0, 0, 0, 0}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`integrate(1)`, func(t *testing.T) { t.Parallel() q := `integrate(1)` @@ -5871,7 +5940,9 @@ func TestExecError(t *testing.T) { f(`outliersk(1)`) f(`mode_over_time()`) f(`rate_over_sum()`) + f(`zscore_over_time()`) f(`mode()`) + f(`zscore()`) f(`prometheus_buckets()`) f(`buckets_limit()`) f(`buckets_limit(1)`) diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 4b2251cdd..fc61c9f7c 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -74,6 +74,7 @@ var rollupFuncs = map[string]newRollupFunc{ "hoeffding_bound_lower": newRollupHoeffdingBoundLower, "ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime), "descent_over_time": newRollupFuncOneArg(rollupDescentOverTime), + "zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime), // `timestamp` function must return timestamp for the last datapoint on the current window // in order to properly handle offset and timestamps unaligned to the current step. @@ -125,6 +126,7 @@ var rollupAggrFuncs = map[string]rollupFunc{ "tmax_over_time": rollupTmax, "ascent_over_time": rollupAscentOverTime, "descent_over_time": rollupDescentOverTime, + "zscore_over_time": rollupZScoreOverTime, "timestamp": rollupTimestamp, "mode_over_time": rollupModeOverTime, "rate_over_sum": rollupRateOverSum, @@ -153,6 +155,7 @@ var rollupFuncsCannotAdjustWindow = map[string]bool{ "integrate": true, "ascent_over_time": true, "descent_over_time": true, + "zscore_over_time": true, } var rollupFuncsRemoveCounterResets = map[string]bool{ @@ -1606,6 +1609,20 @@ func rollupDescentOverTime(rfa *rollupFuncArg) float64 { return s } +func rollupZScoreOverTime(rfa *rollupFuncArg) float64 { + // See https://about.gitlab.com/blog/2019/07/23/anomaly-detection-using-prometheus/#using-z-score-for-anomaly-detection + scrapeInterval := rollupScrapeInterval(rfa) + lag := rollupLag(rfa) + if math.IsNaN(scrapeInterval) || math.IsNaN(lag) || lag > scrapeInterval { + return nan + } + d := rollupLast(rfa) - rollupAvg(rfa) + if d == 0 { + return 0 + } + return d / rollupStddev(rfa) +} + func rollupFirst(rfa *rollupFuncArg) float64 { // There is no need in handling NaNs here, since they must be cleaned up // before calling rollup funcs. diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 3f361eff5..074305aca 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -392,6 +392,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) { f("increases_over_time", 5) f("ascent_over_time", 142) f("descent_over_time", 231) + f("zscore_over_time", -0.4254336383156416) f("timestamp", 0.13) f("mode_over_time", 34) f("rate_over_sum", 4520) @@ -983,6 +984,20 @@ func TestRollupFuncsNoWindow(t *testing.T) { timestampsExpected := []int64{0, 40, 80, 120, 160} testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) }) + t.Run("zscore_over_time", func(t *testing.T) { + rc := rollupConfig{ + Func: rollupZScoreOverTime, + Start: 0, + End: 160, + Step: 40, + Window: 80, + } + rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) + values := rc.Do(nil, testValues, testTimestamps) + valuesExpected := []float64{nan, 0.9397878236968458, 1.1969836716333457, 2.3112921116373175, nan} + timestampsExpected := []int64{0, 40, 80, 120, 160} + testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) + }) } func TestRollupBigNumberOfValues(t *testing.T) { diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 04e86e9cd..b839b0d5d 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -120,10 +120,14 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g for the given `phi` in the range `[0..1]`. - `last_over_time(m[d])` - returns the last value for `m` on the time range `d`. - `first_over_time(m[d])` - returns the first value for `m` on the time range `d`. -- `outliersk(N, m)` - returns up to `N` outlier time series for `m`. Outlier time series have the highest deviation from the `median(m)`. +- `outliersk(N, q) by (group)` - returns up to `N` outlier time series for `q` in every `group`. Outlier time series have the highest deviation from the `median(m)`. This aggregate function is useful to detect anomalies across groups of similar time series. - `ascent_over_time(m[d])` - returns the sum of positive deltas between adjancent data points in `m` over `d`. Useful for tracking height gains in GPS track. - `descent_over_time(m[d])` - returns the absolute sum of negative deltas between adjancent data points in `m` over `d`. Useful for tracking height loss in GPS track. - `mode_over_time(m[d])` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for `m` values over `d`. It is expected that `m` values are discrete. - `mode(q) by (x)` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for each point in `q` grouped by `x`. It is expected that `q` points are discrete. - `rate_over_sum(m[d])` - returns rate over the sum of `m` values over `d` duration. +- `zscore_over_time(m[d])` - returns [z-score](https://en.wikipedia.org/wiki/Standard_score) for `m` values over `d` duration. Useful for detecting + anomalies in time series comparing to historical samples. +- `zscore(q) by (group)` - returns independent [z-score](https://en.wikipedia.org/wiki/Standard_score) values for every point in every `group` of `q`. + Useful for detecting anomalies in the group of related time series. diff --git a/go.mod b/go.mod index 6feb82fab..d2033f4eb 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.0.1 github.com/VictoriaMetrics/metrics v1.12.2 - github.com/VictoriaMetrics/metricsql v0.2.9 + github.com/VictoriaMetrics/metricsql v0.2.10 github.com/aws/aws-sdk-go v1.33.14 github.com/cespare/xxhash/v2 v2.1.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index 2b6182316..aa31971bf 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ= github.com/VictoriaMetrics/metrics v1.12.2 h1:SG8iAmqavDNuh7GIdHPoGHUhDL23KeKfvSZSozucNeA= github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= -github.com/VictoriaMetrics/metricsql v0.2.9 h1:RHLEmt4VNZ2RAqZjmXyRtKpCrtSuYUS1+TyOqfXbHWs= -github.com/VictoriaMetrics/metricsql v0.2.9/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= +github.com/VictoriaMetrics/metricsql v0.2.10 h1:1z0cfVwjh9n6J0rM8znNQkTy0rIpB+VO2hqnggtYRoc= +github.com/VictoriaMetrics/metricsql v0.2.10/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/aws/aws-sdk-go v1.33.14 h1:ucjyVEvtIdtn4acf+RKsgk6ybAYeMLXpGZeqoVvi7Kk= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go index 80ae9f8f7..197227106 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/aggr.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/aggr.go @@ -37,6 +37,7 @@ var aggrFuncs = map[string]bool{ "any": true, "outliersk": true, "mode": true, + "zscore": true, } func isAggrFunc(s string) bool { diff --git a/vendor/github.com/VictoriaMetrics/metricsql/rollup.go b/vendor/github.com/VictoriaMetrics/metricsql/rollup.go index 313292529..7d9a75cbd 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/rollup.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/rollup.go @@ -59,6 +59,7 @@ var rollupFuncs = map[string]bool{ "hoeffding_bound_lower": true, "ascent_over_time": true, "descent_over_time": true, + "zscore_over_time": true, // `timestamp` func has been moved here because it must work properly with offsets and samples unaligned to the current step. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details. diff --git a/vendor/modules.txt b/vendor/modules.txt index 5daf43999..5222f095c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.12.2 github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.2.9 +# github.com/VictoriaMetrics/metricsql v0.2.10 github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop # github.com/aws/aws-sdk-go v1.33.14