lib/logstorage: optimize 'stats by(...)' calculations for by(...) fields with millions of unique values on multi-CPU systems

- Parallelize merging of per-CPU `stats by(...)` result shards.
- Parallelize writing `stats by(...)` results to the next pipe.
This commit is contained in:
Aliaksandr Valialkin 2024-10-17 22:47:52 +02:00
parent 192c07f76a
commit c4b2fdff70
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 200 additions and 59 deletions

View file

@ -16,7 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add basic [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml) for VictoriaLogs process. See details at [monitoring docs](https://docs.victoriametrics.com/victorialogs/index.html#monitoring).
* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on multi-CPU hosts when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster now when `user_id` field contains millions of unique values.
* FEATURE: improve [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) performance on systems with many CPU cores when `by(...)` fields contain big number of unique values. For example, `_time:1d | stats by (user_id) count() x` should be executed much faster when `user_id` field contains millions of unique values.
* FEATURE: improve [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) performance on systems with many CPU cores when it is applied to [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with big number of unique values. For example, `_time:1d | top 5 (user_id)` should be executed much faster when `user_id` field contains millions of unique values.
## [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs)

View file

@ -3,9 +3,12 @@ package logstorage
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -425,40 +428,38 @@ func (psp *pipeStatsProcessor) flush() error {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Merge states across shards
shards := psp.shards
shardMain := &shards[0]
shardMain.init()
m := shardMain.m
shards = shards[1:]
for i := range shards {
shard := &shards[i]
for key, psg := range shard.m {
// shard.m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
// Merge states across shards in parallel
ms, err := psp.mergeShardsParallel()
if err != nil {
return err
}
if needStop(psp.stopCh) {
return nil
}
spgBase := m[key]
if spgBase == nil {
m[key] = psg
} else {
for i, sfp := range spgBase.sfps {
sfp.mergeState(psg.sfps[i])
}
}
}
}
// Write per-group states to ppNext
byFields := psp.ps.byFields
if len(byFields) == 0 && len(m) == 0 {
if len(psp.ps.byFields) == 0 && len(ms) == 0 {
// Special case - zero matching rows.
_ = shardMain.getPipeStatsGroup(nil)
m = shardMain.m
psp.shards[0].init()
_ = psp.shards[0].getPipeStatsGroup(nil)
ms = append(ms, psp.shards[0].m)
}
// Write the calculated stats in parallel to the next pipe.
var wg sync.WaitGroup
for i, m := range ms {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
psp.writeShardData(workerID, m)
}(uint(i))
}
wg.Wait()
return nil
}
func (psp *pipeStatsProcessor) writeShardData(workerID uint, m map[string]*pipeStatsGroup) {
byFields := psp.ps.byFields
rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs))
for _, bf := range byFields {
rcs = appendResultColumnWithName(rcs, bf.name)
@ -475,7 +476,7 @@ func (psp *pipeStatsProcessor) flush() error {
// m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
if needStop(psp.stopCh) {
return nil
return
}
// Unmarshal values for byFields from key.
@ -511,7 +512,7 @@ func (psp *pipeStatsProcessor) flush() error {
if valuesLen >= 1_000_000 {
br.setResultColumns(rcs, rowsCount)
rowsCount = 0
psp.ppNext.writeBlock(0, &br)
psp.ppNext.writeBlock(workerID, &br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
@ -521,9 +522,134 @@ func (psp *pipeStatsProcessor) flush() error {
}
br.setResultColumns(rcs, rowsCount)
psp.ppNext.writeBlock(0, &br)
psp.ppNext.writeBlock(workerID, &br)
}
return nil
func (psp *pipeStatsProcessor) mergeShardsParallel() ([]map[string]*pipeStatsGroup, error) {
shards := psp.shards
shardsLen := len(shards)
if shardsLen == 1 {
var ms []map[string]*pipeStatsGroup
shards[0].init()
if len(shards[0].m) > 0 {
ms = append(ms, shards[0].m)
}
return ms, nil
}
var wg sync.WaitGroup
perShardMaps := make([][]map[string]*pipeStatsGroup, shardsLen)
for i := range shards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
shardMaps := make([]map[string]*pipeStatsGroup, shardsLen)
for i := range shardMaps {
shardMaps[i] = make(map[string]*pipeStatsGroup)
}
shards[idx].init()
n := int64(0)
nTotal := int64(0)
for k, psg := range shards[idx].m {
if needStop(psp.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
m := shardMaps[h%uint64(len(shardMaps))]
n += updatePipeStatsMap(m, k, psg)
if n > stateSizeBudgetChunk {
if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
psp.stateSizeBudget.Add(-n)
perShardMaps[idx] = shardMaps
// Clean the original map and return its state size budget back.
shards[idx].m = nil
psp.stateSizeBudget.Add(nTotal)
}(i)
}
wg.Wait()
if needStop(psp.stopCh) {
return nil, nil
}
if n := psp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Merge per-shard entries into perShardMaps[0]
for i := range perShardMaps {
wg.Add(1)
go func(idx int) {
defer wg.Done()
m := perShardMaps[0][idx]
for i := 1; i < len(perShardMaps); i++ {
n := int64(0)
nTotal := int64(0)
for k, psg := range perShardMaps[i][idx] {
if needStop(psp.stopCh) {
return
}
n += updatePipeStatsMap(m, k, psg)
if n > stateSizeBudgetChunk {
if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
psp.stateSizeBudget.Add(-n)
// Clean the original map and return its state size budget back.
perShardMaps[i][idx] = nil
psp.stateSizeBudget.Add(nTotal)
}
}(i)
}
wg.Wait()
if needStop(psp.stopCh) {
return nil, nil
}
if n := psp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Filter out maps without entries
ms := perShardMaps[0]
result := ms[:0]
for _, m := range ms {
if len(m) > 0 {
result = append(result, m)
}
}
return result, nil
}
func updatePipeStatsMap(m map[string]*pipeStatsGroup, k string, psgSrc *pipeStatsGroup) int64 {
psgDst := m[k]
if psgDst != nil {
for i, sfp := range psgDst.sfps {
sfp.mergeState(psgSrc.sfps[i])
}
return 0
}
m[k] = psgSrc
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(psgSrc))
}
func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {

View file

@ -201,8 +201,8 @@ func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
m := shard.getM()
pHits, ok := m[v]
if !ok {
pHits := m[v]
if pHits == nil {
vCopy := strings.Clone(v)
hits := uint64(0)
pHits = &hits
@ -247,22 +247,12 @@ func (ptp *pipeTopProcessor) flush() error {
if n := ptp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
limit := ptp.pt.limit
if limit == 0 {
return nil
}
// merge state across shards in parallel
var entries []*pipeTopEntry
if len(ptp.shards) == 1 {
entries = getTopEntries(ptp.shards[0].getM(), limit, ptp.stopCh)
} else {
es, err := ptp.getTopEntriesParallel(limit)
entries, err := ptp.mergeShardsParallel()
if err != nil {
return err
}
entries = es
}
if needStop(ptp.stopCh) {
return nil
}
@ -358,9 +348,18 @@ func (ptp *pipeTopProcessor) flush() error {
return nil
}
func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntry, error) {
func (ptp *pipeTopProcessor) mergeShardsParallel() ([]*pipeTopEntry, error) {
limit := ptp.pt.limit
if limit == 0 {
return nil, nil
}
shards := ptp.shards
shardsLen := len(shards)
if shardsLen == 1 {
entries := getTopEntries(shards[0].getM(), limit, ptp.stopCh)
return entries, nil
}
var wg sync.WaitGroup
perShardMaps := make([][]map[string]*uint64, shardsLen)
@ -375,24 +374,30 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr
}
n := int64(0)
for k, pHitsSrc := range shards[idx].getM() {
nTotal := int64(0)
for k, pHits := range shards[idx].getM() {
if needStop(ptp.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
m := shardMaps[h%uint64(len(shardMaps))]
n += updatePipeTopMap(m, k, pHitsSrc)
n += updatePipeTopMap(m, k, pHits)
if n > stateSizeBudgetChunk {
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
ptp.stateSizeBudget.Add(-n)
perShardMaps[idx] = shardMaps
// Clean the original map and return its state size budget back.
shards[idx].m = nil
ptp.stateSizeBudget.Add(nTotal)
}(i)
}
wg.Wait()
@ -403,6 +408,7 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
}
// Obtain topN entries per each shard
entriess := make([][]*pipeTopEntry, shardsLen)
for i := range entriess {
wg.Add(1)
@ -410,23 +416,31 @@ func (ptp *pipeTopProcessor) getTopEntriesParallel(limit uint64) ([]*pipeTopEntr
defer wg.Done()
m := perShardMaps[0][idx]
for i := 1; i < len(perShardMaps); i++ {
n := int64(0)
for _, shardMaps := range perShardMaps[1:] {
for k, pHitsSrc := range shardMaps[idx] {
nTotal := int64(0)
for k, pHits := range perShardMaps[i][idx] {
if needStop(ptp.stopCh) {
return
}
n += updatePipeTopMap(m, k, pHitsSrc)
n += updatePipeTopMap(m, k, pHits)
if n > stateSizeBudgetChunk {
if nRemaining := ptp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
}
nTotal += n
ptp.stateSizeBudget.Add(-n)
// Clean the original map and return its state size budget back.
perShardMaps[i][idx] = nil
ptp.stateSizeBudget.Add(nTotal)
}
perShardMaps[0][idx] = nil
entriess[idx] = getTopEntries(m, ptp.pt.limit, ptp.stopCh)
}(i)
}
@ -489,8 +503,8 @@ func getTopEntries(m map[string]*uint64, limit uint64, stopCh <-chan struct{}) [
}
func updatePipeTopMap(m map[string]*uint64, k string, pHitsSrc *uint64) int64 {
pHitsDst, ok := m[k]
if ok {
pHitsDst := m[k]
if pHitsDst != nil {
*pHitsDst += *pHitsSrc
return 0
}