mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/streamaggr: do not flush dedup shards in parallel
This significantly increases CPU usage on systems with many CPU cores, while doesn't reduce flush latency too much
This commit is contained in:
parent
d7252fce79
commit
482560a1f3
2 changed files with 8 additions and 14 deletions
|
@ -113,22 +113,14 @@ func (ctx *dedupFlushCtx) reset() {
|
|||
}
|
||||
|
||||
func (da *dedupAggr) flush(f func(samples []pushSample)) {
|
||||
var wg sync.WaitGroup
|
||||
for i := range da.shards {
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
wg.Add(1)
|
||||
go func(shard *dedupAggrShard) {
|
||||
defer func() {
|
||||
<-flushConcurrencyCh
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
// Do not flush shards in parallel, since this significantly increases CPU usage
|
||||
// on systems with many CPU cores, while doesn't improve flush latency too much.
|
||||
ctx := getDedupFlushCtx()
|
||||
shard.flush(ctx, f)
|
||||
putDedupFlushCtx(ctx)
|
||||
}(&da.shards[i])
|
||||
for i := range da.shards {
|
||||
ctx.reset()
|
||||
da.shards[i].flush(ctx, f)
|
||||
}
|
||||
wg.Wait()
|
||||
putDedupFlushCtx(ctx)
|
||||
}
|
||||
|
||||
type perShardSamples struct {
|
||||
|
|
|
@ -670,7 +670,9 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
|
|||
|
||||
startTime := time.Now()
|
||||
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
a.da.flush(a.pushSamples)
|
||||
<-flushConcurrencyCh
|
||||
|
||||
d := time.Since(startTime)
|
||||
a.dedupFlushDuration.Update(d.Seconds())
|
||||
|
|
Loading…
Reference in a new issue