From da97e58979a0a6c41bae47a2a8c74783af659a04 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 16 Oct 2021 21:13:58 +0300 Subject: [PATCH] app/vmselect/promql: randomize the static selection of time series returned from `limitk()` Sort series by a hash calculated from the series labels. This should guarantee "random" selection of the returned time series. Previously the selection could be biased, since time series were sorted alphabetically by label names and label values. --- app/vmselect/promql/aggr.go | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index ba908e6b8..ba71ea0d6 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" + xxhash "github.com/cespare/xxhash/v2" ) var aggrFuncs = map[string]aggrFunc{ @@ -1012,9 +1013,26 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { // Sort series by metricName in order to get consistent set of output series // across multiple calls to limitk() function. - sort.Slice(tss, func(i, j int) bool { - return metricNameLess(&tss[i].MetricName, &tss[j].MetricName) + // Sort series by hash in order to guarantee uniform selection across series. + type hashSeries struct { + h uint64 + ts *timeseries + } + hss := make([]hashSeries, len(tss)) + d := xxhash.New() + for i, ts := range tss { + h := getHash(d, &ts.MetricName) + hss[i] = hashSeries{ + h: h, + ts: ts, + } + } + sort.Slice(hss, func(i, j int) bool { + return hss[i].h < hss[j].h }) + for i, hs := range hss { + tss[i] = hs.ts + } if len(tss) > maxK { tss = tss[:maxK] } @@ -1032,6 +1050,17 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } +func getHash(d *xxhash.Digest, mn *storage.MetricName) uint64 { + d.Reset() + _, _ = d.Write(mn.MetricGroup) + for _, tag := range mn.Tags { + _, _ = d.Write(tag.Key) + _, _ = d.Write(tag.Value) + } + return d.Sum64() + +} + func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if len(args) < 3 {