Revert "lib/streamaggr: do not flush dedup shards in parallel"

This reverts commit eb40395a1c.

Reason for revert: it has been appeared that the performance gain on multiple CPU cores
wasn't visible because the benchmark was generating incorrect pushSample.key.

See a207e0bf687d65f5198207477248d70c69284296
This commit is contained in:
Aliaksandr Valialkin 2024-03-04 17:57:50 +02:00
parent e70177c5fb
commit 074abd5bee
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 14 additions and 8 deletions

View file

@ -113,14 +113,22 @@ func (ctx *dedupFlushCtx) reset() {
} }
func (da *dedupAggr) flush(f func(samples []pushSample)) { func (da *dedupAggr) flush(f func(samples []pushSample)) {
// Do not flush shards in parallel, since this significantly increases CPU usage var wg sync.WaitGroup
// on systems with many CPU cores, while doesn't improve flush latency too much.
ctx := getDedupFlushCtx()
for i := range da.shards { for i := range da.shards {
ctx.reset() flushConcurrencyCh <- struct{}{}
da.shards[i].flush(ctx, f) wg.Add(1)
go func(shard *dedupAggrShard) {
defer func() {
<-flushConcurrencyCh
wg.Done()
}()
ctx := getDedupFlushCtx()
shard.flush(ctx, f)
putDedupFlushCtx(ctx)
}(&da.shards[i])
} }
putDedupFlushCtx(ctx) wg.Wait()
} }
type perShardSamples struct { type perShardSamples struct {

View file

@ -670,9 +670,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
startTime := time.Now() startTime := time.Now()
flushConcurrencyCh <- struct{}{}
a.da.flush(a.pushSamples) a.da.flush(a.pushSamples)
<-flushConcurrencyCh
d := time.Since(startTime) d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds()) a.dedupFlushDuration.Update(d.Seconds())