mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect/promql: optimize buckets_limit(k, buckets)
for big number of buckets
This commit is contained in:
parent
a090627059
commit
97b6f5d223
1 changed files with 34 additions and 22 deletions
|
@ -303,9 +303,9 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||
|
||||
// Group timeseries by all MetricGroup+tags excluding `le` tag.
|
||||
type x struct {
|
||||
le float64
|
||||
delta float64
|
||||
ts *timeseries
|
||||
le float64
|
||||
hits float64
|
||||
ts *timeseries
|
||||
}
|
||||
m := make(map[string][]x)
|
||||
var b []byte
|
||||
|
@ -333,28 +333,40 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||
// Remove buckets with the smallest counters.
|
||||
rvs := make([]*timeseries, 0, len(tss))
|
||||
for _, leGroup := range m {
|
||||
for len(leGroup) > limit {
|
||||
// Remove a single bucket with the smallest sum.
|
||||
// TODO: optimize this dumb implementation a bit, since it may be slow on big number of buckets.
|
||||
sort.Slice(leGroup, func(i, j int) bool {
|
||||
return leGroup[i].le < leGroup[j].le
|
||||
})
|
||||
for i := range leGroup {
|
||||
leGroup[i].delta = 0
|
||||
if len(leGroup) <= limit {
|
||||
// Fast path - the number of buckets doesn't exceed the given limit.
|
||||
// Keep all the buckets as is.
|
||||
for _, xx := range leGroup {
|
||||
rvs = append(rvs, xx.ts)
|
||||
}
|
||||
for n := range limits {
|
||||
prevValue := float64(0)
|
||||
for i := range leGroup {
|
||||
xx := &leGroup[i]
|
||||
value := xx.ts.Values[n]
|
||||
xx.delta += value - prevValue
|
||||
prevValue = value
|
||||
continue
|
||||
}
|
||||
// Slow path - remove buckets with the smallest number of hits until their count reaches the limit.
|
||||
|
||||
// Calculate per-bucket hits.
|
||||
sort.Slice(leGroup, func(i, j int) bool {
|
||||
return leGroup[i].le < leGroup[j].le
|
||||
})
|
||||
for n := range limits {
|
||||
prevValue := float64(0)
|
||||
for i := range leGroup {
|
||||
xx := &leGroup[i]
|
||||
value := xx.ts.Values[n]
|
||||
xx.hits += value - prevValue
|
||||
prevValue = value
|
||||
}
|
||||
}
|
||||
for len(leGroup) > limit {
|
||||
xxMinIdx := 0
|
||||
for i, xx := range leGroup {
|
||||
if xx.hits < leGroup[xxMinIdx].hits {
|
||||
xxMinIdx = i
|
||||
}
|
||||
}
|
||||
sort.Slice(leGroup, func(i, j int) bool {
|
||||
return leGroup[i].delta < leGroup[j].delta
|
||||
})
|
||||
leGroup = leGroup[1:]
|
||||
if xxMinIdx+1 < len(leGroup) {
|
||||
leGroup[xxMinIdx+1].hits += leGroup[xxMinIdx].hits
|
||||
}
|
||||
leGroup = append(leGroup[:xxMinIdx], leGroup[xxMinIdx+1:]...)
|
||||
}
|
||||
for _, xx := range leGroup {
|
||||
rvs = append(rvs, xx.ts)
|
||||
|
|
Loading…
Reference in a new issue