From af90c3c43b69a7cda3a9cfa6697cc90afbf9936d Mon Sep 17 00:00:00 2001 From: faceair Date: Mon, 14 Jun 2021 20:10:58 +0800 Subject: [PATCH] lib/protoparser: stop read when callback error (#1380) --- lib/protoparser/prometheus/streamparser.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index 91009692f..abc937d5c 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -56,7 +56,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil { + if ctx.err != nil || ctx.CallbackError() != nil { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) @@ -77,7 +77,7 @@ type streamContext struct { err error wg sync.WaitGroup - callbackErrLock sync.Mutex + callbackErrLock sync.RWMutex callbackErr error } @@ -88,6 +88,13 @@ func (ctx *streamContext) Error() error { return ctx.err } +func (ctx *streamContext) CallbackError() error { + ctx.callbackErrLock.RLock() + callbackErr := ctx.callbackErr + ctx.callbackErrLock.RUnlock() + return callbackErr +} + func (ctx *streamContext) reset() { ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0]