diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 502e51941..6845f9c33 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -8,6 +8,12 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) +// The number of blocks to search at once by a single worker +// +// This number must be increased on systems with many CPU cores in order to amortize +// the overhead for passing the blockSearchWork to worker goroutines. +const blockSearchWorksPerBatch = 64 + type blockSearchWork struct { // p is the part where the block belongs to. p *part @@ -19,12 +25,54 @@ type blockSearchWork struct { bh blockHeader } -func newBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) *blockSearchWork { - var bsw blockSearchWork - bsw.p = p - bsw.so = so +func (bsw *blockSearchWork) reset() { + bsw.p = nil + bsw.so = nil + bsw.bh.reset() +} + +type blockSearchWorkBatch struct { + bsws []blockSearchWork +} + +func (bswb *blockSearchWorkBatch) reset() { + bsws := bswb.bsws + for i := range bsws { + bsws[i].reset() + } + bswb.bsws = bsws[:0] +} + +func getBlockSearchWorkBatch() *blockSearchWorkBatch { + v := blockSearchWorkBatchPool.Get() + if v == nil { + return &blockSearchWorkBatch{ + bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch), + } + } + return v.(*blockSearchWorkBatch) +} + +func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) { + bswb.reset() + blockSearchWorkBatchPool.Put(bswb) +} + +var blockSearchWorkBatchPool sync.Pool + +func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool { + bsws := bswb.bsws + + bsws = append(bsws, blockSearchWork{ + p: p, + so: so, + }) + bsw := &bsws[len(bsws)-1] bsw.bh.copyFrom(bh) - return &bsw + + bswb.bsws = bsws + + return len(bsws) < cap(bsws) } func getBlockSearch() *blockSearch { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index c8fe9df1a..2b1ea3ba4 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -162,12 +162,6 @@ func (c *BlockColumn) reset() { c.Values = nil } -// The number of blocks to search at once by a single worker -// -// This number must be increased on systems with many CPU cores in order to amortize -// the overhead for passing the blockSearchWork to worker goroutines. -const blockSearchWorksPerBatch = 64 - // searchResultFunc must process sr. // // The callback is called at the worker with the given workerID. @@ -179,16 +173,19 @@ type searchResultFunc func(workerID uint, br *blockResult) func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { // Spin up workers var wgWorkers sync.WaitGroup - workCh := make(chan []*blockSearchWork, workersCount) + workCh := make(chan *blockSearchWorkBatch, workersCount) wgWorkers.Add(workersCount) for i := 0; i < workersCount; i++ { go func(workerID uint) { bs := getBlockSearch() - for bsws := range workCh { - for _, bsw := range bsws { + for bswb := range workCh { + bsws := bswb.bsws + for i := range bsws { + bsw := &bsws[i] select { case <-stopCh: // The search has been canceled. Just skip all the scheduled work in order to save CPU time. + bsw.reset() continue default: } @@ -197,7 +194,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch if len(bs.br.timestamps) > 0 { processBlockResult(workerID, &bs.br) } + bsw.reset() } + bswb.bsws = bswb.bsws[:0] + putBlockSearchWorkBatch(bswb) } putBlockSearch(bs) wgWorkers.Done() @@ -265,7 +265,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() -func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { +func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { select { case <-stopCh: // Do not spend CPU time on search, since it is already stopped. @@ -352,7 +352,7 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter) return result } -func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { +func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { // Select parts with data for the given time range ddb.partsLock.Lock() pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp) @@ -378,7 +378,7 @@ func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, s } } -func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { bhss := getBlockHeaders() if len(so.tenantIDs) > 0 { p.searchByTenantIDs(so, bhss, workCh, stopCh) @@ -415,27 +415,20 @@ func (bhss *blockHeaders) reset() { bhss.bhs = bhs[:0] } -func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { // it is assumed that tenantIDs are sorted tenantIDs := so.tenantIDs - bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + bswb := getBlockSearchWorkBatch() scheduleBlockSearch := func(bh *blockHeader) bool { - // Do not use pool for blockSearchWork, since it is returned back to the pool - // at another goroutine, which may run on another CPU core. - // This means that it will be put into another per-CPU pool, which may result - // in slowdown related to memory synchronization between CPU cores. - // This slowdown is increased on systems with bigger number of CPU cores. - bsw := newBlockSearchWork(p, so, bh) - bsws = append(bsws, bsw) - if len(bsws) < cap(bsws) { + if bswb.appendBlockSearchWork(p, so, bh) { return true } select { case <-stopCh: return false - case workCh <- bsws: - bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + case workCh <- bswb: + bswb = getBlockSearchWorkBatch() return true } } @@ -520,35 +513,26 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c } // Flush the remaining work - if len(bsws) > 0 { - select { - case <-stopCh: - case workCh <- bsws: - } + select { + case <-stopCh: + case workCh <- bswb: } } -func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { // it is assumed that streamIDs are sorted streamIDs := so.streamIDs - bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + bswb := getBlockSearchWorkBatch() scheduleBlockSearch := func(bh *blockHeader) bool { - // Do not use pool for blockSearchWork, since it is returned back to the pool - // at another goroutine, which may run on another CPU core. - // This means that it will be put into another per-CPU pool, which may result - // in slowdown related to memory synchronization between CPU cores. - // This slowdown is increased on systems with bigger number of CPU cores. - bsw := newBlockSearchWork(p, so, bh) - bsws = append(bsws, bsw) - if len(bsws) < cap(bsws) { + if bswb.appendBlockSearchWork(p, so, bh) { return true } select { case <-stopCh: return false - case workCh <- bsws: - bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + case workCh <- bswb: + bswb = getBlockSearchWorkBatch() return true } } @@ -634,11 +618,9 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c } // Flush the remaining work - if len(bsws) > 0 { - select { - case <-stopCh: - case workCh <- bsws: - } + select { + case <-stopCh: + case workCh <- bswb: } }