diff --git a/lib/protoparser/clusternative/streamparser.go b/lib/protoparser/clusternative/streamparser.go index 51ffda38b..40a55006e 100644 --- a/lib/protoparser/clusternative/streamparser.go +++ b/lib/protoparser/clusternative/streamparser.go @@ -30,10 +30,7 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric callbackErr error ) for { - // Do not use unmarshalWork pool, since every unmarshalWork structure usually occupies - // big amounts of memory (more than consts.MaxInsertPacketSizeForVMStorage bytes). - // The pool would result in increased memory usage. - uw := &unmarshalWork{} + uw := getUnmarshalWork() uw.callback = func(rows []storage.MetricRow) { if err := callback(rows); err != nil { processErrors.Inc() @@ -138,6 +135,14 @@ type unmarshalWork struct { mrs []storage.MetricRow } +func (uw *unmarshalWork) reset() { + uw.wg = nil + uw.callback = nil + // Zero reqBuf, since it may occupy big amounts of memory (consts.MaxInsertPacketSizeForVMStorage). + uw.reqBuf = nil + uw.mrs = uw.mrs[:0] +} + // Unmarshal implements common.UnmarshalWork func (uw *unmarshalWork) Unmarshal() { reqBuf := uw.reqBuf @@ -157,6 +162,22 @@ func (uw *unmarshalWork) Unmarshal() { } wg := uw.wg wg.Done() + putUnmarshalWork(uw) } const maxRowsPerCallback = 10000 + +func getUnmarshalWork() *unmarshalWork { + v := unmarshalWorkPool.Get() + if v == nil { + return &unmarshalWork{} + } + return v.(*unmarshalWork) +} + +func putUnmarshalWork(uw *unmarshalWork) { + uw.reset() + unmarshalWorkPool.Put(uw) +} + +var unmarshalWorkPool sync.Pool