lib/writeconcurrencylimiter: improve the logic behind -maxConcurrentInserts limit

Previously the -maxConcurrentInserts was limiting the number of established client connections,
which write data to VictoriaMetrics. Some of these connections could be idle.
Such connections do not consume big amounts of CPU and RAM, so there is a little sense in limiting
the number of such connections. So now the -maxConcurrentInserts command-line option
limits the number of concurrently executed insert requests, not including idle connections.

It is recommended removing -maxConcurrentInserts command-line option, since the default value
for this option should work good for most cases.
This commit is contained in:
Aliaksandr Valialkin 2023-01-06 18:59:39 -08:00
parent f299d2ca1a
commit c63755c316
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
42 changed files with 231 additions and 255 deletions

View file

@ -10,7 +10,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -26,10 +25,8 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
}

View file

@ -10,7 +10,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -28,11 +27,9 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels)
})
ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels)
})
}

View file

@ -7,7 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -20,9 +19,7 @@ var (
//
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, insertRows)
})
return parser.ParseStream(r, insertRows)
}
func insertRows(rows []parser.Row) error {

View file

@ -16,7 +16,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -37,10 +36,8 @@ var (
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil)
})
return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil)
})
}
@ -52,15 +49,13 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, extraLabels)
})
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, extraLabels)
})
}

View file

@ -38,7 +38,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -104,7 +103,6 @@ func main() {
startTime := time.Now()
remotewrite.Init()
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false)

View file

@ -12,7 +12,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -31,10 +30,8 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err
}
isGzip := req.Header.Get("Content-Encoding") == "gzip"
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error {
return insertRows(at, block, extraLabels)
})
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error {
return insertRows(at, block, extraLabels)
})
}

View file

@ -7,7 +7,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -20,9 +19,7 @@ var (
//
// See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, insertRows)
})
return parser.ParseStream(r, insertRows)
}
func insertRows(rows []parser.Row) error {

View file

@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -25,10 +24,8 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
}

View file

@ -11,7 +11,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -31,21 +30,17 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
}, nil)
})
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
}, nil)
}
// InsertHandlerForReader processes metrics from given reader with optional gzip format
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
}, nil)
})
return parser.ParseStream(r, 0, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
}, nil)
}
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {

View file

@ -13,7 +13,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -29,19 +28,15 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, extraLabels)
})
}
// InsertHandlerForReader processes metrics from given reader
func InsertHandlerForReader(at *auth.Token, r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, nil)
})
return parser.ParseStream(r, func(tss []prompb.TimeSeries) error {
return insertRows(at, tss, nil)
})
}

View file

@ -13,7 +13,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -31,20 +30,16 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels)
})
}
// InsertHandlerForReader processes metrics from given reader
func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
})
return parser.ParseStream(r, isGzipped, func(rows []parser.Row) error {
return insertRows(nil, rows, nil)
})
}

View file

@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -23,10 +22,8 @@ func InsertHandler(req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
}

View file

@ -1,7 +1,6 @@
package datadog
import (
"fmt"
"net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
@ -9,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -26,15 +24,9 @@ func InsertHandlerForHTTP(req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
ce := req.Header.Get("Content-Encoding")
err := parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
return insertRows(series, extraLabels)
})
if err != nil {
return fmt.Errorf("headers: %q; err: %w", req.Header, err)
}
return nil
ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error {
return insertRows(series, extraLabels)
})
}

View file

@ -6,7 +6,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -19,9 +18,7 @@ var (
//
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, insertRows)
})
return parser.ParseStream(r, insertRows)
}
func insertRows(rows []parser.Row) error {

View file

@ -15,7 +15,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -35,10 +34,8 @@ var (
//
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil)
})
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil)
})
}
@ -50,15 +47,13 @@ func InsertHandlerForHTTP(req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels)
})
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
q := req.URL.Query()
precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels)
})
}

View file

