From 149c0c4a6d55e1d589f01a94b20d295ccd125250 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 13 Nov 2020 13:03:54 +0200 Subject: [PATCH] lib/protoparser: propagate callback error to the caller of ParseStream for every supported data ingestion protocols The caller of ParseStream then can generate HTTP 503 responses for non-nil errors occured in callbacks when processing incoming requests. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 --- lib/protoparser/csvimport/streamparser.go | 33 +++-- lib/protoparser/graphite/parser_test.go | 3 +- lib/protoparser/graphite/streamparser.go | 33 +++-- lib/protoparser/influx/streamparser.go | 33 +++-- lib/protoparser/native/streamparser.go | 38 ++++-- lib/protoparser/opentsdb/streamparser.go | 33 +++-- lib/protoparser/opentsdbhttp/streamparser.go | 125 ++++++++----------- lib/protoparser/prometheus/streamparser.go | 33 ++--- lib/protoparser/vmimport/streamparser.go | 33 +++-- 9 files changed, 213 insertions(+), 151 deletions(-) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 027ecc458..0b318775a 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -11,7 +11,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -24,7 +23,6 @@ var ( // ParseStream parses csv from req and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from req. -// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { @@ -47,12 +45,26 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = callback + uw.callback = func(rows []Row) { + if err := callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() + } uw.cds = cds uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf + ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } - return ctx.Error() + ctx.wg.Wait() + if err := ctx.Error(); err != nil { + return err + } + return ctx.callbackErr } func (ctx *streamContext) Read() bool { @@ -82,6 +94,10 @@ type streamContext struct { reqBuf []byte tailBuf []byte err error + + wg sync.WaitGroup + callbackErrLock sync.Mutex + callbackErr error } func (ctx *streamContext) Error() error { @@ -96,6 +112,7 @@ func (ctx *streamContext) reset() { ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil + ctx.callbackErr = nil } func getStreamContext(r io.Reader) *streamContext { @@ -129,7 +146,7 @@ var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) type unmarshalWork struct { rows Rows - callback func(rows []Row) error + callback func(rows []Row) cds []ColumnDescriptor reqBuf []byte } @@ -164,11 +181,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - if err := uw.callback(rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } + uw.callback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go index 6217adfc3..c2dc4373e 100644 --- a/lib/protoparser/graphite/parser_test.go +++ b/lib/protoparser/graphite/parser_test.go @@ -200,7 +200,7 @@ func Test_streamContext_Read(t *testing.T) { } uw := getUnmarshalWork() callbackCalls := 0 - uw.callback = func(rows []Row) error { + uw.callback = func(rows []Row) { callbackCalls++ if len(rows) != len(rowsExpected.Rows) { t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) @@ -208,7 +208,6 @@ func Test_streamContext_Read(t *testing.T) { if !reflect.DeepEqual(rows, rowsExpected.Rows) { t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) } - return nil } uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) uw.Unmarshal() diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 7fa734f9f..26b490162 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -24,7 +23,6 @@ var ( // ParseStream parses Graphite lines from r and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. -// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { @@ -33,11 +31,25 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { for ctx.Read() { uw := getUnmarshalWork() - uw.callback = callback + uw.callback = func(rows []Row) { + if err := callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() + } uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf + ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } - return ctx.Error() + ctx.wg.Wait() + if err := ctx.Error(); err != nil { + return err + } + return ctx.callbackErr } func (ctx *streamContext) Read() bool { @@ -61,6 +73,10 @@ type streamContext struct { reqBuf []byte tailBuf []byte err error + + wg sync.WaitGroup + callbackErrLock sync.Mutex + callbackErr error } func (ctx *streamContext) Error() error { @@ -75,6 +91,7 @@ func (ctx *streamContext) reset() { ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil + ctx.callbackErr = nil } var ( @@ -114,7 +131,7 @@ var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) type unmarshalWork struct { rows Rows - callback func(rows []Row) error + callback func(rows []Row) reqBuf []byte } @@ -152,11 +169,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - if err := uw.callback(rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } + uw.callback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index d302e3ef0..fc6bc0215 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -25,7 +24,6 @@ var ( // ParseStream parses r with the given args and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. -// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { @@ -59,13 +57,27 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = callback + uw.callback = func(db string, rows []Row) { + if err := callback(db, rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() + } uw.db = db uw.tsMultiplier = tsMultiplier uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf + ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } - return ctx.Error() + ctx.wg.Wait() + if err := ctx.Error(); err != nil { + return err + } + return ctx.callbackErr } func (ctx *streamContext) Read() bool { @@ -95,6 +107,10 @@ type streamContext struct { reqBuf []byte tailBuf []byte err error + + wg sync.WaitGroup + callbackErrLock sync.Mutex + callbackErr error } func (ctx *streamContext) Error() error { @@ -109,6 +125,7 @@ func (ctx *streamContext) reset() { ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil + ctx.callbackErr = nil } func getStreamContext(r io.Reader) *streamContext { @@ -142,7 +159,7 @@ var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) type unmarshalWork struct { rows Rows - callback func(db string, rows []Row) error + callback func(db string, rows []Row) db string tsMultiplier int64 reqBuf []byte @@ -195,11 +212,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - if err := uw.callback(uw.db, rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } + uw.callback(uw.db, rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index 0e87d8787..fa4c5e3ce 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -18,10 +18,8 @@ import ( // ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks. // // The callback can be called concurrently multiple times for streamed data from req. -// The callback can be called after ParseStream returns. // // callback shouldn't hold block after returning. -// callback can be called in parallel from multiple concurrent goroutines. func ParseStream(req *http.Request, callback func(block *Block) error) error { r := req.Body if req.Header.Get("Content-Encoding") == "gzip" { @@ -47,30 +45,49 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { // Read native blocks and feed workers with work. sizeBuf := make([]byte, 4) + var wg sync.WaitGroup + var ( + callbackErrLock sync.Mutex + callbackErr error + ) for { uw := getUnmarshalWork() uw.tr = tr - uw.callback = callback + uw.callback = func(block *Block) { + if err := callback(block); err != nil { + processErrors.Inc() + callbackErrLock.Lock() + if callbackErr == nil { + callbackErr = fmt.Errorf("error when processing native block: %w", err) + } + callbackErrLock.Unlock() + } + wg.Done() + } // Read uw.metricNameBuf if _, err := io.ReadFull(br, sizeBuf); err != nil { if err == io.EOF { // End of stream putUnmarshalWork(uw) - return nil + wg.Wait() + return callbackErr } readErrors.Inc() + wg.Wait() return fmt.Errorf("cannot read metricName size: %w", err) } readCalls.Inc() bufSize := encoding.UnmarshalUint32(sizeBuf) if bufSize > 1024*1024 { parseErrors.Inc() + wg.Wait() return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024) } uw.metricNameBuf = bytesutil.Resize(uw.metricNameBuf, int(bufSize)) if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil { readErrors.Inc() + wg.Wait() return fmt.Errorf("cannot read metricName with size %d bytes: %w", bufSize, err) } readCalls.Inc() @@ -78,22 +95,26 @@ func ParseStream(req *http.Request, callback func(block *Block) error) error { // Read uw.blockBuf if _, err := io.ReadFull(br, sizeBuf); err != nil { readErrors.Inc() + wg.Wait() return fmt.Errorf("cannot read native block size: %w", err) } readCalls.Inc() bufSize = encoding.UnmarshalUint32(sizeBuf) if bufSize > 1024*1024 { parseErrors.Inc() + wg.Wait() return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024) } uw.blockBuf = bytesutil.Resize(uw.blockBuf, int(bufSize)) if _, err := io.ReadFull(br, uw.blockBuf); err != nil { readErrors.Inc() + wg.Wait() return fmt.Errorf("cannot read native block with size %d bytes: %w", bufSize, err) } readCalls.Inc() blocksRead.Inc() + wg.Add(1) common.ScheduleUnmarshalWork(uw) } } @@ -123,7 +144,7 @@ var ( type unmarshalWork struct { tr storage.TimeRange - callback func(block *Block) error + callback func(block *Block) metricNameBuf []byte blockBuf []byte block Block @@ -144,12 +165,7 @@ func (uw *unmarshalWork) Unmarshal() { putUnmarshalWork(uw) return } - if err := uw.callback(&uw.block); err != nil { - processErrors.Inc() - logger.Errorf("error when processing native block: %s", err) - putUnmarshalWork(uw) - return - } + uw.callback(&uw.block) putUnmarshalWork(uw) } diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index 752f1cd5e..aaab8da92 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -24,7 +23,6 @@ var ( // ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. -// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { @@ -32,11 +30,25 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = callback + uw.callback = func(rows []Row) { + if err := callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() + } uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf + ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } - return ctx.Error() + ctx.wg.Wait() + if err := ctx.Error(); err != nil { + return err + } + return ctx.callbackErr } func (ctx *streamContext) Read() bool { @@ -60,6 +72,10 @@ type streamContext struct { reqBuf []byte tailBuf []byte err error + + wg sync.WaitGroup + callbackErrLock sync.Mutex + callbackErr error } func (ctx *streamContext) Error() error { @@ -74,6 +90,7 @@ func (ctx *streamContext) reset() { ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil + ctx.callbackErr = nil } var ( @@ -113,7 +130,7 @@ var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) type unmarshalWork struct { rows Rows - callback func(rows []Row) error + callback func(rows []Row) reqBuf []byte } @@ -151,11 +168,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - if err := uw.callback(rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } + uw.callback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index 5a1f9012e..3eedef67d 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -13,7 +13,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -27,7 +26,6 @@ var ( // ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from req. -// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { @@ -58,10 +56,49 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", maxInsertRequestSize.N) } - uw := getUnmarshalWork() - uw.callback = callback - uw.reqBuf, ctx.reqBuf.B = ctx.reqBuf.B, uw.reqBuf - common.ScheduleUnmarshalWork(uw) + // Process the request synchronously, since there is no sense in processing a single request asynchronously. + // Sync code is easier to read and understand. + p := getJSONParser() + defer putJSONParser(p) + v, err := p.ParseBytes(ctx.reqBuf.B) + if err != nil { + return fmt.Errorf("cannot parse HTTP OpenTSDB json: %w", err) + } + rs := getRows() + defer putRows(rs) + rs.Unmarshal(v) + rows := rs.Rows + rowsRead.Add(len(rows)) + + // Fill in missing timestamps + currentTimestamp := int64(fasttime.UnixTimestamp()) + for i := range rows { + r := &rows[i] + if r.Timestamp == 0 { + r.Timestamp = currentTimestamp + } + } + + // Convert timestamps in seconds to milliseconds if needed. + // See http://opentsdb.net/docs/javadoc/net/opentsdb/core/Const.html#SECOND_MASK + for i := range rows { + r := &rows[i] + if r.Timestamp&secondMask == 0 { + r.Timestamp *= 1e3 + } + } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + + if err := callback(rows); err != nil { + return fmt.Errorf("error when processing imported data: %w", err) + } return nil } @@ -113,77 +150,17 @@ func putStreamContext(ctx *streamContext) { var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) -type unmarshalWork struct { - rows Rows - callback func(rows []Row) error - reqBuf []byte -} - -func (uw *unmarshalWork) reset() { - uw.rows.Reset() - uw.callback = nil - uw.reqBuf = uw.reqBuf[:0] -} - -// Unmarshal implements common.UnmarshalWork -func (uw *unmarshalWork) Unmarshal() { - p := getJSONParser() - defer putJSONParser(p) - v, err := p.ParseBytes(uw.reqBuf) - if err != nil { - unmarshalErrors.Inc() - logger.Errorf("cannot parse HTTP OpenTSDB json: %s", err) - return - } - uw.rows.Unmarshal(v) - rows := uw.rows.Rows - rowsRead.Add(len(rows)) - - // Fill in missing timestamps - currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range rows { - r := &rows[i] - if r.Timestamp == 0 { - r.Timestamp = currentTimestamp - } - } - - // Convert timestamps in seconds to milliseconds if needed. - // See http://opentsdb.net/docs/javadoc/net/opentsdb/core/Const.html#SECOND_MASK - for i := range rows { - r := &rows[i] - if r.Timestamp&secondMask == 0 { - r.Timestamp *= 1e3 - } - } - - // Trim timestamps if required. - if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { - for i := range rows { - row := &rows[i] - row.Timestamp -= row.Timestamp % tsTrim - } - } - - if err := uw.callback(rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } - putUnmarshalWork(uw) -} - -func getUnmarshalWork() *unmarshalWork { - v := unmarshalWorkPool.Get() +func getRows() *Rows { + v := rowsPool.Get() if v == nil { - return &unmarshalWork{} + return &Rows{} } - return v.(*unmarshalWork) + return v.(*Rows) } -func putUnmarshalWork(uw *unmarshalWork) { - uw.reset() - unmarshalWorkPool.Put(uw) +func putRows(rs *Rows) { + rs.Reset() + rowsPool.Put(rs) } -var unmarshalWorkPool sync.Pool +var rowsPool sync.Pool diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index 8c915c9fd..2c6aa46c0 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -17,7 +16,6 @@ import ( // ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. -// It is guaranteed that the callback isn't called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error { @@ -33,18 +31,26 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = func(rows []Row) error { - err := callback(rows) + uw.callback = func(rows []Row) { + if err := callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } ctx.wg.Done() - return err } uw.defaultTimestamp = defaultTimestamp uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } - ctx.wg.Wait() // wait for all the outstanding callback calls before returning - return ctx.Error() + ctx.wg.Wait() + if err := ctx.Error(); err != nil { + return err + } + return ctx.callbackErr } func (ctx *streamContext) Read() bool { @@ -69,7 +75,9 @@ type streamContext struct { tailBuf []byte err error - wg sync.WaitGroup + wg sync.WaitGroup + callbackErrLock sync.Mutex + callbackErr error } func (ctx *streamContext) Error() error { @@ -84,6 +92,7 @@ func (ctx *streamContext) reset() { ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil + ctx.callbackErr = nil } var ( @@ -123,7 +132,7 @@ var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) type unmarshalWork struct { rows Rows - callback func(rows []Row) error + callback func(rows []Row) defaultTimestamp int64 reqBuf []byte } @@ -153,11 +162,7 @@ func (uw *unmarshalWork) Unmarshal() { } } - if err := uw.callback(rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } + uw.callback(rows) putUnmarshalWork(uw) } diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 66ad6c322..52063ae2d 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -10,7 +10,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -21,7 +20,6 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi // ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from req. -// The callback can be called after ParseStream returns. // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { @@ -38,11 +36,25 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { defer putStreamContext(ctx) for ctx.Read() { uw := getUnmarshalWork() - uw.callback = callback + uw.callback = func(rows []Row) { + if err := callback(rows); err != nil { + ctx.callbackErrLock.Lock() + if ctx.callbackErr == nil { + ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) + } + ctx.callbackErrLock.Unlock() + } + ctx.wg.Done() + } uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf + ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) } - return ctx.Error() + ctx.wg.Wait() + if err := ctx.Error(); err != nil { + return err + } + return ctx.callbackErr } func (ctx *streamContext) Read() bool { @@ -72,6 +84,10 @@ type streamContext struct { reqBuf []byte tailBuf []byte err error + + wg sync.WaitGroup + callbackErrLock sync.Mutex + callbackErr error } func (ctx *streamContext) Error() error { @@ -86,6 +102,7 @@ func (ctx *streamContext) reset() { ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil + ctx.callbackErr = nil } func getStreamContext(r io.Reader) *streamContext { @@ -119,7 +136,7 @@ var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) type unmarshalWork struct { rows Rows - callback func(rows []Row) error + callback func(rows []Row) reqBuf []byte } @@ -137,11 +154,7 @@ func (uw *unmarshalWork) Unmarshal() { row := &rows[i] rowsRead.Add(len(row.Timestamps)) } - if err := uw.callback(rows); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } + uw.callback(rows) putUnmarshalWork(uw) }