diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index d963b86ffd..c8de18c304 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -345,10 +346,18 @@ func (a *aggregator) runFlusher(interval time.Duration) { return case <-t.C: } + + // Globally limit the concurrency for metrics' flush + // in order to limit memory usage when big number of aggregators + // are flushed at the same time. + flushConcurrencyCh <- struct{}{} a.flush() + <-flushConcurrencyCh } } +var flushConcurrencyCh = make(chan struct{}, 2*cgroup.AvailableCPUs()) + func (a *aggregator) flush() { ctx := &flushCtx{ suffix: a.suffix, @@ -373,7 +382,7 @@ func (a *aggregator) flush() { tss = dst } - // Push the output metrics + // Push the output metrics. a.pushFunc(tss) } }