lib/streamaggr: limit the number of concurrent flushes of aggregate metrics in order to limit memory usage

This commit is contained in:
Aliaksandr Valialkin 2023-01-06 22:39:13 -08:00
parent 2ca48444e2
commit 3461ae8f13
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -10,6 +10,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -345,10 +346,18 @@ func (a *aggregator) runFlusher(interval time.Duration) {
return
case <-t.C:
}
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
}
}
var flushConcurrencyCh = make(chan struct{}, 2*cgroup.AvailableCPUs())
func (a *aggregator) flush() {
ctx := &flushCtx{
suffix: a.suffix,
@ -373,7 +382,7 @@ func (a *aggregator) flush() {
tss = dst
}
// Push the output metrics
// Push the output metrics.
a.pushFunc(tss)
}
}