mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: optimize buckets_limit(k, buckets)
for big number of buckets
This commit is contained in:
parent
cc735da814
commit
978c1e930e
1 changed files with 34 additions and 22 deletions
|
@ -303,9 +303,9 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
|
|
||||||
// Group timeseries by all MetricGroup+tags excluding `le` tag.
|
// Group timeseries by all MetricGroup+tags excluding `le` tag.
|
||||||
type x struct {
|
type x struct {
|
||||||
le float64
|
le float64
|
||||||
delta float64
|
hits float64
|
||||||
ts *timeseries
|
ts *timeseries
|
||||||
}
|
}
|
||||||
m := make(map[string][]x)
|
m := make(map[string][]x)
|
||||||
var b []byte
|
var b []byte
|
||||||
|
@ -333,28 +333,40 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
// Remove buckets with the smallest counters.
|
// Remove buckets with the smallest counters.
|
||||||
rvs := make([]*timeseries, 0, len(tss))
|
rvs := make([]*timeseries, 0, len(tss))
|
||||||
for _, leGroup := range m {
|
for _, leGroup := range m {
|
||||||
for len(leGroup) > limit {
|
if len(leGroup) <= limit {
|
||||||
// Remove a single bucket with the smallest sum.
|
// Fast path - the number of buckets doesn't exceed the given limit.
|
||||||
// TODO: optimize this dumb implementation a bit, since it may be slow on big number of buckets.
|
// Keep all the buckets as is.
|
||||||
sort.Slice(leGroup, func(i, j int) bool {
|
for _, xx := range leGroup {
|
||||||
return leGroup[i].le < leGroup[j].le
|
rvs = append(rvs, xx.ts)
|
||||||
})
|
|
||||||
for i := range leGroup {
|
|
||||||
leGroup[i].delta = 0
|
|
||||||
}
|
}
|
||||||
for n := range limits {
|
continue
|
||||||
prevValue := float64(0)
|
}
|
||||||
for i := range leGroup {
|
// Slow path - remove buckets with the smallest number of hits until their count reaches the limit.
|
||||||
xx := &leGroup[i]
|
|
||||||
value := xx.ts.Values[n]
|
// Calculate per-bucket hits.
|
||||||
xx.delta += value - prevValue
|
sort.Slice(leGroup, func(i, j int) bool {
|
||||||
prevValue = value
|
return leGroup[i].le < leGroup[j].le
|
||||||
|
})
|
||||||
|
for n := range limits {
|
||||||
|
prevValue := float64(0)
|
||||||
|
for i := range leGroup {
|
||||||
|
xx := &leGroup[i]
|
||||||
|
value := xx.ts.Values[n]
|
||||||
|
xx.hits += value - prevValue
|
||||||
|
prevValue = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for len(leGroup) > limit {
|
||||||
|
xxMinIdx := 0
|
||||||
|
for i, xx := range leGroup {
|
||||||
|
if xx.hits < leGroup[xxMinIdx].hits {
|
||||||
|
xxMinIdx = i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Slice(leGroup, func(i, j int) bool {
|
if xxMinIdx+1 < len(leGroup) {
|
||||||
return leGroup[i].delta < leGroup[j].delta
|
leGroup[xxMinIdx+1].hits += leGroup[xxMinIdx].hits
|
||||||
})
|
}
|
||||||
leGroup = leGroup[1:]
|
leGroup = append(leGroup[:xxMinIdx], leGroup[xxMinIdx+1:]...)
|
||||||
}
|
}
|
||||||
for _, xx := range leGroup {
|
for _, xx := range leGroup {
|
||||||
rvs = append(rvs, xx.ts)
|
rvs = append(rvs, xx.ts)
|
||||||
|
|
Loading…
Reference in a new issue