mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
app/vmselect/promql: add {topk|bottomk}_{min|max|avg|median}
aggregate functions for returning the exact k time series on the given time range
The full list of functions added: - `topk_min(k, q)` - returns top K time series with the max minimums on the given time range - `topk_max(k, q)` - returns top K time series with the max maximums on the given time range - `topk_avg(k, q)` - returns top K time series with the max averages on the given time range - `topk_median(k, q)` - returns top K time series with the max medians on the given time range - `bottomk_min(k, q)` - returns bottom K time series with the min minimums on the given time range - `bottomk_max(k, q)` - returns bottom K time series with the min maximums on the given time range - `bottomk_avg(k, q)` - returns bottom K time series with the min averages on the given time range - `bottomk_median(k, q)` - returns bottom K time series with the min medians on the given time range
This commit is contained in:
parent
639967db59
commit
d39bba3547
2 changed files with 267 additions and 28 deletions
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
"github.com/valyala/histogram"
|
||||||
)
|
)
|
||||||
|
|
||||||
var aggrFuncs = map[string]aggrFunc{
|
var aggrFuncs = map[string]aggrFunc{
|
||||||
|
@ -27,12 +28,20 @@ var aggrFuncs = map[string]aggrFunc{
|
||||||
"quantile": aggrFuncQuantile,
|
"quantile": aggrFuncQuantile,
|
||||||
|
|
||||||
// Extended PromQL funcs
|
// Extended PromQL funcs
|
||||||
"median": aggrFuncMedian,
|
"median": aggrFuncMedian,
|
||||||
"limitk": aggrFuncLimitK,
|
"limitk": aggrFuncLimitK,
|
||||||
"distinct": newAggrFunc(aggrFuncDistinct),
|
"distinct": newAggrFunc(aggrFuncDistinct),
|
||||||
"sum2": newAggrFunc(aggrFuncSum2),
|
"sum2": newAggrFunc(aggrFuncSum2),
|
||||||
"geomean": newAggrFunc(aggrFuncGeomean),
|
"geomean": newAggrFunc(aggrFuncGeomean),
|
||||||
"histogram": newAggrFunc(aggrFuncHistogram),
|
"histogram": newAggrFunc(aggrFuncHistogram),
|
||||||
|
"topk_min": newAggrFuncRangeTopK(minValue, false),
|
||||||
|
"topk_max": newAggrFuncRangeTopK(maxValue, false),
|
||||||
|
"topk_avg": newAggrFuncRangeTopK(avgValue, false),
|
||||||
|
"topk_median": newAggrFuncRangeTopK(medianValue, false),
|
||||||
|
"bottomk_min": newAggrFuncRangeTopK(minValue, true),
|
||||||
|
"bottomk_max": newAggrFuncRangeTopK(maxValue, true),
|
||||||
|
"bottomk_avg": newAggrFuncRangeTopK(avgValue, true),
|
||||||
|
"bottomk_median": newAggrFuncRangeTopK(medianValue, true),
|
||||||
}
|
}
|
||||||
|
|
||||||
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
|
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
|
||||||
|
@ -459,37 +468,138 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
afe := func(tss []*timeseries) []*timeseries {
|
afe := func(tss []*timeseries) []*timeseries {
|
||||||
rvs := tss
|
for n := range tss[0].Values {
|
||||||
for n := range rvs[0].Values {
|
sort.Slice(tss, func(i, j int) bool {
|
||||||
sort.Slice(rvs, func(i, j int) bool {
|
a := tss[i].Values[n]
|
||||||
a := rvs[i].Values[n]
|
b := tss[j].Values[n]
|
||||||
b := rvs[j].Values[n]
|
|
||||||
cmp := lessWithNaNs(a, b)
|
|
||||||
if isReverse {
|
if isReverse {
|
||||||
cmp = !cmp
|
a, b = b, a
|
||||||
}
|
}
|
||||||
return cmp
|
return lessWithNaNs(a, b)
|
||||||
})
|
})
|
||||||
if math.IsNaN(ks[n]) {
|
fillNaNsAtIdx(n, ks[n], tss)
|
||||||
ks[n] = 0
|
|
||||||
}
|
|
||||||
k := int(ks[n])
|
|
||||||
if k < 0 {
|
|
||||||
k = 0
|
|
||||||
}
|
|
||||||
if k > len(rvs) {
|
|
||||||
k = len(rvs)
|
|
||||||
}
|
|
||||||
for _, ts := range rvs[:len(rvs)-k] {
|
|
||||||
ts.Values[n] = nan
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return removeNaNs(rvs)
|
return removeNaNs(tss)
|
||||||
}
|
}
|
||||||
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true)
|
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type tsWithValue struct {
|
||||||
|
ts *timeseries
|
||||||
|
value float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAggrFuncRangeTopK(f func(values []float64) float64, isReverse bool) aggrFunc {
|
||||||
|
return func(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||||
|
args := afa.args
|
||||||
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ks, err := getScalar(args[0], 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
afe := func(tss []*timeseries) []*timeseries {
|
||||||
|
maxs := make([]tsWithValue, len(tss))
|
||||||
|
for i, ts := range tss {
|
||||||
|
value := f(ts.Values)
|
||||||
|
maxs[i] = tsWithValue{
|
||||||
|
ts: ts,
|
||||||
|
value: value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Slice(maxs, func(i, j int) bool {
|
||||||
|
a := maxs[i].value
|
||||||
|
b := maxs[j].value
|
||||||
|
if isReverse {
|
||||||
|
a, b = b, a
|
||||||
|
}
|
||||||
|
return lessWithNaNs(a, b)
|
||||||
|
})
|
||||||
|
for i := range maxs {
|
||||||
|
tss[i] = maxs[i].ts
|
||||||
|
}
|
||||||
|
for i, k := range ks {
|
||||||
|
fillNaNsAtIdx(i, k, tss)
|
||||||
|
}
|
||||||
|
return removeNaNs(tss)
|
||||||
|
}
|
||||||
|
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func fillNaNsAtIdx(idx int, k float64, tss []*timeseries) {
|
||||||
|
if math.IsNaN(k) {
|
||||||
|
k = 0
|
||||||
|
}
|
||||||
|
kn := int(k)
|
||||||
|
if kn < 0 {
|
||||||
|
kn = 0
|
||||||
|
}
|
||||||
|
if kn > len(tss) {
|
||||||
|
kn = len(tss)
|
||||||
|
}
|
||||||
|
for _, ts := range tss[:len(tss)-kn] {
|
||||||
|
ts.Values[idx] = nan
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func minValue(values []float64) float64 {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return nan
|
||||||
|
}
|
||||||
|
min := values[0]
|
||||||
|
for _, v := range values[1:] {
|
||||||
|
if v < min {
|
||||||
|
min = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return min
|
||||||
|
}
|
||||||
|
|
||||||
|
func maxValue(values []float64) float64 {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return nan
|
||||||
|
}
|
||||||
|
max := values[0]
|
||||||
|
for _, v := range values[1:] {
|
||||||
|
if v > max {
|
||||||
|
max = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
|
||||||
|
func avgValue(values []float64) float64 {
|
||||||
|
sum := float64(0)
|
||||||
|
count := 0
|
||||||
|
for _, v := range values {
|
||||||
|
if math.IsNaN(v) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
sum += v
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
return nan
|
||||||
|
}
|
||||||
|
return sum / float64(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
func medianValue(values []float64) float64 {
|
||||||
|
h := histogram.GetFast()
|
||||||
|
for _, v := range values {
|
||||||
|
if math.IsNaN(v) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
h.Update(v)
|
||||||
|
}
|
||||||
|
value := h.Quantile(0.5)
|
||||||
|
histogram.PutFast(h)
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
|
func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||||
args := afa.args
|
args := afa.args
|
||||||
if err := expectTransformArgsNum(args, 2); err != nil {
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
||||||
|
|
|
@ -3114,6 +3114,126 @@ func TestExecSuccess(t *testing.T) {
|
||||||
resultExpected := []netstorage.Result{r1, r2}
|
resultExpected := []netstorage.Result{r1, r2}
|
||||||
f(q, resultExpected)
|
f(q, resultExpected)
|
||||||
})
|
})
|
||||||
|
t.Run(`topk_min(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(topk_min(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{10, 10, 10, nan, nan, nan},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`bottomk_min(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(bottomk_min(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("sss"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`topk_max(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(topk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("sss"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`bottomk_max(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(bottomk_max(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{10, 10, 10, nan, nan, nan},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`topk_avg(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(topk_avg(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("sss"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`bottomk_avg(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(bottomk_avg(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{10, 10, 10, nan, nan, nan},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`topk_median(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(topk_median(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{nan, nan, nan, 10.666666666666666, 12, 13.333333333333334},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("baz"),
|
||||||
|
Value: []byte("sss"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`bottomk_median(1)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(bottomk_median(1, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{10, 10, 10, nan, nan, nan},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
t.Run(`topk(1, nan_timeseries)`, func(t *testing.T) {
|
t.Run(`topk(1, nan_timeseries)`, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := `topk(1, label_set(NaN, "foo", "bar") or label_set(time()/150, "baz", "sss")) default 0`
|
q := `topk(1, label_set(NaN, "foo", "bar") or label_set(time()/150, "baz", "sss")) default 0`
|
||||||
|
@ -4522,8 +4642,16 @@ func TestExecError(t *testing.T) {
|
||||||
f(`count_values()`)
|
f(`count_values()`)
|
||||||
f(`quantile()`)
|
f(`quantile()`)
|
||||||
f(`topk()`)
|
f(`topk()`)
|
||||||
|
f(`topk_min()`)
|
||||||
|
f(`topk_max()`)
|
||||||
|
f(`topk_avg()`)
|
||||||
|
f(`topk_median()`)
|
||||||
f(`limitk()`)
|
f(`limitk()`)
|
||||||
f(`bottomk()`)
|
f(`bottomk()`)
|
||||||
|
f(`bottomk_min()`)
|
||||||
|
f(`bottomk_max()`)
|
||||||
|
f(`bottomk_avg()`)
|
||||||
|
f(`bottomk_median()`)
|
||||||
f(`time(123)`)
|
f(`time(123)`)
|
||||||
f(`start(1)`)
|
f(`start(1)`)
|
||||||
f(`end(1)`)
|
f(`end(1)`)
|
||||||
|
@ -4566,6 +4694,7 @@ func TestExecError(t *testing.T) {
|
||||||
f(`clamp_max(1, 1 or label_set(2, "xx", "foo"))`)
|
f(`clamp_max(1, 1 or label_set(2, "xx", "foo"))`)
|
||||||
f(`clamp_min(1, 1 or label_set(2, "xx", "foo"))`)
|
f(`clamp_min(1, 1 or label_set(2, "xx", "foo"))`)
|
||||||
f(`topk(label_set(2, "xx", "foo") or 1, 12)`)
|
f(`topk(label_set(2, "xx", "foo") or 1, 12)`)
|
||||||
|
f(`topk_avg(label_set(2, "xx", "foo") or 1, 12)`)
|
||||||
f(`limitk(label_set(2, "xx", "foo") or 1, 12)`)
|
f(`limitk(label_set(2, "xx", "foo") or 1, 12)`)
|
||||||
f(`round(1, 1 or label_set(2, "xx", "foo"))`)
|
f(`round(1, 1 or label_set(2, "xx", "foo"))`)
|
||||||
f(`histogram_quantile(1 or label_set(2, "xx", "foo"), 1)`)
|
f(`histogram_quantile(1 or label_set(2, "xx", "foo"), 1)`)
|
||||||
|
|
Loading…
Reference in a new issue