mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/protoparser/native: fixes parseStream dead-lock (#2423)
previously, if native block cannot be unmarshaled, wg.Done wasn't called by unmarshal work. It leads to connection blocking and possible dead-lock at client side
This commit is contained in:
parent
d7bf0a7348
commit
fe01f4803d
1 changed files with 21 additions and 16 deletions
|
@ -8,7 +8,6 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -48,19 +47,27 @@ func ParseStream(r io.Reader, isGzip bool, callback func(block *Block) error) er
|
|||
callbackErrLock sync.Mutex
|
||||
callbackErr error
|
||||
)
|
||||
addFirstErr := func(err error) {
|
||||
processErrors.Inc()
|
||||
callbackErrLock.Lock()
|
||||
if callbackErr == nil {
|
||||
callbackErr = fmt.Errorf("error when processing native block: %w", err)
|
||||
}
|
||||
callbackErrLock.Unlock()
|
||||
}
|
||||
for {
|
||||
uw := getUnmarshalWork()
|
||||
uw.tr = tr
|
||||
uw.callback = func(block *Block) {
|
||||
if err := callback(block); err != nil {
|
||||
processErrors.Inc()
|
||||
callbackErrLock.Lock()
|
||||
if callbackErr == nil {
|
||||
callbackErr = fmt.Errorf("error when processing native block: %w", err)
|
||||
}
|
||||
callbackErrLock.Unlock()
|
||||
uw.callback = func(block *Block, parseErr error) {
|
||||
defer wg.Done()
|
||||
// fast path
|
||||
if parseErr != nil {
|
||||
addFirstErr(parseErr)
|
||||
return
|
||||
}
|
||||
if err := callback(block); err != nil {
|
||||
addFirstErr(err)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
// Read uw.metricNameBuf
|
||||
|
@ -142,7 +149,7 @@ var (
|
|||
|
||||
type unmarshalWork struct {
|
||||
tr storage.TimeRange
|
||||
callback func(block *Block)
|
||||
callback func(block *Block, parseErr error)
|
||||
metricNameBuf []byte
|
||||
blockBuf []byte
|
||||
block Block
|
||||
|
@ -157,13 +164,11 @@ func (uw *unmarshalWork) reset() {
|
|||
|
||||
// Unmarshal implements common.UnmarshalWork
|
||||
func (uw *unmarshalWork) Unmarshal() {
|
||||
if err := uw.unmarshal(); err != nil {
|
||||
err := uw.unmarshal()
|
||||
if err != nil {
|
||||
parseErrors.Inc()
|
||||
logger.Errorf("error when unmarshaling native block: %s", err)
|
||||
putUnmarshalWork(uw)
|
||||
return
|
||||
}
|
||||
uw.callback(&uw.block)
|
||||
uw.callback(&uw.block, err)
|
||||
putUnmarshalWork(uw)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue