diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index 72dadc888..e17207c0d 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -10,7 +10,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -26,10 +25,8 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(at, rows, extraLabels) - }) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(at, rows, extraLabels) }) } diff --git a/app/vmagent/datadog/request_handler.go b/app/vmagent/datadog/request_handler.go index 2b7aa5a27..adac86be3 100644 --- a/app/vmagent/datadog/request_handler.go +++ b/app/vmagent/datadog/request_handler.go @@ -10,7 +10,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -28,11 +27,9 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - ce := req.Header.Get("Content-Encoding") - return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { - return insertRows(at, series, extraLabels) - }) + ce := req.Header.Get("Content-Encoding") + return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { + return insertRows(at, series, extraLabels) }) } diff --git a/app/vmagent/graphite/request_handler.go b/app/vmagent/graphite/request_handler.go index c3ef22d8d..d9ff2a1b2 100644 --- a/app/vmagent/graphite/request_handler.go +++ b/app/vmagent/graphite/request_handler.go @@ -7,7 +7,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -20,9 +19,7 @@ var ( // // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol func InsertHandler(r io.Reader) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, insertRows) - }) + return parser.ParseStream(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 19177203d..d7211e385 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -16,7 +16,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -37,10 +36,8 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader, isGzipped bool) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error { - return insertRows(nil, db, rows, nil) - }) + return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error { + return insertRows(nil, db, rows, nil) }) } @@ -52,15 +49,13 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - q := req.URL.Query() - precision := q.Get("precision") - // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint - db := q.Get("db") - return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { - return insertRows(at, db, rows, extraLabels) - }) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + q := req.URL.Query() + precision := q.Get("precision") + // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint + db := q.Get("db") + return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return insertRows(at, db, rows, extraLabels) }) } diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 412580f8b..ce2ab12d1 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -38,7 +38,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -104,7 +103,6 @@ func main() { startTime := time.Now() remotewrite.Init() common.StartUnmarshalWorkers() - writeconcurrencylimiter.Init() if len(*influxListenAddr) > 0 { influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { return influx.InsertHandlerForReader(r, false) diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index f0869e042..898394ae8 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -12,7 +12,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -31,10 +30,8 @@ func InsertHandler(at *auth.Token, req *http.Request) error { return err } isGzip := req.Header.Get("Content-Encoding") == "gzip" - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error { - return insertRows(at, block, extraLabels) - }) + return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error { + return insertRows(at, block, extraLabels) }) } diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index 2721912d6..e1e42ba4b 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -7,7 +7,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -20,9 +19,7 @@ var ( // // See http://opentsdb.net/docs/build/html/api_telnet/put.html func InsertHandler(r io.Reader) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, insertRows) - }) + return parser.ParseStream(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index 98b02e5e3..968ea936b 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -25,10 +24,8 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(at, rows, extraLabels) - }) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(at, rows, extraLabels) }) } diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index e5b86a799..571860e25 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -11,7 +11,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -31,21 +30,17 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { - return insertRows(at, rows, extraLabels) - }, nil) - }) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { + return insertRows(at, rows, extraLabels) + }, nil) } // InsertHandlerForReader processes metrics from given reader with optional gzip format func InsertHandlerForReader(r io.Reader, isGzipped bool) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error { - return insertRows(nil, rows, nil) - }, nil) - }) + return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error { + return insertRows(nil, rows, nil) + }, nil) } func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index f2ed1b6f6..4fd0ce0e6 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -13,7 +13,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -29,19 +28,15 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error { - return insertRows(at, tss, extraLabels) - }) + return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error { + return insertRows(at, tss, extraLabels) }) } // InsertHandlerForReader processes metrics from given reader func InsertHandlerForReader(at *auth.Token, r io.Reader) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, func(tss []prompb.TimeSeries) error { - return insertRows(at, tss, nil) - }) + return parser.ParseStream(r, func(tss []prompb.TimeSeries) error { + return insertRows(at, tss, nil) }) } diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index 0afe7389b..781f75753 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -13,7 +13,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -31,20 +30,16 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { - return insertRows(at, rows, extraLabels) - }) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return insertRows(at, rows, extraLabels) }) } // InsertHandlerForReader processes metrics from given reader func InsertHandlerForReader(r io.Reader, isGzipped bool) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, isGzipped, func(rows []parser.Row) error { - return insertRows(nil, rows, nil) - }) + return parser.ParseStream(r, isGzipped, func(rows []parser.Row) error { + return insertRows(nil, rows, nil) }) } diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index 6997dd4cb..971f3abbd 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -8,7 +8,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -23,10 +22,8 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(rows, extraLabels) - }) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) }) } diff --git a/app/vminsert/datadog/request_handler.go b/app/vminsert/datadog/request_handler.go index 6dbfbbbec..48dd6f83f 100644 --- a/app/vminsert/datadog/request_handler.go +++ b/app/vminsert/datadog/request_handler.go @@ -1,7 +1,6 @@ package datadog import ( - "fmt" "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" @@ -9,7 +8,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -26,15 +24,9 @@ func InsertHandlerForHTTP(req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - ce := req.Header.Get("Content-Encoding") - err := parser.ParseStream(req.Body, ce, func(series []parser.Series) error { - return insertRows(series, extraLabels) - }) - if err != nil { - return fmt.Errorf("headers: %q; err: %w", req.Header, err) - } - return nil + ce := req.Header.Get("Content-Encoding") + return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { + return insertRows(series, extraLabels) }) } diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 5aa24f929..5d2d0a504 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -19,9 +18,7 @@ var ( // // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol func InsertHandler(r io.Reader) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, insertRows) - }) + return parser.ParseStream(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 40ede91c8..daa7334d7 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -15,7 +15,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -35,10 +34,8 @@ var ( // // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ func InsertHandlerForReader(r io.Reader) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { - return insertRows(db, rows, nil) - }) + return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { + return insertRows(db, rows, nil) }) } @@ -50,15 +47,13 @@ func InsertHandlerForHTTP(req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - q := req.URL.Query() - precision := q.Get("precision") - // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint - db := q.Get("db") - return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { - return insertRows(db, rows, extraLabels) - }) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + q := req.URL.Query() + precision := q.Get("precision") + // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint + db := q.Get("db") + return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { + return insertRows(db, rows, extraLabels) }) } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 8ad6b2023..118726236 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -35,7 +35,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -71,7 +70,6 @@ func Init() { storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries) storage.SetMaxLabelValueLen(*maxLabelValueLen) common.StartUnmarshalWorkers() - writeconcurrencylimiter.Init() if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) } diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index f821b0889..666848e4a 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -12,7 +12,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -28,10 +27,8 @@ func InsertHandler(req *http.Request) error { return err } isGzip := req.Header.Get("Content-Encoding") == "gzip" - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error { - return insertRows(block, extraLabels) - }) + return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error { + return insertRows(block, extraLabels) }) } diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 49a6157a1..44bf3eb98 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -6,7 +6,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -19,9 +18,7 @@ var ( // // See http://opentsdb.net/docs/build/html/api_telnet/put.html func InsertHandler(r io.Reader) error { - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(r, insertRows) - }) + return parser.ParseStream(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index b5927ecc7..3dafe29f9 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -28,10 +27,8 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(rows, extraLabels) - }) + return parser.ParseStream(req, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) }) default: return fmt.Errorf("unexpected path requested on HTTP OpenTSDB server: %q", path) diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index aa65b7da5..0afccf45b 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -8,7 +8,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -27,12 +26,10 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { - return insertRows(rows, extraLabels) - }, nil) - }) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) + }, nil) } func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index eef326361..8d94e3235 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -24,10 +23,8 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error { - return insertRows(tss, extraLabels) - }) + return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error { + return insertRows(tss, extraLabels) }) } diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index f0a531a54..2a3a45039 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -12,7 +12,6 @@ import ( parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -29,11 +28,9 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } - return writeconcurrencylimiter.Do(func() error { - isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { - return insertRows(rows, extraLabels) - }) + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return insertRows(rows, extraLabels) }) } diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 79ef3df7c..402106a02 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -118,7 +118,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { timerpool.Put(t) concurrencyLimitTimeout.Inc() err := &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("couldn't start executing the request in %.3fs, since -search.maxConcurrentRequests=%d concurrent requests "+ + Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+ "are already executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+ "to increase -search.maxQueueDuration; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests", d.Seconds(), *maxConcurrentRequests), diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index f9eaf7423..4d3cf3793 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -650,22 +650,6 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().TooSmallTimestampRows) }) - metrics.NewGauge(`vm_concurrent_addrows_limit_reached_total`, func() float64 { - return float64(m().AddRowsConcurrencyLimitReached) - }) - metrics.NewGauge(`vm_concurrent_addrows_limit_timeout_total`, func() float64 { - return float64(m().AddRowsConcurrencyLimitTimeout) - }) - metrics.NewGauge(`vm_concurrent_addrows_dropped_rows_total`, func() float64 { - return float64(m().AddRowsConcurrencyDroppedRows) - }) - metrics.NewGauge(`vm_concurrent_addrows_capacity`, func() float64 { - return float64(m().AddRowsConcurrencyCapacity) - }) - metrics.NewGauge(`vm_concurrent_addrows_current`, func() float64 { - return float64(m().AddRowsConcurrencyCurrent) - }) - metrics.NewGauge(`vm_search_delays_total`, func() float64 { return float64(m().SearchDelays) }) diff --git a/dashboards/victoriametrics-cluster.json b/dashboards/victoriametrics-cluster.json index 51fd6350a..e43f206f2 100644 --- a/dashboards/victoriametrics-cluster.json +++ b/dashboards/victoriametrics-cluster.json @@ -5021,7 +5021,7 @@ }, "editorMode": "code", "exemplar": true, - "expr": "max(\n max_over_time(vm_concurrent_addrows_current{job=~\"$job_storage\", \n instance=~\"$instance\"}[$__rate_interval])\n)", + "expr": "max(\n max_over_time(vm_concurrent_insert_current{job=~\"$job_storage\", \n instance=~\"$instance\"}[$__rate_interval])\n)", "interval": "", "legendFormat": "current", "range": true, @@ -5034,7 +5034,7 @@ }, "editorMode": "code", "exemplar": true, - "expr": "min(vm_concurrent_addrows_capacity{job=~\"$job_storage\", instance=~\"$instance\"})", + "expr": "min(vm_concurrent_insert_capacity{job=~\"$job_storage\", instance=~\"$instance\"})", "hide": false, "interval": "", "legendFormat": "max", diff --git a/dashboards/victoriametrics.json b/dashboards/victoriametrics.json index 51cc849c5..5afd6087e 100644 --- a/dashboards/victoriametrics.json +++ b/dashboards/victoriametrics.json @@ -4609,7 +4609,7 @@ "uid": "$ds" }, "editorMode": "code", - "expr": "max_over_time(vm_concurrent_addrows_capacity{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])", + "expr": "max_over_time(vm_concurrent_insert_capacity{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])", "format": "time_series", "interval": "", "intervalFactor": 1, @@ -4622,7 +4622,7 @@ "type": "prometheus", "uid": "$ds" }, - "expr": "sum(vm_concurrent_addrows_current{job=~\"$job\", instance=~\"$instance\"})", + "expr": "sum(vm_concurrent_insert_current{job=~\"$job\", instance=~\"$instance\"})", "format": "time_series", "intervalFactor": 1, "legendFormat": "current", @@ -5319,4 +5319,4 @@ "uid": "wNf0q_kZk", "version": 1, "weekStart": "" -} \ No newline at end of file +} diff --git a/deployment/docker/alerts-cluster.yml b/deployment/docker/alerts-cluster.yml index da36260de..f1161a48b 100644 --- a/deployment/docker/alerts-cluster.yml +++ b/deployment/docker/alerts-cluster.yml @@ -81,7 +81,7 @@ groups: Possible reasons for errors are misconfiguration, overload, network blips or unreachable components." - alert: ConcurrentFlushesHitTheLimit - expr: avg_over_time(vm_concurrent_addrows_current[1m]) >= vm_concurrent_addrows_capacity + expr: avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity for: 15m labels: severity: warning diff --git a/deployment/docker/alerts.yml b/deployment/docker/alerts.yml index 9a773d95d..954f037b8 100644 --- a/deployment/docker/alerts.yml +++ b/deployment/docker/alerts.yml @@ -61,7 +61,7 @@ groups: Please verify if clients are sending correct requests." - alert: ConcurrentFlushesHitTheLimit - expr: avg_over_time(vm_concurrent_addrows_current[1m]) >= vm_concurrent_addrows_capacity + expr: avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity for: 15m labels: severity: warning diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index af7d6765a..3b3fb45ff 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,10 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +**Update note 1:** This release changes the logic behind `-maxConcurrentInserts` command-line flag. Previously this flag was limiting the number of concurrent connections established from clients, which send data to VictoriaMetrics. Some of these connections could be temporarily idle. Such connections do not take significant CPU and memory resources, so there is no need in limiting their count. The new logic takes into account only those connections, which **actively** ingest new data to VictoriaMetrics and to [vmagent](https://docs.victoriametrics.com/vmagent.html). This means that the default `-maxConcurrentInserts` value should handle cases, which could require increasing the value in the previous releases. So it is recommended trying to remove the explicitly set `-maxConcurrentInserts` command-line flag after upgrading to this release and verifying whether this reduces CPU and memory usage. + +**Update note 2:** The `vm_concurrent_addrows_current` and `vm_concurrent_addrows_capacity` metrics [exported](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) by `vmstorage` are replaced with `vm_concurrent_insert_current` and `vm_concurrent_insert_capacity` metrics in order to be consistent with the corresponding metrics exported by `vminsert`. Please update queries in dahsboards and alerting rules with new metric names if old metric names are used there. + * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus-compatible target discovery for [HashiCorp Nomad](https://www.nomadproject.io/) services via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367). Thanks to @mr-karan for [the implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3549). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically pre-fetch `metric_relabel_configs` and the target labels when clicking on the `debug metrics relabeling` link at the `http://vmagent:8429/targets` page at the particular target. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabel-debug). @@ -24,6 +28,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling): add support for `keepequal` and `dropequal` relabeling actions, which are supported by Prometheus starting from [v2.41.0](https://github.com/prometheus/prometheus/releases/tag/v2.41.0). These relabeling actions are almost identical to `keep_if_equal` and `drop_if_equal` relabeling actions supported by VictoriaMetrics since `v1.38.0` - see [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) - so it is recommended sticking to `keep_if_equal` and `drop_if_equal` actions instead of switching to `keepequal` and `dropequal`. * FEATURE: [csvimport](https://docs.victoriametrics.com/#how-to-import-csv-data): support empty values for imported metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3540). * FEATURE: [vmalert](httpпоs://docs.victoriametrics.com/vmalert.html): allow configuring the default number of stored rule's update states in memory via global `-rule.updateEntriesLimit` command-line flag or per-rule via rule's `update_entries_limit` configuration param. See [these docs](https://docs.victoriametrics.com/vmalert.html#rules) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3556). +* FEATURE: improve the logic benhind `-maxConcurrentInserts` command-line flag. Previously this flag was limiting the number of concurrent connections from clients, which write data to VictoriaMetrics or [vmagent](https://docs.victoriametrics.com/vmagent.html). Some of these connections could be idle for some time. These connections do not need significant amounts of CPU and memory, so there is no sense in limiting their count. The updated logic behind `-maxConcurrentInserts` limits the number of **active** insert requests, not counting idle connections. +* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `-maxConcurrentInserts` and `-insert.maxQueueDuration` command-line flags to `vmstorage`, so they could be tuned if needed in the same way as at `vminsert` nodes. * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): limit the number of concurrently executed requests at `vmstorage` proportionally to the number of available CPU cores, since every request can saturate a single CPU core at `vmstorage`. Previously a single `vmstorage` could accept and start processing arbitrary number of concurrent requests received from big number of `vmselect` nodes. This could result in increased RAM, CPU and disk IO usage or event to out of memory crash at `vmstorage` side under high load. The limit can be fine-tuned if needed via `-search.maxConcurrentRequests` command-line flag at `vmstorage` according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#resource-usage-limits). `vmstorage` now [exposes](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) the following additional metrics at `http://vmstorage:8482/metrics` page: - `vm_vmselect_concurrent_requests_capacity` - the maximum number of requests allowed to execute concurrently - `vm_vmselect_concurrent_requests_current` - the current number of concurrently executed requests diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 09bde595b..c5ac6814c 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -860,7 +860,7 @@ Below is the output for `/path/to/vminsert -help`: -influxTrimTimestamp duration Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -insert.maxQueueDuration duration - The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s) + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are already executed (default 1m0s) -loggerDisableTimestamps Whether to disable writing timestamps in logs -loggerErrorsPerSecondLimit int @@ -878,7 +878,7 @@ Below is the output for `/path/to/vminsert -help`: -loggerWarnsPerSecondLimit int Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit -maxConcurrentInserts int - The maximum number of concurrent inserts. Default value should work for most cases, since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration (default 16) + The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the overhead. See also -insert.maxQueueDuration (default 8) -maxInsertRequestSize size The maximum size in bytes of a single Prometheus remote_write API request Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432) @@ -1240,6 +1240,8 @@ Below is the output for `/path/to/vmstorage -help`: Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password -httpListenAddr string Address to listen for http connections (default ":8482") + -insert.maxQueueDuration duration + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are already executed (default 1m0s) -logNewSeries Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics -loggerDisableTimestamps @@ -1258,6 +1260,8 @@ Below is the output for `/path/to/vmstorage -help`: Timezone to use for timestamps in logs. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 or Local (default "UTC") -loggerWarnsPerSecondLimit int Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit + -maxConcurrentInserts int + The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the overhead. See also -insert.maxQueueDuration (default 8) -memory.allowedBytes size Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 45a86a35d..50db7840a 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -26,13 +27,16 @@ var ( // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { + wcr := writeconcurrencylimiter.GetReader(req.Body) + defer writeconcurrencylimiter.PutReader(wcr) + r := io.Reader(wcr) + q := req.URL.Query() format := q.Get("format") cds, err := ParseColumnDescriptors(format) if err != nil { return fmt.Errorf("cannot parse the provided csv format: %w", err) } - r := req.Body if req.Header.Get("Content-Encoding") == "gzip" { zr, err := common.GetGzipReader(r) if err != nil { @@ -51,6 +55,7 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) + wcr.DecConcurrency() } ctx.wg.Wait() if err := ctx.Error(); err != nil { diff --git a/lib/protoparser/datadog/streamparser.go b/lib/protoparser/datadog/streamparser.go index ef1877071..bdc1ad4d6 100644 --- a/lib/protoparser/datadog/streamparser.go +++ b/lib/protoparser/datadog/streamparser.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -33,6 +34,10 @@ var ( // // callback shouldn't hold series after returning. func ParseStream(r io.Reader, contentEncoding string, callback func(series []Series) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + switch contentEncoding { case "gzip": zr, err := common.GetGzipReader(r) diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index a4207b5df..1fb94fffd 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -26,6 +27,10 @@ var ( // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + ctx := getStreamContext(r) defer putStreamContext(ctx) @@ -36,6 +41,7 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) + wcr.DecConcurrency() } ctx.wg.Wait() if err := ctx.Error(); err != nil { diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 2f21309a5..1d1a1057f 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -27,6 +28,10 @@ var ( // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + if isGzipped { zr, err := common.GetGzipReader(r) if err != nil { @@ -63,6 +68,7 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) + wcr.DecConcurrency() } ctx.wg.Wait() if err := ctx.Error(); err != nil { diff --git a/lib/protoparser/native/streamparser.go b/lib/protoparser/native/streamparser.go index afb7d6924..e771ad554 100644 --- a/lib/protoparser/native/streamparser.go +++ b/lib/protoparser/native/streamparser.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -19,6 +20,10 @@ import ( // // callback shouldn't hold block after returning. func ParseStream(r io.Reader, isGzip bool, callback func(block *Block) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + if isGzip { zr, err := common.GetGzipReader(r) if err != nil { @@ -101,6 +106,7 @@ func ParseStream(r io.Reader, isGzip bool, callback func(block *Block) error) er ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) + wcr.DecConcurrency() } } diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index 94e7fc5b8..b71babdea 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -26,6 +27,10 @@ var ( // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + ctx := getStreamContext(r) defer putStreamContext(ctx) for ctx.Read() { @@ -35,6 +40,7 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error { uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) + wcr.DecConcurrency() } ctx.wg.Wait() if err := ctx.Error(); err != nil { diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index edad74dd2..cfa0512a6 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -29,8 +30,11 @@ var ( // // callback shouldn't hold rows after returning. func ParseStream(req *http.Request, callback func(rows []Row) error) error { + wcr := writeconcurrencylimiter.GetReader(req.Body) + defer writeconcurrencylimiter.PutReader(wcr) + r := io.Reader(req.Body) + readCalls.Inc() - r := req.Body if req.Header.Get("Content-Encoding") == "gzip" { zr, err := common.GetGzipReader(r) if err != nil { diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index af093f82f..b7e8a2bff 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -19,6 +20,10 @@ import ( // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error, errLogger func(string)) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + if isGzipped { zr, err := common.GetGzipReader(r) if err != nil { @@ -38,6 +43,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) + wcr.DecConcurrency() } ctx.wg.Wait() if err := ctx.Error(); err != nil { diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index 12c118c0a..44a507260 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" "github.com/golang/snappy" ) @@ -21,6 +22,10 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102 // // callback shouldn't hold tss after returning. func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + ctx := getPushCtx(r) defer putPushCtx(ctx) if err := ctx.Read(); err != nil { diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index bc5a1570d..3d4ba19d6 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -22,6 +23,10 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error { + wcr := writeconcurrencylimiter.GetReader(r) + defer writeconcurrencylimiter.PutReader(wcr) + r = wcr + if isGzipped { zr, err := common.GetGzipReader(r) if err != nil { @@ -39,6 +44,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf ctx.wg.Add(1) common.ScheduleUnmarshalWork(uw) + wcr.DecConcurrency() } ctx.wg.Wait() if err := ctx.Error(); err != nil { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a3e8f19aa..f6d633467 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -18,7 +18,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -28,7 +27,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" @@ -46,10 +44,6 @@ type Storage struct { tooSmallTimestampRows uint64 tooBigTimestampRows uint64 - addRowsConcurrencyLimitReached uint64 - addRowsConcurrencyLimitTimeout uint64 - addRowsConcurrencyDroppedRows uint64 - slowRowInserts uint64 slowPerDayIndexInserts uint64 slowMetricNameLoads uint64 @@ -451,12 +445,6 @@ type Metrics struct { TooSmallTimestampRows uint64 TooBigTimestampRows uint64 - AddRowsConcurrencyLimitReached uint64 - AddRowsConcurrencyLimitTimeout uint64 - AddRowsConcurrencyDroppedRows uint64 - AddRowsConcurrencyCapacity uint64 - AddRowsConcurrencyCurrent uint64 - SearchDelays uint64 SlowRowInserts uint64 @@ -528,12 +516,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows) m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows) - m.AddRowsConcurrencyLimitReached += atomic.LoadUint64(&s.addRowsConcurrencyLimitReached) - m.AddRowsConcurrencyLimitTimeout += atomic.LoadUint64(&s.addRowsConcurrencyLimitTimeout) - m.AddRowsConcurrencyDroppedRows += atomic.LoadUint64(&s.addRowsConcurrencyDroppedRows) - m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) - m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) - m.SearchDelays = storagepacelimiter.Search.DelaysTotal() m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) @@ -1521,38 +1503,14 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error { var rowsAddedTotal uint64 // AddRows adds the given mrs to s. +// +// The caller should limit the number of concurrent AddRows calls to the number +// of available CPU cores in order to limit memory usage. func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { if len(mrs) == 0 { return nil } - // Limit the number of concurrent goroutines that may add rows to the storage. - // This should prevent from out of memory errors and CPU thrashing when too many - // goroutines call AddRows. - select { - case addRowsConcurrencyCh <- struct{}{}: - default: - // Sleep for a while until giving up - atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1) - t := timerpool.Get(addRowsTimeout) - - // Prioritize data ingestion over concurrent searches. - storagepacelimiter.Search.Inc() - - select { - case addRowsConcurrencyCh <- struct{}{}: - timerpool.Put(t) - storagepacelimiter.Search.Dec() - case <-t.C: - timerpool.Put(t) - storagepacelimiter.Search.Dec() - atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) - atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) - return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load", - len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh)) - } - } - // Add rows to the storage in blocks with limited size in order to reduce memory usage. var firstErr error ic := getMetricRowsInsertCtx() @@ -1575,8 +1533,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { } putMetricRowsInsertCtx(ic) - <-addRowsConcurrencyCh - return firstErr } @@ -1608,14 +1564,6 @@ var metricRowsInsertCtxPool sync.Pool const maxMetricRowsPerBlock = 8000 -var ( - // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation - // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent - // goroutines on data ingestion path. - addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) - addRowsTimeout = 30 * time.Second -) - // RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later. // // The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp. diff --git a/lib/writeconcurrencylimiter/concurrencylimiter.go b/lib/writeconcurrencylimiter/concurrencylimiter.go index 0237f5ba1..a54672d9b 100644 --- a/lib/writeconcurrencylimiter/concurrencylimiter.go +++ b/lib/writeconcurrencylimiter/concurrencylimiter.go @@ -3,7 +3,9 @@ package writeconcurrencylimiter import ( "flag" "fmt" + "io" "net/http" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" @@ -13,62 +15,118 @@ import ( ) var ( - maxConcurrentInserts = flag.Int("maxConcurrentInserts", cgroup.AvailableCPUs()*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+ - "since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration") - maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts") + maxConcurrentInserts = flag.Int("maxConcurrentInserts", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent insert requests. "+ + "Default value should work for most cases, since it minimizes the overhead. See also -insert.maxQueueDuration") + maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration to wait in the queue when -maxConcurrentInserts "+ + "concurrent insert requests are already executed") ) -// ch is the channel for limiting concurrent calls to Do. -var ch chan struct{} - -// Init initializes concurrencylimiter. +// Reader is a reader, which increases the concurrency after the first Read() call // -// Init must be called after flag.Parse call. -func Init() { - ch = make(chan struct{}, *maxConcurrentInserts) +// The concurrency can be reduced by calling DecConcurrency(). +// Then the concurrency is increased after the next Read() call. +type Reader struct { + r io.Reader + increasedConcurrency bool } -// Do calls f with the limited concurrency. -func Do(f func() error) error { - // Limit the number of conurrent f calls in order to prevent from excess - // memory usage and CPU thrashing. +// GetReader returns the Reader for r. +// +// The PutReader() must be called when the returned Reader is no longer needed. +func GetReader(r io.Reader) *Reader { + v := readerPool.Get() + if v == nil { + return &Reader{ + r: r, + } + } + rr := v.(*Reader) + rr.r = r + return rr +} + +// PutReader returns the r to the pool. +// +// It decreases the concurrency if r has increased concurrency. +func PutReader(r *Reader) { + r.DecConcurrency() + r.r = nil + readerPool.Put(r) +} + +var readerPool sync.Pool + +// Read implements io.Reader. +// +// It increases concurrency after the first call or after the next call after DecConcurrency() call. +func (r *Reader) Read(p []byte) (int, error) { + n, err := r.r.Read(p) + if !r.increasedConcurrency { + if err := incConcurrency(); err != nil { + return 0, err + } + r.increasedConcurrency = true + } + return n, err +} + +// DecConcurrency decreases the concurrency, so it could be increased again after the next Read() call. +func (r *Reader) DecConcurrency() { + if r.increasedConcurrency { + decConcurrency() + r.increasedConcurrency = false + } +} + +func initConcurrencyLimitCh() { + concurrencyLimitCh = make(chan struct{}, *maxConcurrentInserts) +} + +var ( + concurrencyLimitCh chan struct{} + concurrencyLimitChOnce sync.Once +) + +func incConcurrency() error { + concurrencyLimitChOnce.Do(initConcurrencyLimitCh) + select { - case ch <- struct{}{}: - err := f() - <-ch - return err + case concurrencyLimitCh <- struct{}{}: + return nil default: } - // All the workers are busy. - // Sleep for up to *maxQueueDuration. concurrencyLimitReached.Inc() t := timerpool.Get(*maxQueueDuration) select { - case ch <- struct{}{}: + case concurrencyLimitCh <- struct{}{}: timerpool.Put(t) - err := f() - <-ch - return err + return nil case <-t.C: timerpool.Put(t) concurrencyLimitTimeout.Inc() return &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("cannot handle more than %d concurrent inserts during %s; possible solutions: "+ - "increase `-insert.maxQueueDuration`, increase `-maxConcurrentInserts`, increase server capacity", *maxConcurrentInserts, *maxQueueDuration), + Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are already executed. "+ + "Possible solutions: to reduce workload; to increase compute resources at the server; "+ + "to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts", + maxQueueDuration.Seconds(), *maxConcurrentInserts), StatusCode: http.StatusServiceUnavailable, } } } +func decConcurrency() { + <-concurrencyLimitCh +} + var ( concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`) concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`) _ = metrics.NewGauge(`vm_concurrent_insert_capacity`, func() float64 { - return float64(cap(ch)) + return float64(cap(concurrencyLimitCh)) }) _ = metrics.NewGauge(`vm_concurrent_insert_current`, func() float64 { - return float64(len(ch)) + return float64(len(concurrencyLimitCh)) }) )