From 371e86194dcb0ba59625de4982b95c55437d416f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 28 Jan 2020 22:53:50 +0200 Subject: [PATCH] app/vminsert: moved `-maxInsertRequestSize` command-line flag out of `lib/prompb` in order to prevent its inclusion in `vmselect` and `vmstorage` apps --- app/vminsert/prometheus/request_handler.go | 40 ++++++++++++++++++- lib/prompb/util.go | 45 ---------------------- 2 files changed, 39 insertions(+), 46 deletions(-) diff --git a/app/vminsert/prometheus/request_handler.go b/app/vminsert/prometheus/request_handler.go index 6fea27a82c..d98ffe7372 100644 --- a/app/vminsert/prometheus/request_handler.go +++ b/app/vminsert/prometheus/request_handler.go @@ -1,17 +1,23 @@ package prometheus import ( + "flag" "fmt" + "io" "net/http" "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/metrics" + "github.com/golang/snappy" ) +var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") + var ( rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`) rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="prometheus"}`) @@ -69,7 +75,7 @@ func (ctx *pushCtx) Read(r *http.Request) error { prometheusReadCalls.Inc() var err error - ctx.reqBuf, err = prompb.ReadSnappy(ctx.reqBuf[:0], r.Body) + ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], r.Body) if err != nil { prometheusReadErrors.Inc() return fmt.Errorf("cannot read prompb.WriteRequest: %s", err) @@ -110,3 +116,35 @@ func putPushCtx(ctx *pushCtx) { var pushCtxPool sync.Pool var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) + +func readSnappy(dst []byte, r io.Reader) ([]byte, error) { + lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) + bb := bodyBufferPool.Get() + reqLen, err := bb.ReadFrom(lr) + if err != nil { + bodyBufferPool.Put(bb) + return dst, fmt.Errorf("cannot read compressed request: %s", err) + } + if reqLen > int64(*maxInsertRequestSize) { + return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) + } + + buf := dst[len(dst):cap(dst)] + buf, err = snappy.Decode(buf, bb.B) + bodyBufferPool.Put(bb) + if err != nil { + err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err) + return dst, err + } + if len(buf) > *maxInsertRequestSize { + return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) + } + if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] { + dst = dst[:len(dst)+len(buf)] + } else { + dst = append(dst, buf...) + } + return dst, nil +} + +var bodyBufferPool bytesutil.ByteBufferPool diff --git a/lib/prompb/util.go b/lib/prompb/util.go index 492e055a46..ffda7168e3 100644 --- a/lib/prompb/util.go +++ b/lib/prompb/util.go @@ -1,50 +1,5 @@ package prompb -import ( - "flag" - "fmt" - "io" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/golang/snappy" -) - -var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") - -// ReadSnappy reads r, unpacks it using snappy, appends it to dst -// and returns the result. -func ReadSnappy(dst []byte, r io.Reader) ([]byte, error) { - lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) - bb := bodyBufferPool.Get() - reqLen, err := bb.ReadFrom(lr) - if err != nil { - bodyBufferPool.Put(bb) - return dst, fmt.Errorf("cannot read compressed request: %s", err) - } - if reqLen > int64(*maxInsertRequestSize) { - return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) - } - - buf := dst[len(dst):cap(dst)] - buf, err = snappy.Decode(buf, bb.B) - bodyBufferPool.Put(bb) - if err != nil { - err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err) - return dst, err - } - if len(buf) > *maxInsertRequestSize { - return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) - } - if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] { - dst = dst[:len(dst)+len(buf)] - } else { - dst = append(dst, buf...) - } - return dst, nil -} - -var bodyBufferPool bytesutil.ByteBufferPool - // Reset resets wr. func (wr *WriteRequest) Reset() { for i := range wr.Timeseries {