mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/protoparser/clusternative: allocate unmarshalWork after reading the data from input connection
This shortens the time when unmarshalWork is in use. This also reduces the number of unmarshalWork objects in the pool, and its memory usage.
This commit is contained in:
parent
481ca746ba
commit
e4e2d1fcde
1 changed files with 11 additions and 11 deletions
|
@ -30,7 +30,18 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric
|
|||
callbackErr error
|
||||
)
|
||||
for {
|
||||
reqBuf, err := readBlock(nil, bc, isReadOnly)
|
||||
if err != nil {
|
||||
wg.Wait()
|
||||
if err == io.EOF {
|
||||
// Remote end gracefully closed the connection.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
blocksRead.Inc()
|
||||
uw := getUnmarshalWork()
|
||||
uw.reqBuf = reqBuf
|
||||
uw.callback = func(rows []storage.MetricRow) {
|
||||
if err := callback(rows); err != nil {
|
||||
processErrors.Inc()
|
||||
|
@ -42,17 +53,6 @@ func ParseStream(bc *handshake.BufferedConn, callback func(rows []storage.Metric
|
|||
}
|
||||
}
|
||||
uw.wg = &wg
|
||||
var err error
|
||||
uw.reqBuf, err = readBlock(uw.reqBuf[:0], bc, isReadOnly)
|
||||
if err != nil {
|
||||
wg.Wait()
|
||||
if err == io.EOF {
|
||||
// Remote end gracefully closed the connection.
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
blocksRead.Inc()
|
||||
wg.Add(1)
|
||||
common.ScheduleUnmarshalWork(uw)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue