mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: add zscore-related functions: zscore_over_time(m[d])
and zscore(q) by (...)
This commit is contained in:
parent
509d12643b
commit
e6eee2bebf
10 changed files with 162 additions and 8 deletions
|
@ -47,6 +47,7 @@ var aggrFuncs = map[string]aggrFunc{
|
|||
"any": aggrFuncAny,
|
||||
"outliersk": aggrFuncOutliersK,
|
||||
"mode": newAggrFunc(aggrFuncMode),
|
||||
"zscore": aggrFuncZScore,
|
||||
}
|
||||
|
||||
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
|
||||
|
@ -355,9 +356,7 @@ func aggrFuncStdvar(tss []*timeseries) []*timeseries {
|
|||
dst := tss[0]
|
||||
for i := range dst.Values {
|
||||
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
|
||||
var avg float64
|
||||
var count float64
|
||||
var q float64
|
||||
var avg, count, q float64
|
||||
for _, ts := range tss {
|
||||
v := ts.Values[i]
|
||||
if math.IsNaN(v) {
|
||||
|
@ -434,6 +433,52 @@ func aggrFuncMode(tss []*timeseries) []*timeseries {
|
|||
return tss[:1]
|
||||
}
|
||||
|
||||
func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||
args := afa.args
|
||||
if err := expectTransformArgsNum(args, 1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
afe := func(tss []*timeseries) []*timeseries {
|
||||
for i := range tss[0].Values {
|
||||
// Calculate avg and stddev for tss points at position i.
|
||||
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
|
||||
var avg, count, q float64
|
||||
for _, ts := range tss {
|
||||
v := ts.Values[i]
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
count++
|
||||
avgNew := avg + (v-avg)/count
|
||||
q += (v - avg) * (v - avgNew)
|
||||
avg = avgNew
|
||||
}
|
||||
if count == 0 {
|
||||
// Cannot calculate z-score for NaN points.
|
||||
continue
|
||||
}
|
||||
|
||||
// Calculate z-score for tss points at position i.
|
||||
// See https://en.wikipedia.org/wiki/Standard_score
|
||||
stddev := math.Sqrt(q / count)
|
||||
for _, ts := range tss {
|
||||
v := ts.Values[i]
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
ts.Values[i] = (v - avg) / stddev
|
||||
}
|
||||
}
|
||||
|
||||
// Remove MetricGroup from all the tss.
|
||||
for _, ts := range tss {
|
||||
ts.MetricName.ResetMetricGroup()
|
||||
}
|
||||
return tss
|
||||
}
|
||||
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, afa.ae.Limit, true)
|
||||
}
|
||||
|
||||
// modeNoNaNs returns mode for a.
|
||||
//
|
||||
// It is expected that a doesn't contain NaNs.
|
||||
|
|
|
@ -3356,6 +3356,53 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`zscore()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort_by_label(round(zscore((
|
||||
label_set(time()/100+10, "k", "v1"),
|
||||
label_set(time()/200+5, "k", "v2"),
|
||||
label_set(time()/110-10, "k", "v3"),
|
||||
label_set(time()/90-5, "k", "v4"),
|
||||
)), 0.001), "k")`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1.482, 1.511, 1.535, 1.552, 1.564, 1.57},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("k"),
|
||||
Value: []byte("v1"),
|
||||
}}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{0.159, 0.058, -0.042, -0.141, -0.237, -0.329},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("k"),
|
||||
Value: []byte("v2"),
|
||||
}}
|
||||
r3 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{-1.285, -1.275, -1.261, -1.242, -1.219, -1.193},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r3.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("k"),
|
||||
Value: []byte("v3"),
|
||||
}}
|
||||
r4 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{-0.356, -0.294, -0.232, -0.17, -0.108, -0.048},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r4.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("k"),
|
||||
Value: []byte("v4"),
|
||||
}}
|
||||
resultExpected := []netstorage.Result{r1, r2, r3, r4}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`avg(scalar) without (xx, yy)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `avg without (xx, yy) (123)`
|
||||
|
@ -4644,6 +4691,28 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`zscore_over_time(rand)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `round(zscore_over_time(rand(0)[100s:10s]), 0.01)`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{-1.17, -0.08, 0.98, 0.67, 1.61, 1.55},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`zscore_over_time(const)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `zscore_over_time(1[100s:10s])`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{0, 0, 0, 0, 0, 0},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`integrate(1)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `integrate(1)`
|
||||
|
@ -5857,7 +5926,9 @@ func TestExecError(t *testing.T) {
|
|||
f(`outliersk(1)`)
|
||||
f(`mode_over_time()`)
|
||||
f(`rate_over_sum()`)
|
||||
f(`zscore_over_time()`)
|
||||
f(`mode()`)
|
||||
f(`zscore()`)
|
||||
f(`prometheus_buckets()`)
|
||||
f(`buckets_limit()`)
|
||||
f(`buckets_limit(1)`)
|
||||
|
|
|
@ -74,6 +74,7 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||
"hoeffding_bound_lower": newRollupHoeffdingBoundLower,
|
||||
"ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime),
|
||||
"descent_over_time": newRollupFuncOneArg(rollupDescentOverTime),
|
||||
"zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime),
|
||||
|
||||
// `timestamp` function must return timestamp for the last datapoint on the current window
|
||||
// in order to properly handle offset and timestamps unaligned to the current step.
|
||||
|
@ -125,6 +126,7 @@ var rollupAggrFuncs = map[string]rollupFunc{
|
|||
"tmax_over_time": rollupTmax,
|
||||
"ascent_over_time": rollupAscentOverTime,
|
||||
"descent_over_time": rollupDescentOverTime,
|
||||
"zscore_over_time": rollupZScoreOverTime,
|
||||
"timestamp": rollupTimestamp,
|
||||
"mode_over_time": rollupModeOverTime,
|
||||
"rate_over_sum": rollupRateOverSum,
|
||||
|
@ -153,6 +155,7 @@ var rollupFuncsCannotAdjustWindow = map[string]bool{
|
|||
"integrate": true,
|
||||
"ascent_over_time": true,
|
||||
"descent_over_time": true,
|
||||
"zscore_over_time": true,
|
||||
}
|
||||
|
||||
var rollupFuncsRemoveCounterResets = map[string]bool{
|
||||
|
@ -1606,6 +1609,20 @@ func rollupDescentOverTime(rfa *rollupFuncArg) float64 {
|
|||
return s
|
||||
}
|
||||
|
||||
func rollupZScoreOverTime(rfa *rollupFuncArg) float64 {
|
||||
// See https://about.gitlab.com/blog/2019/07/23/anomaly-detection-using-prometheus/#using-z-score-for-anomaly-detection
|
||||
scrapeInterval := rollupScrapeInterval(rfa)
|
||||
lag := rollupLag(rfa)
|
||||
if math.IsNaN(scrapeInterval) || math.IsNaN(lag) || lag > scrapeInterval {
|
||||
return nan
|
||||
}
|
||||
d := rollupLast(rfa) - rollupAvg(rfa)
|
||||
if d == 0 {
|
||||
return 0
|
||||
}
|
||||
return d / rollupStddev(rfa)
|
||||
}
|
||||
|
||||
func rollupFirst(rfa *rollupFuncArg) float64 {
|
||||
// There is no need in handling NaNs here, since they must be cleaned up
|
||||
// before calling rollup funcs.
|
||||
|
|
|
@ -392,6 +392,7 @@ func TestRollupNewRollupFuncSuccess(t *testing.T) {
|
|||
f("increases_over_time", 5)
|
||||
f("ascent_over_time", 142)
|
||||
f("descent_over_time", 231)
|
||||
f("zscore_over_time", -0.4254336383156416)
|
||||
f("timestamp", 0.13)
|
||||
f("mode_over_time", 34)
|
||||
f("rate_over_sum", 4520)
|
||||
|
@ -983,6 +984,20 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
t.Run("zscore_over_time", func(t *testing.T) {
|
||||
rc := rollupConfig{
|
||||
Func: rollupZScoreOverTime,
|
||||
Start: 0,
|
||||
End: 160,
|
||||
Step: 40,
|
||||
Window: 80,
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 0.9397878236968458, 1.1969836716333457, 2.3112921116373175, nan}
|
||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRollupBigNumberOfValues(t *testing.T) {
|
||||
|
|
|
@ -120,10 +120,14 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
|
|||
for the given `phi` in the range `[0..1]`.
|
||||
- `last_over_time(m[d])` - returns the last value for `m` on the time range `d`.
|
||||
- `first_over_time(m[d])` - returns the first value for `m` on the time range `d`.
|
||||
- `outliersk(N, m)` - returns up to `N` outlier time series for `m`. Outlier time series have the highest deviation from the `median(m)`.
|
||||
- `outliersk(N, q) by (group)` - returns up to `N` outlier time series for `q` in every `group`. Outlier time series have the highest deviation from the `median(m)`.
|
||||
This aggregate function is useful to detect anomalies across groups of similar time series.
|
||||
- `ascent_over_time(m[d])` - returns the sum of positive deltas between adjancent data points in `m` over `d`. Useful for tracking height gains in GPS track.
|
||||
- `descent_over_time(m[d])` - returns the absolute sum of negative deltas between adjancent data points in `m` over `d`. Useful for tracking height loss in GPS track.
|
||||
- `mode_over_time(m[d])` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for `m` values over `d`. It is expected that `m` values are discrete.
|
||||
- `mode(q) by (x)` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for each point in `q` grouped by `x`. It is expected that `q` points are discrete.
|
||||
- `rate_over_sum(m[d])` - returns rate over the sum of `m` values over `d` duration.
|
||||
- `zscore_over_time(m[d])` - returns [z-score](https://en.wikipedia.org/wiki/Standard_score) for `m` values over `d` duration. Useful for detecting
|
||||
anomalies in time series comparing to historical samples.
|
||||
- `zscore(q) by (group)` - returns independent [z-score](https://en.wikipedia.org/wiki/Standard_score) values for every point in every `group` of `q`.
|
||||
Useful for detecting anomalies in the group of related time series.
|
||||
|
|
2
go.mod
2
go.mod
|
@ -9,7 +9,7 @@ require (
|
|||
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.1
|
||||
github.com/VictoriaMetrics/metrics v1.12.2
|
||||
github.com/VictoriaMetrics/metricsql v0.2.9
|
||||
github.com/VictoriaMetrics/metricsql v0.2.10
|
||||
github.com/aws/aws-sdk-go v1.33.14
|
||||
github.com/cespare/xxhash/v2 v2.1.1
|
||||
github.com/golang/snappy v0.0.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t
|
|||
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
|
||||
github.com/VictoriaMetrics/metrics v1.12.2 h1:SG8iAmqavDNuh7GIdHPoGHUhDL23KeKfvSZSozucNeA=
|
||||
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.9 h1:RHLEmt4VNZ2RAqZjmXyRtKpCrtSuYUS1+TyOqfXbHWs=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.9/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.10 h1:1z0cfVwjh9n6J0rM8znNQkTy0rIpB+VO2hqnggtYRoc=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.10/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
|
||||
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
|
||||
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
|
||||
github.com/aws/aws-sdk-go v1.33.14 h1:ucjyVEvtIdtn4acf+RKsgk6ybAYeMLXpGZeqoVvi7Kk=
|
||||
|
|
1
vendor/github.com/VictoriaMetrics/metricsql/aggr.go
generated
vendored
1
vendor/github.com/VictoriaMetrics/metricsql/aggr.go
generated
vendored
|
@ -37,6 +37,7 @@ var aggrFuncs = map[string]bool{
|
|||
"any": true,
|
||||
"outliersk": true,
|
||||
"mode": true,
|
||||
"zscore": true,
|
||||
}
|
||||
|
||||
func isAggrFunc(s string) bool {
|
||||
|
|
1
vendor/github.com/VictoriaMetrics/metricsql/rollup.go
generated
vendored
1
vendor/github.com/VictoriaMetrics/metricsql/rollup.go
generated
vendored
|
@ -59,6 +59,7 @@ var rollupFuncs = map[string]bool{
|
|||
"hoeffding_bound_lower": true,
|
||||
"ascent_over_time": true,
|
||||
"descent_over_time": true,
|
||||
"zscore_over_time": true,
|
||||
|
||||
// `timestamp` func has been moved here because it must work properly with offsets and samples unaligned to the current step.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.
|
||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
|
|||
github.com/VictoriaMetrics/fasthttp/stackless
|
||||
# github.com/VictoriaMetrics/metrics v1.12.2
|
||||
github.com/VictoriaMetrics/metrics
|
||||
# github.com/VictoriaMetrics/metricsql v0.2.9
|
||||
# github.com/VictoriaMetrics/metricsql v0.2.10
|
||||
github.com/VictoriaMetrics/metricsql
|
||||
github.com/VictoriaMetrics/metricsql/binaryop
|
||||
# github.com/aws/aws-sdk-go v1.33.14
|
||||
|
|
Loading…
Reference in a new issue