diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 05e7aadb5..b3a7a5aad 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -5,7 +5,7 @@ import ( "errors" "flag" "fmt" - "math/rand" + "runtime" "sort" "sync" "sync/atomic" @@ -16,12 +16,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" - "github.com/valyala/fastrand" ) var ( @@ -133,16 +131,66 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error { return nil } -func timeseriesWorker(tsws []*timeseriesWork, workerID uint) { +func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, workerID uint) { + tmpResult := getTmpResult() + + // Perform own work at first. + rowsProcessed := 0 + seriesProcessed := 0 + ch := workChs[workerID] + for tsw := range ch { + tsw.err = tsw.do(&tmpResult.rs, workerID) + rowsProcessed += tsw.rowsProcessed + seriesProcessed++ + } + qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) + + // Then help others with the remaining work. + rowsProcessed = 0 + seriesProcessed = 0 + idx := int(workerID) + for { + tsw, idxNext := stealTimeseriesWork(workChs, idx) + if tsw == nil { + // There is no more work + break + } + tsw.err = tsw.do(&tmpResult.rs, workerID) + rowsProcessed += tsw.rowsProcessed + seriesProcessed++ + idx = idxNext + } + qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed) + + putTmpResult(tmpResult) +} + +func stealTimeseriesWork(workChs []chan *timeseriesWork, startIdx int) (*timeseriesWork, int) { + for i := startIdx; i < startIdx+len(workChs); i++ { + // Give a chance other goroutines to perform their work + runtime.Gosched() + + idx := i % len(workChs) + ch := workChs[idx] + // It is expected that every channel in the workChs is already closed, + // so the next line should return immediately. + tsw, ok := <-ch + if ok { + return tsw, idx + } + } + return nil, startIdx +} + +func getTmpResult() *result { v := resultPool.Get() if v == nil { v = &result{} } - r := v.(*result) - for _, tsw := range tsws { - err := tsw.do(&r.rs, workerID) - tsw.err = err - } + return v.(*result) +} + +func putTmpResult(r *result) { currentTime := fasttime.UnixTimestamp() if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { // Reset r.rs in order to preseve memory usage after processing big time series with millions of rows. @@ -170,87 +218,113 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke qt = qt.NewChild("parallel process of fetched data") defer rss.mustClose() - // Prepare work for workers. - tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) + rowsProcessedTotal, err := rss.runParallel(qt, f) + seriesProcessedTotal := len(rss.packedTimeseries) + rss.packedTimeseries = rss.packedTimeseries[:0] + + rowsReadPerQuery.Update(float64(rowsProcessedTotal)) + seriesReadPerQuery.Update(float64(seriesProcessedTotal)) + + qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal) + + return err +} + +func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) { + tswsLen := len(rss.packedTimeseries) + if tswsLen == 0 { + // Nothing to process + return 0, nil + } + var mustStop uint32 - for i := range rss.packedTimeseries { - tsw := getTimeseriesWork() + initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) { tsw.rss = rss - tsw.pts = &rss.packedTimeseries[i] + tsw.pts = pts tsw.f = f tsw.mustStop = &mustStop + } + if gomaxprocs == 1 || tswsLen == 1 { + // It is faster to process time series in the current goroutine. + tsw := getTimeseriesWork() + tmpResult := getTmpResult() + rowsProcessedTotal := 0 + var err error + for i := range rss.packedTimeseries { + initTimeseriesWork(tsw, &rss.packedTimeseries[i]) + err = tsw.do(&tmpResult.rs, 0) + rowsReadPerSeries.Update(float64(tsw.rowsProcessed)) + rowsProcessedTotal += tsw.rowsProcessed + if err != nil { + break + } + tsw.reset() + } + putTmpResult(tmpResult) + putTimeseriesWork(tsw) + + return rowsProcessedTotal, err + } + + // Slow path - spin up multiple local workers for parallel data processing. + // Do not use global workers pool, since it increases inter-CPU memory ping-poing, + // which reduces the scalability on systems with many CPU cores. + + // Prepare the work for workers. + tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) + for i := range rss.packedTimeseries { + tsw := getTimeseriesWork() + initTimeseriesWork(tsw, &rss.packedTimeseries[i]) tsws[i] = tsw } - // Shuffle tsws for providing the equal amount of work among workers. - r := getRand() - r.Shuffle(len(tsws), func(i, j int) { - tsws[i], tsws[j] = tsws[j], tsws[i] - }) - putRand(r) - // Spin up up to gomaxprocs local workers and split work equally among them. - // This guarantees linear scalability with the increase of gomaxprocs - // (e.g. the number of available CPU cores). - itemsPerWorker := 1 - if len(rss.packedTimeseries) > gomaxprocs { - itemsPerWorker = (len(rss.packedTimeseries) + gomaxprocs - 1) / gomaxprocs + // Prepare worker channels. + workers := len(tsws) + if workers > gomaxprocs { + workers = gomaxprocs } - var start int - var i uint + itemsPerWorker := (len(tsws) + workers - 1) / workers + workChs := make([]chan *timeseriesWork, workers) + for i := range workChs { + workChs[i] = make(chan *timeseriesWork, itemsPerWorker) + } + + // Spread work among workers. + for i, tsw := range tsws { + idx := i % len(workChs) + workChs[idx] <- tsw + } + // Mark worker channels as closed. + for _, workCh := range workChs { + close(workCh) + } + + // Start workers and wait until they finish the work. var wg sync.WaitGroup - for start < len(tsws) { - end := start + itemsPerWorker - if end > len(tsws) { - end = len(tsws) - } - chunk := tsws[start:end] + for i := range workChs { wg.Add(1) - go func(tswsChunk []*timeseriesWork, workerID uint) { - defer wg.Done() - timeseriesWorker(tswsChunk, workerID) - }(chunk, i) - start = end - i++ + qtChild := qt.NewChild("worker #%d", i) + go func(workerID uint) { + timeseriesWorker(qtChild, workChs, workerID) + qtChild.Done() + wg.Done() + }(uint(i)) } - - // Wait until work is complete. wg.Wait() // Collect results. var firstErr error rowsProcessedTotal := 0 for _, tsw := range tsws { - if err := tsw.err; err != nil && firstErr == nil { + if tsw.err != nil && firstErr == nil { // Return just the first error, since other errors are likely duplicate the first error. - firstErr = err + firstErr = tsw.err } rowsReadPerSeries.Update(float64(tsw.rowsProcessed)) rowsProcessedTotal += tsw.rowsProcessed putTimeseriesWork(tsw) } - - seriesProcessedTotal := len(rss.packedTimeseries) - rss.packedTimeseries = rss.packedTimeseries[:0] - rowsReadPerQuery.Update(float64(rowsProcessedTotal)) - seriesReadPerQuery.Update(float64(seriesProcessedTotal)) - - qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal) - - return firstErr -} - -var randPool sync.Pool - -func getRand() *rand.Rand { - v := randPool.Get() - if v == nil { - v = rand.New(rand.NewSource(int64(fasttime.UnixTimestamp()))) - } - return v.(*rand.Rand) -} - -func putRand(r *rand.Rand) { - randPool.Put(r) + return rowsProcessedTotal, firstErr } var ( @@ -266,48 +340,30 @@ type packedTimeseries struct { brs []blockRef } -type unpackWorkItem struct { - br blockRef - tr storage.TimeRange -} - type unpackWork struct { - tbf *tmpBlocksFile - ws []unpackWorkItem - sbs []*sortBlock - doneCh chan error + tbf *tmpBlocksFile + br blockRef + tr storage.TimeRange + sb *sortBlock + err error } func (upw *unpackWork) reset() { upw.tbf = nil - ws := upw.ws - for i := range ws { - w := &ws[i] - w.br = blockRef{} - w.tr = storage.TimeRange{} - } - upw.ws = upw.ws[:0] - sbs := upw.sbs - for i := range sbs { - sbs[i] = nil - } - upw.sbs = upw.sbs[:0] - if n := len(upw.doneCh); n > 0 { - logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n) - } + upw.br = blockRef{} + upw.tr = storage.TimeRange{} + upw.sb = nil + upw.err = nil } func (upw *unpackWork) unpack(tmpBlock *storage.Block) { - for _, w := range upw.ws { - sb := getSortBlock() - if err := sb.unpackFrom(tmpBlock, upw.tbf, w.br, w.tr); err != nil { - putSortBlock(sb) - upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) - return - } - upw.sbs = append(upw.sbs, sb) + sb := getSortBlock() + if err := sb.unpackFrom(tmpBlock, upw.tbf, upw.br, upw.tr); err != nil { + putSortBlock(sb) + upw.err = fmt.Errorf("cannot unpack block: %w", err) + return } - upw.doneCh <- nil + upw.sb = sb } func getUnpackWork() *unpackWork { @@ -315,9 +371,7 @@ func getUnpackWork() *unpackWork { if v != nil { return v.(*unpackWork) } - return &unpackWork{ - doneCh: make(chan error, 1), - } + return &unpackWork{} } func putUnpackWork(upw *unpackWork) { @@ -327,36 +381,47 @@ func putUnpackWork(upw *unpackWork) { var unpackWorkPool sync.Pool -func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) { - if len(workChs) == 1 { - // Fast path for a single worker - workChs[0] <- uw - return - } - attempts := 0 - for { - idx := fastrand.Uint32n(uint32(len(workChs))) - select { - case workChs[idx] <- uw: - return - default: - attempts++ - if attempts >= len(workChs) { - workChs[idx] <- uw - return - } - } - } -} - -func unpackWorker(ch <-chan *unpackWork) { +func unpackWorker(workChs []chan *unpackWork, workerID uint) { tmpBlock := getTmpStorageBlock() + + // Deal with own work at first. + ch := workChs[workerID] for upw := range ch { upw.unpack(tmpBlock) } + + // Then help others with their work. + idx := int(workerID) + for { + upw, idxNext := stealUnpackWork(workChs, idx) + if upw == nil { + // There is no more work + break + } + upw.unpack(tmpBlock) + idx = idxNext + } + putTmpStorageBlock(tmpBlock) } +func stealUnpackWork(workChs []chan *unpackWork, startIdx int) (*unpackWork, int) { + for i := startIdx; i < startIdx+len(workChs); i++ { + // Give a chance other goroutines to perform their work + runtime.Gosched() + + idx := i % len(workChs) + ch := workChs[idx] + // It is expected that every channel in the workChs is already closed, + // so the next line should return immediately. + upw, ok := <-ch + if ok { + return upw, idx + } + } + return nil, startIdx +} + func getTmpStorageBlock() *storage.Block { v := tmpStorageBlockPool.Get() if v == nil { @@ -371,14 +436,6 @@ func putTmpStorageBlock(sb *storage.Block) { var tmpStorageBlockPool sync.Pool -// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. -// -// It is better to load a single goroutine for up to 100ms on a system with many CPU cores -// in order to reduce inter-CPU memory ping-pong. -// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows. -// So the batch size should be 100ms * 40M / 8K = 500. -var unpackBatchSize = 500 - // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error { dst.reset() @@ -388,6 +445,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. sbh := getSortBlocksHeap() var err error sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbf, tr) + pts.brs = pts.brs[:0] if err != nil { putSortBlocksHeap(sbh) return err @@ -399,112 +457,114 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. } func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) { - brsLen := len(pts.brs) - upwsLen := (brsLen + unpackBatchSize - 1) / unpackBatchSize - if upwsLen == 1 { - // Fast path for common case - unpack all the data in the current goroutine - upw := getUnpackWork() + upwsLen := len(pts.brs) + if upwsLen == 0 { + // Nothing to do + return nil, nil + } + initUnpackWork := func(upw *unpackWork, br blockRef) { upw.tbf = tbf - for _, br := range pts.brs { - upw.ws = append(upw.ws, unpackWorkItem{ - br: br, - tr: tr, - }) - } - pts.brs = pts.brs[:0] - tmpBlock := getTmpStorageBlock() - upw.unpack(tmpBlock) - putTmpStorageBlock(tmpBlock) - - if err := <-upw.doneCh; err != nil { - return dst, err - } + upw.br = br + upw.tr = tr + } + if gomaxprocs == 1 || upwsLen <= 100 { + // It is faster to unpack all the data in the current goroutine. + upw := getUnpackWork() samples := 0 - for _, sb := range upw.sbs { - samples += len(sb.Timestamps) - if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { - return dst, fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ - "or reduce time range for the query", *maxSamplesPerSeries) + tmpBlock := getTmpStorageBlock() + var err error + for _, br := range pts.brs { + initUnpackWork(upw, br) + upw.unpack(tmpBlock) + if upw.err != nil { + return dst, upw.err } - dst = append(dst, sb) + samples += len(upw.sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + putSortBlock(upw.sb) + err = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + break + } + dst = append(dst, upw.sb) + upw.reset() } - + putTmpStorageBlock(tmpBlock) putUnpackWork(upw) - return dst, nil + + return dst, err } // Slow path - spin up multiple local workers for parallel data unpacking. // Do not use global workers pool, since it increases inter-CPU memory ping-poing, // which reduces the scalability on systems with many CPU cores. - workers := upwsLen + + // Prepare the work for workers. + upws := make([]*unpackWork, upwsLen) + for i, br := range pts.brs { + upw := getUnpackWork() + initUnpackWork(upw, br) + upws[i] = upw + } + + // Prepare worker channels. + workers := len(upws) if workers > gomaxprocs { workers = gomaxprocs } if workers < 1 { workers = 1 } + itemsPerWorker := (len(upws) + workers - 1) / workers workChs := make([]chan *unpackWork, workers) - var workChsWG sync.WaitGroup - for i := 0; i < workers; i++ { - workChs[i] = make(chan *unpackWork, 1) - workChsWG.Add(1) - go func(workerID int) { - defer workChsWG.Done() - unpackWorker(workChs[workerID]) - }(i) + for i := range workChs { + workChs[i] = make(chan *unpackWork, itemsPerWorker) } - // Feed workers with work - upws := make([]*unpackWork, 0, upwsLen) - upw := getUnpackWork() - upw.tbf = tbf - for _, br := range pts.brs { - if len(upw.ws) >= unpackBatchSize { - scheduleUnpackWork(workChs, upw) - upws = append(upws, upw) - upw = getUnpackWork() - upw.tbf = tbf - } - upw.ws = append(upw.ws, unpackWorkItem{ - br: br, - tr: tr, - }) + // Spread work among worker channels. + for i, upw := range upws { + idx := i % len(workChs) + workChs[idx] <- upw } - scheduleUnpackWork(workChs, upw) - upws = append(upws, upw) - pts.brs = pts.brs[:0] - - // Collect the unpacked sortBlock items - samples := 0 - var firstErr error - for _, upw := range upws { - if err := <-upw.doneCh; err != nil && firstErr == nil { - // Return the first error only, since other errors are likely the same. - firstErr = err - } - if firstErr == nil { - for _, sb := range upw.sbs { - samples += len(sb.Timestamps) - if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { - firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ - "or reduce time range for the query", *maxSamplesPerSeries) - break - } - dst = append(dst, sb) - } - } else { - for _, sb := range upw.sbs { - putSortBlock(sb) - } - } - putUnpackWork(upw) - } - - // Shut down local workers + // Mark worker channels as closed. for _, workCh := range workChs { close(workCh) } - workChsWG.Wait() + + // Start workers and wait until they finish the work. + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func(workerID uint) { + unpackWorker(workChs, workerID) + wg.Done() + }(uint(i)) + } + wg.Wait() + + // Collect results. + samples := 0 + var firstErr error + for _, upw := range upws { + if upw.err != nil && firstErr == nil { + // Return the first error only, since other errors are likely the same. + firstErr = upw.err + } + if firstErr == nil { + sb := upw.sb + samples += len(sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + putSortBlock(sb) + firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + } else { + dst = append(dst, sb) + } + } else { + putSortBlock(upw.sb) + } + putUnpackWork(upw) + } return dst, firstErr }