From c4b2fdff700471f48f542fb1b874a6e12ca71131 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 17 Oct 2024 22:47:52 +0200 Subject: [PATCH] lib/logstorage: optimize 'stats by(...)' calculations for by(...) fields with millions of unique values on multi-CPU systems - Parallelize merging of per-CPU `stats by(...)` result shards. - Parallelize writing `stats by(...)` results to the next pipe. --- docs/VictoriaLogs/CHANGELOG.md | 3 +- lib/logstorage/pipe_stats.go | 192 +++++++++++++++++++++++++++------ lib/logstorage/pipe_top.go | 64 ++++++----- 3 files changed, 200 insertions(+), 59 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 8655f9f84..3190ac63f 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,7 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring). -* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on multi-CPU hosts when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster now when `user_id` field contains millions of unique values. +* FEATURE: improve [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) performance on systems with many CPU cores when `by(...)` fields contain big number of unique values. For example, `_time:1d | stats by (user_id) count() x` should be executed much faster when `user_id` field contains millions of unique values. +* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on systems with many CPU cores when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster when `user_id` field contains millions of unique values. ## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 0f44f2403..7cf723a1b 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -3,9 +3,12 @@ package logstorage import ( "fmt" "strings" + "sync" "sync/atomic" "unsafe" + "github.com/cespare/xxhash/v2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -425,40 +428,38 @@ func (psp *pipeStatsProcessor) flush() error { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) } - // Merge states across shards - shards := psp.shards - shardMain := &shards[0] - shardMain.init() - m := shardMain.m - shards = shards[1:] - for i := range shards { - shard := &shards[i] - for key, psg := range shard.m { - // shard.m may be quite big, so this loop can take a lot of time and CPU. - // Stop processing data as soon as stopCh is closed without wasting additional CPU time. - if needStop(psp.stopCh) { - return nil - } - - spgBase := m[key] - if spgBase == nil { - m[key] = psg - } else { - for i, sfp := range spgBase.sfps { - sfp.mergeState(psg.sfps[i]) - } - } - } + // Merge states across shards in parallel + ms, err := psp.mergeShardsParallel() + if err != nil { + return err + } + if needStop(psp.stopCh) { + return nil } - // Write per-group states to ppNext - byFields := psp.ps.byFields - if len(byFields) == 0 && len(m) == 0 { + if len(psp.ps.byFields) == 0 && len(ms) == 0 { // Special case - zero matching rows. - _ = shardMain.getPipeStatsGroup(nil) - m = shardMain.m + psp.shards[0].init() + _ = psp.shards[0].getPipeStatsGroup(nil) + ms = append(ms, psp.shards[0].m) } + // Write the calculated stats in parallel to the next pipe. + var wg sync.WaitGroup + for i, m := range ms { + wg.Add(1) + go func(workerID uint) { + defer wg.Done() + psp.writeShardData(workerID, m) + }(uint(i)) + } + wg.Wait() + + return nil +} + +func (psp *pipeStatsProcessor) writeShardData(workerID uint, m map[string]*pipeStatsGroup) { + byFields := psp.ps.byFields rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs)) for _, bf := range byFields { rcs = appendResultColumnWithName(rcs, bf.name) @@ -475,7 +476,7 @@ func (psp *pipeStatsProcessor) flush() error { // m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. if needStop(psp.stopCh) { - return nil + return } // Unmarshal values for byFields from key. @@ -511,7 +512,7 @@ func (psp *pipeStatsProcessor) flush() error { if valuesLen >= 1_000_000 { br.setResultColumns(rcs, rowsCount) rowsCount = 0 - psp.ppNext.writeBlock(0, &br) + psp.ppNext.writeBlock(workerID, &br) br.reset() for i := range rcs { rcs[i].resetValues() @@ -521,9 +522,134 @@ func (psp *pipeStatsProcessor) flush() error { } br.setResultColumns(rcs, rowsCount) - psp.ppNext.writeBlock(0, &br) + psp.ppNext.writeBlock(workerID, &br) +} - return nil +func (psp *pipeStatsProcessor) mergeShardsParallel() ([]map[string]*pipeStatsGroup, error) { + shards := psp.shards + shardsLen := len(shards) + + if shardsLen == 1 { + var ms []map[string]*pipeStatsGroup + shards[0].init() + if len(shards[0].m) > 0 { + ms = append(ms, shards[0].m) + } + return ms, nil + } + + var wg sync.WaitGroup + perShardMaps := make([][]map[string]*pipeStatsGroup, shardsLen) + for i := range shards { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + shardMaps := make([]map[string]*pipeStatsGroup, shardsLen) + for i := range shardMaps { + shardMaps[i] = make(map[string]*pipeStatsGroup) + } + + shards[idx].init() + + n := int64(0) + nTotal := int64(0) + for k, psg := range shards[idx].m { + if needStop(psp.stopCh) { + return + } + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k)) + m := shardMaps[h%uint64(len(shardMaps))] + n += updatePipeStatsMap(m, k, psg) + if n > stateSizeBudgetChunk { + if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 { + return + } + nTotal += n + n = 0 + } + } + nTotal += n + psp.stateSizeBudget.Add(-n) + + perShardMaps[idx] = shardMaps + + // Clean the original map and return its state size budget back. + shards[idx].m = nil + psp.stateSizeBudget.Add(nTotal) + }(i) + } + wg.Wait() + if needStop(psp.stopCh) { + return nil, nil + } + if n := psp.stateSizeBudget.Load(); n < 0 { + return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) + } + + // Merge per-shard entries into perShardMaps[0] + for i := range perShardMaps { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + m := perShardMaps[0][idx] + for i := 1; i < len(perShardMaps); i++ { + n := int64(0) + nTotal := int64(0) + for k, psg := range perShardMaps[i][idx] { + if needStop(psp.stopCh) { + return + } + n += updatePipeStatsMap(m, k, psg) + if n > stateSizeBudgetChunk { + if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 { + return + } + nTotal += n + n = 0 + } + } + nTotal += n + psp.stateSizeBudget.Add(-n) + + // Clean the original map and return its state size budget back. + perShardMaps[i][idx] = nil + psp.stateSizeBudget.Add(nTotal) + } + }(i) + } + wg.Wait() + if needStop(psp.stopCh) { + return nil, nil + } + if n := psp.stateSizeBudget.Load(); n < 0 { + return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) + } + + // Filter out maps without entries + ms := perShardMaps[0] + result := ms[:0] + for _, m := range ms { + if len(m) > 0 { + result = append(result, m) + } + } + + return result, nil +} + +func updatePipeStatsMap(m map[string]*pipeStatsGroup, k string, psgSrc *pipeStatsGroup) int64 { + psgDst := m[k] + if psgDst != nil { + for i, sfp := range psgDst.sfps { + sfp.mergeState(psgSrc.sfps[i]) + } + return 0 + } + + m[k] = psgSrc + return int64(unsafe.Sizeof(k) + unsafe.Sizeof(psgSrc)) } func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index 77886be77..df1f59a3c 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -201,8 +201,8 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) { func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) { m := shard.getM() - pHits, ok := m[v] - if !ok { + pHits := m[v] + if pHits == nil { vCopy := strings.Clone(v) hits := uint64(0) pHits = &hits @@ -247,21 +247,11 @@ func (ptp *pipeTopProcessor) flush() error { if n := ptp.stateSizeBudget.Load(); n <= 0 { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) } - limit := ptp.pt.limit - if limit == 0 { - return nil - } // merge state across shards in parallel - var entries []*pipeTopEntry - if len(ptp.shards) == 1 { - entries = getTopEntries(ptp.shards[0].getM(), limit, ptp.stopCh) - } else { - es, err := ptp.getTopEntriesParallel(limit) - if err != nil { - return err - } - entries = es + entries, err := ptp.mergeShardsParallel() + if err != nil { + return err } if needStop(ptp.stopCh) { return nil @@ -358,9 +348,18 @@ func (ptp *pipeTopProcessor) flush() error { return nil } -func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntry, error) { +func (ptp *pipeTopProcessor) mergeShardsParallel() ([]*pipeTopEntry, error) { + limit := ptp.pt.limit + if limit == 0 { + return nil, nil + } + shards := ptp.shards shardsLen := len(shards) + if shardsLen == 1 { + entries := getTopEntries(shards[0].getM(), limit, ptp.stopCh) + return entries, nil + } var wg sync.WaitGroup perShardMaps := make([][]map[string]*uint64, shardsLen) @@ -375,24 +374,30 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr } n := int64(0) - for k, pHitsSrc := range shards[idx].getM() { + nTotal := int64(0) + for k, pHits := range shards[idx].getM() { if needStop(ptp.stopCh) { return } h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k)) m := shardMaps[h%uint64(len(shardMaps))] - n += updatePipeTopMap(m, k, pHitsSrc) + n += updatePipeTopMap(m, k, pHits) if n > stateSizeBudgetChunk { if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 { return } + nTotal += n n = 0 } } + nTotal += n ptp.stateSizeBudget.Add(-n) perShardMaps[idx] = shardMaps + + // Clean the original map and return its state size budget back. shards[idx].m = nil + ptp.stateSizeBudget.Add(nTotal) }(i) } wg.Wait() @@ -403,6 +408,7 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20)) } + // Obtain topN entries per each shard entriess := make([][]*pipeTopEntry, shardsLen) for i := range entriess { wg.Add(1) @@ -410,22 +416,30 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr defer wg.Done() m := perShardMaps[0][idx] - n := int64(0) - for _, shardMaps := range perShardMaps[1:] { - for k, pHitsSrc := range shardMaps[idx] { + for i := 1; i < len(perShardMaps); i++ { + n := int64(0) + nTotal := int64(0) + for k, pHits := range perShardMaps[i][idx] { if needStop(ptp.stopCh) { return } - n += updatePipeTopMap(m, k, pHitsSrc) + n += updatePipeTopMap(m, k, pHits) if n > stateSizeBudgetChunk { if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 { return } + nTotal += n n = 0 } } + nTotal += n + ptp.stateSizeBudget.Add(-n) + + // Clean the original map and return its state size budget back. + perShardMaps[i][idx] = nil + ptp.stateSizeBudget.Add(nTotal) } - ptp.stateSizeBudget.Add(-n) + perShardMaps[0][idx] = nil entriess[idx] = getTopEntries(m, ptp.pt.limit, ptp.stopCh) }(i) @@ -489,8 +503,8 @@ func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) [ } func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 { - pHitsDst, ok := m[k] - if ok { + pHitsDst := m[k] + if pHitsDst != nil { *pHitsDst += *pHitsSrc return 0 }