From 0d7505b00e93ad748024270f727fa135494e9379 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 16 Jan 2020 13:03:28 +0200 Subject: [PATCH] all: mention command-line flags used for limiting the incoming request size in error messages This should improve error logs usability. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/287 --- app/vminsert/main.go | 7 +++---- app/vminsert/opentsdb/server.go | 4 ++-- app/vminsert/opentsdbhttp/request_handler.go | 15 +++++++++------ app/vminsert/opentsdbhttp/server.go | 14 +++++++------- app/vminsert/prometheus/request_handler.go | 12 ++++++------ app/vmselect/prometheus/prometheus.go | 4 ++-- lib/prompb/util.go | 15 +++++++++------ 7 files changed, 38 insertions(+), 33 deletions(-) diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 2882fe3442..9e66f5db36 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -24,7 +24,6 @@ var ( "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ "Usually :4242 must be set. Doesn't work if empty") opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty") - maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size of a single insert request in bytes") maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superflouos labels are dropped") ) @@ -43,10 +42,10 @@ func Init() { graphiteServer = graphite.MustStart(*graphiteListenAddr) } if len(*opentsdbListenAddr) > 0 { - opentsdbServer = opentsdb.MustStart(*opentsdbListenAddr, int64(*maxInsertRequestSize)) + opentsdbServer = opentsdb.MustStart(*opentsdbListenAddr) } if len(*opentsdbHTTPListenAddr) > 0 { - opentsdbhttpServer = opentsdbhttp.MustStart(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize)) + opentsdbhttpServer = opentsdbhttp.MustStart(*opentsdbHTTPListenAddr) } } @@ -69,7 +68,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { switch path { case "/api/v1/write": prometheusWriteRequests.Inc() - if err := prometheus.InsertHandler(r, int64(*maxInsertRequestSize)); err != nil { + if err := prometheus.InsertHandler(r); err != nil { prometheusWriteErrors.Inc() httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) return true diff --git a/app/vminsert/opentsdb/server.go b/app/vminsert/opentsdb/server.go index ce6d14367b..5f62355c13 100644 --- a/app/vminsert/opentsdb/server.go +++ b/app/vminsert/opentsdb/server.go @@ -36,7 +36,7 @@ type Server struct { // MustStart starts OpenTSDB collector on the given addr. // // MustStop must be called on the returned server when it is no longer needed. -func MustStart(addr string, maxRequestSize int64) *Server { +func MustStart(addr string) *Server { logger.Infof("starting TCP OpenTSDB collector at %q", addr) lnTCP, err := netutil.NewTCPListener("opentsdb", addr) if err != nil { @@ -45,7 +45,7 @@ func MustStart(addr string, maxRequestSize int64) *Server { ls := newListenerSwitch(lnTCP) lnHTTP := ls.newHTTPListener() lnTelnet := ls.newTelnetListener() - httpServer := opentsdbhttp.MustServe(lnHTTP, maxRequestSize) + httpServer := opentsdbhttp.MustServe(lnHTTP) logger.Infof("starting UDP OpenTSDB collector at %q", addr) lnUDP, err := net.ListenPacket("udp4", addr) diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 8cdcf58afe..7f39102746 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -1,6 +1,7 @@ package opentsdbhttp import ( + "flag" "fmt" "io" "net/http" @@ -15,6 +16,8 @@ import ( "github.com/valyala/fastjson" ) +var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") + var ( rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentsdb-http"}`) rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="opentsdb-http"}`) @@ -26,13 +29,13 @@ var ( // insertHandler processes HTTP OpenTSDB put requests. // See http://opentsdb.net/docs/build/html/api_http/put.html -func insertHandler(req *http.Request, maxSize int64) error { +func insertHandler(req *http.Request) error { return concurrencylimiter.Do(func() error { - return insertHandlerInternal(req, maxSize) + return insertHandlerInternal(req) }) } -func insertHandlerInternal(req *http.Request, maxSize int64) error { +func insertHandlerInternal(req *http.Request) error { readCalls.Inc() r := req.Body @@ -50,15 +53,15 @@ func insertHandlerInternal(req *http.Request, maxSize int64) error { defer putPushCtx(ctx) // Read the request in ctx.reqBuf - lr := io.LimitReader(r, maxSize+1) + lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) reqLen, err := ctx.reqBuf.ReadFrom(lr) if err != nil { readErrors.Inc() return fmt.Errorf("cannot read HTTP OpenTSDB request: %s", err) } - if reqLen > maxSize { + if reqLen > int64(*maxInsertRequestSize) { readErrors.Inc() - return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed %d bytes", maxSize) + return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) } // Unmarshal the request to ctx.Rows diff --git a/app/vminsert/opentsdbhttp/server.go b/app/vminsert/opentsdbhttp/server.go index cb882838a9..0aaaeb32c7 100644 --- a/app/vminsert/opentsdbhttp/server.go +++ b/app/vminsert/opentsdbhttp/server.go @@ -28,20 +28,20 @@ type Server struct { // MustStart starts HTTP OpenTSDB server on the given addr. // // MustStop must be called on the returned server when it is no longer needed. -func MustStart(addr string, maxRequestSize int64) *Server { +func MustStart(addr string) *Server { logger.Infof("starting HTTP OpenTSDB server at %q", addr) lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr) if err != nil { logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err) } - return MustServe(lnTCP, maxRequestSize) + return MustServe(lnTCP) } -// MustServe serves OpenTSDB HTTP put requests from ln with up to maxRequestSize size. +// MustServe serves OpenTSDB HTTP put requests from ln. // // MustStop must be called on the returned server when it is no longer needed. -func MustServe(ln net.Listener, maxRequestSize int64) *Server { - h := newRequestHandler(maxRequestSize) +func MustServe(ln net.Listener) *Server { + h := newRequestHandler() hs := &http.Server{ Handler: h, ReadTimeout: 30 * time.Second, @@ -82,12 +82,12 @@ func (s *Server) MustStop() { logger.Infof("OpenTSDB HTTP server at %q has been stopped", s.ln.Addr()) } -func newRequestHandler(maxRequestSize int64) http.Handler { +func newRequestHandler() http.Handler { rh := func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/api/put": writeRequests.Inc() - if err := insertHandler(r, maxRequestSize); err != nil { + if err := insertHandler(r); err != nil { writeErrors.Inc() httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) return diff --git a/app/vminsert/prometheus/request_handler.go b/app/vminsert/prometheus/request_handler.go index 46a03ddfb6..6fea27a82c 100644 --- a/app/vminsert/prometheus/request_handler.go +++ b/app/vminsert/prometheus/request_handler.go @@ -18,16 +18,16 @@ var ( ) // InsertHandler processes remote write for prometheus. -func InsertHandler(r *http.Request, maxSize int64) error { +func InsertHandler(r *http.Request) error { return concurrencylimiter.Do(func() error { - return insertHandlerInternal(r, maxSize) + return insertHandlerInternal(r) }) } -func insertHandlerInternal(r *http.Request, maxSize int64) error { +func insertHandlerInternal(r *http.Request) error { ctx := getPushCtx() defer putPushCtx(ctx) - if err := ctx.Read(r, maxSize); err != nil { + if err := ctx.Read(r); err != nil { return err } timeseries := ctx.req.Timeseries @@ -65,11 +65,11 @@ func (ctx *pushCtx) reset() { ctx.reqBuf = ctx.reqBuf[:0] } -func (ctx *pushCtx) Read(r *http.Request, maxSize int64) error { +func (ctx *pushCtx) Read(r *http.Request) error { prometheusReadCalls.Inc() var err error - ctx.reqBuf, err = prompb.ReadSnappy(ctx.reqBuf[:0], r.Body, maxSize) + ctx.reqBuf, err = prompb.ReadSnappy(ctx.reqBuf[:0], r.Body) if err != nil { prometheusReadErrors.Inc() return fmt.Errorf("cannot read prompb.WriteRequest: %s", err) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 4260359ad7..6297b1e0e1 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -560,7 +560,7 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error { } if len(query) > *maxQueryLen { - return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen) + return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) } if !getBool(r, "nocache") && ct-start < queryOffset { // Adjust start time only if `nocache` arg isn't set. @@ -685,7 +685,7 @@ func queryRangeHandler(w http.ResponseWriter, query string, start, end, step int // Validate input args. if len(query) > *maxQueryLen { - return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen) + return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) } if start > end { end = start + defaultStep diff --git a/lib/prompb/util.go b/lib/prompb/util.go index 4768bacf08..492e055a46 100644 --- a/lib/prompb/util.go +++ b/lib/prompb/util.go @@ -1,6 +1,7 @@ package prompb import ( + "flag" "fmt" "io" @@ -8,18 +9,20 @@ import ( "github.com/golang/snappy" ) +var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request") + // ReadSnappy reads r, unpacks it using snappy, appends it to dst // and returns the result. -func ReadSnappy(dst []byte, r io.Reader, maxSize int64) ([]byte, error) { - lr := io.LimitReader(r, maxSize+1) +func ReadSnappy(dst []byte, r io.Reader) ([]byte, error) { + lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1) bb := bodyBufferPool.Get() reqLen, err := bb.ReadFrom(lr) if err != nil { bodyBufferPool.Put(bb) return dst, fmt.Errorf("cannot read compressed request: %s", err) } - if reqLen > maxSize { - return dst, fmt.Errorf("too big packed request; mustn't exceed %d bytes", maxSize) + if reqLen > int64(*maxInsertRequestSize) { + return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) } buf := dst[len(dst):cap(dst)] @@ -29,8 +32,8 @@ func ReadSnappy(dst []byte, r io.Reader, maxSize int64) ([]byte, error) { err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err) return dst, err } - if int64(len(buf)) > maxSize { - return dst, fmt.Errorf("too big unpacked request; musn't exceed %d bytes", maxSize) + if len(buf) > *maxInsertRequestSize { + return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize) } if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] { dst = dst[:len(dst)+len(buf)]