From 192c07f76a1d180784db36764f66d9f03c2d5875 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 17 Oct 2024 21:19:16 +0200 Subject: [PATCH] lib/logstorage: optimize performance for `top` pipe when it is applied to a field with millions of unique values - Use parallel merge of per-CPU shard results. This improves merge performance on multi-CPU systems. - Use topN heap sort of per-shard results. This improves performance when results contain millions of entries. --- docs/VictoriaLogs/CHANGELOG.md | 1 + lib/logstorage/pipe_top.go | 226 ++++++++++++++++++++++++++++----- 2 files changed, 193 insertions(+), 34 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index a16a6056b..8655f9f84 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring). +* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on multi-CPU hosts when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster now when `user_id` field contains millions of unique values. ## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs) diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index 1b8677934..77886be77 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -1,13 +1,17 @@ package logstorage import ( + "container/heap" "fmt" "slices" "sort" "strings" + "sync" "sync/atomic" "unsafe" + "github.com/cespare/xxhash/v2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -243,43 +247,24 @@ func (ptp *pipeTopProcessor) flush() error { if n := ptp.stateSizeBudget.Load(); n <= 0 { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) } - - // merge state across shards - shards := ptp.shards - m := shards[0].getM() - shards = shards[1:] - for i := range shards { - if needStop(ptp.stopCh) { - return nil - } - - for k, pHitsSrc := range shards[i].getM() { - pHits, ok := m[k] - if !ok { - m[k] = pHitsSrc - } else { - *pHits += *pHitsSrc - } - } + limit := ptp.pt.limit + if limit == 0 { + return nil } - // select top entries with the biggest number of hits - entries := make([]pipeTopEntry, 0, len(m)) - for k, pHits := range m { - entries = append(entries, pipeTopEntry{ - k: k, - hits: *pHits, - }) - } - sort.Slice(entries, func(i, j int) bool { - a, b := &entries[i], &entries[j] - if a.hits == b.hits { - return a.k < b.k + // merge state across shards in parallel + var entries []*pipeTopEntry + if len(ptp.shards) == 1 { + entries = getTopEntries(ptp.shards[0].getM(), limit, ptp.stopCh) + } else { + es, err := ptp.getTopEntriesParallel(limit) + if err != nil { + return err } - return a.hits > b.hits - }) - if uint64(len(entries)) > ptp.pt.limit { - entries = entries[:ptp.pt.limit] + entries = es + } + if needStop(ptp.stopCh) { + return nil } // write result @@ -373,11 +358,184 @@ func (ptp *pipeTopProcessor) flush() error { return nil } +func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntry, error) { + shards := ptp.shards + shardsLen := len(shards) + + var wg sync.WaitGroup + perShardMaps := make([][]map[string]*uint64, shardsLen) + for i := range shards { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + shardMaps := make([]map[string]*uint64, shardsLen) + for i := range shardMaps { + shardMaps[i] = make(map[string]*uint64) + } + + n := int64(0) + for k, pHitsSrc := range shards[idx].getM() { + if needStop(ptp.stopCh) { + return + } + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k)) + m := shardMaps[h%uint64(len(shardMaps))] + n += updatePipeTopMap(m, k, pHitsSrc) + if n > stateSizeBudgetChunk { + if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 { + return + } + n = 0 + } + } + ptp.stateSizeBudget.Add(-n) + + perShardMaps[idx] = shardMaps + shards[idx].m = nil + }(i) + } + wg.Wait() + if needStop(ptp.stopCh) { + return nil, nil + } + if n := ptp.stateSizeBudget.Load(); n < 0 { + return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) + } + + entriess := make([][]*pipeTopEntry, shardsLen) + for i := range entriess { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + m := perShardMaps[0][idx] + n := int64(0) + for _, shardMaps := range perShardMaps[1:] { + for k, pHitsSrc := range shardMaps[idx] { + if needStop(ptp.stopCh) { + return + } + n += updatePipeTopMap(m, k, pHitsSrc) + if n > stateSizeBudgetChunk { + if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 { + return + } + n = 0 + } + } + } + ptp.stateSizeBudget.Add(-n) + + entriess[idx] = getTopEntries(m, ptp.pt.limit, ptp.stopCh) + }(i) + } + wg.Wait() + if needStop(ptp.stopCh) { + return nil, nil + } + if n := ptp.stateSizeBudget.Load(); n < 0 { + return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) + } + + // merge entriess + entries := entriess[0] + for _, es := range entriess[1:] { + entries = append(entries, es...) + } + sort.Slice(entries, func(i, j int) bool { + return entries[j].less(entries[i]) + }) + if uint64(len(entries)) > limit { + entries = entries[:limit] + } + return entries, nil +} + +func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry { + if limit == 0 { + return nil + } + + var eh topEntriesHeap + for k, pHits := range m { + if needStop(stopCh) { + return nil + } + + e := pipeTopEntry{ + k: k, + hits: *pHits, + } + if uint64(len(eh)) < limit { + eCopy := e + heap.Push(&eh, &eCopy) + continue + } + if eh[0].less(&e) { + eCopy := e + eh[0] = &eCopy + heap.Fix(&eh, 0) + } + } + + result := ([]*pipeTopEntry)(eh) + for len(eh) > 0 { + x := heap.Pop(&eh) + result[len(eh)] = x.(*pipeTopEntry) + } + + return result +} + +func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 { + pHitsDst, ok := m[k] + if ok { + *pHitsDst += *pHitsSrc + return 0 + } + + m[k] = pHitsSrc + return int64(unsafe.Sizeof(k) + unsafe.Sizeof(pHitsSrc)) +} + +type topEntriesHeap []*pipeTopEntry + +func (h *topEntriesHeap) Less(i, j int) bool { + a := *h + return a[i].less(a[j]) +} +func (h *topEntriesHeap) Swap(i, j int) { + a := *h + a[i], a[j] = a[j], a[i] +} +func (h *topEntriesHeap) Len() int { + return len(*h) +} +func (h *topEntriesHeap) Push(v any) { + x := v.(*pipeTopEntry) + *h = append(*h, x) +} +func (h *topEntriesHeap) Pop() any { + a := *h + x := a[len(a)-1] + a[len(a)-1] = nil + *h = a[:len(a)-1] + return x +} + type pipeTopEntry struct { k string hits uint64 } +func (e *pipeTopEntry) less(r *pipeTopEntry) bool { + if e.hits == r.hits { + return e.k > r.k + } + return e.hits < r.hits +} + type pipeTopWriteContext struct { ptp *pipeTopProcessor rcs []resultColumn