diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go index 35e520ced..303725795 100644 --- a/lib/streamaggr/dedup.go +++ b/lib/streamaggr/dedup.go @@ -113,22 +113,14 @@ func (ctx *dedupFlushCtx) reset() { } func (da *dedupAggr) flush(f func(samples []pushSample)) { - var wg sync.WaitGroup + // Do not flush shards in parallel, since this significantly increases CPU usage + // on systems with many CPU cores, while doesn't improve flush latency too much. + ctx := getDedupFlushCtx() for i := range da.shards { - flushConcurrencyCh <- struct{}{} - wg.Add(1) - go func(shard *dedupAggrShard) { - defer func() { - <-flushConcurrencyCh - wg.Done() - }() - - ctx := getDedupFlushCtx() - shard.flush(ctx, f) - putDedupFlushCtx(ctx) - }(&da.shards[i]) + ctx.reset() + da.shards[i].flush(ctx, f) } - wg.Wait() + putDedupFlushCtx(ctx) } type perShardSamples struct { diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 8d98a7071..2d8888dc5 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -670,7 +670,9 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { startTime := time.Now() + flushConcurrencyCh <- struct{}{} a.da.flush(a.pushSamples) + <-flushConcurrencyCh d := time.Since(startTime) a.dedupFlushDuration.Update(d.Seconds())