From 53d871d0b173242c8f4ce13defd81abb861c358a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 10 Jan 2023 13:06:02 -0800 Subject: [PATCH] app/vmselect/netstorage: reduce tail latency during query processing Previously the selected time series were split evenly among available CPU cores for further processing - e.g unpacking the data and applying the given rollup function to the unpacked data. Some time series could be processed slower than others. This could result in uneven work distribution among available CPU cores, e.g. some CPU cores could complete their work sooner than others. This could slow down query execution. The new algorithm allows stealing time series to process from other CPU cores when all the local work is done. This should reduce the maximum time needed for query execution (aka tail latency). The new algorithm should also scale better on systems with many CPU cores, since every CPU processes locally assigned time series without inter-CPU communications. The inter-CPU communications are used only when all the local work is finished and the pending work from other CPUs needs to be stealed. --- app/vmselect/netstorage/netstorage.go | 494 +++++++++++++++----------- 1 file changed, 277 insertions(+), 217 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 05e7aadb5..b3a7a5aad 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -5,7 +5,7 @@ import ( "errors" "flag" "fmt" - "math/rand" + "runtime" "sort" "sync" "sync/atomic" @@ -16,12 +16,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" - "github.com/valyala/fastrand" ) var ( @@ -133,16 +131,66 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error { return nil } -func timeseriesWorker(tsws []*timeseriesWork, workerID uint) { +func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, workerID uint) { + tmpResult := getTmpResult() + + // Perform own work at first. + rowsProcessed := 0 + seriesProcessed := 0 + ch := workChs[workerID] + for tsw := range ch { + tsw.err = tsw.do(&tmpResult.rs, workerID) + rowsProcessed += tsw.rowsProcessed + seriesProcessed++ + } + qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) + + // Then help others with the remaining work. + rowsProcessed = 0 + seriesProcessed = 0 + idx := int(workerID) + for { + tsw, idxNext := stealTimeseriesWork(workChs, idx) + if tsw == nil { + // There is no more work + break + } + tsw.err = tsw.do(&tmpResult.rs, workerID) + rowsProcessed += tsw.rowsProcessed + seriesProcessed++ + idx = idxNext + } + qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) + + putTmpResult(tmpResult) +} + +func stealTimeseriesWork(workChs []chan *timeseriesWork, startIdx int) (*timeseriesWork, int) { + for i := startIdx; i < startIdx+len(workChs); i++ { + // Give a chance other goroutines to perform their work + runtime.Gosched() + + idx := i % len(workChs) + ch := workChs[idx] + // It is expected that every channel in the workChs is already closed, + // so the next line should return immediately. + tsw, ok := <-ch + if ok { + return tsw, idx + } + } + return nil, startIdx +} + +func getTmpResult() *result { v := resultPool.Get() if v == nil { v = &result{} } - r := v.(*result) - for _, tsw := range tsws { - err := tsw.do(&r.rs, workerID) - tsw.err = err - } + return v.(*result) +} + +func putTmpResult(r *result) { currentTime := fasttime.UnixTimestamp() if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { // Reset r.rs in order to preseve memory usage after processing big time series with millions of rows. @@ -170,87 +218,113 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke qt = qt.NewChild("parallel process of fetched data") defer rss.mustClose() - // Prepare work for workers. - tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) + rowsProcessedTotal, err := rss.runParallel(qt, f) + seriesProcessedTotal := len(rss.packedTimeseries) + rss.packedTimeseries = rss.packedTimeseries[:0] + + rowsReadPerQuery.Update(float64(rowsProcessedTotal)) + seriesReadPerQuery.Update(float64(seriesProcessedTotal)) + + qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal) + + return err +} + +func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) { + tswsLen := len(rss.packedTimeseries) + if tswsLen == 0 { + // Nothing to process + return 0, nil + } + var mustStop uint32 - for i := range rss.packedTimeseries { - tsw := getTimeseriesWork() + initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) { tsw.rss = rss - tsw.pts = &rss.packedTimeseries[i] + tsw.pts = pts tsw.f = f tsw.mustStop = &mustStop + } + if gomaxprocs == 1 || tswsLen == 1 { + // It is faster to process time series in the current goroutine. + tsw := getTimeseriesWork() + tmpResult := getTmpResult() + rowsProcessedTotal := 0 + var err error + for i := range rss.packedTimeseries { + initTimeseriesWork(tsw, &rss.packedTimeseries[i]) + err = tsw.do(&tmpResult.rs, 0) + rowsReadPerSeries.Update(float64(tsw.rowsProcessed)) + rowsProcessedTotal += tsw.rowsProcessed + if err != nil { + break + } + tsw.reset() + } + putTmpResult(tmpResult) + putTimeseriesWork(tsw) + + return rowsProcessedTotal, err + } + + // Slow path - spin up multiple local workers for parallel data processing. + // Do not use global workers pool, since it increases inter-CPU memory ping-poing, + // which reduces the scalability on systems with many CPU cores. + + // Prepare the work for workers. + tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) + for i := range rss.packedTimeseries { + tsw := getTimeseriesWork() + initTimeseriesWork(tsw, &rss.packedTimeseries[i]) tsws[i] = tsw } - // Shuffle tsws for providing the equal amount of work among workers. - r := getRand() - r.Shuffle(len(tsws), func(i, j int) { - tsws[i], tsws[j] = tsws[j], tsws[i] - }) - putRand(r) - // Spin up up to gomaxprocs local workers and split work equally among them. - // This guarantees linear scalability with the increase of gomaxprocs - // (e.g. the number of available CPU cores). - itemsPerWorker := 1 - if len(rss.packedTimeseries) > gomaxprocs { - itemsPerWorker = (len(rss.packedTimeseries) + gomaxprocs - 1) / gomaxprocs + // Prepare worker channels. + workers := len(tsws) + if workers > gomaxprocs { + workers = gomaxprocs } - var start int - var i uint + itemsPerWorker := (len(tsws) + workers - 1) / workers + workChs := make([]chan *timeseriesWork, workers) + for i := range workChs { + workChs[i] = make(chan *timeseriesWork, itemsPerWorker) + } + + // Spread work among workers. + for i, tsw := range tsws { + idx := i % len(workChs) + workChs[idx] <- tsw + } + // Mark worker channels as closed. + for _, workCh := range workChs { + close(workCh) + } + + // Start workers and wait until they finish the work. var wg sync.WaitGroup - for start < len(tsws) { - end := start + itemsPerWorker - if end > len(tsws) { - end = len(tsws) - } - chunk := tsws[start:end] + for i := range workChs { wg.Add(1) - go func(tswsChunk []*timeseriesWork, workerID uint) { - defer wg.Done() - timeseriesWorker(tswsChunk, workerID) - }(chunk, i) - start = end - i++ + qtChild := qt.NewChild("worker #%d", i) + go func(workerID uint) { + timeseriesWorker(qtChild, workChs, workerID) + qtChild.Done() + wg.Done() + }(uint(i)) } - - // Wait until work is complete. wg.Wait() // Collect results. var firstErr error rowsProcessedTotal := 0 for _, tsw := range tsws { - if err := tsw.err; err != nil && firstErr == nil { + if tsw.err != nil && firstErr == nil { // Return just the first error, since other errors are likely duplicate the first error. - firstErr = err + firstErr = tsw.err } rowsReadPerSeries.Update(float64(tsw.rowsProcessed)) rowsProcessedTotal += tsw.rowsProcessed putTimeseriesWork(tsw) } - - seriesProcessedTotal := len(rss.packedTimeseries) - rss.packedTimeseries = rss.packedTimeseries[:0] - rowsReadPerQuery.Update(float64(rowsProcessedTotal)) - seriesReadPerQuery.Update(float64(seriesProcessedTotal)) - - qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal) - - return firstErr -} - -var randPool sync.Pool - -func getRand() *rand.Rand { - v := randPool.Get() - if v == nil { - v = rand.New(rand.NewSource(int64(fasttime.UnixTimestamp()))) - } - return v.(*rand.Rand) -} - -func putRand(r *rand.Rand) { - randPool.Put(r) + return rowsProcessedTotal, firstErr } var ( @@ -266,48 +340,30 @@ type packedTimeseries struct { brs []blockRef } -type unpackWorkItem struct { - br blockRef - tr storage.TimeRange -} - type unpackWork struct { - tbf *tmpBlocksFile - ws []unpackWorkItem - sbs []*sortBlock - doneCh chan error + tbf *tmpBlocksFile + br blockRef + tr storage.TimeRange + sb *sortBlock + err error } func (upw *unpackWork) reset() { upw.tbf = nil - ws := upw.ws - for i := range ws { - w := &ws[i] - w.br = blockRef{} - w.tr = storage.TimeRange{} - } - upw.ws = upw.ws[:0] - sbs := upw.sbs - for i := range sbs { - sbs[i] = nil - } - upw.sbs = upw.sbs[:0] - if n := len(upw.doneCh); n > 0 { - logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n) - } + upw.br = blockRef{} + upw.tr = storage.TimeRange{} + upw.sb = nil + upw.err = nil } func (upw *unpackWork) unpack(tmpBlock *storage.Block) { - for _, w := range upw.ws { - sb := getSortBlock() - if err := sb.unpackFrom(tmpBlock, upw.tbf, w.br, w.tr); err != nil { - putSortBlock(sb) - upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) - return - } - upw.sbs = append(upw.sbs, sb) + sb := getSortBlock() + if err := sb.unpackFrom(tmpBlock, upw.tbf, upw.br, upw.tr); err != nil { + putSortBlock(sb) + upw.err = fmt.Errorf("cannot unpack block: %w", err) + return } - upw.doneCh <- nil + upw.sb = sb } func getUnpackWork() *unpackWork { @@ -315,9 +371,7 @@ func getUnpackWork() *unpackWork { if v != nil { return v.(*unpackWork) } - return &unpackWork{ - doneCh: make(chan error, 1), - } + return &unpackWork{} } func putUnpackWork(upw *unpackWork) { @@ -327,36 +381,47 @@ func putUnpackWork(upw *unpackWork) { var unpackWorkPool sync.Pool -func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) { - if len(workChs) == 1 { - // Fast path for a single worker - workChs[0] <- uw - return - } - attempts := 0 - for { - idx := fastrand.Uint32n(uint32(len(workChs))) - select { - case workChs[idx] <- uw: - return - default: - attempts++ - if attempts >= len(workChs) { - workChs[idx] <- uw - return - } - } - } -} - -func unpackWorker(ch <-chan *unpackWork) { +func unpackWorker(workChs []chan *unpackWork, workerID uint) { tmpBlock := getTmpStorageBlock() + + // Deal with own work at first. + ch := workChs[workerID] for upw := range ch { upw.unpack(tmpBlock) } + + // Then help others with their work. + idx := int(workerID) + for { + upw, idxNext := stealUnpackWork(workChs, idx) + if upw == nil { + // There is no more work + break + } + upw.unpack(tmpBlock) + idx = idxNext + } + putTmpStorageBlock(tmpBlock) } +func stealUnpackWork(workChs []chan *unpackWork, startIdx int) (*unpackWork, int) { + for i := startIdx; i < startIdx+len(workChs); i++ { + // Give a chance other goroutines to perform their work + runtime.Gosched() + + idx := i % len(workChs) + ch := workChs[idx] + // It is expected that every channel in the workChs is already closed, + // so the next line should return immediately. + upw, ok := <-ch + if ok { + return upw, idx + } + } + return nil, startIdx +} + func getTmpStorageBlock() *storage.Block { v := tmpStorageBlockPool.Get() if v == nil { @@ -371,14 +436,6 @@ func putTmpStorageBlock(sb *storage.Block) { var tmpStorageBlockPool sync.Pool -// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. -// -// It is better to load a single goroutine for up to 100ms on a system with many CPU cores -// in order to reduce inter-CPU memory ping-pong. -// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows. -// So the batch size should be 100ms * 40M / 8K = 500. -var unpackBatchSize = 500 - // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error { dst.reset() @@ -388,6 +445,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. sbh := getSortBlocksHeap() var err error sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbf, tr) + pts.brs = pts.brs[:0] if err != nil { putSortBlocksHeap(sbh) return err @@ -399,112 +457,114 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. } func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) { - brsLen := len(pts.brs) - upwsLen := (brsLen + unpackBatchSize - 1) / unpackBatchSize - if upwsLen == 1 { - // Fast path for common case - unpack all the data in the current goroutine - upw := getUnpackWork() + upwsLen := len(pts.brs) + if upwsLen == 0 { + // Nothing to do + return nil, nil + } + initUnpackWork := func(upw *unpackWork, br blockRef) { upw.tbf = tbf - for _, br := range pts.brs { - upw.ws = append(upw.ws, unpackWorkItem{ - br: br, - tr: tr, - }) - } - pts.brs = pts.brs[:0] - tmpBlock := getTmpStorageBlock() - upw.unpack(tmpBlock) - putTmpStorageBlock(tmpBlock) - - if err := <-upw.doneCh; err != nil { - return dst, err - } + upw.br = br + upw.tr = tr + } + if gomaxprocs == 1 || upwsLen <= 100 { + // It is faster to unpack all the data in the current goroutine. + upw := getUnpackWork() samples := 0 - for _, sb := range upw.sbs { - samples += len(sb.Timestamps) - if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { - return dst, fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ - "or reduce time range for the query", *maxSamplesPerSeries) + tmpBlock := getTmpStorageBlock() + var err error + for _, br := range pts.brs { + initUnpackWork(upw, br) + upw.unpack(tmpBlock) + if upw.err != nil { + return dst, upw.err } - dst = append(dst, sb) + samples += len(upw.sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + putSortBlock(upw.sb) + err = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + break + } + dst = append(dst, upw.sb) + upw.reset() } - + putTmpStorageBlock(tmpBlock) putUnpackWork(upw) - return dst, nil + + return dst, err } // Slow path - spin up multiple local workers for parallel data unpacking. // Do not use global workers pool, since it increases inter-CPU memory ping-poing, // which reduces the scalability on systems with many CPU cores. - workers := upwsLen + + // Prepare the work for workers. + upws := make([]*unpackWork, upwsLen) + for i, br := range pts.brs { + upw := getUnpackWork() + initUnpackWork(upw, br) + upws[i] = upw + } + + // Prepare worker channels. + workers := len(upws) if workers > gomaxprocs { workers = gomaxprocs } if workers < 1 { workers = 1 } + itemsPerWorker := (len(upws) + workers - 1) / workers workChs := make([]chan *unpackWork, workers) - var workChsWG sync.WaitGroup - for i := 0; i < workers; i++ { - workChs[i] = make(chan *unpackWork, 1) - workChsWG.Add(1) - go func(workerID int) { - defer workChsWG.Done() - unpackWorker(workChs[workerID]) - }(i) + for i := range workChs { + workChs[i] = make(chan *unpackWork, itemsPerWorker) } - // Feed workers with work - upws := make([]*unpackWork, 0, upwsLen) - upw := getUnpackWork() - upw.tbf = tbf - for _, br := range pts.brs { - if len(upw.ws) >= unpackBatchSize { - scheduleUnpackWork(workChs, upw) - upws = append(upws, upw) - upw = getUnpackWork() - upw.tbf = tbf - } - upw.ws = append(upw.ws, unpackWorkItem{ - br: br, - tr: tr, - }) + // Spread work among worker channels. + for i, upw := range upws { + idx := i % len(workChs) + workChs[idx] <- upw } - scheduleUnpackWork(workChs, upw) - upws = append(upws, upw) - pts.brs = pts.brs[:0] - - // Collect the unpacked sortBlock items - samples := 0 - var firstErr error - for _, upw := range upws { - if err := <-upw.doneCh; err != nil && firstErr == nil { - // Return the first error only, since other errors are likely the same. - firstErr = err - } - if firstErr == nil { - for _, sb := range upw.sbs { - samples += len(sb.Timestamps) - if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { - firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ - "or reduce time range for the query", *maxSamplesPerSeries) - break - } - dst = append(dst, sb) - } - } else { - for _, sb := range upw.sbs { - putSortBlock(sb) - } - } - putUnpackWork(upw) - } - - // Shut down local workers + // Mark worker channels as closed. for _, workCh := range workChs { close(workCh) } - workChsWG.Wait() + + // Start workers and wait until they finish the work. + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func(workerID uint) { + unpackWorker(workChs, workerID) + wg.Done() + }(uint(i)) + } + wg.Wait() + + // Collect results. + samples := 0 + var firstErr error + for _, upw := range upws { + if upw.err != nil && firstErr == nil { + // Return the first error only, since other errors are likely the same. + firstErr = upw.err + } + if firstErr == nil { + sb := upw.sb + samples += len(sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + putSortBlock(sb) + firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + } else { + dst = append(dst, sb) + } + } else { + putSortBlock(upw.sb) + } + putUnpackWork(upw) + } return dst, firstErr }