From 074abd5beeb7b17d38dfcc11130deefcb2b79657 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 4 Mar 2024 17:57:50 +0200 Subject: [PATCH] Revert "lib/streamaggr: do not flush dedup shards in parallel" This reverts commit eb40395a1c1d7a9aa89cdb55b86f3664d52bc968. Reason for revert: it has been appeared that the performance gain on multiple CPU cores wasn't visible because the benchmark was generating incorrect pushSample.key. See a207e0bf687d65f5198207477248d70c69284296 --- lib/streamaggr/dedup.go | 20 ++++++++++++++------ lib/streamaggr/streamaggr.go | 2 -- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 3037257954..35e520ceda 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -113,14 +113,22 @@ func (ctx *dedupFlushCtx) reset() { } func (da *dedupAggr) flush(f func(samples []pushSample)) { - // Do not flush shards in parallel, since this significantly increases CPU usage - // on systems with many CPU cores, while doesn't improve flush latency too much. - ctx := getDedupFlushCtx() + var wg sync.WaitGroup for i := range da.shards { - ctx.reset() - da.shards[i].flush(ctx, f) + flushConcurrencyCh <- struct{}{} + wg.Add(1) + go func(shard *dedupAggrShard) { + defer func() { + <-flushConcurrencyCh + wg.Done() + }() + + ctx := getDedupFlushCtx() + shard.flush(ctx, f) + putDedupFlushCtx(ctx) + }(&da.shards[i]) } - putDedupFlushCtx(ctx) + wg.Wait() } type perShardSamples struct { diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 88120f871e..99fb9a14f5 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -670,9 +670,7 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { startTime := time.Now() - flushConcurrencyCh <- struct{}{} a.da.flush(a.pushSamples) - <-flushConcurrencyCh d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds())