diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 8e757db4b9..6f6c22bdbd 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -94,6 +95,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) { ticker := time.NewTicker(time.Second) defer ticker.Stop() var br bufRows + brLastResetTime := fasttime.UnixTimestamp() var waitCh <-chan struct{} mustStop := false for !mustStop { @@ -118,6 +120,12 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) { sn.br, br = br, sn.br sn.brLock.Unlock() } + currentTime := fasttime.UnixTimestamp() + if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { + // Free up capacity space occupied by br.buf in order to reduce memory usage after spikes. + br.buf = append(br.buf[:0:0], br.buf...) + brLastResetTime = currentTime + } if len(br.buf) == 0 { // Nothing to send. continue @@ -277,6 +285,7 @@ func rerouteWorker(stopCh <-chan struct{}) { ticker := time.NewTicker(time.Second) defer ticker.Stop() var br bufRows + brLastResetTime := fasttime.UnixTimestamp() var waitCh <-chan struct{} mustStop := false for !mustStop { @@ -302,6 +311,12 @@ func rerouteWorker(stopCh <-chan struct{}) { reroutedBRLock.Unlock() } reroutedBRCond.Broadcast() + currentTime := fasttime.UnixTimestamp() + if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 { + // Free up capacity space occupied by br.buf in order to reduce memory usage after spikes. + br.buf = append(br.buf[:0:0], br.buf...) + brLastResetTime = currentTime + } if len(br.buf) == 0 { // Nothing to re-route. continue