From ebaf68bcb0fe79bb38a1af55c0c594d4b592bf4f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 14 Jun 2021 15:18:46 +0300 Subject: [PATCH] lib/protoparser: stop reading the input stream as soon as the callback provided by the caller returns error This is a follow-up for af90c3c43b69a7cda3a9cfa6697cc90afbf9936d --- lib/protoparser/csvimport/streamparser.go | 9 ++++++++- lib/protoparser/graphite/streamparser.go | 9 ++++++++- lib/protoparser/influx/streamparser.go | 9 ++++++++- lib/protoparser/opentsdb/streamparser.go | 9 ++++++++- lib/protoparser/prometheus/streamparser.go | 14 +++++++------- lib/protoparser/vmimport/streamparser.go | 9 ++++++++- 6 files changed, 47 insertions(+), 12 deletions(-) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 918f40baf..7f5d5ed19 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -69,7 +69,7 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil { + if ctx.err != nil || ctx.hasCallbackError() { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) @@ -107,6 +107,13 @@ func (ctx *streamContext) Error() error { return ctx.err } +func (ctx *streamContext) hasCallbackError() bool { + ctx.callbackErrLock.Lock() + ok := ctx.callbackErr != nil + ctx.callbackErrLock.Unlock() + return ok +} + func (ctx *streamContext) reset() { ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 93ac45394..010771c4d 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -54,7 +54,7 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil { + if ctx.err != nil || ctx.hasCallbackError() { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) @@ -86,6 +86,13 @@ func (ctx *streamContext) Error() error { return ctx.err } +func (ctx *streamContext) hasCallbackError() bool { + ctx.callbackErrLock.Lock() + ok := ctx.callbackErr != nil + ctx.callbackErrLock.Unlock() + return ok +} + func (ctx *streamContext) reset() { ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 3da3858b8..c8479cbbc 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -82,7 +82,7 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil { + if ctx.err != nil || ctx.hasCallbackError() { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.N) @@ -120,6 +120,13 @@ func (ctx *streamContext) Error() error { return ctx.err } +func (ctx *streamContext) hasCallbackError() bool { + ctx.callbackErrLock.Lock() + ok := ctx.callbackErr != nil + ctx.callbackErrLock.Unlock() + return ok +} + func (ctx *streamContext) reset() { ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index 14a476e22..c66ad5fca 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -53,7 +53,7 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil { + if ctx.err != nil || ctx.hasCallbackError() { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) @@ -85,6 +85,13 @@ func (ctx *streamContext) Error() error { return ctx.err } +func (ctx *streamContext) hasCallbackError() bool { + ctx.callbackErrLock.Lock() + ok := ctx.callbackErr != nil + ctx.callbackErrLock.Unlock() + return ok +} + func (ctx *streamContext) reset() { ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0] diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index abc937d5c..ef7781950 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -56,7 +56,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil || ctx.CallbackError() != nil { + if ctx.err != nil || ctx.hasCallbackError() { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) @@ -77,7 +77,7 @@ type streamContext struct { err error wg sync.WaitGroup - callbackErrLock sync.RWMutex + callbackErrLock sync.Mutex callbackErr error } @@ -88,11 +88,11 @@ func (ctx *streamContext) Error() error { return ctx.err } -func (ctx *streamContext) CallbackError() error { - ctx.callbackErrLock.RLock() - callbackErr := ctx.callbackErr - ctx.callbackErrLock.RUnlock() - return callbackErr +func (ctx *streamContext) hasCallbackError() bool { + ctx.callbackErrLock.Lock() + ok := ctx.callbackErr != nil + ctx.callbackErrLock.Unlock() + return ok } func (ctx *streamContext) reset() { diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 7646d6cee..f1ad59709 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -59,7 +59,7 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { func (ctx *streamContext) Read() bool { readCalls.Inc() - if ctx.err != nil { + if ctx.err != nil || ctx.hasCallbackError() { return false } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineLen.N) @@ -97,6 +97,13 @@ func (ctx *streamContext) Error() error { return ctx.err } +func (ctx *streamContext) hasCallbackError() bool { + ctx.callbackErrLock.Lock() + ok := ctx.callbackErr != nil + ctx.callbackErrLock.Unlock() + return ok +} + func (ctx *streamContext) reset() { ctx.br.Reset(nil) ctx.reqBuf = ctx.reqBuf[:0]