@ -35,7 +35,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -71,7 +70,6 @@ func Init() {
storage.SetMaxLabelsPerTimeseries(*maxLabelsPerTimeseries)
storage.SetMaxLabelValueLen(*maxLabelValueLen)
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init()
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)
}

View file

@ -12,7 +12,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -28,10 +27,8 @@ func InsertHandler(req *http.Request) error {
return err
}
isGzip := req.Header.Get("Content-Encoding") == "gzip"
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error {
return insertRows(block, extraLabels)
})
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error {
return insertRows(block, extraLabels)
})
}

View file

@ -6,7 +6,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -19,9 +18,7 @@ var (
//
// See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, insertRows)
})
return parser.ParseStream(r, insertRows)
}
func insertRows(rows []parser.Row) error {

View file

@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -28,10 +27,8 @@ func InsertHandler(req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
default:
return fmt.Errorf("unexpected path requested on HTTP OpenTSDB server: %q", path)

View file

@ -8,7 +8,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -27,12 +26,10 @@ func InsertHandler(req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
}, nil)
})
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
}, nil)
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {

View file

@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -24,10 +23,8 @@ func InsertHandler(req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
})
return parser.ParseStream(req.Body, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
})
}

View file

@ -12,7 +12,6 @@ import (
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -29,11 +28,9 @@ func InsertHandler(req *http.Request) error {
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
}

View file

@ -118,7 +118,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
timerpool.Put(t)
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("couldn't start executing the request in %.3fs, since -search.maxConcurrentRequests=%d concurrent requests "+
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are already executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
d.Seconds(), *maxConcurrentRequests),

View file

@ -650,22 +650,6 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().TooSmallTimestampRows)
})
metrics.NewGauge(`vm_concurrent_addrows_limit_reached_total`, func() float64 {
return float64(m().AddRowsConcurrencyLimitReached)
})
metrics.NewGauge(`vm_concurrent_addrows_limit_timeout_total`, func() float64 {
return float64(m().AddRowsConcurrencyLimitTimeout)
})
metrics.NewGauge(`vm_concurrent_addrows_dropped_rows_total`, func() float64 {
return float64(m().AddRowsConcurrencyDroppedRows)
})
metrics.NewGauge(`vm_concurrent_addrows_capacity`, func() float64 {
return float64(m().AddRowsConcurrencyCapacity)
})
metrics.NewGauge(`vm_concurrent_addrows_current`, func() float64 {
return float64(m().AddRowsConcurrencyCurrent)
})
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays)
})

View file

@ -5021,7 +5021,7 @@
},
"editorMode": "code",
"exemplar": true,
"expr": "max(\n max_over_time(vm_concurrent_addrows_current{job=~\"$job_storage\", \n instance=~\"$instance\"}[$__rate_interval])\n)",
"expr": "max(\n max_over_time(vm_concurrent_insert_current{job=~\"$job_storage\", \n instance=~\"$instance\"}[$__rate_interval])\n)",
"interval": "",
"legendFormat": "current",
"range": true,
@ -5034,7 +5034,7 @@
},
"editorMode": "code",
"exemplar": true,
"expr": "min(vm_concurrent_addrows_capacity{job=~\"$job_storage\", instance=~\"$instance\"})",
"expr": "min(vm_concurrent_insert_capacity{job=~\"$job_storage\", instance=~\"$instance\"})",
"hide": false,
"interval": "",
"legendFormat": "max",

View file

@ -4609,7 +4609,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "max_over_time(vm_concurrent_addrows_capacity{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])",
"expr": "max_over_time(vm_concurrent_insert_capacity{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
@ -4622,7 +4622,7 @@
"type": "prometheus",
"uid": "$ds"
},
"expr": "sum(vm_concurrent_addrows_current{job=~\"$job\", instance=~\"$instance\"})",
"expr": "sum(vm_concurrent_insert_current{job=~\"$job\", instance=~\"$instance\"})",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "current",

