From 482560a1f366594d24fbe1d041ba3a99274501b4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 4 Mar 2024 16:59:19 +0200 Subject: [PATCH] lib/streamaggr: do not flush dedup shards in parallel This significantly increases CPU usage on systems with many CPU cores, while doesn't reduce flush latency too much --- lib/streamaggr/dedup.go | 20 ++++++-------------- lib/streamaggr/streamaggr.go | 2 ++ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 35e520ced..303725795 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -113,22 +113,14 @@ func (ctx *dedupFlushCtx) reset() { } func (da *dedupAggr) flush(f func(samples []pushSample)) { - var wg sync.WaitGroup + // Do not flush shards in parallel, since this significantly increases CPU usage + // on systems with many CPU cores, while doesn't improve flush latency too much. + ctx := getDedupFlushCtx() for i := range da.shards { - flushConcurrencyCh <- struct{}{} - wg.Add(1) - go func(shard *dedupAggrShard) { - defer func() { - <-flushConcurrencyCh - wg.Done() - }() - - ctx := getDedupFlushCtx() - shard.flush(ctx, f) - putDedupFlushCtx(ctx) - }(&da.shards[i]) + ctx.reset() + da.shards[i].flush(ctx, f) } - wg.Wait() + putDedupFlushCtx(ctx) } type perShardSamples struct { diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 8d98a7071..2d8888dc5 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -670,7 +670,9 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { startTime := time.Now() + flushConcurrencyCh <- struct{}{} a.da.flush(a.pushSamples) + <-flushConcurrencyCh d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds())