diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index 91009692f7..abc937d5ce 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -56,7 +56,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil { + if ctx.err != nil || ctx.CallbackError() != nil { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) @@ -77,7 +77,7 @@ type streamContext struct { err error wg sync.WaitGroup - callbackErrLock sync.Mutex + callbackErrLock sync.RWMutex callbackErr error } @@ -88,6 +88,13 @@ func (ctx *streamContext) Error() error { return ctx.err } +func (ctx *streamContext) CallbackError() error { + ctx.callbackErrLock.RLock() + callbackErr := ctx.callbackErr + ctx.callbackErrLock.RUnlock() + return callbackErr +} + func (ctx *streamContext) reset() { ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0]