app/vminsert/netstorage: free up unused memory in buffer after memory usage spikes

This commit is contained in:
Aliaksandr Valialkin 2020-06-01 14:33:29 +03:00
parent 44c51c627f
commit 43b14b9569

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
@ -94,6 +95,7 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
var waitCh <-chan struct{}
mustStop := false
for !mustStop {
@ -118,6 +120,12 @@ func (sn *storageNode) run(stopCh <-chan struct{}, snIdx int) {
sn.br, br = br, sn.br
sn.brLock.Unlock()
}
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
if len(br.buf) == 0 {
// Nothing to send.
continue
@ -277,6 +285,7 @@ func rerouteWorker(stopCh <-chan struct{}) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
var waitCh <-chan struct{}
mustStop := false
for !mustStop {
@ -302,6 +311,12 @@ func rerouteWorker(stopCh <-chan struct{}) {
reroutedBRLock.Unlock()
}
reroutedBRCond.Broadcast()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
if len(br.buf) == 0 {
// Nothing to re-route.
continue