app/vmselect/promql: add zscore-related functions: zscore_over_time(m[d]) and zscore(q) by (...)

This commit is contained in:
Aliaksandr Valialkin 2020-08-03 21:51:15 +03:00
parent 94471a1273
commit bdb881c43b
10 changed files with 162 additions and 8 deletions

View file

@ -47,6 +47,7 @@ var aggrFuncs = map[string]aggrFunc{
"any": aggrFuncAny,
"outliersk": aggrFuncOutliersK,
"mode": newAggrFunc(aggrFuncMode),
"zscore": aggrFuncZScore,
}
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
@ -355,9 +356,7 @@ func aggrFuncStdvar(tss []*timeseries) []*timeseries {
dst := tss[0]
for i := range dst.Values {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
var avg float64
var count float64
var q float64
var avg, count, q float64
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
@ -434,6 +433,52 @@ func aggrFuncMode(tss []*timeseries) []*timeseries {
return tss[:1]
}
func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 1); err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
for i := range tss[0].Values {
// Calculate avg and stddev for tss points at position i.
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
var avg, count, q float64
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
count++
avgNew := avg + (v-avg)/count
q += (v - avg) * (v - avgNew)
avg = avgNew
}
if count == 0 {
// Cannot calculate z-score for NaN points.
continue
}
// Calculate z-score for tss points at position i.
// See https://en.wikipedia.org/wiki/Standard_score
stddev := math.Sqrt(q / count)
for _, ts := range tss {
v := ts.Values[i]
if math.IsNaN(v) {
continue
}
ts.Values[i] = (v - avg) / stddev
}
}
// Remove MetricGroup from all the tss.
for _, ts := range tss {
ts.MetricName.ResetMetricGroup()
}
return tss
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, true)
}
// modeNoNaNs returns mode for a.
//
// It is expected that a doesn't contain NaNs.

View file

