mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: randomize the static selection of time series returned from limitk()
Sort series by a hash calculated from the series labels. This should guarantee "random" selection of the returned time series. Previously the selection could be biased, since time series were sorted alphabetically by label names and label values.
This commit is contained in:
parent
c37f285466
commit
da97e58979
1 changed files with 31 additions and 2 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
||||
var aggrFuncs = map[string]aggrFunc{
|
||||
|
@ -1012,9 +1013,26 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
|
|||
afe := func(tss []*timeseries, modifier *metricsql.ModifierExpr) []*timeseries {
|
||||
// Sort series by metricName in order to get consistent set of output series
|
||||
// across multiple calls to limitk() function.
|
||||
sort.Slice(tss, func(i, j int) bool {
|
||||
return metricNameLess(&tss[i].MetricName, &tss[j].MetricName)
|
||||
// Sort series by hash in order to guarantee uniform selection across series.
|
||||
type hashSeries struct {
|
||||
h uint64
|
||||
ts *timeseries
|
||||
}
|
||||
hss := make([]hashSeries, len(tss))
|
||||
d := xxhash.New()
|
||||
for i, ts := range tss {
|
||||
h := getHash(d, &ts.MetricName)
|
||||
hss[i] = hashSeries{
|
||||
h: h,
|
||||
ts: ts,
|
||||
}
|
||||
}
|
||||
sort.Slice(hss, func(i, j int) bool {
|
||||
return hss[i].h < hss[j].h
|
||||
})
|
||||
for i, hs := range hss {
|
||||
tss[i] = hs.ts
|
||||
}
|
||||
if len(tss) > maxK {
|
||||
tss = tss[:maxK]
|
||||
}
|
||||
|
@ -1032,6 +1050,17 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
|
|||
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
|
||||
}
|
||||
|
||||
func getHash(d *xxhash.Digest, mn *storage.MetricName) uint64 {
|
||||
d.Reset()
|
||||
_, _ = d.Write(mn.MetricGroup)
|
||||
for _, tag := range mn.Tags {
|
||||
_, _ = d.Write(tag.Key)
|
||||
_, _ = d.Write(tag.Value)
|
||||
}
|
||||
return d.Sum64()
|
||||
|
||||
}
|
||||
|
||||
func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||
args := afa.args
|
||||
if len(args) < 3 {
|
||||
|
|
Loading…
Reference in a new issue