From e4e2d1fcde79450221c38ce1a5cea9e2eca45e78 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 18 Oct 2022 00:24:02 +0300 Subject: [PATCH] lib/protoparser/clusternative: allocate unmarshalWork after reading the data from input connection This shortens the time when unmarshalWork is in use. This also reduces the number of unmarshalWork objects in the pool, and its memory usage. --- lib/protoparser/clusternative/streamparser.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/protoparser/clusternative/streamparser.go b/lib/protoparser/clusternative/streamparser.go index 40a55006e..b0fcd2712 100644 --- a/lib/protoparser/clusternative/streamparser.go +++ b/lib/protoparser/clusternative/streamparser.go @@ -30,7 +30,18 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric callbackErr error ) for { + reqBuf, err := readBlock(nil, bc, isReadOnly) + if err != nil { + wg.Wait() + if err == io.EOF { + // Remote end gracefully closed the connection. + return nil + } + return err + } + blocksRead.Inc() uw := getUnmarshalWork() + uw.reqBuf = reqBuf uw.callback = func(rows []storage.MetricRow) { if err := callback(rows); err != nil { processErrors.Inc() @@ -42,17 +53,6 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric } } uw.wg = &wg - var err error - uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc, isReadOnly) - if err != nil { - wg.Wait() - if err == io.EOF { - // Remote end gracefully closed the connection. - return nil - } - return err - } - blocksRead.Inc() wg.Add(1) common.ScheduleUnmarshalWork(uw) }