lib/logstorage: search in partitions in parallel

This commit is contained in:
Aliaksandr Valialkin 2024-04-24 23:04:29 +02:00
parent e7685164f5
commit 543a6b9cee
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -125,9 +125,9 @@ type searchResultFunc func(workerID uint, br *blockResult)
// It calls processBlockResult for each found matching block. // It calls processBlockResult for each found matching block.
func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) {
// Spin up workers // Spin up workers
var wg sync.WaitGroup var wgWorkers sync.WaitGroup
workCh := make(chan []*blockSearchWork, workersCount) workCh := make(chan []*blockSearchWork, workersCount)
wg.Add(workersCount) wgWorkers.Add(workersCount)
for i := 0; i < workersCount; i++ { for i := 0; i < workersCount; i++ {
go func(workerID uint) { go func(workerID uint) {
bs := getBlockSearch() bs := getBlockSearch()
@ -140,7 +140,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
} }
} }
putBlockSearch(bs) putBlockSearch(bs)
wg.Done() wgWorkers.Done()
}(uint(i)) }(uint(i))
} }
@ -169,19 +169,27 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
var sf *StreamFilter var sf *StreamFilter
sf, f = getCommonStreamFilter(f) sf, f = getCommonStreamFilter(f)
// Apply search to matching partitions // Schedule concurrent search across matching partitions.
var pws []*partWrapper psfs := make([]partitionSearchFinalizer, len(ptws))
for _, ptw := range ptws { var wgSearchers sync.WaitGroup
pws = ptw.pt.search(pws, tf, sf, f, so, workCh, stopCh) for i, ptw := range ptws {
partitionSearchConcurrencyLimitCh <- struct{}{}
wgSearchers.Add(1)
go func(idx int, pt *partition) {
psfs[idx] = pt.search(tf, sf, f, so, workCh, stopCh)
wgSearchers.Done()
<-partitionSearchConcurrencyLimitCh
}(i, ptw.pt)
} }
wgSearchers.Wait()
// Wait until workers finish their work // Wait until workers finish their work
close(workCh) close(workCh)
wg.Wait() wgWorkers.Wait()
// Decrement references to parts, which were incremented inside partition.search(). // Finalize partition search
for _, pw := range pws { for _, psf := range psfs {
pw.decRef() psf()
} }
// Decrement references to partitions // Decrement references to partitions
@ -190,9 +198,21 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
} }
} }
func (pt *partition) search(pwsDst []*partWrapper, tf *timeFilter, sf *StreamFilter, f filter, so *genericSearchOptions, // partitionSearchConcurrencyLimitCh limits the number of concurrent searches in partition.
workCh chan<- []*blockSearchWork, stopCh <-chan struct{}, //
) []*partWrapper { // This is needed for limiting memory usage under high load.
var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
type partitionSearchFinalizer func()
func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer {
select {
case <-stopCh:
// Do not spend CPU time on search, since it is already stopped.
return func() {}
default:
}
tenantIDs := so.tenantIDs tenantIDs := so.tenantIDs
var streamIDs []streamID var streamIDs []streamID
if sf != nil { if sf != nil {
@ -210,7 +230,7 @@ func (pt *partition) search(pwsDst []*partWrapper, tf *timeFilter, sf *StreamFil
filter: f, filter: f,
resultColumnNames: so.resultColumnNames, resultColumnNames: so.resultColumnNames,
} }
return pt.ddb.search(pwsDst, soInternal, workCh, stopCh) return pt.ddb.search(soInternal, workCh, stopCh)
} }
func hasStreamFilters(f filter) bool { func hasStreamFilters(f filter) bool {
@ -270,16 +290,14 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter)
return result return result
} }
func (ddb *datadb) search(pwsDst []*partWrapper, so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) []*partWrapper { func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer {
// Select parts with data for the given time range // Select parts with data for the given time range
ddb.partsLock.Lock() ddb.partsLock.Lock()
pwsDstLen := len(pwsDst) pws := appendPartsInTimeRange(nil, ddb.fileParts, so.minTimestamp, so.maxTimestamp)
pwsDst = appendPartsInTimeRange(pwsDst, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp)
pwsDst = appendPartsInTimeRange(pwsDst, ddb.fileParts, so.minTimestamp, so.maxTimestamp)
pws := pwsDst[pwsDstLen:]
// Increase references to the searched parts, so they aren't deleted during search. // Increase references to the searched parts, so they aren't deleted during search.
// References to the searched parts must be decremented by the caller. // References to the searched parts must be decremented by calling the returned partitionSearchFinalizer.
for _, pw := range pws { for _, pw := range pws {
pw.incRef() pw.incRef()
} }
@ -290,7 +308,11 @@ func (ddb *datadb) search(pwsDst []*partWrapper, so *searchOptions, workCh chan<
pw.p.search(so, workCh, stopCh) pw.p.search(so, workCh, stopCh)
} }
return pwsDst return func() {
for _, pw := range pws {
pw.decRef()
}
}
} }
func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) {
@ -436,7 +458,10 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c
// Flush the remaining work // Flush the remaining work
if len(bsws) > 0 { if len(bsws) > 0 {
workCh <- bsws select {
case <-stopCh:
case workCh <- bsws:
}
} }
} }
@ -547,7 +572,10 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c
// Flush the remaining work // Flush the remaining work
if len(bsws) > 0 { if len(bsws) > 0 {
workCh <- bsws select {
case <-stopCh:
case workCh <- bsws:
}
} }
} }