View file

@ -81,7 +81,7 @@ groups:
Possible reasons for errors are misconfiguration, overload, network blips or unreachable components."
- alert: ConcurrentFlushesHitTheLimit
expr: avg_over_time(vm_concurrent_addrows_current[1m]) >= vm_concurrent_addrows_capacity
expr: avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity
for: 15m
labels:
severity: warning

View file

@ -61,7 +61,7 @@ groups:
Please verify if clients are sending correct requests."
- alert: ConcurrentFlushesHitTheLimit
expr: avg_over_time(vm_concurrent_addrows_current[1m]) >= vm_concurrent_addrows_capacity
expr: avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity
for: 15m
labels:
severity: warning

View file

@ -15,6 +15,10 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
**Update note 1:** This release changes the logic behind `-maxConcurrentInserts` command-line flag. Previously this flag was limiting the number of concurrent connections established from clients, which send data to VictoriaMetrics. Some of these connections could be temporarily idle. Such connections do not take significant CPU and memory resources, so there is no need in limiting their count. The new logic takes into account only those connections, which **actively** ingest new data to VictoriaMetrics and to [vmagent](https://docs.victoriametrics.com/vmagent.html). This means that the default `-maxConcurrentInserts` value should handle cases, which could require increasing the value in the previous releases. So it is recommended trying to remove the explicitly set `-maxConcurrentInserts` command-line flag after upgrading to this release and verifying whether this reduces CPU and memory usage.
**Update note 2:** The `vm_concurrent_addrows_current` and `vm_concurrent_addrows_capacity` metrics [exported](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) by `vmstorage` are replaced with `vm_concurrent_insert_current` and `vm_concurrent_insert_capacity` metrics in order to be consistent with the corresponding metrics exported by `vminsert`. Please update queries in dahsboards and alerting rules with new metric names if old metric names are used there.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus-compatible target discovery for [HashiCorp Nomad](https://www.nomadproject.io/) services via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367). Thanks to @mr-karan for [the implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3549).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically pre-fetch `metric_relabel_configs` and the target labels when clicking on the `debug metrics relabeling` link at the `http://vmagent:8429/targets` page at the particular target. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabel-debug).
@ -24,6 +28,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling): add support for `keepequal` and `dropequal` relabeling actions, which are supported by Prometheus starting from [v2.41.0](https://github.com/prometheus/prometheus/releases/tag/v2.41.0). These relabeling actions are almost identical to `keep_if_equal` and `drop_if_equal` relabeling actions supported by VictoriaMetrics since `v1.38.0` - see [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) - so it is recommended sticking to `keep_if_equal` and `drop_if_equal` actions instead of switching to `keepequal` and `dropequal`.
* FEATURE: [csvimport](https://docs.victoriametrics.com/#how-to-import-csv-data): support empty values for imported metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3540).
* FEATURE: [vmalert](httpпоs://docs.victoriametrics.com/vmalert.html): allow configuring the default number of stored rule's update states in memory via global `-rule.updateEntriesLimit` command-line flag or per-rule via rule's `update_entries_limit` configuration param. See [these docs](https://docs.victoriametrics.com/vmalert.html#rules) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3556).
* FEATURE: improve the logic benhind `-maxConcurrentInserts` command-line flag. Previously this flag was limiting the number of concurrent connections from clients, which write data to VictoriaMetrics or [vmagent](https://docs.victoriametrics.com/vmagent.html). Some of these connections could be idle for some time. These connections do not need significant amounts of CPU and memory, so there is no sense in limiting their count. The updated logic behind `-maxConcurrentInserts` limits the number of **active** insert requests, not counting idle connections.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): add `-maxConcurrentInserts` and `-insert.maxQueueDuration` command-line flags to `vmstorage`, so they could be tuned if needed in the same way as at `vminsert` nodes.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): limit the number of concurrently executed requests at `vmstorage` proportionally to the number of available CPU cores, since every request can saturate a single CPU core at `vmstorage`. Previously a single `vmstorage` could accept and start processing arbitrary number of concurrent requests received from big number of `vmselect` nodes. This could result in increased RAM, CPU and disk IO usage or event to out of memory crash at `vmstorage` side under high load. The limit can be fine-tuned if needed via `-search.maxConcurrentRequests` command-line flag at `vmstorage` according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#resource-usage-limits). `vmstorage` now [exposes](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) the following additional metrics at `http://vmstorage:8482/metrics` page:
- `vm_vmselect_concurrent_requests_capacity` - the maximum number of requests allowed to execute concurrently
- `vm_vmselect_concurrent_requests_current` - the current number of concurrently executed requests

View file

@ -860,7 +860,7 @@ Below is the output for `/path/to/vminsert -help`:
-influxTrimTimestamp duration
Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
-insert.maxQueueDuration duration
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are already executed (default 1m0s)
-loggerDisableTimestamps
Whether to disable writing timestamps in logs
-loggerErrorsPerSecondLimit int
@ -878,7 +878,7 @@ Below is the output for `/path/to/vminsert -help`:
-loggerWarnsPerSecondLimit int
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
-maxConcurrentInserts int
The maximum number of concurrent inserts. Default value should work for most cases, since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration (default 16)
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the overhead. See also -insert.maxQueueDuration (default 8)
-maxInsertRequestSize size
The maximum size in bytes of a single Prometheus remote_write API request
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432)
@ -1240,6 +1240,8 @@ Below is the output for `/path/to/vmstorage -help`:
Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
-httpListenAddr string
Address to listen for http connections (default ":8482")
-insert.maxQueueDuration duration
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are already executed (default 1m0s)
-logNewSeries
Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics
-loggerDisableTimestamps
@ -1258,6 +1260,8 @@ Below is the output for `/path/to/vmstorage -help`:
Timezone to use for timestamps in logs. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 or Local (default "UTC")
-loggerWarnsPerSecondLimit int
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
-maxConcurrentInserts int
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the overhead. See also -insert.maxQueueDuration (default 8)
-memory.allowedBytes size
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -26,13 +27,16 @@ var (
//
// callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error {
wcr := writeconcurrencylimiter.GetReader(req.Body)
defer writeconcurrencylimiter.PutReader(wcr)
r := io.Reader(wcr)
q := req.URL.Query()
format := q.Get("format")
cds, err := ParseColumnDescriptors(format)
if err != nil {
return fmt.Errorf("cannot parse the provided csv format: %w", err)
}
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(r)
if err != nil {
@ -51,6 +55,7 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View file

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -33,6 +34,10 @@ var (
//
// callback shouldn't hold series after returning.
func ParseStream(r io.Reader, contentEncoding string, callback func(series []Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
switch contentEncoding {
case "gzip":
zr, err := common.GetGzipReader(r)

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -26,6 +27,10 @@ var (
//
// callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getStreamContext(r)
defer putStreamContext(ctx)
@ -36,6 +41,7 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -27,6 +28,10 @@ var (
//
// callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
@ -63,6 +68,7 @@ func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback fun
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -19,6 +20,10 @@ import (
//
// callback shouldn't hold block after returning.
func ParseStream(r io.Reader, isGzip bool, callback func(block *Block) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzip {
zr, err := common.GetGzipReader(r)
if err != nil {
@ -101,6 +106,7 @@ func ParseStream(r io.Reader, isGzip bool, callback func(block *Block) error) er
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
}

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -26,6 +27,10 @@ var (
//
// callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getStreamContext(r)
defer putStreamContext(ctx)
for ctx.Read() {
@ -35,6 +40,7 @@ func ParseStream(r io.Reader, callback func(rows []Row) error) error {
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -29,8 +30,11 @@ var (
//
// callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error {
wcr := writeconcurrencylimiter.GetReader(req.Body)
defer writeconcurrencylimiter.PutReader(wcr)
r := io.Reader(req.Body)
readCalls.Inc()
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
zr, err := common.GetGzipReader(r)
if err != nil {

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -19,6 +20,10 @@ import (
//
// callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error, errLogger func(string)) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
@ -38,6 +43,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
)
@ -21,6 +22,10 @@ var maxInsertRequestSize = flagutil.NewBytes("maxInsertRequestSize", 32*1024*102
//
// callback shouldn't hold tss after returning.
func ParseStream(r io.Reader, callback func(tss []prompb.TimeSeries) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
ctx := getPushCtx(r)
defer putPushCtx(ctx)
if err := ctx.Read(); err != nil {

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -22,6 +23,10 @@ var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maxi
//
// callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
@ -39,6 +44,7 @@ func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) e
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {

View file

@ -18,7 +18,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -28,7 +27,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
@ -46,10 +44,6 @@ type Storage struct {
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
addRowsConcurrencyLimitReached uint64
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
slowMetricNameLoads uint64
@ -451,12 +445,6 @@ type Metrics struct {
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
AddRowsConcurrencyLimitReached uint64
AddRowsConcurrencyLimitTimeout uint64
AddRowsConcurrencyDroppedRows uint64
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
SearchDelays uint64
SlowRowInserts uint64
@ -528,12 +516,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
m.AddRowsConcurrencyLimitReached += atomic.LoadUint64(&s.addRowsConcurrencyLimitReached)
m.AddRowsConcurrencyLimitTimeout += atomic.LoadUint64(&s.addRowsConcurrencyLimitTimeout)
m.AddRowsConcurrencyDroppedRows += atomic.LoadUint64(&s.addRowsConcurrencyDroppedRows)
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
@ -1521,38 +1503,14 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error {
var rowsAddedTotal uint64
// AddRows adds the given mrs to s.
//
// The caller should limit the number of concurrent AddRows calls to the number
// of available CPU cores in order to limit memory usage.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
}
// Limit the number of concurrent goroutines that may add rows to the storage.
// This should prevent from out of memory errors and CPU thrashing when too many
// goroutines call AddRows.
select {
case addRowsConcurrencyCh <- struct{}{}:
default:
// Sleep for a while until giving up
atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)
t := timerpool.Get(addRowsTimeout)
// Prioritize data ingestion over concurrent searches.
storagepacelimiter.Search.Inc()
select {
case addRowsConcurrencyCh <- struct{}{}:
timerpool.Put(t)
storagepacelimiter.Search.Dec()
case <-t.C:
timerpool.Put(t)
storagepacelimiter.Search.Dec()
atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1)
atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs)))
return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load",
len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))
}
}
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
var firstErr error
ic := getMetricRowsInsertCtx()
@ -1575,8 +1533,6 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
}
putMetricRowsInsertCtx(ic)
<-addRowsConcurrencyCh
return firstErr
}
@ -1608,14 +1564,6 @@ var metricRowsInsertCtxPool sync.Pool
const maxMetricRowsPerBlock = 8000
var (
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path.
addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
addRowsTimeout = 30 * time.Second
)
// RegisterMetricNames registers all the metric names from mns in the indexdb, so they can be queried later.
//
// The the MetricRow.Timestamp is used for registering the metric name starting from the given timestamp.

View file

@ -3,7 +3,9 @@ package writeconcurrencylimiter
import (
"flag"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@ -13,62 +15,118 @@ import (
)
var (
maxConcurrentInserts = flag.Int("maxConcurrentInserts", cgroup.AvailableCPUs()*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+
"since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration")
maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts")
maxConcurrentInserts = flag.Int("maxConcurrentInserts", 2*cgroup.AvailableCPUs(), "The maximum number of concurrent insert requests. "+
"Default value should work for most cases, since it minimizes the overhead. See also -insert.maxQueueDuration")
maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration to wait in the queue when -maxConcurrentInserts "+
"concurrent insert requests are already executed")
)
// ch is the channel for limiting concurrent calls to Do.
var ch chan struct{}
// Init initializes concurrencylimiter.
// Reader is a reader, which increases the concurrency after the first Read() call
//
// Init must be called after flag.Parse call.
func Init() {
ch = make(chan struct{}, *maxConcurrentInserts)
// The concurrency can be reduced by calling DecConcurrency().
// Then the concurrency is increased after the next Read() call.
type Reader struct {
r io.Reader
increasedConcurrency bool
}
// Do calls f with the limited concurrency.
func Do(f func() error) error {
// Limit the number of conurrent f calls in order to prevent from excess
// memory usage and CPU thrashing.
// GetReader returns the Reader for r.
//
// The PutReader() must be called when the returned Reader is no longer needed.
func GetReader(r io.Reader) *Reader {
v := readerPool.Get()
if v == nil {
return &Reader{
r: r,
}
}
rr := v.(*Reader)
rr.r = r
return rr
}
// PutReader returns the r to the pool.
//
// It decreases the concurrency if r has increased concurrency.
func PutReader(r *Reader) {
r.DecConcurrency()
r.r = nil
readerPool.Put(r)
}
var readerPool sync.Pool
// Read implements io.Reader.
//
// It increases concurrency after the first call or after the next call after DecConcurrency() call.
func (r *Reader) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
if !r.increasedConcurrency {
if err := incConcurrency(); err != nil {
return 0, err
}
r.increasedConcurrency = true
}
return n, err
}
// DecConcurrency decreases the concurrency, so it could be increased again after the next Read() call.
func (r *Reader) DecConcurrency() {
if r.increasedConcurrency {
decConcurrency()
r.increasedConcurrency = false
}
}
func initConcurrencyLimitCh() {
concurrencyLimitCh = make(chan struct{}, *maxConcurrentInserts)
}
var (
concurrencyLimitCh chan struct{}
concurrencyLimitChOnce sync.Once
)
func incConcurrency() error {
concurrencyLimitChOnce.Do(initConcurrencyLimitCh)
select {
case ch <- struct{}{}:
err := f()
<-ch
return err
case concurrencyLimitCh <- struct{}{}:
return nil
default:
}
// All the workers are busy.
// Sleep for up to *maxQueueDuration.
concurrencyLimitReached.Inc()
t := timerpool.Get(*maxQueueDuration)
select {
case ch <- struct{}{}:
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
err := f()
<-ch
return err
return nil
case <-t.C:
timerpool.Put(t)
concurrencyLimitTimeout.Inc()
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot handle more than %d concurrent inserts during %s; possible solutions: "+
"increase `-insert.maxQueueDuration`, increase `-maxConcurrentInserts`, increase server capacity", *maxConcurrentInserts, *maxQueueDuration),
Err: fmt.Errorf("cannot process insert request for %.3f seconds because %d concurrent insert requests are already executed. "+
"Possible solutions: to reduce workload; to increase compute resources at the server; "+
"to increase -insert.maxQueueDuration; to increase -maxConcurrentInserts",
maxQueueDuration.Seconds(), *maxConcurrentInserts),
StatusCode: http.StatusServiceUnavailable,
}
}
}
func decConcurrency() {
<-concurrencyLimitCh
}
var (
concurrencyLimitReached = metrics.NewCounter(`vm_concurrent_insert_limit_reached_total`)
concurrencyLimitTimeout = metrics.NewCounter(`vm_concurrent_insert_limit_timeout_total`)
_ = metrics.NewGauge(`vm_concurrent_insert_capacity`, func() float64 {
return float64(cap(ch))
return float64(cap(concurrencyLimitCh))
})
_ = metrics.NewGauge(`vm_concurrent_insert_current`, func() float64 {
return float64(len(ch))
return float64(len(concurrencyLimitCh))
})
)