diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 3190ac63f..780fa683b 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -17,7 +17,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring). * FEATURE: improve [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) performance on systems with many CPU cores when `by(...)` fields contain big number of unique values. For example, `_time:1d | stats by (user_id) count() x` should be executed much faster when `user_id` field contains millions of unique values. -* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on systems with many CPU cores when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster when `user_id` field contains millions of unique values. +* FEATURE: improve performance for [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe), [`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe) and [`field_values`](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe) pipes on systems with many CPU cores when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster when `user_id` field contains millions of unique values. ## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs) diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index dd199d1cf..5228eb042 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -4,9 +4,12 @@ import ( "fmt" "slices" "strings" + "sync" "sync/atomic" "unsafe" + "github.com/cespare/xxhash/v2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -263,32 +266,64 @@ func (pup *pipeUniqProcessor) flush() error { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20)) } - // merge state across shards - shards := pup.shards - m := shards[0].getM() - shards = shards[1:] - for i := range shards { - if needStop(pup.stopCh) { - return nil - } - - for k, pHitsSrc := range shards[i].getM() { - pHits, ok := m[k] - if !ok { - m[k] = pHitsSrc - } else { - *pHits += *pHitsSrc - } - } + // merge state across shards in parallel + ms, err := pup.mergeShardsParallel() + if err != nil { + return err + } + if needStop(pup.stopCh) { + return nil } - // There is little sense in returning partial hits when the limit on the number of unique entries is reached. - // It is better from UX experience is to return zero hits instead. - resetHits := pup.pu.limit > 0 && uint64(len(m)) > pup.pu.limit + resetHits := false + if limit := pup.pu.limit; limit > 0 { + // Trim the number of entries according to the given limit + entriesLen := 0 + result := ms[:0] + for _, m := range ms { + entriesLen += len(m) + if uint64(entriesLen) <= limit { + result = append(result, m) + continue + } - // write result + // There is little sense in returning partial hits when the limit on the number of unique entries is reached, + // since arbitrary number of unique entries and hits for these entries could be skipped. + // It is better to return zero hits instead of misleading hits results. + resetHits = true + for k := range m { + delete(m, k) + entriesLen-- + if uint64(entriesLen) <= limit { + break + } + } + if len(m) > 0 { + result = append(result, m) + } + break + } + ms = result + } + + // Write the calculated stats in parallel to the next pipe. + var wg sync.WaitGroup + for i, m := range ms { + wg.Add(1) + go func(workerID uint) { + defer wg.Done() + pup.writeShardData(workerID, m, resetHits) + }(uint(i)) + } + wg.Wait() + + return nil +} + +func (pup *pipeUniqProcessor) writeShardData(workerID uint, m map[string]*uint64, resetHits bool) { wctx := &pipeUniqWriteContext{ - pup: pup, + workerID: workerID, + pup: pup, } byFields := pup.pu.byFields var rowFields []Field @@ -311,7 +346,7 @@ func (pup *pipeUniqProcessor) flush() error { if len(byFields) == 0 { for k, pHits := range m { if needStop(pup.stopCh) { - return nil + return } rowFields = rowFields[:0] @@ -341,7 +376,7 @@ func (pup *pipeUniqProcessor) flush() error { fieldName := byFields[0] for k, pHits := range m { if needStop(pup.stopCh) { - return nil + return } rowFields = append(rowFields[:0], Field{ @@ -354,7 +389,7 @@ func (pup *pipeUniqProcessor) flush() error { } else { for k, pHits := range m { if needStop(pup.stopCh) { - return nil + return } rowFields = rowFields[:0] @@ -379,17 +414,135 @@ func (pup *pipeUniqProcessor) flush() error { } wctx.flush() +} - return nil +func (pup *pipeUniqProcessor) mergeShardsParallel() ([]map[string]*uint64, error) { + shards := pup.shards + shardsLen := len(shards) + if shardsLen == 1 { + m := shards[0].getM() + var ms []map[string]*uint64 + if len(m) > 0 { + ms = append(ms, m) + } + return ms, nil + } + + var wg sync.WaitGroup + perShardMaps := make([][]map[string]*uint64, shardsLen) + for i := range shards { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + shardMaps := make([]map[string]*uint64, shardsLen) + for i := range shardMaps { + shardMaps[i] = make(map[string]*uint64) + } + + n := int64(0) + nTotal := int64(0) + for k, pHits := range shards[idx].getM() { + if needStop(pup.stopCh) { + return + } + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k)) + m := shardMaps[h%uint64(len(shardMaps))] + n += updatePipeUniqMap(m, k, pHits) + if n > stateSizeBudgetChunk { + if nRemaining := pup.stateSizeBudget.Add(-n); nRemaining < 0 { + return + } + nTotal += n + n = 0 + } + } + nTotal += n + pup.stateSizeBudget.Add(-n) + + perShardMaps[idx] = shardMaps + + // Clean the original map and return its state size budget back. + shards[idx].m = nil + pup.stateSizeBudget.Add(nTotal) + }(i) + } + wg.Wait() + if needStop(pup.stopCh) { + return nil, nil + } + if n := pup.stateSizeBudget.Load(); n < 0 { + return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20)) + } + + // Merge per-shard entries into perShardMaps[0] + for i := range perShardMaps { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + m := perShardMaps[0][idx] + for i := 1; i < len(perShardMaps); i++ { + n := int64(0) + nTotal := int64(0) + for k, psg := range perShardMaps[i][idx] { + if needStop(pup.stopCh) { + return + } + n += updatePipeUniqMap(m, k, psg) + if n > stateSizeBudgetChunk { + if nRemaining := pup.stateSizeBudget.Add(-n); nRemaining < 0 { + return + } + nTotal += n + n = 0 + } + } + nTotal += n + pup.stateSizeBudget.Add(-n) + + // Clean the original map and return its state size budget back. + perShardMaps[i][idx] = nil + pup.stateSizeBudget.Add(nTotal) + } + }(i) + } + wg.Wait() + if needStop(pup.stopCh) { + return nil, nil + } + if n := pup.stateSizeBudget.Load(); n < 0 { + return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20)) + } + + // Filter out maps without entries + ms := perShardMaps[0] + result := ms[:0] + for _, m := range ms { + if len(m) > 0 { + result = append(result, m) + } + } + + return result, nil +} + +func updatePipeUniqMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 { + pHitsDst := m[k] + if pHitsDst != nil { + *pHitsDst += *pHitsSrc + return 0 + } + + m[k] = pHitsSrc + return int64(unsafe.Sizeof(k) + unsafe.Sizeof(pHitsSrc)) } type pipeUniqWriteContext struct { - pup *pipeUniqProcessor - rcs []resultColumn - br blockResult - - // rowsWritten is the total number of rows passed to writeRow. - rowsWritten uint64 + workerID uint + pup *pipeUniqProcessor + rcs []resultColumn + br blockResult // rowsCount is the number of rows in the current block rowsCount int @@ -399,11 +552,6 @@ type pipeUniqWriteContext struct { } func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) { - if limit := wctx.pup.pu.limit; limit > 0 && wctx.rowsWritten >= limit { - return - } - wctx.rowsWritten++ - rcs := wctx.rcs areEqualColumns := len(rcs) == len(rowFields) @@ -447,7 +595,7 @@ func (wctx *pipeUniqWriteContext) flush() { // Flush rcs to ppNext br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 - wctx.pup.ppNext.writeBlock(0, br) + wctx.pup.ppNext.writeBlock(wctx.workerID, br) br.reset() for i := range rcs { rcs[i].resetValues()