From ed994873fd88733082b6a3bbc4db200156a9a097 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 16 Oct 2021 21:13:58 +0300 Subject: [PATCH] app/vmselect/promql: randomize the static selection of time series returned from `limitk()` Sort series by a hash calculated from the series labels. This should guarantee "random" selection of the returned time series. Previously the selection could be biased, since time series were sorted alphabetically by label names and label values. --- app/vmselect/promql/aggr.go | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index ba908e6b89..ba71ea0d66 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" + xxhash "github.com/cespare/xxhash/v2" ) var aggrFuncs = map[string]aggrFunc{ @@ -1012,9 +1013,26 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries { // Sort series by metricName in order to get consistent set of output series // across multiple calls to limitk() function. - sort.Slice(tss, func(i, j int) bool { - return metricNameLess(&tss[i].MetricName, &tss[j].MetricName) + // Sort series by hash in order to guarantee uniform selection across series. + type hashSeries struct { + h uint64 + ts *timeseries + } + hss := make([]hashSeries, len(tss)) + d := xxhash.New() + for i, ts := range tss { + h := getHash(d, &ts.MetricName) + hss[i] = hashSeries{ + h: h, + ts: ts, + } + } + sort.Slice(hss, func(i, j int) bool { + return hss[i].h < hss[j].h }) + for i, hs := range hss { + tss[i] = hs.ts + } if len(tss) > maxK { tss = tss[:maxK] } @@ -1032,6 +1050,17 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true) } +func getHash(d *xxhash.Digest, mn *storage.MetricName) uint64 { + d.Reset() + _, _ = d.Write(mn.MetricGroup) + for _, tag := range mn.Tags { + _, _ = d.Write(tag.Key) + _, _ = d.Write(tag.Value) + } + return d.Sum64() + +} + func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if len(args) < 3 {