lib/streamaggr: limit the number of concurrent flushes of aggregate metrics in order to limit memory usage

This commit is contained in:
Aliaksandr Valialkin 2023-01-06 22:39:13 -08:00
parent c63755c316
commit 5c4bd4f7c1
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -345,10 +346,18 @@ func (a *aggregator) runFlusher(interval time.Duration) {
return return
case <-t.C: case <-t.C:
} }
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.flush() a.flush()
<-flushConcurrencyCh
} }
} }
var flushConcurrencyCh = make(chan struct{}, 2*cgroup.AvailableCPUs())
func (a *aggregator) flush() { func (a *aggregator) flush() {
ctx := &flushCtx{ ctx := &flushCtx{
suffix: a.suffix, suffix: a.suffix,
@ -373,7 +382,7 @@ func (a *aggregator) flush() {
tss = dst tss = dst
} }
// Push the output metrics // Push the output metrics.
a.pushFunc(tss) a.pushFunc(tss)
} }
} }