This commit is contained in:
Aliaksandr Valialkin 2024-05-14 00:50:32 +02:00
parent dafd45d4c6
commit b03c672227
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 81 additions and 51 deletions

View file

@ -8,6 +8,12 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// The number of blocks to search at once by a single worker
//
// This number must be increased on systems with many CPU cores in order to amortize
// the overhead for passing the blockSearchWork to worker goroutines.
const blockSearchWorksPerBatch = 64
type blockSearchWork struct { type blockSearchWork struct {
// p is the part where the block belongs to. // p is the part where the block belongs to.
p *part p *part
@ -19,12 +25,54 @@ type blockSearchWork struct {
bh blockHeader bh blockHeader
} }
func newBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) *blockSearchWork { func (bsw *blockSearchWork) reset() {
var bsw blockSearchWork bsw.p = nil
bsw.p = p bsw.so = nil
bsw.so = so bsw.bh.reset()
}
type blockSearchWorkBatch struct {
bsws []blockSearchWork
}
func (bswb *blockSearchWorkBatch) reset() {
bsws := bswb.bsws
for i := range bsws {
bsws[i].reset()
}
bswb.bsws = bsws[:0]
}
func getBlockSearchWorkBatch() *blockSearchWorkBatch {
v := blockSearchWorkBatchPool.Get()
if v == nil {
return &blockSearchWorkBatch{
bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch),
}
}
return v.(*blockSearchWorkBatch)
}
func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) {
bswb.reset()
blockSearchWorkBatchPool.Put(bswb)
}
var blockSearchWorkBatchPool sync.Pool
func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool {
bsws := bswb.bsws
bsws = append(bsws, blockSearchWork{
p: p,
so: so,
})
bsw := &bsws[len(bsws)-1]
bsw.bh.copyFrom(bh) bsw.bh.copyFrom(bh)
return &bsw
bswb.bsws = bsws
return len(bsws) < cap(bsws)
} }
func getBlockSearch() *blockSearch { func getBlockSearch() *blockSearch {

View file

@ -162,12 +162,6 @@ func (c *BlockColumn) reset() {
c.Values = nil c.Values = nil
} }
// The number of blocks to search at once by a single worker
//
// This number must be increased on systems with many CPU cores in order to amortize
// the overhead for passing the blockSearchWork to worker goroutines.
const blockSearchWorksPerBatch = 64
// searchResultFunc must process sr. // searchResultFunc must process sr.
// //
// The callback is called at the worker with the given workerID. // The callback is called at the worker with the given workerID.
@ -179,16 +173,19 @@ type searchResultFunc func(workerID uint, br *blockResult)
func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) {
// Spin up workers // Spin up workers
var wgWorkers sync.WaitGroup var wgWorkers sync.WaitGroup
workCh := make(chan []*blockSearchWork, workersCount) workCh := make(chan *blockSearchWorkBatch, workersCount)
wgWorkers.Add(workersCount) wgWorkers.Add(workersCount)
for i := 0; i < workersCount; i++ { for i := 0; i < workersCount; i++ {
go func(workerID uint) { go func(workerID uint) {
bs := getBlockSearch() bs := getBlockSearch()
for bsws := range workCh { for bswb := range workCh {
for _, bsw := range bsws { bsws := bswb.bsws
for i := range bsws {
bsw := &bsws[i]
select { select {
case <-stopCh: case <-stopCh:
// The search has been canceled. Just skip all the scheduled work in order to save CPU time. // The search has been canceled. Just skip all the scheduled work in order to save CPU time.
bsw.reset()
continue continue
default: default:
} }
@ -197,7 +194,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
if len(bs.br.timestamps) > 0 { if len(bs.br.timestamps) > 0 {
processBlockResult(workerID, &bs.br) processBlockResult(workerID, &bs.br)
} }
bsw.reset()
} }
bswb.bsws = bswb.bsws[:0]
putBlockSearchWorkBatch(bswb)
} }
putBlockSearch(bs) putBlockSearch(bs)
wgWorkers.Done() wgWorkers.Done()
@ -265,7 +265,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs
type partitionSearchFinalizer func() type partitionSearchFinalizer func()
func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
select { select {
case <-stopCh: case <-stopCh:
// Do not spend CPU time on search, since it is already stopped. // Do not spend CPU time on search, since it is already stopped.
@ -352,7 +352,7 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter)
return result return result
} }
func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
// Select parts with data for the given time range // Select parts with data for the given time range
ddb.partsLock.Lock() ddb.partsLock.Lock()
pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp) pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp)
@ -378,7 +378,7 @@ func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, s
} }
} }
func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { func (p *part) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
bhss := getBlockHeaders() bhss := getBlockHeaders()
if len(so.tenantIDs) > 0 { if len(so.tenantIDs) > 0 {
p.searchByTenantIDs(so, bhss, workCh, stopCh) p.searchByTenantIDs(so, bhss, workCh, stopCh)
@ -415,27 +415,20 @@ func (bhss *blockHeaders) reset() {
bhss.bhs = bhs[:0] bhss.bhs = bhs[:0]
} }
func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
// it is assumed that tenantIDs are sorted // it is assumed that tenantIDs are sorted
tenantIDs := so.tenantIDs tenantIDs := so.tenantIDs
bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) bswb := getBlockSearchWorkBatch()
scheduleBlockSearch := func(bh *blockHeader) bool { scheduleBlockSearch := func(bh *blockHeader) bool {
// Do not use pool for blockSearchWork, since it is returned back to the pool if bswb.appendBlockSearchWork(p, so, bh) {
// at another goroutine, which may run on another CPU core.
// This means that it will be put into another per-CPU pool, which may result
// in slowdown related to memory synchronization between CPU cores.
// This slowdown is increased on systems with bigger number of CPU cores.
bsw := newBlockSearchWork(p, so, bh)
bsws = append(bsws, bsw)
if len(bsws) < cap(bsws) {
return true return true
} }
select { select {
case <-stopCh: case <-stopCh:
return false return false
case workCh <- bsws: case workCh <- bswb:
bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) bswb = getBlockSearchWorkBatch()
return true return true
} }
} }
@ -520,35 +513,26 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c
} }
// Flush the remaining work // Flush the remaining work
if len(bsws) > 0 { select {
select { case <-stopCh:
case <-stopCh: case workCh <- bswb:
case workCh <- bsws:
}
} }
} }
func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
// it is assumed that streamIDs are sorted // it is assumed that streamIDs are sorted
streamIDs := so.streamIDs streamIDs := so.streamIDs
bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) bswb := getBlockSearchWorkBatch()
scheduleBlockSearch := func(bh *blockHeader) bool { scheduleBlockSearch := func(bh *blockHeader) bool {
// Do not use pool for blockSearchWork, since it is returned back to the pool if bswb.appendBlockSearchWork(p, so, bh) {
// at another goroutine, which may run on another CPU core.
// This means that it will be put into another per-CPU pool, which may result
// in slowdown related to memory synchronization between CPU cores.
// This slowdown is increased on systems with bigger number of CPU cores.
bsw := newBlockSearchWork(p, so, bh)
bsws = append(bsws, bsw)
if len(bsws) < cap(bsws) {
return true return true
} }
select { select {
case <-stopCh: case <-stopCh:
return false return false
case workCh <- bsws: case workCh <- bswb:
bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) bswb = getBlockSearchWorkBatch()
return true return true
} }
} }
@ -634,11 +618,9 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c
} }
// Flush the remaining work // Flush the remaining work
if len(bsws) > 0 { select {
select { case <-stopCh:
case <-stopCh: case workCh <- bswb:
case workCh <- bsws:
}
} }
} }