diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 8d42ffa3d..f9a56be0f 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -10,9 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" "github.com/golang/snappy" ) @@ -21,9 +19,6 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102 // ParseStream parses Prometheus remote_write message req and calls callback for the parsed timeseries. // -// The callback can be called concurrently multiple times for streamed data from req. -// The callback can be called after ParseStream returns. -// // callback shouldn't hold tss after returning. func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error) error { ctx := getPushCtx(req.Body) @@ -31,34 +26,50 @@ func ParseStream(req *http.Request, callback func(tss []prompb.TimeSeries) error if err := ctx.Read(); err != nil { return err } - uw := getUnmarshalWork() - ctx.wg.Add(1) - uw.callback = func(tss []prompb.TimeSeries) error { - // Propagate the error to the caller of ParseStream, so it could properly return HTTP 503 status code on error. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 - ctx.err = callback(tss) - ctx.wg.Done() - // Do not return the error from callback in order to prevent from double logging. - return nil + + // Synchronously process the request in order to properly return errors to ParseStream caller, + // so it could properly return HTTP 503 status code in response. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/896 + bb := bodyBufferPool.Get() + defer bodyBufferPool.Put(bb) + var err error + bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B) + if err != nil { + return fmt.Errorf("cannot decompress request with length %d: %w", len(ctx.reqBuf.B), err) } - uw.reqBuf, ctx.reqBuf.B = ctx.reqBuf.B, uw.reqBuf - common.ScheduleUnmarshalWork(uw) - ctx.wg.Wait() - return ctx.err + if len(bb.B) > maxInsertRequestSize.N { + return fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B)) + } + wr := getWriteRequest() + defer putWriteRequest(wr) + if err := wr.Unmarshal(bb.B); err != nil { + unmarshalErrors.Inc() + return fmt.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %w", len(bb.B), err) + } + + rows := 0 + tss := wr.Timeseries + for i := range tss { + rows += len(tss[i].Samples) + } + rowsRead.Add(rows) + + if err := callback(tss); err != nil { + return fmt.Errorf("error when processing imported data: %w", err) + } + return nil } +var bodyBufferPool bytesutil.ByteBufferPool + type pushCtx struct { br *bufio.Reader reqBuf bytesutil.ByteBuffer - - wg sync.WaitGroup - err error } func (ctx *pushCtx) reset() { ctx.br.Reset(nil) ctx.reqBuf.Reset() - ctx.err = nil } func (ctx *pushCtx) Read() error { @@ -112,66 +123,17 @@ func putPushCtx(ctx *pushCtx) { var pushCtxPool sync.Pool var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) -type unmarshalWork struct { - wr prompb.WriteRequest - callback func(tss []prompb.TimeSeries) error - reqBuf []byte -} - -func (uw *unmarshalWork) reset() { - uw.wr.Reset() - uw.callback = nil - uw.reqBuf = uw.reqBuf[:0] -} - -// Unmarshal implements common.UnmarshalWork -func (uw *unmarshalWork) Unmarshal() { - bb := bodyBufferPool.Get() - defer bodyBufferPool.Put(bb) - var err error - bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], uw.reqBuf) - if err != nil { - logger.Errorf("cannot decompress request with length %d: %s", len(uw.reqBuf), err) - return - } - if len(bb.B) > maxInsertRequestSize.N { - logger.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes; got %d bytes", maxInsertRequestSize.N, len(bb.B)) - return - } - if err := uw.wr.Unmarshal(bb.B); err != nil { - unmarshalErrors.Inc() - logger.Errorf("cannot unmarshal prompb.WriteRequest with size %d bytes: %s", len(bb.B), err) - return - } - - rows := 0 - tss := uw.wr.Timeseries - for i := range tss { - rows += len(tss[i].Samples) - } - rowsRead.Add(rows) - - if err := uw.callback(tss); err != nil { - logger.Errorf("error when processing imported data: %s", err) - putUnmarshalWork(uw) - return - } - putUnmarshalWork(uw) -} - -var bodyBufferPool bytesutil.ByteBufferPool - -func getUnmarshalWork() *unmarshalWork { - v := unmarshalWorkPool.Get() +func getWriteRequest() *prompb.WriteRequest { + v := writeRequestPool.Get() if v == nil { - return &unmarshalWork{} + return &prompb.WriteRequest{} } - return v.(*unmarshalWork) + return v.(*prompb.WriteRequest) } -func putUnmarshalWork(uw *unmarshalWork) { - uw.reset() - unmarshalWorkPool.Put(uw) +func putWriteRequest(wr *prompb.WriteRequest) { + wr.Reset() + writeRequestPool.Put(wr) } -var unmarshalWorkPool sync.Pool +var writeRequestPool sync.Pool