mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: optimize performance for top
pipe when it is applied to a field with millions of unique values
- Use parallel merge of per-CPU shard results. This improves merge performance on multi-CPU systems.
- Use topN heap sort of per-shard results. This improves performance when results contain millions of entries.
(cherry picked from commit 192c07f76a
)
This commit is contained in:
parent
6f455c2dce
commit
1000ae437c
2 changed files with 193 additions and 34 deletions
|
@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
## tip
|
||||
|
||||
* FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring).
|
||||
* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on multi-CPU hosts when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster now when `user_id` field contains millions of unique values.
|
||||
|
||||
## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs)
|
||||
|
||||
|
|
|
@ -1,13 +1,17 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -243,43 +247,24 @@ func (ptp *pipeTopProcessor) flush() error {
|
|||
if n := ptp.stateSizeBudget.Load(); n <= 0 {
|
||||
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
// merge state across shards
|
||||
shards := ptp.shards
|
||||
m := shards[0].getM()
|
||||
shards = shards[1:]
|
||||
for i := range shards {
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for k, pHitsSrc := range shards[i].getM() {
|
||||
pHits, ok := m[k]
|
||||
if !ok {
|
||||
m[k] = pHitsSrc
|
||||
} else {
|
||||
*pHits += *pHitsSrc
|
||||
}
|
||||
}
|
||||
limit := ptp.pt.limit
|
||||
if limit == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// select top entries with the biggest number of hits
|
||||
entries := make([]pipeTopEntry, 0, len(m))
|
||||
for k, pHits := range m {
|
||||
entries = append(entries, pipeTopEntry{
|
||||
k: k,
|
||||
hits: *pHits,
|
||||
})
|
||||
}
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
a, b := &entries[i], &entries[j]
|
||||
if a.hits == b.hits {
|
||||
return a.k < b.k
|
||||
// merge state across shards in parallel
|
||||
var entries []*pipeTopEntry
|
||||
if len(ptp.shards) == 1 {
|
||||
entries = getTopEntries(ptp.shards[0].getM(), limit, ptp.stopCh)
|
||||
} else {
|
||||
es, err := ptp.getTopEntriesParallel(limit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return a.hits > b.hits
|
||||
})
|
||||
if uint64(len(entries)) > ptp.pt.limit {
|
||||
entries = entries[:ptp.pt.limit]
|
||||
entries = es
|
||||
}
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// write result
|
||||
|
@ -373,11 +358,184 @@ func (ptp *pipeTopProcessor) flush() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntry, error) {
|
||||
shards := ptp.shards
|
||||
shardsLen := len(shards)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
perShardMaps := make([][]map[string]*uint64, shardsLen)
|
||||
for i := range shards {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
|
||||
shardMaps := make([]map[string]*uint64, shardsLen)
|
||||
for i := range shardMaps {
|
||||
shardMaps[i] = make(map[string]*uint64)
|
||||
}
|
||||
|
||||
n := int64(0)
|
||||
for k, pHitsSrc := range shards[idx].getM() {
|
||||
if needStop(ptp.stopCh) {
|
||||
return
|
||||
}
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
|
||||
m := shardMaps[h%uint64(len(shardMaps))]
|
||||
n += updatePipeTopMap(m, k, pHitsSrc)
|
||||
if n > stateSizeBudgetChunk {
|
||||
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||
return
|
||||
}
|
||||
n = 0
|
||||
}
|
||||
}
|
||||
ptp.stateSizeBudget.Add(-n)
|
||||
|
||||
perShardMaps[idx] = shardMaps
|
||||
shards[idx].m = nil
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil, nil
|
||||
}
|
||||
if n := ptp.stateSizeBudget.Load(); n < 0 {
|
||||
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
entriess := make([][]*pipeTopEntry, shardsLen)
|
||||
for i := range entriess {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
|
||||
m := perShardMaps[0][idx]
|
||||
n := int64(0)
|
||||
for _, shardMaps := range perShardMaps[1:] {
|
||||
for k, pHitsSrc := range shardMaps[idx] {
|
||||
if needStop(ptp.stopCh) {
|
||||
return
|
||||
}
|
||||
n += updatePipeTopMap(m, k, pHitsSrc)
|
||||
if n > stateSizeBudgetChunk {
|
||||
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
|
||||
return
|
||||
}
|
||||
n = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
ptp.stateSizeBudget.Add(-n)
|
||||
|
||||
entriess[idx] = getTopEntries(m, ptp.pt.limit, ptp.stopCh)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil, nil
|
||||
}
|
||||
if n := ptp.stateSizeBudget.Load(); n < 0 {
|
||||
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
// merge entriess
|
||||
entries := entriess[0]
|
||||
for _, es := range entriess[1:] {
|
||||
entries = append(entries, es...)
|
||||
}
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[j].less(entries[i])
|
||||
})
|
||||
if uint64(len(entries)) > limit {
|
||||
entries = entries[:limit]
|
||||
}
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry {
|
||||
if limit == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var eh topEntriesHeap
|
||||
for k, pHits := range m {
|
||||
if needStop(stopCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
e := pipeTopEntry{
|
||||
k: k,
|
||||
hits: *pHits,
|
||||
}
|
||||
if uint64(len(eh)) < limit {
|
||||
eCopy := e
|
||||
heap.Push(&eh, &eCopy)
|
||||
continue
|
||||
}
|
||||
if eh[0].less(&e) {
|
||||
eCopy := e
|
||||
eh[0] = &eCopy
|
||||
heap.Fix(&eh, 0)
|
||||
}
|
||||
}
|
||||
|
||||
result := ([]*pipeTopEntry)(eh)
|
||||
for len(eh) > 0 {
|
||||
x := heap.Pop(&eh)
|
||||
result[len(eh)] = x.(*pipeTopEntry)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
|
||||
pHitsDst, ok := m[k]
|
||||
if ok {
|
||||
*pHitsDst += *pHitsSrc
|
||||
return 0
|
||||
}
|
||||
|
||||
m[k] = pHitsSrc
|
||||
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(pHitsSrc))
|
||||
}
|
||||
|
||||
type topEntriesHeap []*pipeTopEntry
|
||||
|
||||
func (h *topEntriesHeap) Less(i, j int) bool {
|
||||
a := *h
|
||||
return a[i].less(a[j])
|
||||
}
|
||||
func (h *topEntriesHeap) Swap(i, j int) {
|
||||
a := *h
|
||||
a[i], a[j] = a[j], a[i]
|
||||
}
|
||||
func (h *topEntriesHeap) Len() int {
|
||||
return len(*h)
|
||||
}
|
||||
func (h *topEntriesHeap) Push(v any) {
|
||||
x := v.(*pipeTopEntry)
|
||||
*h = append(*h, x)
|
||||
}
|
||||
func (h *topEntriesHeap) Pop() any {
|
||||
a := *h
|
||||
x := a[len(a)-1]
|
||||
a[len(a)-1] = nil
|
||||
*h = a[:len(a)-1]
|
||||
return x
|
||||
}
|
||||
|
||||
type pipeTopEntry struct {
|
||||
k string
|
||||
hits uint64
|
||||
}
|
||||
|
||||
func (e *pipeTopEntry) less(r *pipeTopEntry) bool {
|
||||
if e.hits == r.hits {
|
||||
return e.k > r.k
|
||||
}
|
||||
return e.hits < r.hits
|
||||
}
|
||||
|
||||
type pipeTopWriteContext struct {
|
||||
ptp *pipeTopProcessor
|
||||
rcs []resultColumn
|
||||
|
|
Loading…
Reference in a new issue