diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 69059e6f5..9bf0ad9e3 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -122,7 +122,7 @@ type searchResultFunc func(workerID uint, br *blockResult) // search searches for the matching rows according to so. // -// It calls f for each found matching block. +// It calls processBlockResult for each found matching block. func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { // Spin up workers var wg sync.WaitGroup @@ -179,7 +179,7 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch close(workCh) wg.Wait() - // Decrement references to parts + // Decrement references to parts, which were incremented inside partition.search(). for _, pw := range pws { pw.decRef() } @@ -277,6 +277,9 @@ func (ddb *datadb) search(pwsDst []*partWrapper, so *searchOptions, workCh chan< pwsDst = appendPartsInTimeRange(pwsDst, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) pwsDst = appendPartsInTimeRange(pwsDst, ddb.fileParts, so.minTimestamp, so.maxTimestamp) pws := pwsDst[pwsDstLen:] + + // Increase references to the searched parts, so they aren't deleted during search. + // References to the searched parts must be decremented by the caller. for _, pw := range pws { pw.incRef() } @@ -584,7 +587,7 @@ func getCommonTimeFilter(f filter) (*timeFilter, filter) { for _, filter := range t.filters { tf, ok := filter.(*timeFilter) if ok { - // The tf must remain in af in order to properly filter out rows outside the selected time range + // The tf must remain in t.filters order to properly filter out rows outside the selected time range return tf, f } }