app/vmselect/promql: add mode() aggregate function

This commit is contained in:
Aliaksandr Valialkin 2020-07-20 15:18:20 +03:00
parent b35cb293f5
commit ecb1b2564a
9 changed files with 110 additions and 27 deletions

View file

@ -46,6 +46,7 @@ var aggrFuncs = map[string]aggrFunc{
"bottomk_median": newAggrFuncRangeTopK(medianValue, true),
"any": aggrFuncAny,
"outliersk": aggrFuncOutliersK,
"mode": newAggrFunc(aggrFuncMode),
}
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
@ -422,6 +423,52 @@ func aggrFuncDistinct(tss []*timeseries) []*timeseries {
return tss[:1]
}
func aggrFuncMode(tss []*timeseries) []*timeseries {
dst := tss[0]
a := make([]float64, 0, len(tss))
for i := range dst.Values {
a := a[:0]
for _, ts := range tss {
v := ts.Values[i]
if !math.IsNaN(v) {
a = append(a, v)
}
}
dst.Values[i] = modeNoNaNs(nan, a)
}
return tss[:1]
}
// modeNoNaNs returns mode for a.
//
// It is expected that a doesn't contain NaNs.
//
// See https://en.wikipedia.org/wiki/Mode_(statistics)
func modeNoNaNs(prevValue float64, a []float64) float64 {
if len(a) == 0 {
return prevValue
}
sort.Float64s(a)
j := -1
dMax := 0
mode := prevValue
for i, v := range a {
if prevValue == v {
continue
}
if d := i - j; d > dMax || math.IsNaN(mode) {
dMax = d
mode = prevValue
}
j = i
prevValue = v
}
if d := len(a) - j; d > dMax || math.IsNaN(mode) {
mode = prevValue
}
return mode
}
func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 2); err != nil {

View file

@ -0,0 +1,36 @@
package promql
import (
"math"
"testing"
)
func TestModeNoNaNs(t *testing.T) {
f := func(prevValue float64, a []float64, expectedResult float64) {
t.Helper()
result := modeNoNaNs(prevValue, a)
if math.IsNaN(result) {
if !math.IsNaN(expectedResult) {
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
}
return
}
if result != expectedResult {
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
}
}
f(nan, nil, nan)
f(nan, []float64{123}, 123)
f(nan, []float64{1, 2, 3}, 1)
f(nan, []float64{1, 2, 2}, 2)
f(nan, []float64{1, 1, 2}, 1)
f(nan, []float64{1, 1, 1}, 1)
f(nan, []float64{1, 2, 2, 3}, 2)
f(nan, []float64{1, 1, 2, 2, 3, 3, 3}, 3)
f(1, []float64{2, 3, 4, 5}, 1)
f(1, []float64{2, 2}, 2)
f(1, []float64{2, 3, 3}, 3)
f(1, []float64{2, 4, 3, 4, 3, 4}, 4)
f(1, []float64{2, 3, 3, 4, 4}, 3)
f(1, []float64{4, 3, 2, 3, 4}, 3)
}

View file

@ -3242,6 +3242,24 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`mode()`, func(t *testing.T) {
t.Parallel()
q := `mode((
alias(3, "m1"),
alias(2, "m2"),
alias(3, "m3"),
alias(4, "m4"),
alias(3, "m5"),
alias(2, "m6"),
))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3, 3, 3, 3, 3, 3},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`avg(scalar) without (xx, yy)`, func(t *testing.T) {
t.Parallel()
q := `avg without (xx, yy) (123)`
@ -4513,7 +4531,7 @@ func TestExecSuccess(t *testing.T) {
q := `mode_over_time(round(time()/500)[100s:1s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{2, 2, 3, 3, 3, 4},
Values: []float64{2, 2, 3, 3, 4, 4},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
@ -5731,6 +5749,7 @@ func TestExecError(t *testing.T) {
f(`outliersk()`)
f(`outliersk(1)`)
f(`mode_over_time()`)
f(`mode()`)
// Invalid argument type
f(`median_over_time({}, 2)`)

View file

@ -4,7 +4,6 @@ import (
"flag"
"fmt"
"math"
"sort"
"strings"
"sync"
@ -1543,27 +1542,7 @@ func rollupTimestamp(rfa *rollupFuncArg) float64 {
func rollupModeOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if len(values) == 0 {
return prevValue
}
sort.Float64s(values)
j := -1
dMax := 0
mode := prevValue
for i, v := range values {
if prevValue == v {
continue
}
if d := i - j; d > dMax {
dMax = d
mode = prevValue
}
j = i
prevValue = v
}
return mode
return modeNoNaNs(rfa.prevValue, rfa.values)
}
func rollupAscentOverTime(rfa *rollupFuncArg) float64 {

View file

@ -124,3 +124,4 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `ascent_over_time(m[d])` - returns the sum of positive deltas between adjancent data points in `m` over `d`. Useful for tracking height gains in GPS track.
- `descent_over_time(m[d])` - returns the absolute sum of negative deltas between adjancent data points in `m` over `d`. Useful for tracking height loss in GPS track.
- `mode_over_time(m[d])` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for `m` values over `d`. It is expected that `m` values are discrete.
- `mode(q) by (x)` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for each point in `q` grouped by `x`.

2
go.mod
View file

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.0.1
github.com/VictoriaMetrics/metrics v1.11.3
github.com/VictoriaMetrics/metricsql v0.2.6
github.com/VictoriaMetrics/metricsql v0.2.7
github.com/aws/aws-sdk-go v1.33.5
github.com/cespare/xxhash/v2 v2.1.1
github.com/golang/snappy v0.0.1

4
go.sum
View file

@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
github.com/VictoriaMetrics/metrics v1.11.3 h1:eSfXc0CrquKa1VTNUvhP+dhNjLUZHQGTFfp19mYCQWE=
github.com/VictoriaMetrics/metrics v1.11.3/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
github.com/VictoriaMetrics/metricsql v0.2.6 h1:hJxeGeyP++fWuW41URWcl2PTNTMTTeqm7UcT1BEZOOg=
github.com/VictoriaMetrics/metricsql v0.2.6/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/VictoriaMetrics/metricsql v0.2.7 h1:4FXyJJjXTbAPVAakEEwaFSD0YOEPKEjdZKNQrWN76Ts=
github.com/VictoriaMetrics/metricsql v0.2.7/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/aws/aws-sdk-go v1.33.5 h1:p2fr1ryvNTU6avUWLI+/H7FGv0TBIjzVM5WDgXBBv4U=

View file

@ -36,6 +36,7 @@ var aggrFuncs = map[string]bool{
"bottomk_median": true,
"any": true,
"outliersk": true,
"mode": true,
}
func isAggrFunc(s string) bool {

2
vendor/modules.txt vendored
View file

@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.11.3
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.2.6
# github.com/VictoriaMetrics/metricsql v0.2.7
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop
# github.com/aws/aws-sdk-go v1.33.5