all: properly handle CPU limits set on the host system/container

This can reduce memory usage on systems with enabled CPU limits.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/946
This commit is contained in:
Aliaksandr Valialkin 2020-12-08 20:49:32 +02:00
parent 364f30a6e7
commit 4146fc4668
39 changed files with 85 additions and 90 deletions

View file

@ -11,7 +11,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -36,7 +35,6 @@ func main() {
envflag.Parse()
buildinfo.Init()
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if promscrape.IsDryRun() {
*dryRun = true

View file

@ -1,9 +1,9 @@
package common
import (
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
@ -63,4 +63,4 @@ func PutPushCtx(ctx *PushCtx) {
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *PushCtx, runtime.GOMAXPROCS(-1))
var pushCtxPoolCh = make(chan *PushCtx, cgroup.AvailableCPUs())

View file

@ -4,12 +4,12 @@ import (
"flag"
"io"
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
@ -165,4 +165,4 @@ func putPushCtx(ctx *pushCtx) {
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -21,7 +21,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -68,7 +67,6 @@ func main() {
remotewrite.InitSecretFlags()
buildinfo.Init()
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if promscrape.IsDryRun() {
if err := promscrape.CheckConfig(); err != nil {

View file

@ -3,10 +3,10 @@ package remotewrite
import (
"flag"
"fmt"
"runtime"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -43,7 +43,7 @@ var allRelabelConfigs atomic.Value
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
var maxQueues = runtime.GOMAXPROCS(-1) * 4
var maxQueues = cgroup.AvailableCPUs() * 4
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags() {

View file

@ -16,7 +16,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remoteread"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -59,7 +58,6 @@ func main() {
envflag.Parse()
buildinfo.Init()
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if *dryRun {
u, _ := url.Parse("https://victoriametrics.com/")

View file

@ -9,7 +9,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -28,7 +27,6 @@ func main() {
envflag.Parse()
buildinfo.Init()
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
logger.Infof("starting vmauth at %q...", *httpListenAddr)
startTime := time.Now()
initAuthConfig()

View file

@ -12,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -40,7 +39,6 @@ func main() {
envflag.Parse()
buildinfo.Init()
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
if len(*snapshotCreateURL) > 0 {
logger.Infof("Snapshots enabled")

View file

@ -1,8 +1,9 @@
package common
import (
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
// GetInsertCtx returns InsertCtx from the pool.
@ -33,4 +34,4 @@ func PutInsertCtx(ctx *InsertCtx) {
}
var insertCtxPool sync.Pool
var insertCtxPoolCh = make(chan *InsertCtx, runtime.GOMAXPROCS(-1))
var insertCtxPoolCh = make(chan *InsertCtx, cgroup.AvailableCPUs())

View file

@ -4,12 +4,12 @@ import (
"flag"
"io"
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -175,4 +175,4 @@ func putPushCtx(ctx *pushCtx) {
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -2,11 +2,11 @@ package native
import (
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -112,4 +112,4 @@ func putPushCtx(ctx *pushCtx) {
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -2,11 +2,11 @@ package vmimport
import (
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -117,4 +117,4 @@ func putPushCtx(ctx *pushCtx) {
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())

View file

@ -9,7 +9,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -33,7 +32,6 @@ func main() {
envflag.Parse()
buildinfo.Init()
logger.Init()
cgroup.UpdateGOMAXPROCSToCPUQuota()
srcFS, err := newSrcFS()
if err != nil {

View file

@ -5,7 +5,6 @@ import (
"flag"
"fmt"
"net/http"
"runtime"
"strings"
"time"
@ -15,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -32,7 +32,7 @@ var (
)
func getDefaultMaxConcurrentRequests() int {
n := runtime.GOMAXPROCS(-1)
n := cgroup.AvailableCPUs()
if n <= 4 {
n *= 2
}

View file

@ -6,7 +6,6 @@ import (
"flag"
"fmt"
"regexp"
"runtime"
"sort"
"sync"
"sync/atomic"
@ -14,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -179,7 +179,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
var perQuerySeriesProcessed = metrics.NewHistogram(`vm_per_query_series_processed_count`)
var gomaxprocs = runtime.GOMAXPROCS(-1)
var gomaxprocs = cgroup.AvailableCPUs()
type packedTimeseries struct {
metricName string
@ -265,7 +265,7 @@ func unpackWorker() {
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
//
// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system.
var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
var unpackBatchSize = 8 * cgroup.AvailableCPUs()
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error {
@ -715,7 +715,7 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func
sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline())
// Start workers that call f in parallel on available CPU cores.
gomaxprocs := runtime.GOMAXPROCS(-1)
gomaxprocs := cgroup.AvailableCPUs()
workCh := make(chan *exportWork, gomaxprocs*8)
var (
errGlobal error

View file

@ -5,7 +5,6 @@ import (
"fmt"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
@ -17,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -147,7 +147,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
doneCh := make(chan error)
go func() {
err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
@ -374,7 +374,7 @@ func exportHandler(w http.ResponseWriter, matches []string, start, end int64, fo
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
doneCh := make(chan error)
if !reduceMemUsage {
rss, err := netstorage.ProcessSearchQuery(sq, true, deadline)

View file

@ -4,12 +4,12 @@ import (
"flag"
"fmt"
"math"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
@ -555,7 +555,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr
}
func doParallel(tss []*timeseries, f func(ts *timeseries, values []float64, timestamps []int64) ([]float64, []int64)) {
concurrency := runtime.GOMAXPROCS(-1)
concurrency := cgroup.AvailableCPUs()
if concurrency > len(tss) {
concurrency = len(tss)
}
@ -678,7 +678,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
timeseriesLen := rssLen
if iafc != nil {
// Incremental aggregates require holding only GOMAXPROCS timeseries in memory.
timeseriesLen = runtime.GOMAXPROCS(-1)
timeseriesLen = cgroup.AvailableCPUs()
if iafc.ae.Modifier.Op != "" {
if iafc.ae.Limit > 0 {
// There is an explicit limit on the number of output time series.

View file

@ -9,6 +9,7 @@
* FEATURE: export `vm_promscrape_scrapers_started_total{type="<sd_type>"}` and `vm_promscrape_scrapers_stopped_total{type="<sd_type>"}` metrics for tracking churn rate for scrapers
per each service discovery type.
* BUGFIX: properly handle CPU limits set on the host system or host container. The bugfix may result in lower memory usage on systems with CPU limits. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/946
* BUGFIX: prevent from duplicate `name` tag returned from `/tags/autoComplete/tags` handler. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/942
* BUGFIX: do not enable strict parsing for `-promscrape.config` if `-promscrape.config.dryRun` comand-line flag is set. Strict parsing can be enabled with `-promscrape.config.strictParse` command-line flag. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/944

View file

@ -6,17 +6,21 @@ import (
"runtime"
"strconv"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"sync"
)
// UpdateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var.
//
// This function must be called after logger.Init().
func UpdateGOMAXPROCSToCPUQuota() {
// AvailableCPUs returns the number of available CPU cores for the app.
func AvailableCPUs() int {
availableCPUsOnce.Do(updateGOMAXPROCSToCPUQuota)
return runtime.GOMAXPROCS(-1)
}
var availableCPUsOnce sync.Once
// updateGOMAXPROCSToCPUQuota updates GOMAXPROCS to cgroup CPU quota if GOMAXPROCS isn't set in environment var.
func updateGOMAXPROCSToCPUQuota() {
if v := os.Getenv("GOMAXPROCS"); v != "" {
// Do not override explicitly set GOMAXPROCS.
logger.Infof("using GOMAXPROCS=%q set via environment variable", v)
return
}
q := getCPUQuota()
@ -28,13 +32,11 @@ func UpdateGOMAXPROCSToCPUQuota() {
numCPU := runtime.NumCPU()
if gomaxprocs > numCPU {
// There is no sense in setting more GOMAXPROCS than the number of available CPU cores.
logger.Infof("cgroup CPU quota=%d exceeds NumCPU=%d; using GOMAXPROCS=NumCPU", gomaxprocs, numCPU)
return
}
if gomaxprocs <= 0 {
gomaxprocs = 1
}
logger.Infof("updating GOMAXPROCS to %d according to cgroup CPU quota", gomaxprocs)
runtime.GOMAXPROCS(gomaxprocs)
}

View file

@ -4,12 +4,12 @@ import (
"errors"
"io"
"net"
"runtime"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) {
}
func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) {
gomaxprocs := runtime.GOMAXPROCS(-1)
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)

View file

@ -4,12 +4,12 @@ import (
"errors"
"io"
"net"
"runtime"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
@ -113,7 +113,7 @@ func serveTCP(ln net.Listener, insertHandler func(r io.Reader) error) {
}
func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) {
gomaxprocs := runtime.GOMAXPROCS(-1)
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)

View file

@ -5,12 +5,12 @@ import (
"io"
"net"
"net/http"
"runtime"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
@ -133,7 +133,7 @@ func serveTelnet(ln net.Listener, insertHandler func(r io.Reader) error) {
}
func serveUDP(ln net.PacketConn, insertHandler func(r io.Reader) error) {
gomaxprocs := runtime.GOMAXPROCS(-1)
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)

View file

@ -6,7 +6,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
@ -14,6 +13,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -959,7 +959,7 @@ func (tb *Table) maxOutPartItemsSlow() uint64 {
return freeSpace / uint64(mergeWorkersCount) / 4
}
var mergeWorkersCount = runtime.GOMAXPROCS(-1)
var mergeWorkersCount = cgroup.AvailableCPUs()
func openParts(path string) ([]*partWrapper, error) {
// The path can be missing after restoring from backup, so create it if needed.

View file

@ -2,8 +2,9 @@ package persistentqueue
import (
"fmt"
"runtime"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
func BenchmarkFastQueueThroughputSerial(b *testing.B) {
@ -36,7 +37,7 @@ func BenchmarkFastQueueThroughputConcurrent(b *testing.B) {
b.SetBytes(int64(blockSize) * iterationsCount)
path := fmt.Sprintf("bench-fast-queue-throughput-concurrent-%d", blockSize)
mustDeleteDir(path)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*runtime.GOMAXPROCS(-1)*2, 0)
fq := MustOpenFastQueue(path, "foobar", iterationsCount*cgroup.AvailableCPUs()*2, 0)
defer func() {
fq.MustClose()
mustDeleteDir(path)

View file

@ -1,9 +1,9 @@
package common
import (
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -25,7 +25,7 @@ func StartUnmarshalWorkers() {
if unmarshalWorkCh != nil {
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been alread called without stopUnmarshalWorkers()")
}
gomaxprocs := runtime.GOMAXPROCS(-1)
gomaxprocs := cgroup.AvailableCPUs()
unmarshalWorkCh = make(chan UnmarshalWork, 2*gomaxprocs)
unmarshalWorkersWG.Add(gomaxprocs)
for i := 0; i < gomaxprocs; i++ {

View file

@ -6,11 +6,11 @@ import (
"fmt"
"io"
"net/http"
"runtime"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
)
@ -142,7 +142,7 @@ func putStreamContext(ctx *streamContext) {
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows

View file

@ -5,11 +5,11 @@ import (
"flag"
"fmt"
"io"
"runtime"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
@ -127,7 +127,7 @@ func putStreamContext(ctx *streamContext) {
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows

View file

@ -5,11 +5,11 @@ import (
"flag"
"fmt"
"io"
"runtime"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
@ -155,7 +155,7 @@ func putStreamContext(ctx *streamContext) {
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows

View file

@ -5,11 +5,11 @@ import (
"flag"
"fmt"
"io"
"runtime"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
@ -126,7 +126,7 @@ func putStreamContext(ctx *streamContext) {
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows

View file

@ -6,11 +6,11 @@ import (
"fmt"
"io"
"net/http"
"runtime"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
@ -149,7 +149,7 @@ func putStreamContext(ctx *streamContext) {
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
func getRows() *Rows {
v := rowsPool.Get()

View file

@ -4,11 +4,11 @@ import (
"bufio"
"fmt"
"io"
"runtime"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
)
@ -128,7 +128,7 @@ func putStreamContext(ctx *streamContext) {
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows

View file

@ -5,10 +5,10 @@ import (
"fmt"
"io"
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
@ -123,7 +123,7 @@ func putPushCtx(ctx *pushCtx) {
}
var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getWriteRequest() *prompb.WriteRequest {
v := writeRequestPool.Get()

View file

@ -5,10 +5,10 @@ import (
"fmt"
"io"
"net/http"
"runtime"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
@ -132,7 +132,7 @@ func putStreamContext(ctx *streamContext) {
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows

View file

@ -7,7 +7,6 @@ import (
"math/bits"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
@ -16,6 +15,7 @@ import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -77,7 +77,7 @@ const finalPartsToMerge = 3
// The number of shards for rawRow entries per partition.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawRowsShardsPerPartition = (runtime.GOMAXPROCS(-1) + 7) / 8
var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8
// getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerPartition() int {
@ -882,8 +882,8 @@ func hasActiveMerges(pws []*partWrapper) bool {
}
var (
bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
bigMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2
smallMergeWorkersCount = (cgroup.AvailableCPUs() + 1) / 2
)
// SetBigMergeWorkersCount sets the maximum number of concurrent mergers for big blocks.

View file

@ -8,13 +8,13 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -873,7 +873,7 @@ var (
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
// is CPU bound and sometimes disk IO bound, so there is no sense in running more
// than GOMAXPROCS*2 concurrent goroutines for TSID searches.
searchTSIDsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1)*2)
searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2)
)
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
@ -1149,7 +1149,7 @@ var (
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
// goroutines on data ingestion path.
addRowsConcurrencyCh = make(chan struct{}, runtime.GOMAXPROCS(-1))
addRowsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
addRowsTimeout = 30 * time.Second
)

View file

@ -4,11 +4,12 @@ import (
"fmt"
"math/rand"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
func TestMain(m *testing.M) {
@ -79,7 +80,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
timestamp := uint64(startTimestamp)
var wg sync.WaitGroup
for k := 0; k < runtime.GOMAXPROCS(-1); k++ {
for k := 0; k < cgroup.AvailableCPUs(); k++ {
wg.Add(1)
go func() {
rows := make([]rawRow, rowsPerInsert)

View file

@ -4,9 +4,10 @@ import (
"fmt"
"math/rand"
"os"
"runtime"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
)
func BenchmarkTableAddRows(b *testing.B) {
@ -57,7 +58,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
close(workCh)
doneCh := make(chan struct{})
gomaxprocs := runtime.GOMAXPROCS(-1)
gomaxprocs := cgroup.AvailableCPUs()
for j := 0; j < gomaxprocs; j++ {
go func(goroutineID int) {

View file

@ -1,11 +1,11 @@
package workingsetcache
import (
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/fastcache"
)
@ -198,7 +198,7 @@ func (c *Cache) cacheSizeWatcher(maxBytes int) {
// Save safes the cache to filePath.
func (c *Cache) Save(filePath string) error {
curr := c.curr.Load().(*fastcache.Cache)
concurrency := runtime.GOMAXPROCS(-1)
concurrency := cgroup.AvailableCPUs()
return curr.SaveToFileConcurrent(filePath, concurrency)
}

View file

@ -4,16 +4,16 @@ import (
"flag"
"fmt"
"net/http"
"runtime"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
var (
maxConcurrentInserts = flag.Int("maxConcurrentInserts", runtime.GOMAXPROCS(-1)*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+
maxConcurrentInserts = flag.Int("maxConcurrentInserts", cgroup.AvailableCPUs()*4, "The maximum number of concurrent inserts. Default value should work for most cases, "+
"since it minimizes the overhead for concurrent inserts. This option is tigthly coupled with -insert.maxQueueDuration")
maxQueueDuration = flag.Duration("insert.maxQueueDuration", time.Minute, "The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts")
)