mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vminsert: moved -maxInsertRequestSize
command-line flag out of lib/prompb
in order to prevent its inclusion in vmselect
and vmstorage
apps
This commit is contained in:
parent
adbbc4fa1a
commit
371e86194d
2 changed files with 39 additions and 46 deletions
|
@ -1,17 +1,23 @@
|
|||
package prometheus
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="prometheus"}`)
|
||||
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="prometheus"}`)
|
||||
|
@ -69,7 +75,7 @@ func (ctx *pushCtx) Read(r *http.Request) error {
|
|||
prometheusReadCalls.Inc()
|
||||
|
||||
var err error
|
||||
ctx.reqBuf, err = prompb.ReadSnappy(ctx.reqBuf[:0], r.Body)
|
||||
ctx.reqBuf, err = readSnappy(ctx.reqBuf[:0], r.Body)
|
||||
if err != nil {
|
||||
prometheusReadErrors.Inc()
|
||||
return fmt.Errorf("cannot read prompb.WriteRequest: %s", err)
|
||||
|
@ -110,3 +116,35 @@ func putPushCtx(ctx *pushCtx) {
|
|||
|
||||
var pushCtxPool sync.Pool
|
||||
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
|
||||
|
||||
func readSnappy(dst []byte, r io.Reader) ([]byte, error) {
|
||||
lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1)
|
||||
bb := bodyBufferPool.Get()
|
||||
reqLen, err := bb.ReadFrom(lr)
|
||||
if err != nil {
|
||||
bodyBufferPool.Put(bb)
|
||||
return dst, fmt.Errorf("cannot read compressed request: %s", err)
|
||||
}
|
||||
if reqLen > int64(*maxInsertRequestSize) {
|
||||
return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
|
||||
}
|
||||
|
||||
buf := dst[len(dst):cap(dst)]
|
||||
buf, err = snappy.Decode(buf, bb.B)
|
||||
bodyBufferPool.Put(bb)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err)
|
||||
return dst, err
|
||||
}
|
||||
if len(buf) > *maxInsertRequestSize {
|
||||
return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
|
||||
}
|
||||
if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] {
|
||||
dst = dst[:len(dst)+len(buf)]
|
||||
} else {
|
||||
dst = append(dst, buf...)
|
||||
}
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
var bodyBufferPool bytesutil.ByteBufferPool
|
||||
|
|
|
@ -1,50 +1,5 @@
|
|||
package prompb
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
|
||||
|
||||
// ReadSnappy reads r, unpacks it using snappy, appends it to dst
|
||||
// and returns the result.
|
||||
func ReadSnappy(dst []byte, r io.Reader) ([]byte, error) {
|
||||
lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1)
|
||||
bb := bodyBufferPool.Get()
|
||||
reqLen, err := bb.ReadFrom(lr)
|
||||
if err != nil {
|
||||
bodyBufferPool.Put(bb)
|
||||
return dst, fmt.Errorf("cannot read compressed request: %s", err)
|
||||
}
|
||||
if reqLen > int64(*maxInsertRequestSize) {
|
||||
return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
|
||||
}
|
||||
|
||||
buf := dst[len(dst):cap(dst)]
|
||||
buf, err = snappy.Decode(buf, bb.B)
|
||||
bodyBufferPool.Put(bb)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err)
|
||||
return dst, err
|
||||
}
|
||||
if len(buf) > *maxInsertRequestSize {
|
||||
return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
|
||||
}
|
||||
if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] {
|
||||
dst = dst[:len(dst)+len(buf)]
|
||||
} else {
|
||||
dst = append(dst, buf...)
|
||||
}
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
var bodyBufferPool bytesutil.ByteBufferPool
|
||||
|
||||
// Reset resets wr.
|
||||
func (wr *WriteRequest) Reset() {
|
||||
for i := range wr.Timeseries {
|
||||
|
|
Loading…
Reference in a new issue