lib/protoparser/clusternative: reuse unmarshalWork in order to reduce memory allocations

This commit is contained in:
Aliaksandr Valialkin 2022-10-18 00:06:56 +03:00
parent 6f69a88a5a
commit 481ca746ba
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -30,10 +30,7 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric
callbackErr error
)
for {
// Do not use unmarshalWork pool, since every unmarshalWork structure usually occupies
// big amounts of memory (more than consts.MaxInsertPacketSizeForVMStorage bytes).
// The pool would result in increased memory usage.
uw := &unmarshalWork{}
uw := getUnmarshalWork()
uw.callback = func(rows []storage.MetricRow) {
if err := callback(rows); err != nil {
processErrors.Inc()
@ -138,6 +135,14 @@ type unmarshalWork struct {
mrs []storage.MetricRow
}
func (uw *unmarshalWork) reset() {
uw.wg = nil
uw.callback = nil
// Zero reqBuf, since it may occupy big amounts of memory (consts.MaxInsertPacketSizeForVMStorage).
uw.reqBuf = nil
uw.mrs = uw.mrs[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
reqBuf := uw.reqBuf
@ -157,6 +162,22 @@ func (uw *unmarshalWork) Unmarshal() {
}
wg := uw.wg
wg.Done()
putUnmarshalWork(uw)
}
const maxRowsPerCallback = 10000
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool