app/vmselect/netstorage: reduce tail latency during query processing

Previously the selected time series were split evenly among available CPU cores
for further processing - e.g unpacking the data and applying the given rollup
function to the unpacked data.
Some time series could be processed slower than others.
This could result in uneven work distribution among available CPU cores,
e.g. some CPU cores could complete their work sooner than others.
This could slow down query execution.

The new algorithm allows stealing time series to process from other CPU cores
when all the local work is done. This should reduce the maximum time
needed for query execution (aka tail latency).

The new algorithm should also scale better on systems with many CPU cores,
since every CPU processes locally assigned time series without inter-CPU communications.

The inter-CPU communications are used only when all the local work is finished
and the pending work from other CPUs needs to be stealed.
This commit is contained in:
Aliaksandr Valialkin 2023-01-10 13:06:02 -08:00
parent b2ccdaaa2f
commit 53d871d0b1
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -5,7 +5,7 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"math/rand" "runtime"
"sort" "sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -16,12 +16,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql" "github.com/VictoriaMetrics/metricsql"
"github.com/valyala/fastrand"
) )
var ( var (
@ -133,16 +131,66 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
return nil return nil
} }
func timeseriesWorker(tsws []*timeseriesWork, workerID uint) { func timeseriesWorker(qt *querytracer.Tracer, workChs []chan *timeseriesWork, workerID uint) {
tmpResult := getTmpResult()
// Perform own work at first.
rowsProcessed := 0
seriesProcessed := 0
ch := workChs[workerID]
for tsw := range ch {
tsw.err = tsw.do(&tmpResult.rs, workerID)
rowsProcessed += tsw.rowsProcessed
seriesProcessed++
}
qt.Printf("own work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
// Then help others with the remaining work.
rowsProcessed = 0
seriesProcessed = 0
idx := int(workerID)
for {
tsw, idxNext := stealTimeseriesWork(workChs, idx)
if tsw == nil {
// There is no more work
break
}
tsw.err = tsw.do(&tmpResult.rs, workerID)
rowsProcessed += tsw.rowsProcessed
seriesProcessed++
idx = idxNext
}
qt.Printf("others work processed: series=%d, samples=%d", seriesProcessed, rowsProcessed)
putTmpResult(tmpResult)
}
func stealTimeseriesWork(workChs []chan *timeseriesWork, startIdx int) (*timeseriesWork, int) {
for i := startIdx; i < startIdx+len(workChs); i++ {
// Give a chance other goroutines to perform their work
runtime.Gosched()
idx := i % len(workChs)
ch := workChs[idx]
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
tsw, ok := <-ch
if ok {
return tsw, idx
}
}
return nil, startIdx
}
func getTmpResult() *result {
v := resultPool.Get() v := resultPool.Get()
if v == nil { if v == nil {
v = &result{} v = &result{}
} }
r := v.(*result) return v.(*result)
for _, tsw := range tsws { }
err := tsw.do(&r.rs, workerID)
tsw.err = err func putTmpResult(r *result) {
}
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows. // Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
@ -170,87 +218,113 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
qt = qt.NewChild("parallel process of fetched data") qt = qt.NewChild("parallel process of fetched data")
defer rss.mustClose() defer rss.mustClose()
// Prepare work for workers. rowsProcessedTotal, err := rss.runParallel(qt, f)
tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) seriesProcessedTotal := len(rss.packedTimeseries)
rss.packedTimeseries = rss.packedTimeseries[:0]
rowsReadPerQuery.Update(float64(rowsProcessedTotal))
seriesReadPerQuery.Update(float64(seriesProcessedTotal))
qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal)
return err
}
func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) {
tswsLen := len(rss.packedTimeseries)
if tswsLen == 0 {
// Nothing to process
return 0, nil
}
var mustStop uint32 var mustStop uint32
for i := range rss.packedTimeseries { initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) {
tsw := getTimeseriesWork()
tsw.rss = rss tsw.rss = rss
tsw.pts = &rss.packedTimeseries[i] tsw.pts = pts
tsw.f = f tsw.f = f
tsw.mustStop = &mustStop tsw.mustStop = &mustStop
}
if gomaxprocs == 1 || tswsLen == 1 {
// It is faster to process time series in the current goroutine.
tsw := getTimeseriesWork()
tmpResult := getTmpResult()
rowsProcessedTotal := 0
var err error
for i := range rss.packedTimeseries {
initTimeseriesWork(tsw, &rss.packedTimeseries[i])
err = tsw.do(&tmpResult.rs, 0)
rowsReadPerSeries.Update(float64(tsw.rowsProcessed))
rowsProcessedTotal += tsw.rowsProcessed
if err != nil {
break
}
tsw.reset()
}
putTmpResult(tmpResult)
putTimeseriesWork(tsw)
return rowsProcessedTotal, err
}
// Slow path - spin up multiple local workers for parallel data processing.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores.
// Prepare the work for workers.
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
for i := range rss.packedTimeseries {
tsw := getTimeseriesWork()
initTimeseriesWork(tsw, &rss.packedTimeseries[i])
tsws[i] = tsw tsws[i] = tsw
} }
// Shuffle tsws for providing the equal amount of work among workers.
r := getRand()
r.Shuffle(len(tsws), func(i, j int) {
tsws[i], tsws[j] = tsws[j], tsws[i]
})
putRand(r)
// Spin up up to gomaxprocs local workers and split work equally among them. // Prepare worker channels.
// This guarantees linear scalability with the increase of gomaxprocs workers := len(tsws)
// (e.g. the number of available CPU cores). if workers > gomaxprocs {
itemsPerWorker := 1 workers = gomaxprocs
if len(rss.packedTimeseries) > gomaxprocs {
itemsPerWorker = (len(rss.packedTimeseries) + gomaxprocs - 1) / gomaxprocs
} }
var start int itemsPerWorker := (len(tsws) + workers - 1) / workers
var i uint workChs := make([]chan *timeseriesWork, workers)
for i := range workChs {
workChs[i] = make(chan *timeseriesWork, itemsPerWorker)
}
// Spread work among workers.
for i, tsw := range tsws {
idx := i % len(workChs)
workChs[idx] <- tsw
}
// Mark worker channels as closed.
for _, workCh := range workChs {
close(workCh)
}
// Start workers and wait until they finish the work.
var wg sync.WaitGroup var wg sync.WaitGroup
for start < len(tsws) { for i := range workChs {
end := start + itemsPerWorker
if end > len(tsws) {
end = len(tsws)
}
chunk := tsws[start:end]
wg.Add(1) wg.Add(1)
go func(tswsChunk []*timeseriesWork, workerID uint) { qtChild := qt.NewChild("worker #%d", i)
defer wg.Done() go func(workerID uint) {
timeseriesWorker(tswsChunk, workerID) timeseriesWorker(qtChild, workChs, workerID)
}(chunk, i) qtChild.Done()
start = end wg.Done()
i++ }(uint(i))
} }
// Wait until work is complete.
wg.Wait() wg.Wait()
// Collect results. // Collect results.
var firstErr error var firstErr error
rowsProcessedTotal := 0 rowsProcessedTotal := 0
for _, tsw := range tsws { for _, tsw := range tsws {
if err := tsw.err; err != nil && firstErr == nil { if tsw.err != nil && firstErr == nil {
// Return just the first error, since other errors are likely duplicate the first error. // Return just the first error, since other errors are likely duplicate the first error.
firstErr = err firstErr = tsw.err
} }
rowsReadPerSeries.Update(float64(tsw.rowsProcessed)) rowsReadPerSeries.Update(float64(tsw.rowsProcessed))
rowsProcessedTotal += tsw.rowsProcessed rowsProcessedTotal += tsw.rowsProcessed
putTimeseriesWork(tsw) putTimeseriesWork(tsw)
} }
return rowsProcessedTotal, firstErr
seriesProcessedTotal := len(rss.packedTimeseries)
rss.packedTimeseries = rss.packedTimeseries[:0]
rowsReadPerQuery.Update(float64(rowsProcessedTotal))
seriesReadPerQuery.Update(float64(seriesProcessedTotal))
qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal)
return firstErr
}
var randPool sync.Pool
func getRand() *rand.Rand {
v := randPool.Get()
if v == nil {
v = rand.New(rand.NewSource(int64(fasttime.UnixTimestamp())))
}
return v.(*rand.Rand)
}
func putRand(r *rand.Rand) {
randPool.Put(r)
} }
var ( var (
@ -266,48 +340,30 @@ type packedTimeseries struct {
brs []blockRef brs []blockRef
} }
type unpackWorkItem struct {
br blockRef
tr storage.TimeRange
}
type unpackWork struct { type unpackWork struct {
tbf *tmpBlocksFile tbf *tmpBlocksFile
ws []unpackWorkItem br blockRef
sbs []*sortBlock tr storage.TimeRange
doneCh chan error sb *sortBlock
err error
} }
func (upw *unpackWork) reset() { func (upw *unpackWork) reset() {
upw.tbf = nil upw.tbf = nil
ws := upw.ws upw.br = blockRef{}
for i := range ws { upw.tr = storage.TimeRange{}
w := &ws[i] upw.sb = nil
w.br = blockRef{} upw.err = nil
w.tr = storage.TimeRange{}
}
upw.ws = upw.ws[:0]
sbs := upw.sbs
for i := range sbs {
sbs[i] = nil
}
upw.sbs = upw.sbs[:0]
if n := len(upw.doneCh); n > 0 {
logger.Panicf("BUG: upw.doneCh must be empty; it contains %d items now", n)
}
} }
func (upw *unpackWork) unpack(tmpBlock *storage.Block) { func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
for _, w := range upw.ws { sb := getSortBlock()
sb := getSortBlock() if err := sb.unpackFrom(tmpBlock, upw.tbf, upw.br, upw.tr); err != nil {
if err := sb.unpackFrom(tmpBlock, upw.tbf, w.br, w.tr); err != nil { putSortBlock(sb)
putSortBlock(sb) upw.err = fmt.Errorf("cannot unpack block: %w", err)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err) return
return
}
upw.sbs = append(upw.sbs, sb)
} }
upw.doneCh <- nil upw.sb = sb
} }
func getUnpackWork() *unpackWork { func getUnpackWork() *unpackWork {
@ -315,9 +371,7 @@ func getUnpackWork() *unpackWork {
if v != nil { if v != nil {
return v.(*unpackWork) return v.(*unpackWork)
} }
return &unpackWork{ return &unpackWork{}
doneCh: make(chan error, 1),
}
} }
func putUnpackWork(upw *unpackWork) { func putUnpackWork(upw *unpackWork) {
@ -327,36 +381,47 @@ func putUnpackWork(upw *unpackWork) {
var unpackWorkPool sync.Pool var unpackWorkPool sync.Pool
func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) { func unpackWorker(workChs []chan *unpackWork, workerID uint) {
if len(workChs) == 1 {
// Fast path for a single worker
workChs[0] <- uw
return
}
attempts := 0
for {
idx := fastrand.Uint32n(uint32(len(workChs)))
select {
case workChs[idx] <- uw:
return
default:
attempts++
if attempts >= len(workChs) {
workChs[idx] <- uw
return
}
}
}
}
func unpackWorker(ch <-chan *unpackWork) {
tmpBlock := getTmpStorageBlock() tmpBlock := getTmpStorageBlock()
// Deal with own work at first.
ch := workChs[workerID]
for upw := range ch { for upw := range ch {
upw.unpack(tmpBlock) upw.unpack(tmpBlock)
} }
// Then help others with their work.
idx := int(workerID)
for {
upw, idxNext := stealUnpackWork(workChs, idx)
if upw == nil {
// There is no more work
break
}
upw.unpack(tmpBlock)
idx = idxNext
}
putTmpStorageBlock(tmpBlock) putTmpStorageBlock(tmpBlock)
} }
func stealUnpackWork(workChs []chan *unpackWork, startIdx int) (*unpackWork, int) {
for i := startIdx; i < startIdx+len(workChs); i++ {
// Give a chance other goroutines to perform their work
runtime.Gosched()
idx := i % len(workChs)
ch := workChs[idx]
// It is expected that every channel in the workChs is already closed,
// so the next line should return immediately.
upw, ok := <-ch
if ok {
return upw, idx
}
}
return nil, startIdx
}
func getTmpStorageBlock() *storage.Block { func getTmpStorageBlock() *storage.Block {
v := tmpStorageBlockPool.Get() v := tmpStorageBlockPool.Get()
if v == nil { if v == nil {
@ -371,14 +436,6 @@ func putTmpStorageBlock(sb *storage.Block) {
var tmpStorageBlockPool sync.Pool var tmpStorageBlockPool sync.Pool
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
//
// It is better to load a single goroutine for up to 100ms on a system with many CPU cores
// in order to reduce inter-CPU memory ping-pong.
// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows.
// So the batch size should be 100ms * 40M / 8K = 500.
var unpackBatchSize = 500
// Unpack unpacks pts to dst. // Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error { func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange) error {
dst.reset() dst.reset()
@ -388,6 +445,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
sbh := getSortBlocksHeap() sbh := getSortBlocksHeap()
var err error var err error
sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbf, tr) sbh.sbs, err = pts.unpackTo(sbh.sbs[:0], tbf, tr)
pts.brs = pts.brs[:0]
if err != nil { if err != nil {
putSortBlocksHeap(sbh) putSortBlocksHeap(sbh)
return err return err
@ -399,112 +457,114 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
} }
func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) { func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbf *tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) {
brsLen := len(pts.brs) upwsLen := len(pts.brs)
upwsLen := (brsLen + unpackBatchSize - 1) / unpackBatchSize if upwsLen == 0 {
if upwsLen == 1 { // Nothing to do
// Fast path for common case - unpack all the data in the current goroutine return nil, nil
upw := getUnpackWork() }
initUnpackWork := func(upw *unpackWork, br blockRef) {
upw.tbf = tbf upw.tbf = tbf
for _, br := range pts.brs { upw.br = br
upw.ws = append(upw.ws, unpackWorkItem{ upw.tr = tr
br: br, }
tr: tr, if gomaxprocs == 1 || upwsLen <= 100 {
}) // It is faster to unpack all the data in the current goroutine.
} upw := getUnpackWork()
pts.brs = pts.brs[:0]
tmpBlock := getTmpStorageBlock()
upw.unpack(tmpBlock)
putTmpStorageBlock(tmpBlock)
if err := <-upw.doneCh; err != nil {
return dst, err
}
samples := 0 samples := 0
for _, sb := range upw.sbs { tmpBlock := getTmpStorageBlock()
samples += len(sb.Timestamps) var err error
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { for _, br := range pts.brs {
return dst, fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ initUnpackWork(upw, br)
"or reduce time range for the query", *maxSamplesPerSeries) upw.unpack(tmpBlock)
if upw.err != nil {
return dst, upw.err
} }
dst = append(dst, sb) samples += len(upw.sb.Timestamps)
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
putSortBlock(upw.sb)
err = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
break
}
dst = append(dst, upw.sb)
upw.reset()
} }
putTmpStorageBlock(tmpBlock)
putUnpackWork(upw) putUnpackWork(upw)
return dst, nil
return dst, err
} }
// Slow path - spin up multiple local workers for parallel data unpacking. // Slow path - spin up multiple local workers for parallel data unpacking.
// Do not use global workers pool, since it increases inter-CPU memory ping-poing, // Do not use global workers pool, since it increases inter-CPU memory ping-poing,
// which reduces the scalability on systems with many CPU cores. // which reduces the scalability on systems with many CPU cores.
workers := upwsLen
// Prepare the work for workers.
upws := make([]*unpackWork, upwsLen)
for i, br := range pts.brs {
upw := getUnpackWork()
initUnpackWork(upw, br)
upws[i] = upw
}
// Prepare worker channels.
workers := len(upws)
if workers > gomaxprocs { if workers > gomaxprocs {
workers = gomaxprocs workers = gomaxprocs
} }
if workers < 1 { if workers < 1 {
workers = 1 workers = 1
} }
itemsPerWorker := (len(upws) + workers - 1) / workers
workChs := make([]chan *unpackWork, workers) workChs := make([]chan *unpackWork, workers)
var workChsWG sync.WaitGroup for i := range workChs {
for i := 0; i < workers; i++ { workChs[i] = make(chan *unpackWork, itemsPerWorker)
workChs[i] = make(chan *unpackWork, 1)
workChsWG.Add(1)
go func(workerID int) {
defer workChsWG.Done()
unpackWorker(workChs[workerID])
}(i)
} }
// Feed workers with work // Spread work among worker channels.
upws := make([]*unpackWork, 0, upwsLen) for i, upw := range upws {
upw := getUnpackWork() idx := i % len(workChs)
upw.tbf = tbf workChs[idx] <- upw
for _, br := range pts.brs {
if len(upw.ws) >= unpackBatchSize {
scheduleUnpackWork(workChs, upw)
upws = append(upws, upw)
upw = getUnpackWork()
upw.tbf = tbf
}
upw.ws = append(upw.ws, unpackWorkItem{
br: br,
tr: tr,
})
} }
scheduleUnpackWork(workChs, upw) // Mark worker channels as closed.
upws = append(upws, upw)
pts.brs = pts.brs[:0]
// Collect the unpacked sortBlock items
samples := 0
var firstErr error
for _, upw := range upws {
if err := <-upw.doneCh; err != nil && firstErr == nil {
// Return the first error only, since other errors are likely the same.
firstErr = err
}
if firstErr == nil {
for _, sb := range upw.sbs {
samples += len(sb.Timestamps)
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
break
}
dst = append(dst, sb)
}
} else {
for _, sb := range upw.sbs {
putSortBlock(sb)
}
}
putUnpackWork(upw)
}
// Shut down local workers
for _, workCh := range workChs { for _, workCh := range workChs {
close(workCh) close(workCh)
} }
workChsWG.Wait()
// Start workers and wait until they finish the work.
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID uint) {
unpackWorker(workChs, workerID)
wg.Done()
}(uint(i))
}
wg.Wait()
// Collect results.
samples := 0
var firstErr error
for _, upw := range upws {
if upw.err != nil && firstErr == nil {
// Return the first error only, since other errors are likely the same.
firstErr = upw.err
}
if firstErr == nil {
sb := upw.sb
samples += len(sb.Timestamps)
if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries {
putSortBlock(sb)
firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+
"or reduce time range for the query", *maxSamplesPerSeries)
} else {
dst = append(dst, sb)
}
} else {
putSortBlock(upw.sb)
}
putUnpackWork(upw)
}
return dst, firstErr return dst, firstErr
} }