From 5c4bd4f7c102c82c041d16e20b5921a153ffef9b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 6 Jan 2023 22:39:13 -0800 Subject: [PATCH] lib/streamaggr: limit the number of concurrent flushes of aggregate metrics in order to limit memory usage --- lib/streamaggr/streamaggr.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index d963b86ffd..c8de18c304 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -345,10 +346,18 @@ func (a *aggregator) runFlusher(interval time.Duration) { return case <-t.C: } + + // Globally limit the concurrency for metrics' flush + // in order to limit memory usage when big number of aggregators + // are flushed at the same time. + flushConcurrencyCh <- struct{}{} a.flush() + <-flushConcurrencyCh } } +var flushConcurrencyCh = make(chan struct{}, 2*cgroup.AvailableCPUs()) + func (a *aggregator) flush() { ctx := &flushCtx{ suffix: a.suffix, @@ -373,7 +382,7 @@ func (a *aggregator) flush() { tss = dst } - // Push the output metrics + // Push the output metrics. a.pushFunc(tss) } }