diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index 6349dd3f3..c74d74768 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -11,7 +11,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -36,7 +35,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if promscrape.IsDryRun() { *dryRun = true diff --git a/app/vmagent/common/push_ctx.go b/app/vmagent/common/push_ctx.go index 1c4e19d36..e25ddd03a 100644 --- a/app/vmagent/common/push_ctx.go +++ b/app/vmagent/common/push_ctx.go @@ -1,9 +1,9 @@ package common import ( - "runtime" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) @@ -63,4 +63,4 @@ func PutPushCtx(ctx *PushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *PushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *PushCtx, cgroup.AvailableCPUs()) diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index b9d1e7c3f..6235e61f9 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -4,12 +4,12 @@ import ( "flag" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" @@ -165,4 +165,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 973e468c4..4f2f269c7 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -21,7 +21,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -68,7 +67,6 @@ func main() { remotewrite.InitSecretFlags() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if promscrape.IsDryRun() { if err := promscrape.CheckConfig(); err != nil { diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 93a146f09..80c742a1b 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -3,10 +3,10 @@ package remotewrite import ( "flag" "fmt" - "runtime" "sync" "sync/atomic" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -43,7 +43,7 @@ var allRelabelConfigs atomic.Value // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. -var maxQueues = runtime.GOMAXPROCS(-1) * 4 +var maxQueues = cgroup.AvailableCPUs() * 4 // InitSecretFlags must be called after flag.Parse and before any logging. func InitSecretFlags() { diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 587c0d0dd..18097680a 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -16,7 +16,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remoteread" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -59,7 +58,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if *dryRun { u, _ := url.Parse("https://victoriametrics.com/") diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 30c33c61f..58c89238e 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -9,7 +9,6 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -28,7 +27,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() logger.Infof("starting vmauth at %q...", *httpListenAddr) startTime := time.Now() initAuthConfig() diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 2e008ad02..13f0f23f2 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -40,7 +39,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() if len(*snapshotCreateURL) > 0 { logger.Infof("Snapshots enabled") diff --git a/app/vminsert/common/insert_ctx_pool.go b/app/vminsert/common/insert_ctx_pool.go index 3f6c057bd..4fb20fb9c 100644 --- a/app/vminsert/common/insert_ctx_pool.go +++ b/app/vminsert/common/insert_ctx_pool.go @@ -1,8 +1,9 @@ package common import ( - "runtime" "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) // GetInsertCtx returns InsertCtx from the pool. @@ -33,4 +34,4 @@ func PutInsertCtx(ctx *InsertCtx) { } var insertCtxPool sync.Pool -var insertCtxPoolCh = make(chan *InsertCtx, runtime.GOMAXPROCS(-1)) +var insertCtxPoolCh = make(chan *InsertCtx, cgroup.AvailableCPUs()) diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 93a826cd0..b841dd0d2 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -4,12 +4,12 @@ import ( "flag" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -175,4 +175,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index 9b0fc6477..cb36025dc 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -2,11 +2,11 @@ package native import ( "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -112,4 +112,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 6fbc7ab83..64002b4d4 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -2,11 +2,11 @@ package vmimport import ( "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -117,4 +117,4 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index 895d37b97..c65f54bf5 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -9,7 +9,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -33,7 +32,6 @@ func main() { envflag.Parse() buildinfo.Init() logger.Init() - cgroup.UpdateGOMAXPROCSToCPUQuota() srcFS, err := newSrcFS() if err != nil { diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 3ec78f12e..5847113ca 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -5,7 +5,6 @@ import ( "flag" "fmt" "net/http" - "runtime" "strings" "time" @@ -15,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -32,7 +32,7 @@ var ( ) func getDefaultMaxConcurrentRequests() int { - n := runtime.GOMAXPROCS(-1) + n := cgroup.AvailableCPUs() if n <= 4 { n *= 2 } diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 34a455bcc..8e90a8fbd 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -6,7 +6,6 @@ import ( "flag" "fmt" "regexp" - "runtime" "sort" "sync" "sync/atomic" @@ -14,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -179,7 +179,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error { var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`) var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`) -var gomaxprocs = runtime.GOMAXPROCS(-1) +var gomaxprocs = cgroup.AvailableCPUs() type packedTimeseries struct { metricName string @@ -265,7 +265,7 @@ func unpackWorker() { // unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. // // This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system. -var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1) +var unpackBatchSize = 8 * cgroup.AvailableCPUs() // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error { @@ -715,7 +715,7 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline()) // Start workers that call f in parallel on available CPU cores. - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() workCh := make(chan *exportWork, gomaxprocs*8) var ( errGlobal error diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 8eba99869..e2c5cd704 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "net/http" - "runtime" "sort" "strconv" "strings" @@ -17,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -147,7 +147,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) + resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) doneCh := make(chan error) go func() { err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { @@ -374,7 +374,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) + resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) doneCh := make(chan error) if !reduceMemUsage { rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index d0c6c99ef..bd94207a7 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -4,12 +4,12 @@ import ( "flag" "fmt" "math" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -555,7 +555,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr } func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) { - concurrency := runtime.GOMAXPROCS(-1) + concurrency := cgroup.AvailableCPUs() if concurrency > len(tss) { concurrency = len(tss) } @@ -678,7 +678,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, timeseriesLen := rssLen if iafc != nil { // Incremental aggregates require holding only GOMAXPROCS timeseries in memory. - timeseriesLen = runtime.GOMAXPROCS(-1) + timeseriesLen = cgroup.AvailableCPUs() if iafc.ae.Modifier.Op != "" { if iafc.ae.Limit > 0 { // There is an explicit limit on the number of output time series. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d8a30e08b..133c5aac7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,6 +9,7 @@ * FEATURE: export `vm_promscrape_scrapers_started_total{type=""}` and `vm_promscrape_scrapers_stopped_total{type=""}` metrics for tracking churn rate for scrapers per each service discovery type. +* BUGFIX: properly handle CPU limits set on the host system or host container. The bugfix may result in lower memory usage on systems with CPU limits. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/946 * BUGFIX: prevent from duplicate `name` tag returned from `/tags/autoComplete/tags` handler. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942 * BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944 diff --git a/lib/cgroup/cpu.go b/lib/cgroup/cpu.go index fbfd73238..56690e86d 100644 --- a/lib/cgroup/cpu.go +++ b/lib/cgroup/cpu.go @@ -6,17 +6,21 @@ import ( "runtime" "strconv" "strings" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "sync" ) -// UpdateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var. -// -// This function must be called after logger.Init(). -func UpdateGOMAXPROCSToCPUQuota() { +// AvailableCPUs returns the number of available CPU cores for the app. +func AvailableCPUs() int { + availableCPUsOnce.Do(updateGOMAXPROCSToCPUQuota) + return runtime.GOMAXPROCS(-1) +} + +var availableCPUsOnce sync.Once + +// updateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var. +func updateGOMAXPROCSToCPUQuota() { if v := os.Getenv("GOMAXPROCS"); v != "" { // Do not override explicitly set GOMAXPROCS. - logger.Infof("using GOMAXPROCS=%q set via environment variable", v) return } q := getCPUQuota() @@ -28,13 +32,11 @@ func UpdateGOMAXPROCSToCPUQuota() { numCPU := runtime.NumCPU() if gomaxprocs > numCPU { // There is no sense in setting more GOMAXPROCS than the number of available CPU cores. - logger.Infof("cgroup CPU quota=%d exceeds NumCPU=%d; using GOMAXPROCS=NumCPU", gomaxprocs, numCPU) return } if gomaxprocs <= 0 { gomaxprocs = 1 } - logger.Infof("updating GOMAXPROCS to %d according to cgroup CPU quota", gomaxprocs) runtime.GOMAXPROCS(gomaxprocs) } diff --git a/lib/ingestserver/graphite/server.go b/lib/ingestserver/graphite/server.go index ae2d9085d..8bd685954 100644 --- a/lib/ingestserver/graphite/server.go +++ b/lib/ingestserver/graphite/server.go @@ -4,12 +4,12 @@ import ( "errors" "io" "net" - "runtime" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { wg.Add(1) diff --git a/lib/ingestserver/influx/server.go b/lib/ingestserver/influx/server.go index 02bc5c60e..9ed428f27 100644 --- a/lib/ingestserver/influx/server.go +++ b/lib/ingestserver/influx/server.go @@ -4,12 +4,12 @@ import ( "errors" "io" "net" - "runtime" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/metrics" @@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) { } func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { wg.Add(1) diff --git a/lib/ingestserver/opentsdb/server.go b/lib/ingestserver/opentsdb/server.go index c7aca100c..b25251410 100644 --- a/lib/ingestserver/opentsdb/server.go +++ b/lib/ingestserver/opentsdb/server.go @@ -5,12 +5,12 @@ import ( "io" "net" "net/http" - "runtime" "strings" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" @@ -133,7 +133,7 @@ func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) { } func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) { - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() var wg sync.WaitGroup for i := 0; i < gomaxprocs; i++ { wg.Add(1) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 4284bc623..0ab790dcb 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "runtime" "sort" "strings" "sync" @@ -14,6 +13,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -959,7 +959,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 { return freeSpace / uint64(mergeWorkersCount) / 4 } -var mergeWorkersCount = runtime.GOMAXPROCS(-1) +var mergeWorkersCount = cgroup.AvailableCPUs() func openParts(path string) ([]*partWrapper, error) { // The path can be missing after restoring from backup, so create it if needed. diff --git a/lib/persistentqueue/fastqueue_timing_test.go b/lib/persistentqueue/fastqueue_timing_test.go index 991846ed5..1ec46801b 100644 --- a/lib/persistentqueue/fastqueue_timing_test.go +++ b/lib/persistentqueue/fastqueue_timing_test.go @@ -2,8 +2,9 @@ package persistentqueue import ( "fmt" - "runtime" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) func BenchmarkFastQueueThroughputSerial(b *testing.B) { @@ -36,7 +37,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) { b.SetBytes(int64(blockSize) * iterationsCount) path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize) mustDeleteDir(path) - fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2, 0) + fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0) defer func() { fq.MustClose() mustDeleteDir(path) diff --git a/lib/protoparser/common/unmarshal_work.go b/lib/protoparser/common/unmarshal_work.go index ac16e289d..049644f15 100644 --- a/lib/protoparser/common/unmarshal_work.go +++ b/lib/protoparser/common/unmarshal_work.go @@ -1,9 +1,9 @@ package common import ( - "runtime" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -25,7 +25,7 @@ func StartUnmarshalWorkers() { if unmarshalWorkCh != nil { logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()") } - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs) unmarshalWorkersWG.Add(gomaxprocs) for i := 0; i < gomaxprocs; i++ { diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 0b318775a..918f40baf 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -6,11 +6,11 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -142,7 +142,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index 26b490162..93ac45394 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -5,11 +5,11 @@ import ( "flag" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -127,7 +127,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index fc6bc0215..3da3858b8 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -5,11 +5,11 @@ import ( "flag" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -155,7 +155,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index aaab8da92..14a476e22 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -5,11 +5,11 @@ import ( "flag" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -126,7 +126,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index ad2d87dd8..edad74dd2 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -6,11 +6,11 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -149,7 +149,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) func getRows() *Rows { v := rowsPool.Get() diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index 2c6aa46c0..5e7563a29 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -4,11 +4,11 @@ import ( "bufio" "fmt" "io" - "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) @@ -128,7 +128,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/protoparser/promremotewrite/streamparser.go b/lib/protoparser/promremotewrite/streamparser.go index ad447f979..6adb6ae77 100644 --- a/lib/protoparser/promremotewrite/streamparser.go +++ b/lib/protoparser/promremotewrite/streamparser.go @@ -5,10 +5,10 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" @@ -123,7 +123,7 @@ func putPushCtx(ctx *pushCtx) { } var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1)) +var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) func getWriteRequest() *prompb.WriteRequest { v := writeRequestPool.Get() diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/streamparser.go index 52063ae2d..183b316e5 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/streamparser.go @@ -5,10 +5,10 @@ import ( "fmt" "io" "net/http" - "runtime" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" @@ -132,7 +132,7 @@ func putStreamContext(ctx *streamContext) { } var streamContextPool sync.Pool -var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1)) +var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { rows Rows diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 82ea56617..70b0d0b7b 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -7,7 +7,6 @@ import ( "math/bits" "os" "path/filepath" - "runtime" "sort" "strings" "sync" @@ -16,6 +15,7 @@ import ( "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -77,7 +77,7 @@ const finalPartsToMerge = 3 // The number of shards for rawRow entries per partition. // // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. -var rawRowsShardsPerPartition = (runtime.GOMAXPROCS(-1) + 7) / 8 +var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8 // getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet. func getMaxRawRowsPerPartition() int { @@ -882,8 +882,8 @@ func hasActiveMerges(pws []*partWrapper) bool { } var ( - bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 - smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 + bigMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2 + smallMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2 ) // SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 07d8686b0..3cef89e86 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -8,13 +8,13 @@ import ( "os" "path/filepath" "regexp" - "runtime" "sort" "sync" "sync/atomic" "time" "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -873,7 +873,7 @@ var ( // Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation // is CPU bound and sometimes disk IO bound, so there is no sense in running more // than GOMAXPROCS*2 concurrent goroutines for TSID searches. - searchTSIDsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)*2) + searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2) ) // prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache. @@ -1149,7 +1149,7 @@ var ( // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent // goroutines on data ingestion path. - addRowsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)) + addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) addRowsTimeout = 30 * time.Second ) diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index c45c9b334..927b54c40 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -4,11 +4,12 @@ import ( "fmt" "math/rand" "os" - "runtime" "sync" "sync/atomic" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) func TestMain(m *testing.M) { @@ -79,7 +80,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn timestamp := uint64(startTimestamp) var wg sync.WaitGroup - for k := 0; k < runtime.GOMAXPROCS(-1); k++ { + for k := 0; k < cgroup.AvailableCPUs(); k++ { wg.Add(1) go func() { rows := make([]rawRow, rowsPerInsert) diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index ed1ca8bde..bd88dd247 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -4,9 +4,10 @@ import ( "fmt" "math/rand" "os" - "runtime" "testing" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) func BenchmarkTableAddRows(b *testing.B) { @@ -57,7 +58,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { close(workCh) doneCh := make(chan struct{}) - gomaxprocs := runtime.GOMAXPROCS(-1) + gomaxprocs := cgroup.AvailableCPUs() for j := 0; j < gomaxprocs; j++ { go func(goroutineID int) { diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index 87b11ecd9..c21a331bd 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -1,11 +1,11 @@ package workingsetcache import ( - "runtime" "sync" "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/fastcache" ) @@ -198,7 +198,7 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) { // Save safes the cache to filePath. func (c *Cache) Save(filePath string) error { curr := c.curr.Load().(*fastcache.Cache) - concurrency := runtime.GOMAXPROCS(-1) + concurrency := cgroup.AvailableCPUs() return curr.SaveToFileConcurrent(filePath, concurrency) } diff --git a/lib/writeconcurrencylimiter/concurrencylimiter.go b/lib/writeconcurrencylimiter/concurrencylimiter.go index e1e9f4ab5..a91fa686d 100644 --- a/lib/writeconcurrencylimiter/concurrencylimiter.go +++ b/lib/writeconcurrencylimiter/concurrencylimiter.go @@ -4,16 +4,16 @@ import ( "flag" "fmt" "net/http" - "runtime" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" ) var ( - maxConcurrentInserts = flag.Int("maxConcurrentInserts", runtime.GOMAXPROCS(-1)*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+ + maxConcurrentInserts = flag.Int("maxConcurrentInserts", cgroup.AvailableCPUs()*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+ "since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration") maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts") )