lib/logstorage: improve performance of top and field_values pipes on systems with many CPU cores

- Parallelize mering of per-CPU results.
- Parallelize writing the results to the next pipe.
This commit is contained in:
Aliaksandr Valialkin 2024-10-17 23:44:38 +02:00
parent c4b2fdff70
commit 78c6fb0883
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 187 additions and 39 deletions

View file

@ -17,7 +17,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring).
* FEATURE: improve [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) performance on systems with many CPU cores when `by(...)` fields contain big number of unique values. For example, `_time:1d | stats by (user_id) count() x` should be executed much faster when `user_id` field contains millions of unique values.
* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on systems with many CPU cores when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster when `user_id` field contains millions of unique values.
* FEATURE: improve performance for [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe), [`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe) and [`field_values`](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe) pipes on systems with many CPU cores when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster when `user_id` field contains millions of unique values.
## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs)

View file

@ -4,9 +4,12 @@ import (
"fmt"
"slices"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -263,31 +266,63 @@ func (pup *pipeUniqProcessor) flush() error {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20))
}
// merge state across shards
shards := pup.shards
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
// merge state across shards in parallel
ms, err := pup.mergeShardsParallel()
if err != nil {
return err
}
if needStop(pup.stopCh) {
return nil
}
for k, pHitsSrc := range shards[i].getM() {
pHits, ok := m[k]
if !ok {
m[k] = pHitsSrc
} else {
*pHits += *pHitsSrc
}
}
resetHits := false
if limit := pup.pu.limit; limit > 0 {
// Trim the number of entries according to the given limit
entriesLen := 0
result := ms[:0]
for _, m := range ms {
entriesLen += len(m)
if uint64(entriesLen) <= limit {
result = append(result, m)
continue
}
// There is little sense in returning partial hits when the limit on the number of unique entries is reached.
// It is better from UX experience is to return zero hits instead.
resetHits := pup.pu.limit > 0 && uint64(len(m)) > pup.pu.limit
// There is little sense in returning partial hits when the limit on the number of unique entries is reached,
// since arbitrary number of unique entries and hits for these entries could be skipped.
// It is better to return zero hits instead of misleading hits results.
resetHits = true
for k := range m {
delete(m, k)
entriesLen--
if uint64(entriesLen) <= limit {
break
}
}
if len(m) > 0 {
result = append(result, m)
}
break
}
ms = result
}
// write result
// Write the calculated stats in parallel to the next pipe.
var wg sync.WaitGroup
for i, m := range ms {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
pup.writeShardData(workerID, m, resetHits)
}(uint(i))
}
wg.Wait()
return nil
}
func (pup *pipeUniqProcessor) writeShardData(workerID uint, m map[string]*uint64, resetHits bool) {
wctx := &pipeUniqWriteContext{
workerID: workerID,
pup: pup,
}
byFields := pup.pu.byFields
@ -311,7 +346,7 @@ func (pup *pipeUniqProcessor) flush() error {
if len(byFields) == 0 {
for k, pHits := range m {
if needStop(pup.stopCh) {
return nil
return
}
rowFields = rowFields[:0]
@ -341,7 +376,7 @@ func (pup *pipeUniqProcessor) flush() error {
fieldName := byFields[0]
for k, pHits := range m {
if needStop(pup.stopCh) {
return nil
return
}
rowFields = append(rowFields[:0], Field{
@ -354,7 +389,7 @@ func (pup *pipeUniqProcessor) flush() error {
} else {
for k, pHits := range m {
if needStop(pup.stopCh) {
return nil
return
}
rowFields = rowFields[:0]
@ -379,18 +414,136 @@ func (pup *pipeUniqProcessor) flush() error {
}
wctx.flush()
}
return nil
func (pup *pipeUniqProcessor) mergeShardsParallel() ([]map[string]*uint64, error) {
shards := pup.shards
shardsLen := len(shards)
if shardsLen == 1 {
m := shards[0].getM()
var ms []map[string]*uint64
if len(m) > 0 {
ms = append(ms, m)
}
return ms, nil
}
var wg sync.WaitGroup
perShardMaps := make([][]map[string]*uint64, shardsLen)
for i := range shards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
shardMaps := make([]map[string]*uint64, shardsLen)
for i := range shardMaps {
shardMaps[i] = make(map[string]*uint64)
}
n := int64(0)
nTotal := int64(0)
for k, pHits := range shards[idx].getM() {
if needStop(pup.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
m := shardMaps[h%uint64(len(shardMaps))]
n += updatePipeUniqMap(m, k, pHits)
if n > stateSizeBudgetChunk {
if nRemaining := pup.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
pup.stateSizeBudget.Add(-n)
perShardMaps[idx] = shardMaps
// Clean the original map and return its state size budget back.
shards[idx].m = nil
pup.stateSizeBudget.Add(nTotal)
}(i)
}
wg.Wait()
if needStop(pup.stopCh) {
return nil, nil
}
if n := pup.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20))
}
// Merge per-shard entries into perShardMaps[0]
for i := range perShardMaps {
wg.Add(1)
go func(idx int) {
defer wg.Done()
m := perShardMaps[0][idx]
for i := 1; i < len(perShardMaps); i++ {
n := int64(0)
nTotal := int64(0)
for k, psg := range perShardMaps[i][idx] {
if needStop(pup.stopCh) {
return
}
n += updatePipeUniqMap(m, k, psg)
if n > stateSizeBudgetChunk {
if nRemaining := pup.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
pup.stateSizeBudget.Add(-n)
// Clean the original map and return its state size budget back.
perShardMaps[i][idx] = nil
pup.stateSizeBudget.Add(nTotal)
}
}(i)
}
wg.Wait()
if needStop(pup.stopCh) {
return nil, nil
}
if n := pup.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20))
}
// Filter out maps without entries
ms := perShardMaps[0]
result := ms[:0]
for _, m := range ms {
if len(m) > 0 {
result = append(result, m)
}
}
return result, nil
}
func updatePipeUniqMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
pHitsDst := m[k]
if pHitsDst != nil {
*pHitsDst += *pHitsSrc
return 0
}
m[k] = pHitsSrc
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(pHitsSrc))
}
type pipeUniqWriteContext struct {
workerID uint
pup *pipeUniqProcessor
rcs []resultColumn
br blockResult
// rowsWritten is the total number of rows passed to writeRow.
rowsWritten uint64
// rowsCount is the number of rows in the current block
rowsCount int
@ -399,11 +552,6 @@ type pipeUniqWriteContext struct {
}
func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) {
if limit := wctx.pup.pu.limit; limit > 0 && wctx.rowsWritten >= limit {
return
}
wctx.rowsWritten++
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(rowFields)
@ -447,7 +595,7 @@ func (wctx *pipeUniqWriteContext) flush() {
// Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.pup.ppNext.writeBlock(0, br)
wctx.pup.ppNext.writeBlock(wctx.workerID, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()