lib/protoparser: stop read when callback error (#1380)

This commit is contained in:
faceair 2021-06-14 20:10:58 +08:00 committed by GitHub
parent 36d55bff66
commit af90c3c43b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -56,7 +56,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
func (ctx *streamContext) Read() bool { func (ctx *streamContext) Read() bool {
readCalls.Inc() readCalls.Inc()
if ctx.err != nil { if ctx.err != nil || ctx.CallbackError() != nil {
return false return false
} }
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
@ -77,7 +77,7 @@ type streamContext struct {
err error err error
wg sync.WaitGroup wg sync.WaitGroup
callbackErrLock sync.Mutex callbackErrLock sync.RWMutex
callbackErr error callbackErr error
} }
@ -88,6 +88,13 @@ func (ctx *streamContext) Error() error {
return ctx.err return ctx.err
} }
func (ctx *streamContext) CallbackError() error {
ctx.callbackErrLock.RLock()
callbackErr := ctx.callbackErr
ctx.callbackErrLock.RUnlock()
return callbackErr
}
func (ctx *streamContext) reset() { func (ctx *streamContext) reset() {
ctx.br.Reset(nil) ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0] ctx.reqBuf = ctx.reqBuf[:0]