From 92630c1ab4c6a968e90fa5b9e47a5bbf087d82bc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 25 Jul 2022 09:12:42 +0300 Subject: [PATCH] app/vmselect/netstorage: improve the speed of queries over big number of time series on multi-CPU system Reduce inter-CPU communications when processing the query over big number of time series. This should improve performance for queries over big number of time series on systems with many CPU cores. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896 Based on https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2 Thanks to @zqyzyq for the idea. --- app/vmselect/netstorage/netstorage.go | 112 +++++++++++--------------- docs/CHANGELOG.md | 1 + 2 files changed, 47 insertions(+), 66 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 1107d129b..c78f4a688 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "math/rand" "regexp" "sort" "sync" @@ -80,7 +81,7 @@ type timeseriesWork struct { rss *Results pts *packedTimeseries f func(rs *Result, workerID uint) error - doneCh chan error + err error rowsProcessed int } @@ -90,18 +91,14 @@ func (tsw *timeseriesWork) reset() { tsw.rss = nil tsw.pts = nil tsw.f = nil - if n := len(tsw.doneCh); n > 0 { - logger.Panicf("BUG: tsw.doneCh must be empty during reset; it contains %d items instead", n) - } + tsw.err = nil tsw.rowsProcessed = 0 } func getTimeseriesWork() *timeseriesWork { v := tswPool.Get() if v == nil { - v = ×eriesWork{ - doneCh: make(chan error, 1), - } + v = ×eriesWork{} } return v.(*timeseriesWork) } @@ -113,28 +110,6 @@ func putTimeseriesWork(tsw *timeseriesWork) { var tswPool sync.Pool -func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) { - if len(workChs) == 1 { - // Fast path for a single worker - workChs[0] <- tsw - return - } - attempts := 0 - for { - idx := fastrand.Uint32n(uint32(len(workChs))) - select { - case workChs[idx] <- tsw: - return - default: - attempts++ - if attempts >= len(workChs) { - workChs[idx] <- tsw - return - } - } - } -} - func (tsw *timeseriesWork) do(r *Result, workerID uint) error { if atomic.LoadUint32(tsw.mustStop) != 0 { return nil @@ -158,15 +133,15 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error { return nil } -func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) { +func timeseriesWorker(tsws []*timeseriesWork, workerID uint) { v := resultPool.Get() if v == nil { v = &result{} } r := v.(*result) - for tsw := range ch { + for _, tsw := range tsws { err := tsw.do(&r.rs, workerID) - tsw.doneCh <- err + tsw.err = err } currentTime := fasttime.UnixTimestamp() if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { @@ -195,31 +170,7 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke qt = qt.NewChild("parallel process of fetched data") defer rss.mustClose() - // Spin up local workers. - // - // Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case: - // - RunParallel is called with f, which blocks without forward progress. - // - All the workers in the global pool became blocked in f. - // - workChs is filled up, so it cannot accept new work items from other RunParallel calls. - workers := len(rss.packedTimeseries) - if workers > gomaxprocs { - workers = gomaxprocs - } - if workers < 1 { - workers = 1 - } - workChs := make([]chan *timeseriesWork, workers) - var workChsWG sync.WaitGroup - for i := 0; i < workers; i++ { - workChs[i] = make(chan *timeseriesWork, 16) - workChsWG.Add(1) - go func(workerID int) { - defer workChsWG.Done() - timeseriesWorker(workChs[workerID], uint(workerID)) - }(i) - } - - // Feed workers with work. + // Prepare work for workers. tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) var mustStop uint32 for i := range rss.packedTimeseries { @@ -228,17 +179,49 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke tsw.pts = &rss.packedTimeseries[i] tsw.f = f tsw.mustStop = &mustStop - scheduleTimeseriesWork(workChs, tsw) tsws[i] = tsw } - seriesProcessedTotal := len(rss.packedTimeseries) - rss.packedTimeseries = rss.packedTimeseries[:0] + // Shuffle tsws for providing the equal amount of work among workers. + r := rand.New(rand.NewSource(int64(fasttime.UnixTimestamp()))) + r.Shuffle(len(tsws), func(i, j int) { + tsws[i], tsws[j] = tsws[j], tsws[i] + }) + + // Spin up up to gomaxprocs local workers and split work equally among them. + // This guarantees linear scalability with the increase of gomaxprocs + // (e.g. the number of available CPU cores). + workers := len(rss.packedTimeseries) + itemsPerWorker := 1 + if workers > gomaxprocs { + itemsPerWorker = 1 + workers/gomaxprocs + workers = gomaxprocs + } + var start int + var i uint + var wg sync.WaitGroup + for start < len(tsws) { + end := start + itemsPerWorker + if end > len(tsws) { + end = len(tsws) + } + chunk := tsws[start:end] + wg.Add(1) + go func(tswsChunk []*timeseriesWork, workerID uint) { + defer wg.Done() + timeseriesWorker(tswsChunk, workerID) + }(chunk, i) + start = end + i++ + } // Wait until work is complete. + wg.Wait() + + // Collect results. var firstErr error rowsProcessedTotal := 0 for _, tsw := range tsws { - if err := <-tsw.doneCh; err != nil && firstErr == nil { + if err := tsw.err; err != nil && firstErr == nil { // Return just the first error, since other errors are likely duplicate the first error. firstErr = err } @@ -247,14 +230,11 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke putTimeseriesWork(tsw) } + seriesProcessedTotal := len(rss.packedTimeseries) + rss.packedTimeseries = rss.packedTimeseries[:0] rowsReadPerQuery.Update(float64(rowsProcessedTotal)) seriesReadPerQuery.Update(float64(seriesProcessedTotal)) - // Shut down local workers - for _, workCh := range workChs { - close(workCh) - } - workChsWG.Wait() qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal) return firstErr diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d0a17d642..f74fca348 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): execute left and right sides of certain operations in parallel. For example, `q1 or q2`, `aggr_func(q1) q2`, `q1 aggr_func(q1)`. This may improve query performance if VictoriaMetrics has enough free resources for parallel processing of both sides of the operation. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886). * FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow duplicate username records with different passwords at configuration file. It should allow password rotation without username change. * FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics). +* FEATURE: improve performance for heavy queries over big number of time series on systems with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896). Thanks to @zqyzyq for [the idea](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884).