diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index d599e1a62..05e7aadb5 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -193,7 +193,7 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke // (e.g. the number of available CPU cores). itemsPerWorker := 1 if len(rss.packedTimeseries) > gomaxprocs { - itemsPerWorker = 1 + len(rss.packedTimeseries)/gomaxprocs + itemsPerWorker = (len(rss.packedTimeseries) + gomaxprocs - 1) / gomaxprocs } var start int var i uint @@ -350,26 +350,34 @@ func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) { } func unpackWorker(ch <-chan *unpackWork) { - v := tmpBlockPool.Get() - if v == nil { - v = &storage.Block{} - } - tmpBlock := v.(*storage.Block) + tmpBlock := getTmpStorageBlock() for upw := range ch { upw.unpack(tmpBlock) } - tmpBlockPool.Put(v) + putTmpStorageBlock(tmpBlock) } -var tmpBlockPool sync.Pool +func getTmpStorageBlock() *storage.Block { + v := tmpStorageBlockPool.Get() + if v == nil { + v = &storage.Block{} + } + return v.(*storage.Block) +} + +func putTmpStorageBlock(sb *storage.Block) { + tmpStorageBlockPool.Put(sb) +} + +var tmpStorageBlockPool sync.Pool // unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine. // -// It is better to load a single goroutine for up to one second on a system with many CPU cores +// It is better to load a single goroutine for up to 100ms on a system with many CPU cores // in order to reduce inter-CPU memory ping-pong. // A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows. -// So the batch size should be 40M / 8K = 5K. -var unpackBatchSize = 5000 +// So the batch size should be 100ms * 40M / 8K = 500. +var unpackBatchSize = 500 // Unpack unpacks pts to dst. func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error { @@ -377,12 +385,58 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil { return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err) } + sbh := getSortBlocksHeap() + var err error + sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbf, tr) + if err != nil { + putSortBlocksHeap(sbh) + return err + } + dedupInterval := storage.GetDedupInterval() + mergeSortBlocks(dst, sbh, dedupInterval) + putSortBlocksHeap(sbh) + return nil +} - // Spin up local workers. +func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) { + brsLen := len(pts.brs) + upwsLen := (brsLen + unpackBatchSize - 1) / unpackBatchSize + if upwsLen == 1 { + // Fast path for common case - unpack all the data in the current goroutine + upw := getUnpackWork() + upw.tbf = tbf + for _, br := range pts.brs { + upw.ws = append(upw.ws, unpackWorkItem{ + br: br, + tr: tr, + }) + } + pts.brs = pts.brs[:0] + tmpBlock := getTmpStorageBlock() + upw.unpack(tmpBlock) + putTmpStorageBlock(tmpBlock) + + if err := <-upw.doneCh; err != nil { + return dst, err + } + samples := 0 + for _, sb := range upw.sbs { + samples += len(sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + return dst, fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + } + dst = append(dst, sb) + } + + putUnpackWork(upw) + return dst, nil + } + + // Slow path - spin up multiple local workers for parallel data unpacking. // Do not use global workers pool, since it increases inter-CPU memory ping-poing, // which reduces the scalability on systems with many CPU cores. - brsLen := len(pts.brs) - workers := brsLen / unpackBatchSize + workers := upwsLen if workers > gomaxprocs { workers = gomaxprocs } @@ -392,11 +446,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. workChs := make([]chan *unpackWork, workers) var workChsWG sync.WaitGroup for i := 0; i < workers; i++ { - // Use unbuffered channel on purpose, since there are high chances - // that only a single unpackWork is needed to unpack. - // The unbuffered channel should reduce inter-CPU ping-pong in this case, - // which should improve the performance in a system with many CPU cores. - workChs[i] = make(chan *unpackWork) + workChs[i] = make(chan *unpackWork, 1) workChsWG.Add(1) go func(workerID int) { defer workChsWG.Done() @@ -405,7 +455,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. } // Feed workers with work - upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize) + upws := make([]*unpackWork, 0, upwsLen) upw := getUnpackWork() upw.tbf = tbf for _, br := range pts.brs { @@ -424,14 +474,8 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. upws = append(upws, upw) pts.brs = pts.brs[:0] - // Wait until work is complete + // Collect the unpacked sortBlock items samples := 0 - sbh := getSortBlocksHeap() - sbs := sbh.sbs - if n := brsLen - cap(sbs); n > 0 { - sbs = append(sbs[:cap(sbs)], make([]*sortBlock, n)...) - } - sbs = sbs[:0] var firstErr error for _, upw := range upws { if err := <-upw.doneCh; err != nil && firstErr == nil { @@ -441,22 +485,20 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. if firstErr == nil { for _, sb := range upw.sbs { samples += len(sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + break + } + dst = append(dst, sb) } - if *maxSamplesPerSeries <= 0 || samples < *maxSamplesPerSeries { - sbs = append(sbs, upw.sbs...) - } else { - firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ - "or reduce time range for the query", *maxSamplesPerSeries) - } - } - if firstErr != nil { + } else { for _, sb := range upw.sbs { putSortBlock(sb) } } putUnpackWork(upw) } - sbh.sbs = sbs // Shut down local workers for _, workCh := range workChs { @@ -464,13 +506,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. } workChsWG.Wait() - if firstErr != nil { - return firstErr - } - dedupInterval := storage.GetDedupInterval() - mergeSortBlocks(dst, sbh, dedupInterval) - putSortBlocksHeap(sbh) - return nil + return dst, firstErr } func getSortBlock() *sortBlock {