diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 32bd5e1b1..7e8cefb58 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "io" + "math/rand" "net" "net/http" "regexp" @@ -84,7 +85,7 @@ type timeseriesWork struct { rss *Results pts *packedTimeseries f func(rs *Result, workerID uint) error - doneCh chan error + err error rowsProcessed int } @@ -94,18 +95,14 @@ func (tsw *timeseriesWork) reset() { tsw.rss = nil tsw.pts = nil tsw.f = nil - if n := len(tsw.doneCh); n > 0 { - logger.Panicf("BUG: tsw.doneCh must be empty during reset; it contains %d items instead", n) - } + tsw.err = nil tsw.rowsProcessed = 0 } func getTimeseriesWork() *timeseriesWork { v := tswPool.Get() if v == nil { - v = ×eriesWork{ - doneCh: make(chan error, 1), - } + v = ×eriesWork{} } return v.(*timeseriesWork) } @@ -117,28 +114,6 @@ func putTimeseriesWork(tsw *timeseriesWork) { var tswPool sync.Pool -func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) { - if len(workChs) == 1 { - // Fast path for a single worker - workChs[0] <- tsw - return - } - attempts := 0 - for { - idx := fastrand.Uint32n(uint32(len(workChs))) - select { - case workChs[idx] <- tsw: - return - default: - attempts++ - if attempts >= len(workChs) { - workChs[idx] <- tsw - return - } - } - } -} - func (tsw *timeseriesWork) do(r *Result, workerID uint) error { if atomic.LoadUint32(tsw.mustStop) != 0 { return nil @@ -162,15 +137,15 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error { return nil } -func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) { +func timeseriesWorker(tsws []*timeseriesWork, workerID uint) { v := resultPool.Get() if v == nil { v = &result{} } r := v.(*result) - for tsw := range ch { + for _, tsw := range tsws { err := tsw.do(&r.rs, workerID) - tsw.doneCh <- err + tsw.err = err } currentTime := fasttime.UnixTimestamp() if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { @@ -202,31 +177,7 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke rss.tbf = nil }() - // Spin up local workers. - // - // Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case: - // - RunParallel is called with f, which blocks without forward progress. - // - All the workers in the global pool became blocked in f. - // - workChs is filled up, so it cannot accept new work items from other RunParallel calls. - workers := len(rss.packedTimeseries) - if workers > gomaxprocs { - workers = gomaxprocs - } - if workers < 1 { - workers = 1 - } - workChs := make([]chan *timeseriesWork, workers) - var workChsWG sync.WaitGroup - for i := 0; i < workers; i++ { - workChs[i] = make(chan *timeseriesWork, 16) - workChsWG.Add(1) - go func(workerID int) { - defer workChsWG.Done() - timeseriesWorker(workChs[workerID], uint(workerID)) - }(i) - } - - // Feed workers with work. + // Prepare work for workers. tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) var mustStop uint32 for i := range rss.packedTimeseries { @@ -235,17 +186,49 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke tsw.pts = &rss.packedTimeseries[i] tsw.f = f tsw.mustStop = &mustStop - scheduleTimeseriesWork(workChs, tsw) tsws[i] = tsw } - seriesProcessedTotal := len(rss.packedTimeseries) - rss.packedTimeseries = rss.packedTimeseries[:0] + // Shuffle tsws for providing the equal amount of work among workers. + r := rand.New(rand.NewSource(int64(fasttime.UnixTimestamp()))) + r.Shuffle(len(tsws), func(i, j int) { + tsws[i], tsws[j] = tsws[j], tsws[i] + }) + + // Spin up up to gomaxprocs local workers and split work equally among them. + // This guarantees linear scalability with the increase of gomaxprocs + // (e.g. the number of available CPU cores). + workers := len(rss.packedTimeseries) + itemsPerWorker := 1 + if workers > gomaxprocs { + itemsPerWorker = 1 + workers/gomaxprocs + workers = gomaxprocs + } + var start int + var i uint + var wg sync.WaitGroup + for start < len(tsws) { + end := start + itemsPerWorker + if end > len(tsws) { + end = len(tsws) + } + chunk := tsws[start:end] + wg.Add(1) + go func(tswsChunk []*timeseriesWork, workerID uint) { + defer wg.Done() + timeseriesWorker(tswsChunk, workerID) + }(chunk, i) + start = end + i++ + } // Wait until work is complete. + wg.Wait() + + // Collect results. var firstErr error rowsProcessedTotal := 0 for _, tsw := range tsws { - if err := <-tsw.doneCh; err != nil && firstErr == nil { + if err := tsw.err; err != nil && firstErr == nil { // Return just the first error, since other errors are likely duplicate the first error. firstErr = err } @@ -254,14 +237,11 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke putTimeseriesWork(tsw) } + seriesProcessedTotal := len(rss.packedTimeseries) + rss.packedTimeseries = rss.packedTimeseries[:0] rowsReadPerQuery.Update(float64(rowsProcessedTotal)) seriesReadPerQuery.Update(float64(seriesProcessedTotal)) - // Shut down local workers - for _, workCh := range workChs { - close(workCh) - } - workChsWG.Wait() qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal) return firstErr diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d0a17d642..f74fca348 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): execute left and right sides of certain operations in parallel. For example, `q1 or q2`, `aggr_func(q1) q2`, `q1 aggr_func(q1)`. This may improve query performance if VictoriaMetrics has enough free resources for parallel processing of both sides of the operation. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow duplicate username records with different passwords at configuration file. It should allow password rotation without username change. * FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics). +* FEATURE: improve performance for heavy queries over big number of time series on systems with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896). Thanks to @zqyzyq for [the idea](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884).