@ -3366,6 +3366,53 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`zscore()`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label(round(zscore((
label_set(time()/100+10, "k", "v1"),
label_set(time()/200+5, "k", "v2"),
label_set(time()/110-10, "k", "v3"),
label_set(time()/90-5, "k", "v4"),
)), 0.001), "k")`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1.482, 1.511, 1.535, 1.552, 1.564, 1.57},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("k"),
Value: []byte("v1"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.159, 0.058, -0.042, -0.141, -0.237, -0.329},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("k"),
Value: []byte("v2"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{-1.285, -1.275, -1.261, -1.242, -1.219, -1.193},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("k"),
Value: []byte("v3"),
}}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{-0.356, -0.294, -0.232, -0.17, -0.108, -0.048},
Timestamps: timestampsExpected,
}
r4.MetricName.Tags = []storage.Tag{{
Key: []byte("k"),
Value: []byte("v4"),
}}
resultExpected := []netstorage.Result{r1, r2, r3, r4}
f(q, resultExpected)
})
t.Run(`avg(scalar) without (xx, yy)`, func(t *testing.T) {
t.Parallel()
q := `avg without (xx, yy) (123)`
@ -4654,6 +4701,28 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`zscore_over_time(rand)`, func(t *testing.T) {
t.Parallel()
q := `round(zscore_over_time(rand(0)[100s:10s]), 0.01)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{-1.17, -0.08, 0.98, 0.67, 1.61, 1.55},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`zscore_over_time(const)`, func(t *testing.T) {
t.Parallel()
q := `zscore_over_time(1[100s:10s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`integrate(1)`, func(t *testing.T) {
t.Parallel()
q := `integrate(1)`
@ -5871,7 +5940,9 @@ func TestExecError(t *testing.T) {
f(`outliersk(1)`)
f(`mode_over_time()`)
f(`rate_over_sum()`)
f(`zscore_over_time()`)
f(`mode()`)
f(`zscore()`)
f(`prometheus_buckets()`)
f(`buckets_limit()`)
f(`buckets_limit(1)`)

View file

@ -74,6 +74,7 @@ var rollupFuncs = map[string]newRollupFunc{
"hoeffding_bound_lower": newRollupHoeffdingBoundLower,
"ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime),
"descent_over_time": newRollupFuncOneArg(rollupDescentOverTime),
"zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime),
// `timestamp` function must return timestamp for the last datapoint on the current window
// in order to properly handle offset and timestamps unaligned to the current step.
@ -125,6 +126,7 @@ var rollupAggrFuncs = map[string]rollupFunc{
"tmax_over_time": rollupTmax,
"ascent_over_time": rollupAscentOverTime,
"descent_over_time": rollupDescentOverTime,
"zscore_over_time": rollupZScoreOverTime,
"timestamp": rollupTimestamp,
"mode_over_time": rollupModeOverTime,
"rate_over_sum": rollupRateOverSum,
@ -153,6 +155,7 @@ var rollupFuncsCannotAdjustWindow = map[string]bool{
"integrate": true,
"ascent_over_time": true,
"descent_over_time": true,
"zscore_over_time": true,
}
var rollupFuncsRemoveCounterResets = map[string]bool{
@ -1606,6 +1609,20 @@ func rollupDescentOverTime(rfa *rollupFuncArg) float64 {
return s
}
func rollupZScoreOverTime(rfa *rollupFuncArg) float64 {
// See https://about.gitlab.com/blog/2019/07/23/anomaly-detection-using-prometheus/#using-z-score-for-anomaly-detection
scrapeInterval := rollupScrapeInterval(rfa)
lag := rollupLag(rfa)
if math.IsNaN(scrapeInterval) || math.IsNaN(lag) || lag > scrapeInterval {
return nan
}
d := rollupLast(rfa) - rollupAvg(rfa)
if d == 0 {
return 0
}
return d / rollupStddev(rfa)
}
func rollupFirst(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.

View file

@ -392,6 +392,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
f("increases_over_time", 5)
f("ascent_over_time", 142)
f("descent_over_time", 231)
f("zscore_over_time", -0.4254336383156416)
f("timestamp", 0.13)
f("mode_over_time", 34)
f("rate_over_sum", 4520)
@ -983,6 +984,20 @@ func TestRollupFuncsNoWindow(t *testing.T) {
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
t.Run("zscore_over_time", func(t *testing.T) {
rc := rollupConfig{
Func: rollupZScoreOverTime,
Start: 0,
End: 160,
Step: 40,
Window: 80,
}
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
values := rc.Do(nil, testValues, testTimestamps)
valuesExpected := []float64{nan, 0.9397878236968458, 1.1969836716333457, 2.3112921116373175, nan}
timestampsExpected := []int64{0, 40, 80, 120, 160}
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
})
}
func TestRollupBigNumberOfValues(t *testing.T) {

View file

@ -120,10 +120,14 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
for the given `phi` in the range `[0..1]`.
- `last_over_time(m[d])` - returns the last value for `m` on the time range `d`.
- `first_over_time(m[d])` - returns the first value for `m` on the time range `d`.
- `outliersk(N, m)` - returns up to `N` outlier time series for `m`. Outlier time series have the highest deviation from the `median(m)`.
- `outliersk(N, q) by (group)` - returns up to `N` outlier time series for `q` in every `group`. Outlier time series have the highest deviation from the `median(m)`.
This aggregate function is useful to detect anomalies across groups of similar time series.
- `ascent_over_time(m[d])` - returns the sum of positive deltas between adjancent data points in `m` over `d`. Useful for tracking height gains in GPS track.
- `descent_over_time(m[d])` - returns the absolute sum of negative deltas between adjancent data points in `m` over `d`. Useful for tracking height loss in GPS track.
- `mode_over_time(m[d])` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for `m` values over `d`. It is expected that `m` values are discrete.
- `mode(q) by (x)` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for each point in `q` grouped by `x`. It is expected that `q` points are discrete.
- `rate_over_sum(m[d])` - returns rate over the sum of `m` values over `d` duration.
- `zscore_over_time(m[d])` - returns [z-score](https://en.wikipedia.org/wiki/Standard_score) for `m` values over `d` duration. Useful for detecting
anomalies in time series comparing to historical samples.
- `zscore(q) by (group)` - returns independent [z-score](https://en.wikipedia.org/wiki/Standard_score) values for every point in every `group` of `q`.
Useful for detecting anomalies in the group of related time series.

2
go.mod
View file

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.0.1
github.com/VictoriaMetrics/metrics v1.12.2
github.com/VictoriaMetrics/metricsql v0.2.9
github.com/VictoriaMetrics/metricsql v0.2.10
github.com/aws/aws-sdk-go v1.33.14
github.com/cespare/xxhash/v2 v2.1.1
github.com/golang/snappy v0.0.1

4
go.sum
View file

@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
github.com/VictoriaMetrics/metrics v1.12.2 h1:SG8iAmqavDNuh7GIdHPoGHUhDL23KeKfvSZSozucNeA=
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metricsql v0.2.9 h1:RHLEmt4VNZ2RAqZjmXyRtKpCrtSuYUS1+TyOqfXbHWs=
github.com/VictoriaMetrics/metricsql v0.2.9/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/VictoriaMetrics/metricsql v0.2.10 h1:1z0cfVwjh9n6J0rM8znNQkTy0rIpB+VO2hqnggtYRoc=
github.com/VictoriaMetrics/metricsql v0.2.10/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/aws/aws-sdk-go v1.33.14 h1:ucjyVEvtIdtn4acf+RKsgk6ybAYeMLXpGZeqoVvi7Kk=

View file

@ -37,6 +37,7 @@ var aggrFuncs = map[string]bool{
"any": true,
"outliersk": true,
"mode": true,
"zscore": true,
}
func isAggrFunc(s string) bool {

View file

@ -59,6 +59,7 @@ var rollupFuncs = map[string]bool{
"hoeffding_bound_lower": true,
"ascent_over_time": true,
"descent_over_time": true,
"zscore_over_time": true,
// `timestamp` func has been moved here because it must work properly with offsets and samples unaligned to the current step.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.

2
vendor/modules.txt vendored
View file

@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.12.2
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.2.9
# github.com/VictoriaMetrics/metricsql v0.2.10
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop
# github.com/aws/aws-sdk-go v1.33.14