diff --git a/lib/protoparser/clusternative/streamparser.go b/lib/protoparser/clusternative/streamparser.go index 40a55006ef..b0fcd2712b 100644 --- a/lib/protoparser/clusternative/streamparser.go +++ b/lib/protoparser/clusternative/streamparser.go @@ -30,7 +30,18 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric callbackErr error ) for { + reqBuf, err := readBlock(nil, bc, isReadOnly) + if err != nil { + wg.Wait() + if err == io.EOF { + // Remote end gracefully closed the connection. + return nil + } + return err + } + blocksRead.Inc() uw := getUnmarshalWork() + uw.reqBuf = reqBuf uw.callback = func(rows []storage.MetricRow) { if err := callback(rows); err != nil { processErrors.Inc() @@ -42,17 +53,6 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric } } uw.wg = &wg - var err error - uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc, isReadOnly) - if err != nil { - wg.Wait() - if err == io.EOF { - // Remote end gracefully closed the connection. - return nil - } - return err - } - blocksRead.Inc() wg.Add(1) common.ScheduleUnmarshalWork(uw) }