app/vmselect/promql: add outliersk(N, m) aggregate function for anomaly detection across groups of similar time series

This commit is contained in:
Aliaksandr Valialkin 2020-05-19 13:52:29 +03:00
parent d0f08b4a58
commit a441cdd1d9
7 changed files with 131 additions and 24 deletions

View file

@ -44,6 +44,7 @@ var aggrFuncs = map[string]aggrFunc{
"bottomk_avg": newAggrFuncRangeTopK(avgValue, true),
"bottomk_median": newAggrFuncRangeTopK(medianValue, true),
"any": newAggrFunc(aggrFuncAny),
"outliersk": aggrFuncOutliersK,
}
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
@ -588,16 +589,73 @@ func avgValue(values []float64) float64 {
func medianValue(values []float64) float64 {
h := histogram.GetFast()
for _, v := range values {
if math.IsNaN(v) {
continue
}
if !math.IsNaN(v) {
h.Update(v)
}
}
value := h.Quantile(0.5)
histogram.PutFast(h)
return value
}
func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
ks, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
// Calculate medians for each point across tss.
medians := make([]float64, len(ks))
h := histogram.GetFast()
for n := range ks {
h.Reset()
for j := range tss {
v := tss[j].Values[n]
if !math.IsNaN(v) {
h.Update(v)
}
}
medians[n] = h.Quantile(0.5)
}
histogram.PutFast(h)
// Calculate variation-like value for each tss.
type variation struct {
sum2 float64
ts *timeseries
}
variations := make([]variation, len(tss))
for i, ts := range tss {
sum2 := float64(0)
for n, v := range ts.Values {
d := v - medians[n]
sum2 += d * d
}
variations[i] = variation{
sum2: sum2,
ts: ts,
}
}
// Sort variations by sum2.
sort.Slice(variations, func(i, j int) bool {
a, b := variations[i], variations[j]
return lessWithNaNs(a.sum2, b.sum2)
})
// Return only up to k time series with the highest variation.
for i, k := range ks {
fillNaNsAtIdx(i, k, tss)
}
return removeNaNs(tss)
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
@ -658,24 +716,18 @@ func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) {
func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries {
return func(tss []*timeseries) []*timeseries {
dst := tss[0]
h := histogram.GetFast()
defer histogram.PutFast(h)
for n := range dst.Values {
sort.Slice(tss, func(i, j int) bool {
a := tss[i].Values[n]
b := tss[j].Values[n]
return lessWithNaNs(a, b)
})
h.Reset()
for j := range tss {
v := tss[j].Values[n]
if !math.IsNaN(v) {
h.Update(v)
}
}
phi := phis[n]
if math.IsNaN(phi) {
phi = 1
}
if phi < 0 {
phi = 0
}
if phi > 1 {
phi = 1
}
idx := int(math.Round(float64(len(tss)-1) * phi))
dst.Values[n] = tss[idx].Values[n]
dst.Values[n] = h.Quantile(phi)
}
tss[0] = dst
return tss[:1]

View file

@ -4196,14 +4196,63 @@ func TestExecSuccess(t *testing.T) {
t.Run(`quantile(NaN)`, func(t *testing.T) {
t.Parallel()
q := `quantile(NaN, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`outliersk(0)`, func(t *testing.T) {
t.Parallel()
q := `outliersk(0, (
label_set(1300, "foo", "bar"),
label_set(time(), "baz", "sss"),
))`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`outliersk(1)`, func(t *testing.T) {
t.Parallel()
q := `outliersk(1, (
label_set(1300, "foo", "bar"),
label_set(time(), "baz", "sss"),
))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10.666666666666666, 12, 13.333333333333334},
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("baz"),
Value: []byte("sss"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`outliersk(3)`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(outliersk(3, (
label_set(1300, "foo", "bar"),
label_set(time(), "baz", "sss"),
)))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("baz"),
Value: []byte("sss"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1300, 1300, 1300, 1300, 1300, 1300},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`range_quantile(0.5)`, func(t *testing.T) {
t.Parallel()
q := `range_quantile(0.5, time())`
@ -5531,6 +5580,8 @@ func TestExecError(t *testing.T) {
f(`hoeffding_bound_upper()`)
f(`hoeffding_bound_upper(1)`)
f(`hoeffding_bound_upper(0.99, foo, 1)`)
f(`outliersk()`)
f(`outliersk(1)`)
// Invalid argument type
f(`median_over_time({}, 2)`)
@ -5570,6 +5621,7 @@ func TestExecError(t *testing.T) {
f(`alias(1, 2)`)
f(`aggr_over_time(1, 2)`)
f(`aggr_over_time(("foo", "bar"), 3)`)
f(`outliersk((label_set(1, "foo", "bar"), label_set(2, "x", "y")), 123)`)
// Duplicate timeseries
f(`(label_set(1, "foo", "bar") or label_set(2, "foo", "baz"))

View file

@ -116,3 +116,5 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
for the given `phi` in the range `[0..1]`.
- `last_over_time(m[d])` - returns the last value for `m` on the time range `d`.
- `first_over_time(m[d])` - returns the first value for `m` on the time range `d`.
- `outliersk(N, m)` - returns up to `N` outlier time series for `m`. Outlier time series have the highest deviation from the `median(m)`.
This aggregate function is useful to detect anomalies across groups of similar time series.

2
go.mod
View file

@ -8,7 +8,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.0.1
github.com/VictoriaMetrics/metrics v1.11.2
github.com/VictoriaMetrics/metricsql v0.2.1
github.com/VictoriaMetrics/metricsql v0.2.2
github.com/aws/aws-sdk-go v1.30.28
github.com/cespare/xxhash/v2 v2.1.1
github.com/golang/protobuf v1.4.2 // indirect

4
go.sum
View file

@ -45,8 +45,8 @@ github.com/VictoriaMetrics/fasthttp v1.0.1 h1:I7YdbswTIW63WxoFoUOSNxeOEGB46rdKUL
github.com/VictoriaMetrics/fasthttp v1.0.1/go.mod h1:BqgsieH90PR7x97c89j+eqZDloKkDhAEQTwhLw6jw/4=
github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/tGoxihHvx8=
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
github.com/VictoriaMetrics/metricsql v0.2.1 h1:OI/W2QCFiQiFULVN3ZiC/iCqZFt25rXp/O7P2NiAwYU=
github.com/VictoriaMetrics/metricsql v0.2.1/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/VictoriaMetrics/metricsql v0.2.2 h1:3PhBV4g2z7lm8adPShC4vr1PfSkRcLoSq5XOEpSgJPg=
github.com/VictoriaMetrics/metricsql v0.2.2/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/aws/aws-sdk-go v1.30.28 h1:SaPM7dlmp7h3Lj1nJ4jdzOkTdom08+g20k7AU5heZYg=

View file

@ -34,6 +34,7 @@ var aggrFuncs = map[string]bool{
"bottomk_avg": true,
"bottomk_median": true,
"any": true,
"outliersk": true,
}
func isAggrFunc(s string) bool {

2
vendor/modules.txt vendored
View file

@ -18,7 +18,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.11.2
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.2.1
# github.com/VictoriaMetrics/metricsql v0.2.2
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop
# github.com/aws/aws-sdk-go v1.30.28