lib/logstorage: make sure that WaitGroup.Add isnt called after stopCh is closed and WaitGroup.Wait is called

This protects from rare panic, which may occur during graceful shutdown of VictoriaLogs
This commit is contained in:
Aliaksandr Valialkin 2024-01-26 21:16:58 +01:00
parent 9e70d9ab47
commit 230ef43a32
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -65,9 +65,15 @@ type datadb struct {
partsLock sync.Mutex
// wg is used for determining when background workers stop
//
// wg.Add() must be called under partsLock after checking whether stopCh isn't closed.
// This should prevent from calling wg.Add() after stopCh is closed and wg.Wait() is called.
wg sync.WaitGroup
// stopCh is used for notifying background workers to stop
//
// It must be closed under partsLock in order to prevent from calling wg.Add()
// after stopCh is closed.
stopCh chan struct{}
// oldInmemoryPartsFlushersCount is the number of currently running flushers for old in-memory parts
@ -187,6 +193,9 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
//
// This function must be called under partsLock.
func (ddb *datadb) startOldInmemoryPartsFlusherLocked() {
if needStop(ddb.stopCh) {
return
}
maxWorkers := getMergeWorkersCount()
if ddb.oldInmemoryPartsFlushersCount >= maxWorkers {
return
@ -254,6 +263,9 @@ func (ddb *datadb) flushOldInmemoryParts() {
//
// This function must be called under locked partsLock.
func (ddb *datadb) startMergeWorkerLocked() {
if needStop(ddb.stopCh) {
return
}
maxWorkers := getMergeWorkersCount()
if ddb.mergeWorkersCount >= maxWorkers {
return
@ -885,8 +897,14 @@ func needStop(stopCh <-chan struct{}) bool {
// mustCloseDatadb can be called only when nobody accesses ddb.
func mustCloseDatadb(ddb *datadb) {
// Stop background workers
// Notify background workers to stop.
// Make it under ddb.partsLock in order to prevent from calling ddb.wg.Add()
// after ddb.stopCh is closed and ddb.wg.Wait() is called.
ddb.partsLock.Lock()
close(ddb.stopCh)
ddb.partsLock.Unlock()
// Wait for background workers to stop.
ddb.wg.Wait()
// flush in-memory data to disk