app/vmselect/promql: add mad(q) and outliers_mad(tolerance, q) functions to MetricsQL

This commit is contained in:
Aliaksandr Valialkin 2021-09-16 13:33:53 +03:00
parent eca75cc650
commit e60dfc96ff
9 changed files with 149 additions and 21 deletions

View file

@ -45,6 +45,8 @@ var aggrFuncs = map[string]aggrFunc{
"bottomk_avg": newAggrFuncRangeTopK(avgValue, true),
"bottomk_median": newAggrFuncRangeTopK(medianValue, true),
"any": aggrFuncAny,
"mad": newAggrFunc(aggrFuncMAD),
"outliers_mad": aggrFuncOutliersMAD,
"outliersk": aggrFuncOutliersK,
"mode": newAggrFunc(aggrFuncMode),
"zscore": aggrFuncZScore,
@ -485,7 +487,6 @@ func aggrFuncZScore(afa *aggrFuncArg) ([]*timeseries, error) {
ts.Values[i] = (v - avg) / stddev
}
}
// Remove MetricGroup from all the tss.
for _, ts := range tss {
ts.MetricName.ResetMetricGroup()
@ -811,6 +812,52 @@ func medianValue(values []float64) float64 {
return value
}
func aggrFuncMAD(tss []*timeseries) []*timeseries {
// Calculate medians for each point across tss.
medians := getPerPointMedians(tss)
// Calculate MAD values multipled by tolerance for each point across tss.
// See https://en.wikipedia.org/wiki/Median_absolute_deviation
mads := getPerPointMADs(tss, medians)
tss[0].Values = append(tss[0].Values[:0], mads...)
return tss[:1]
}
func aggrFuncOutliersMAD(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
return nil, err
}
tolerances, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Calculate medians for each point across tss.
medians := getPerPointMedians(tss)
// Calculate MAD values multipled by tolerance for each point across tss.
// See https://en.wikipedia.org/wiki/Median_absolute_deviation
mads := getPerPointMADs(tss, medians)
for n := range mads {
mads[n] *= tolerances[n]
}
// Leave only time series with at least a single peak above the MAD multiplied by tolerance.
tssDst := tss[:0]
for _, ts := range tss {
values := ts.Values
for n, v := range values {
ad := math.Abs(v - medians[n])
mad := mads[n]
if ad > mad {
tssDst = append(tssDst, ts)
break
}
}
}
return tssDst
}
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {
@ -822,20 +869,7 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
}
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
// Calculate medians for each point across tss.
medians := make([]float64, len(ks))
h := histogram.GetFast()
for n := range ks {
h.Reset()
for j := range tss {
v := tss[j].Values[n]
if !math.IsNaN(v) {
h.Update(v)
}
}
medians[n] = h.Quantile(0.5)
}
histogram.PutFast(h)
medians := getPerPointMedians(tss)
// Return topK time series with the highest variance from median.
f := func(values []float64) float64 {
sum2 := float64(0)
@ -850,6 +884,44 @@ func aggrFuncOutliersK(afa *aggrFuncArg) ([]*timeseries, error) {
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
}
func getPerPointMedians(tss []*timeseries) []float64 {
if len(tss) == 0 {
logger.Panicf("BUG: expecting non-empty tss")
}
medians := make([]float64, len(tss[0].Values))
h := histogram.GetFast()
for n := range medians {
h.Reset()
for j := range tss {
v := tss[j].Values[n]
if !math.IsNaN(v) {
h.Update(v)
}
}
medians[n] = h.Quantile(0.5)
}
histogram.PutFast(h)
return medians
}
func getPerPointMADs(tss []*timeseries, medians []float64) []float64 {
mads := make([]float64, len(medians))
h := histogram.GetFast()
for n, median := range medians {
h.Reset()
for j := range tss {
v := tss[j].Values[n]
if !math.IsNaN(v) {
ad := math.Abs(v - median)
h.Update(ad)
}
}
mads[n] = h.Quantile(0.5)
}
histogram.PutFast(h)
return mads
}
func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {

View file

@ -5524,6 +5524,47 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`mad()`, func(t *testing.T) {
t.Parallel()
q := `mad(
alias(time(), "metric1"),
alias(time()*1.5, "metric2"),
label_set(time()*0.9, "baz", "sss"),
)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{100, 120, 140, 160, 180, 200},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`outliers_mad(1)`, func(t *testing.T) {
t.Parallel()
q := `outliers_mad(1, (
alias(time(), "metric1"),
alias(time()*1.5, "metric2"),
label_set(time()*0.9, "baz", "sss"),
))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1500, 1800, 2100, 2400, 2700, 3000},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("metric2")
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`outliers_mad(5)`, func(t *testing.T) {
t.Parallel()
q := `outliers_mad(5, (
alias(time(), "metric1"),
alias(time()*1.5, "metric2"),
label_set(time()*0.9, "baz", "sss"),
))`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`outliersk(0)`, func(t *testing.T) {
t.Parallel()
q := `outliersk(0, (
@ -7000,6 +7041,9 @@ func TestExecError(t *testing.T) {
f(`hoeffding_bound_upper()`)
f(`hoeffding_bound_upper(1)`)
f(`hoeffding_bound_upper(0.99, foo, 1)`)
f(`mad()`)
f(`outliers_mad()`)
f(`outliers_mad(1)`)
f(`outliersk()`)
f(`outliersk(1)`)
f(`mode_over_time()`)

View file

@ -24,6 +24,8 @@ sort: 15
* FEATURE: vmui: use Prometheus-compatible query args, so `vmui` could be accessed from graph editor in Grafana. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1619). Thanks to @Loori-R.
* FEATURE: vmselect: automatically add missing port to `-storageNode` hostnames. For example, `-storageNode=vmstorage1,vmstorage2` is automatically translated to `-storageNode=vmstorage1:8401,vmstorage2:8401`. This simplifies [manual setup of VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#cluster-setup).
* FEATURE: vminsert: automatically add missing port to `-storageNode` hostnames. For example, `-storageNode=vmstorage1,vmstorage2` is automatically translated to `-storageNode=vmstorage1:8400,vmstorage2:8400`. This simplifies [manual setup of VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#cluster-setup).
* FEATURE: add [mad(q)](https://docs.victoriametrics.com/MetricsQL.html#mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) for groups of points with identical timestamps across multiple time series.
* FEATURE: add [outliers_mad(tolerance, q)](https://docs.victoriametrics.com/MetricsQL.html#outliers_mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It returns time series with peaks outside the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) multiplied by `tolerance`.
* BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601).
* BUGFIX: vmselect: reset connection timeouts after each request to `vmstorage`. This should prevent from `cannot read data in 0.000 seconds: unexpected EOF` warning in logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1562). Thanks to @mxlxm .

View file

@ -748,6 +748,10 @@ See also [implicit query conversions](#implicit-query-conversions).
`limitk(k, q) by (group_labels)` returns up to `k` time series per each `group_labels` out of time series returned by `q`. The returned set of time series can change with each call.
#### mad
`mad(q) by (group_labels)` returns the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. See also [outliers_mad](#outliers_mad) and [stddev](#stddev).
#### max
`max(q) by (group_labels)` returns the maximum value per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL.
@ -764,9 +768,13 @@ See also [implicit query conversions](#implicit-query-conversions).
`mode(q) by (group_labels)` returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp.
#### outliers_mad
`outliers_mad(tolerance, q)` returns time series from `q` with at least a single point outside [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (aka MAD) multiplied by `tolerance`. E.g. it returns time series with at least a single point below `median(q) - mad(q)` or a single point above `median(q) + mad(q)`. See also [outliersk](#outliersk) and [mad](#mad).
#### outliersk
`outliersk(k, q)` returns up to `k` time series with the biggest standard deviation (aka outliers) out of time series returned by `q`.
`outliersk(k, q)` returns up to `k` time series with the biggest standard deviation (aka outliers) out of time series returned by `q`. See also [outliers_mad](#outliers_mad).
#### quantile

2
go.mod
View file

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.1.0
github.com/VictoriaMetrics/metrics v1.18.0
github.com/VictoriaMetrics/metricsql v0.21.0
github.com/VictoriaMetrics/metricsql v0.22.0
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/aws/aws-sdk-go v1.40.43
github.com/cespare/xxhash/v2 v2.1.2

4
go.sum
View file

@ -109,8 +109,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ=
github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/VictoriaMetrics/metricsql v0.21.0 h1:wA/IVfRFQaThy4bM1kAmPiCR0BkWv4tEXD9lBF+GPdU=
github.com/VictoriaMetrics/metricsql v0.21.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VictoriaMetrics/metricsql v0.22.0 h1:QWXvNfaFGeEQvh7Cjjpfp/7Un1N+W+ODQOiM+numyN0=
github.com/VictoriaMetrics/metricsql v0.22.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=

View file

@ -35,6 +35,8 @@ var aggrFuncs = map[string]bool{
"bottomk_avg": true,
"bottomk_median": true,
"any": true,
"mad": true,
"outliers_mad": true,
"outliersk": true,
"mode": true,
"zscore": true,

View file

@ -134,7 +134,7 @@ func scanString(s string) (string, error) {
for {
n := strings.IndexByte(s[i:], quote)
if n < 0 {
return "", fmt.Errorf("cannot find closing quote %ch for the string %q", quote, s)
return "", fmt.Errorf("cannot find closing quote %c for the string %q", quote, s)
}
i += n
bs := 0

2
vendor/modules.txt vendored
View file

@ -22,7 +22,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.18.0
## explicit
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.21.0
# github.com/VictoriaMetrics/metricsql v0.22.0
## explicit
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop