mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
all: mention command-line flags used for limiting the incoming request size in error messages
This should improve error logs usability. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/287
This commit is contained in:
parent
2839f4688a
commit
0d7505b00e
7 changed files with 38 additions and 33 deletions
|
@ -24,7 +24,6 @@ var (
|
||||||
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
|
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
|
||||||
"Usually :4242 must be set. Doesn't work if empty")
|
"Usually :4242 must be set. Doesn't work if empty")
|
||||||
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
|
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
|
||||||
maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size of a single insert request in bytes")
|
|
||||||
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superflouos labels are dropped")
|
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superflouos labels are dropped")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -43,10 +42,10 @@ func Init() {
|
||||||
graphiteServer = graphite.MustStart(*graphiteListenAddr)
|
graphiteServer = graphite.MustStart(*graphiteListenAddr)
|
||||||
}
|
}
|
||||||
if len(*opentsdbListenAddr) > 0 {
|
if len(*opentsdbListenAddr) > 0 {
|
||||||
opentsdbServer = opentsdb.MustStart(*opentsdbListenAddr, int64(*maxInsertRequestSize))
|
opentsdbServer = opentsdb.MustStart(*opentsdbListenAddr)
|
||||||
}
|
}
|
||||||
if len(*opentsdbHTTPListenAddr) > 0 {
|
if len(*opentsdbHTTPListenAddr) > 0 {
|
||||||
opentsdbhttpServer = opentsdbhttp.MustStart(*opentsdbHTTPListenAddr, int64(*maxInsertRequestSize))
|
opentsdbhttpServer = opentsdbhttp.MustStart(*opentsdbHTTPListenAddr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,7 +68,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
switch path {
|
switch path {
|
||||||
case "/api/v1/write":
|
case "/api/v1/write":
|
||||||
prometheusWriteRequests.Inc()
|
prometheusWriteRequests.Inc()
|
||||||
if err := prometheus.InsertHandler(r, int64(*maxInsertRequestSize)); err != nil {
|
if err := prometheus.InsertHandler(r); err != nil {
|
||||||
prometheusWriteErrors.Inc()
|
prometheusWriteErrors.Inc()
|
||||||
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -36,7 +36,7 @@ type Server struct {
|
||||||
// MustStart starts OpenTSDB collector on the given addr.
|
// MustStart starts OpenTSDB collector on the given addr.
|
||||||
//
|
//
|
||||||
// MustStop must be called on the returned server when it is no longer needed.
|
// MustStop must be called on the returned server when it is no longer needed.
|
||||||
func MustStart(addr string, maxRequestSize int64) *Server {
|
func MustStart(addr string) *Server {
|
||||||
logger.Infof("starting TCP OpenTSDB collector at %q", addr)
|
logger.Infof("starting TCP OpenTSDB collector at %q", addr)
|
||||||
lnTCP, err := netutil.NewTCPListener("opentsdb", addr)
|
lnTCP, err := netutil.NewTCPListener("opentsdb", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -45,7 +45,7 @@ func MustStart(addr string, maxRequestSize int64) *Server {
|
||||||
ls := newListenerSwitch(lnTCP)
|
ls := newListenerSwitch(lnTCP)
|
||||||
lnHTTP := ls.newHTTPListener()
|
lnHTTP := ls.newHTTPListener()
|
||||||
lnTelnet := ls.newTelnetListener()
|
lnTelnet := ls.newTelnetListener()
|
||||||
httpServer := opentsdbhttp.MustServe(lnHTTP, maxRequestSize)
|
httpServer := opentsdbhttp.MustServe(lnHTTP)
|
||||||
|
|
||||||
logger.Infof("starting UDP OpenTSDB collector at %q", addr)
|
logger.Infof("starting UDP OpenTSDB collector at %q", addr)
|
||||||
lnUDP, err := net.ListenPacket("udp4", addr)
|
lnUDP, err := net.ListenPacket("udp4", addr)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package opentsdbhttp
|
package opentsdbhttp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -15,6 +16,8 @@ import (
|
||||||
"github.com/valyala/fastjson"
|
"github.com/valyala/fastjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentsdb-http"}`)
|
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="opentsdb-http"}`)
|
||||||
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="opentsdb-http"}`)
|
rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="opentsdb-http"}`)
|
||||||
|
@ -26,13 +29,13 @@ var (
|
||||||
|
|
||||||
// insertHandler processes HTTP OpenTSDB put requests.
|
// insertHandler processes HTTP OpenTSDB put requests.
|
||||||
// See http://opentsdb.net/docs/build/html/api_http/put.html
|
// See http://opentsdb.net/docs/build/html/api_http/put.html
|
||||||
func insertHandler(req *http.Request, maxSize int64) error {
|
func insertHandler(req *http.Request) error {
|
||||||
return concurrencylimiter.Do(func() error {
|
return concurrencylimiter.Do(func() error {
|
||||||
return insertHandlerInternal(req, maxSize)
|
return insertHandlerInternal(req)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func insertHandlerInternal(req *http.Request, maxSize int64) error {
|
func insertHandlerInternal(req *http.Request) error {
|
||||||
readCalls.Inc()
|
readCalls.Inc()
|
||||||
|
|
||||||
r := req.Body
|
r := req.Body
|
||||||
|
@ -50,15 +53,15 @@ func insertHandlerInternal(req *http.Request, maxSize int64) error {
|
||||||
defer putPushCtx(ctx)
|
defer putPushCtx(ctx)
|
||||||
|
|
||||||
// Read the request in ctx.reqBuf
|
// Read the request in ctx.reqBuf
|
||||||
lr := io.LimitReader(r, maxSize+1)
|
lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1)
|
||||||
reqLen, err := ctx.reqBuf.ReadFrom(lr)
|
reqLen, err := ctx.reqBuf.ReadFrom(lr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
return fmt.Errorf("cannot read HTTP OpenTSDB request: %s", err)
|
return fmt.Errorf("cannot read HTTP OpenTSDB request: %s", err)
|
||||||
}
|
}
|
||||||
if reqLen > maxSize {
|
if reqLen > int64(*maxInsertRequestSize) {
|
||||||
readErrors.Inc()
|
readErrors.Inc()
|
||||||
return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed %d bytes", maxSize)
|
return fmt.Errorf("too big HTTP OpenTSDB request; mustn't exceed `-opentsdbhttp.maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal the request to ctx.Rows
|
// Unmarshal the request to ctx.Rows
|
||||||
|
|
|
@ -28,20 +28,20 @@ type Server struct {
|
||||||
// MustStart starts HTTP OpenTSDB server on the given addr.
|
// MustStart starts HTTP OpenTSDB server on the given addr.
|
||||||
//
|
//
|
||||||
// MustStop must be called on the returned server when it is no longer needed.
|
// MustStop must be called on the returned server when it is no longer needed.
|
||||||
func MustStart(addr string, maxRequestSize int64) *Server {
|
func MustStart(addr string) *Server {
|
||||||
logger.Infof("starting HTTP OpenTSDB server at %q", addr)
|
logger.Infof("starting HTTP OpenTSDB server at %q", addr)
|
||||||
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr)
|
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err)
|
logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err)
|
||||||
}
|
}
|
||||||
return MustServe(lnTCP, maxRequestSize)
|
return MustServe(lnTCP)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustServe serves OpenTSDB HTTP put requests from ln with up to maxRequestSize size.
|
// MustServe serves OpenTSDB HTTP put requests from ln.
|
||||||
//
|
//
|
||||||
// MustStop must be called on the returned server when it is no longer needed.
|
// MustStop must be called on the returned server when it is no longer needed.
|
||||||
func MustServe(ln net.Listener, maxRequestSize int64) *Server {
|
func MustServe(ln net.Listener) *Server {
|
||||||
h := newRequestHandler(maxRequestSize)
|
h := newRequestHandler()
|
||||||
hs := &http.Server{
|
hs := &http.Server{
|
||||||
Handler: h,
|
Handler: h,
|
||||||
ReadTimeout: 30 * time.Second,
|
ReadTimeout: 30 * time.Second,
|
||||||
|
@ -82,12 +82,12 @@ func (s *Server) MustStop() {
|
||||||
logger.Infof("OpenTSDB HTTP server at %q has been stopped", s.ln.Addr())
|
logger.Infof("OpenTSDB HTTP server at %q has been stopped", s.ln.Addr())
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRequestHandler(maxRequestSize int64) http.Handler {
|
func newRequestHandler() http.Handler {
|
||||||
rh := func(w http.ResponseWriter, r *http.Request) {
|
rh := func(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.URL.Path {
|
switch r.URL.Path {
|
||||||
case "/api/put":
|
case "/api/put":
|
||||||
writeRequests.Inc()
|
writeRequests.Inc()
|
||||||
if err := insertHandler(r, maxRequestSize); err != nil {
|
if err := insertHandler(r); err != nil {
|
||||||
writeErrors.Inc()
|
writeErrors.Inc()
|
||||||
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -18,16 +18,16 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// InsertHandler processes remote write for prometheus.
|
// InsertHandler processes remote write for prometheus.
|
||||||
func InsertHandler(r *http.Request, maxSize int64) error {
|
func InsertHandler(r *http.Request) error {
|
||||||
return concurrencylimiter.Do(func() error {
|
return concurrencylimiter.Do(func() error {
|
||||||
return insertHandlerInternal(r, maxSize)
|
return insertHandlerInternal(r)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func insertHandlerInternal(r *http.Request, maxSize int64) error {
|
func insertHandlerInternal(r *http.Request) error {
|
||||||
ctx := getPushCtx()
|
ctx := getPushCtx()
|
||||||
defer putPushCtx(ctx)
|
defer putPushCtx(ctx)
|
||||||
if err := ctx.Read(r, maxSize); err != nil {
|
if err := ctx.Read(r); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
timeseries := ctx.req.Timeseries
|
timeseries := ctx.req.Timeseries
|
||||||
|
@ -65,11 +65,11 @@ func (ctx *pushCtx) reset() {
|
||||||
ctx.reqBuf = ctx.reqBuf[:0]
|
ctx.reqBuf = ctx.reqBuf[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ctx *pushCtx) Read(r *http.Request, maxSize int64) error {
|
func (ctx *pushCtx) Read(r *http.Request) error {
|
||||||
prometheusReadCalls.Inc()
|
prometheusReadCalls.Inc()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
ctx.reqBuf, err = prompb.ReadSnappy(ctx.reqBuf[:0], r.Body, maxSize)
|
ctx.reqBuf, err = prompb.ReadSnappy(ctx.reqBuf[:0], r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
prometheusReadErrors.Inc()
|
prometheusReadErrors.Inc()
|
||||||
return fmt.Errorf("cannot read prompb.WriteRequest: %s", err)
|
return fmt.Errorf("cannot read prompb.WriteRequest: %s", err)
|
||||||
|
|
|
@ -560,7 +560,7 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(query) > *maxQueryLen {
|
if len(query) > *maxQueryLen {
|
||||||
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
|
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen)
|
||||||
}
|
}
|
||||||
if !getBool(r, "nocache") && ct-start < queryOffset {
|
if !getBool(r, "nocache") && ct-start < queryOffset {
|
||||||
// Adjust start time only if `nocache` arg isn't set.
|
// Adjust start time only if `nocache` arg isn't set.
|
||||||
|
@ -685,7 +685,7 @@ func queryRangeHandler(w http.ResponseWriter, query string, start, end, step int
|
||||||
|
|
||||||
// Validate input args.
|
// Validate input args.
|
||||||
if len(query) > *maxQueryLen {
|
if len(query) > *maxQueryLen {
|
||||||
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
|
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen)
|
||||||
}
|
}
|
||||||
if start > end {
|
if start > end {
|
||||||
end = start + defaultStep
|
end = start + defaultStep
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package prompb
|
package prompb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
@ -8,18 +9,20 @@ import (
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var maxInsertRequestSize = flag.Int("maxInsertRequestSize", 32*1024*1024, "The maximum size in bytes of a single Prometheus remote_write API request")
|
||||||
|
|
||||||
// ReadSnappy reads r, unpacks it using snappy, appends it to dst
|
// ReadSnappy reads r, unpacks it using snappy, appends it to dst
|
||||||
// and returns the result.
|
// and returns the result.
|
||||||
func ReadSnappy(dst []byte, r io.Reader, maxSize int64) ([]byte, error) {
|
func ReadSnappy(dst []byte, r io.Reader) ([]byte, error) {
|
||||||
lr := io.LimitReader(r, maxSize+1)
|
lr := io.LimitReader(r, int64(*maxInsertRequestSize)+1)
|
||||||
bb := bodyBufferPool.Get()
|
bb := bodyBufferPool.Get()
|
||||||
reqLen, err := bb.ReadFrom(lr)
|
reqLen, err := bb.ReadFrom(lr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bodyBufferPool.Put(bb)
|
bodyBufferPool.Put(bb)
|
||||||
return dst, fmt.Errorf("cannot read compressed request: %s", err)
|
return dst, fmt.Errorf("cannot read compressed request: %s", err)
|
||||||
}
|
}
|
||||||
if reqLen > maxSize {
|
if reqLen > int64(*maxInsertRequestSize) {
|
||||||
return dst, fmt.Errorf("too big packed request; mustn't exceed %d bytes", maxSize)
|
return dst, fmt.Errorf("too big packed request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := dst[len(dst):cap(dst)]
|
buf := dst[len(dst):cap(dst)]
|
||||||
|
@ -29,8 +32,8 @@ func ReadSnappy(dst []byte, r io.Reader, maxSize int64) ([]byte, error) {
|
||||||
err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err)
|
err = fmt.Errorf("cannot decompress request with length %d: %s", reqLen, err)
|
||||||
return dst, err
|
return dst, err
|
||||||
}
|
}
|
||||||
if int64(len(buf)) > maxSize {
|
if len(buf) > *maxInsertRequestSize {
|
||||||
return dst, fmt.Errorf("too big unpacked request; musn't exceed %d bytes", maxSize)
|
return dst, fmt.Errorf("too big unpacked request; mustn't exceed `-maxInsertRequestSize=%d` bytes", *maxInsertRequestSize)
|
||||||
}
|
}
|
||||||
if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] {
|
if len(buf) > 0 && len(dst) < cap(dst) && &buf[0] == &dst[len(dst):cap(dst)][0] {
|
||||||
dst = dst[:len(dst)+len(buf)]
|
dst = dst[:len(dst)+len(buf)]
|
||||||
|
|
Loading…
Reference in a new issue