From a1911e13304bf2463e9533ca1bf20748d7de08e5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 30 Jul 2021 12:02:09 +0300 Subject: [PATCH] app/vmselect/netstorage: unpack time series data in mostly local big chunks This should improve performance on multi-CPU systems for queries selecting time series with big number of raw samples --- app/vmselect/netstorage/netstorage.go | 148 ++++++++++++++++---------- docs/CHANGELOG.md | 1 + 2 files changed, 91 insertions(+), 58 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 4227e897c..669ca18d6 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -122,7 +122,7 @@ var tswPool sync.Pool func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) { if len(workChs) == 1 { - // Fast path for a single CPU core + // Fast path for a single worker workChs[0] <- tsw return } @@ -142,6 +142,29 @@ func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) } } +func (tsw *timeseriesWork) do(r *Result, workerID uint) error { + if atomic.LoadUint32(tsw.mustStop) != 0 { + return nil + } + rss := tsw.rss + if rss.deadline.Exceeded() { + atomic.StoreUint32(tsw.mustStop, 1) + return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) + } + if err := tsw.pts.Unpack(r, rss.tbf, rss.tr, rss.fetchData); err != nil { + atomic.StoreUint32(tsw.mustStop, 1) + return fmt.Errorf("error during time series unpacking: %w", err) + } + if len(r.Timestamps) > 0 || !rss.fetchData { + if err := tsw.f(r, workerID); err != nil { + atomic.StoreUint32(tsw.mustStop, 1) + return err + } + } + tsw.rowsProcessed = len(r.Values) + return nil +} + func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) { v := resultPool.Get() if v == nil { @@ -149,38 +172,15 @@ func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) { } r := v.(*result) for tsw := range ch { - if atomic.LoadUint32(tsw.mustStop) != 0 { - tsw.doneCh <- nil - continue - } - rss := tsw.rss - if rss.deadline.Exceeded() { - atomic.StoreUint32(tsw.mustStop, 1) - tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) - continue - } - if err := tsw.pts.Unpack(&r.rs, rss.tbf, rss.tr, rss.fetchData); err != nil { - atomic.StoreUint32(tsw.mustStop, 1) - tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err) - continue - } - if len(r.rs.Timestamps) > 0 || !rss.fetchData { - if err := tsw.f(&r.rs, workerID); err != nil { - atomic.StoreUint32(tsw.mustStop, 1) - tsw.doneCh <- err - continue - } - } - tsw.rowsProcessed = len(r.rs.Values) - tsw.doneCh <- nil - currentTime := fasttime.UnixTimestamp() - if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { - // Reset r.rs in order to preseve memory usage after processing big time series with millions of rows. - r.rs = Result{} - r.lastResetTime = currentTime - } + err := tsw.do(&r.rs, workerID) + tsw.doneCh <- err + } + currentTime := fasttime.UnixTimestamp() + if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { + // Reset r.rs in order to preseve memory usage after processing big time series with millions of rows. + r.rs = Result{} + r.lastResetTime = currentTime } - r.rs.reset() resultPool.Put(r) } @@ -335,32 +335,22 @@ func putUnpackWork(upw *unpackWork) { var unpackWorkPool sync.Pool -var unpackWorkChs []chan *unpackWork - -func init() { - unpackWorkChs = make([]chan *unpackWork, gomaxprocs) - for i := range unpackWorkChs { - unpackWorkChs[i] = make(chan *unpackWork, 128) - go unpackWorker(unpackWorkChs[i]) - } -} - -func scheduleUnpackWork(uw *unpackWork) { - if len(unpackWorkChs) == 1 { - // Fast path for a single CPU core - unpackWorkChs[0] <- uw +func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) { + if len(workChs) == 1 { + // Fast path for a single worker + workChs[0] <- uw return } attempts := 0 for { - idx := fastrand.Uint32n(uint32(len(unpackWorkChs))) + idx := fastrand.Uint32n(uint32(len(workChs))) select { - case unpackWorkChs[idx] <- uw: + case workChs[idx] <- uw: return default: attempts++ - if attempts >= len(unpackWorkChs) { - unpackWorkChs[idx] <- uw + if attempts >= len(workChs) { + workChs[idx] <- uw return } } @@ -368,16 +358,26 @@ func scheduleUnpackWork(uw *unpackWork) { } func unpackWorker(ch <-chan *unpackWork) { - var tmpBlock storage.Block - for upw := range ch { - upw.unpack(&tmpBlock) + v := tmpBlockPool.Get() + if v == nil { + v = &storage.Block{} } + tmpBlock := v.(*storage.Block) + for upw := range ch { + upw.unpack(tmpBlock) + } + tmpBlockPool.Put(v) } +var tmpBlockPool sync.Pool + // unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. // -// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system. -var unpackBatchSize = 32 * cgroup.AvailableCPUs() +// It is better to load a single goroutine for up to one second on a system with many CPU cores +// in order to reduce inter-CPU memory ping-pong. +// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows. +// So the batch size should be 40M / 8K = 5K. +var unpackBatchSize = 5000 // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error { @@ -390,14 +390,39 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. return nil } - // Feed workers with work + // Spin up local workers. + // Do not use global workers pool, since it increases inter-CPU memory ping-poing, + // which reduces the scalability on systems with many CPU cores. brsLen := len(pts.brs) + workers := brsLen / unpackBatchSize + if workers > gomaxprocs { + workers = gomaxprocs + } + if workers < 1 { + workers = 1 + } + workChs := make([]chan *unpackWork, workers) + var workChsWG sync.WaitGroup + for i := 0; i < workers; i++ { + // Use unbuffered channel on purpose, since there are high chances + // that only a single unpackWork is needed to unpack. + // The unbuffered channel should reduce inter-CPU ping-pong in this case, + // which should improve the performance in a system with many CPU cores. + workChs[i] = make(chan *unpackWork) + workChsWG.Add(1) + go func(workerID int) { + defer workChsWG.Done() + unpackWorker(workChs[workerID]) + }(i) + } + + // Feed workers with work upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize) upw := getUnpackWork() upw.tbf = tbf for _, br := range pts.brs { if len(upw.ws) >= unpackBatchSize { - scheduleUnpackWork(upw) + scheduleUnpackWork(workChs, upw) upws = append(upws, upw) upw = getUnpackWork() upw.tbf = tbf @@ -407,7 +432,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. tr: tr, }) } - scheduleUnpackWork(upw) + scheduleUnpackWork(workChs, upw) upws = append(upws, upw) pts.brs = pts.brs[:0] @@ -438,6 +463,13 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. } putUnpackWork(upw) } + + // Shut down local workers + for _, workCh := range workChs { + close(workCh) + } + workChsWG.Wait() + if firstErr != nil { return firstErr } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f1dec890e..65f56a783 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,7 @@ sort: 15 * FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query can process per each time series. This option can protect from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067). * FEATURE: add `-search.maxSamplesPerQuery` command-line flag for limiting the number of raw samples a single query can process across all the time series. This option can protect from heavy queries, which select too big number of raw samples. Thanks to @jiangxinlingdu for [the initial pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1478). +* FEATURE: improve performance for heavy queries on systems with big number of CPU cores. * BUGFIX: vmselect: return dummy response at `/rules` page in the same way as for `/api/v1/rules` page. The `/rules` page is requested by Grafana 8. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1493) for details. * BUGFIX: vmbackup: automatically set default `us-east-1` S3 region if it is missing. This should simplify using S3-compatible services such as MinIO for backups. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1449).