app/vmstorage/transport: prevent from uncontrolled memory usage growth when vminsert sends big packets with too long labels

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490
This commit is contained in:
Aliaksandr Valialkin 2020-05-15 15:42:30 +03:00
parent d1c8b0d6e9
commit a853869e75

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consts" "github.com/VictoriaMetrics/VictoriaMetrics/lib/consts"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake" "github.com/VictoriaMetrics/VictoriaMetrics/lib/handshake"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
@ -272,7 +273,15 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
sizeBuf := make([]byte, 8) sizeBuf := make([]byte, 8)
var buf []byte var buf []byte
var mrs []storage.MetricRow var mrs []storage.MetricRow
lastMRsResetTime := fasttime.UnixTimestamp()
for { for {
if fasttime.UnixTimestamp()-lastMRsResetTime > 10 {
// Periodically reset mrs in order to prevent from gradual memory usage growth
// when ceratin entries in mr contain too long labels.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490 for details.
mrs = nil
lastMRsResetTime = fasttime.UnixTimestamp()
}
if _, err := io.ReadFull(bc, sizeBuf); err != nil { if _, err := io.ReadFull(bc, sizeBuf); err != nil {
if err == io.EOF { if err == io.EOF {
// Remote end gracefully closed the connection. // Remote end gracefully closed the connection.
@ -317,6 +326,16 @@ func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {
if err != nil { if err != nil {
return fmt.Errorf("cannot unmarshal MetricRow: %s", err) return fmt.Errorf("cannot unmarshal MetricRow: %s", err)
} }
if len(mrs) >= 10000 {
// Store the collected mrs in order to reduce memory usage
// when too big number of mrs are sent in each packet.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/490
vminsertMetricsRead.Add(len(mrs))
if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil {
return fmt.Errorf("cannot store metrics: %s", err)
}
mrs = mrs[:0]
}
} }
vminsertMetricsRead.Add(len(mrs)) vminsertMetricsRead.Add(len(mrs))
if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil { if err := s.storage.AddRows(mrs, uint8(*precisionBits)); err != nil {