diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 9bf0ad9e3..a0be8ff0f 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -125,9 +125,9 @@ type searchResultFunc func(workerID uint, br *blockResult) // It calls processBlockResult for each found matching block. func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { // Spin up workers - var wg sync.WaitGroup + var wgWorkers sync.WaitGroup workCh := make(chan []*blockSearchWork, workersCount) - wg.Add(workersCount) + wgWorkers.Add(workersCount) for i := 0; i < workersCount; i++ { go func(workerID uint) { bs := getBlockSearch() @@ -140,7 +140,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } } putBlockSearch(bs) - wg.Done() + wgWorkers.Done() }(uint(i)) } @@ -169,19 +169,27 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch var sf *StreamFilter sf, f = getCommonStreamFilter(f) - // Apply search to matching partitions - var pws []*partWrapper - for _, ptw := range ptws { - pws = ptw.pt.search(pws, tf, sf, f, so, workCh, stopCh) + // Schedule concurrent search across matching partitions. + psfs := make([]partitionSearchFinalizer, len(ptws)) + var wgSearchers sync.WaitGroup + for i, ptw := range ptws { + partitionSearchConcurrencyLimitCh <- struct{}{} + wgSearchers.Add(1) + go func(idx int, pt *partition) { + psfs[idx] = pt.search(tf, sf, f, so, workCh, stopCh) + wgSearchers.Done() + <-partitionSearchConcurrencyLimitCh + }(i, ptw.pt) } + wgSearchers.Wait() // Wait until workers finish their work close(workCh) - wg.Wait() + wgWorkers.Wait() - // Decrement references to parts, which were incremented inside partition.search(). - for _, pw := range pws { - pw.decRef() + // Finalize partition search + for _, psf := range psfs { + psf() } // Decrement references to partitions @@ -190,9 +198,21 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } } -func (pt *partition) search(pwsDst []*partWrapper, tf *timeFilter, sf *StreamFilter, f filter, so *genericSearchOptions, - workCh chan<- []*blockSearchWork, stopCh <-chan struct{}, -) []*partWrapper { +// partitionSearchConcurrencyLimitCh limits the number of concurrent searches in partition. +// +// This is needed for limiting memory usage under high load. +var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) + +type partitionSearchFinalizer func() + +func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { + select { + case <-stopCh: + // Do not spend CPU time on search, since it is already stopped. + return func() {} + default: + } + tenantIDs := so.tenantIDs var streamIDs []streamID if sf != nil { @@ -210,7 +230,7 @@ func (pt *partition) search(pwsDst []*partWrapper, tf *timeFilter, sf *StreamFil filter: f, resultColumnNames: so.resultColumnNames, } - return pt.ddb.search(pwsDst, soInternal, workCh, stopCh) + return pt.ddb.search(soInternal, workCh, stopCh) } func hasStreamFilters(f filter) bool { @@ -270,16 +290,14 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter) return result } -func (ddb *datadb) search(pwsDst []*partWrapper, so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) []*partWrapper { +func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { // Select parts with data for the given time range ddb.partsLock.Lock() - pwsDstLen := len(pwsDst) - pwsDst = appendPartsInTimeRange(pwsDst, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) - pwsDst = appendPartsInTimeRange(pwsDst, ddb.fileParts, so.minTimestamp, so.maxTimestamp) - pws := pwsDst[pwsDstLen:] + pws := appendPartsInTimeRange(nil, ddb.fileParts, so.minTimestamp, so.maxTimestamp) + pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) // Increase references to the searched parts, so they aren't deleted during search. - // References to the searched parts must be decremented by the caller. + // References to the searched parts must be decremented by calling the returned partitionSearchFinalizer. for _, pw := range pws { pw.incRef() } @@ -290,7 +308,11 @@ func (ddb *datadb) search(pwsDst []*partWrapper, so *searchOptions, workCh chan< pw.p.search(so, workCh, stopCh) } - return pwsDst + return func() { + for _, pw := range pws { + pw.decRef() + } + } } func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { @@ -436,7 +458,10 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c // Flush the remaining work if len(bsws) > 0 { - workCh <- bsws + select { + case <-stopCh: + case workCh <- bsws: + } } } @@ -547,7 +572,10 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c // Flush the remaining work if len(bsws) > 0 { - workCh <- bsws + select { + case <-stopCh: + case workCh <- bsws: + } } }