lib/storage: reuse sync.WaitGroup objects

This reduces GC load by up to 10% according to memory profiling
This commit is contained in:
Aliaksandr Valialkin 2022-04-06 13:34:00 +03:00
parent 077193d87c
commit 50cf74ce4b
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 26 additions and 6 deletions

View file

@ -724,7 +724,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time
return is.searchTagKeys(tks, maxTagKeys)
}
var mu sync.Mutex
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
@ -752,6 +752,7 @@ func (is *indexSearch) searchTagKeysOnTimeRange(tks map[string]struct{}, tr Time
}(date)
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -926,7 +927,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe
return is.searchTagValues(tvs, tagKey, maxTagValues)
}
var mu sync.Mutex
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
for date := minDate; date <= maxDate; date++ {
wg.Add(1)
@ -954,6 +955,7 @@ func (is *indexSearch) searchTagValuesOnTimeRange(tvs map[string]struct{}, tagKe
}(date)
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -1141,7 +1143,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
}
// Query over multiple days in parallel.
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
var mu sync.Mutex // protects tvss + errGlobal from concurrent access below.
for minDate <= maxDate {
@ -1171,6 +1173,7 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
minDate++
}
wg.Wait()
putWaitGroup(wg)
return errGlobal
}
@ -2446,7 +2449,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
}
// Slower path - search for metricIDs for each day in parallel.
var wg sync.WaitGroup
wg := getWaitGroup()
var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
for minDate <= maxDate {
@ -2473,6 +2476,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
minDate++
}
wg.Wait()
putWaitGroup(wg)
if errGlobal != nil {
return errGlobal
}

View file

@ -481,7 +481,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
func (pt *partition) flushRowsToParts(rows []rawRow) {
maxRows := getMaxRawRowsPerShard()
var wg sync.WaitGroup
wg := getWaitGroup()
for len(rows) > 0 {
n := maxRows
if n > len(rows) {
@ -495,8 +495,23 @@ func (pt *partition) flushRowsToParts(rows []rawRow) {
rows = rows[n:]
}
wg.Wait()
putWaitGroup(wg)
}
func getWaitGroup() *sync.WaitGroup {
v := wgPool.Get()
if v == nil {
return &sync.WaitGroup{}
}
return v.(*sync.WaitGroup)
}
func putWaitGroup(wg *sync.WaitGroup) {
wgPool.Put(wg)
}
var wgPool sync.Pool
func (pt *partition) addRowsPart(rows []rawRow) {
if len(rows) == 0 {
return

View file

@ -166,10 +166,11 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1447 for details.
if fs.IsPathExist(s.cachePath + "/reset_cache_on_startup") {
logger.Infof("removing cache directory at %q, since it contains `reset_cache_on_startup` file...", s.cachePath)
var wg sync.WaitGroup
wg := getWaitGroup()
wg.Add(1)
fs.MustRemoveAllWithDoneCallback(s.cachePath, wg.Done)
wg.Wait()
putWaitGroup(wg)
logger.Infof("cache directory at %q has been successfully removed", s.cachePath)
}