lib/storage: reuse sync.WaitGroup objects

This reduces GC load by up to 10% according to memory profiling
This commit is contained in:
Aliaksandr Valialkin 2022-04-06 13:34:00 +03:00
parent f526c7814e
commit 123a88bb65
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 26 additions and 6 deletions

View file

@ -746,7 +746,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time
return is.searchTagKeys(tks, maxTagKeys)
}
var mu sync.Mutex
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
@ -774,6 +774,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time
}(date)
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -948,7 +949,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe
return is.searchTagValues(tvs, tagKey, maxTagValues)
}
var mu sync.Mutex
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
@ -976,6 +977,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe
}(date)
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -1164,7 +1166,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
}
// Query over multiple days in parallel.
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
var mu sync.Mutex // protects tvss + errGlobal from concurrent access below.
for minDate <= maxDate {
@ -1194,6 +1196,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
minDate++
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -2471,7 +2474,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
}
// Slower path - search for metricIDs for each day in parallel.
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
for minDate <= maxDate {
@ -2498,6 +2501,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
minDate++
}
wg.Wait()
putWaitGroup(wg)
if errGlobal != nil {
return errGlobal
}

View file

@ -481,7 +481,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
func (pt *partition) flushRowsToParts(rows []rawRow) {
maxRows := getMaxRawRowsPerShard()
var wg sync.WaitGroup
wg := getWaitGroup()
for len(rows) > 0 {
n := maxRows
if n > len(rows) {
@ -495,8 +495,23 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
rows = rows[n:]
}
wg.Wait()
putWaitGroup(wg)
}
func getWaitGroup() *sync.WaitGroup {
v := wgPool.Get()
if v == nil {
return &sync.WaitGroup{}
}
return v.(*sync.WaitGroup)
}
func putWaitGroup(wg *sync.WaitGroup) {
wgPool.Put(wg)
}
var wgPool sync.Pool
func (pt *partition) addRowsPart(rows []rawRow) {
if len(rows) == 0 {
return

View file

@ -177,10 +177,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details.
if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") {
logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath)
var wg sync.WaitGroup
wg := getWaitGroup()
wg.Add(1)
fs.MustRemoveAllWithDoneCallback(s.cachePath, wg.Done)
wg.Wait()
putWaitGroup(wg)
logger.Infof("cache directory at %q has been successfully removed", s.cachePath)
}