From c3a72b6cdb42e39c3374067de9df947203b4fad4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 2 Apr 2024 21:24:57 +0300 Subject: [PATCH] lib/storage: consistently use stopCh instead of stop --- lib/storage/partition.go | 8 ++++---- lib/storage/storage.go | 14 +++++++------- lib/storage/table.go | 24 +++++++++++++----------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 5ab20be64..704fa8e93 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1201,7 +1201,7 @@ func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{ } // ForceMergeAllParts runs merge for all the parts in pt. -func (pt *partition) ForceMergeAllParts(stop chan struct{}) error { +func (pt *partition) ForceMergeAllParts(stopCh <-chan struct{}) error { pws := pt.getAllPartsForMerge() if len(pws) == 0 { // Nothing to merge. @@ -1221,7 +1221,7 @@ func (pt *partition) ForceMergeAllParts(stop chan struct{}) error { // If len(pws) == 1, then the merge must run anyway. // This allows applying the configured retention, removing the deleted series // and performing de-duplication if needed. - if err := pt.mergePartsToFiles(pws, stop, bigPartsConcurrencyCh); err != nil { + if err := pt.mergePartsToFiles(pws, stopCh, bigPartsConcurrencyCh); err != nil { return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) } @@ -1330,10 +1330,10 @@ func (pt *partition) releasePartsToMerge(pws []*partWrapper) { pt.partsLock.Unlock() } -func (pt *partition) runFinalDedup() error { +func (pt *partition) runFinalDedup(stopCh <-chan struct{}) error { t := time.Now() logger.Infof("start removing duplicate samples from partition (%s, %s)", pt.bigPartsPath, pt.smallPartsPath) - if err := pt.ForceMergeAllParts(pt.stopCh); err != nil { + if err := pt.ForceMergeAllParts(stopCh); err != nil { return fmt.Errorf("cannot remove duplicate samples from partition (%s, %s): %w", pt.bigPartsPath, pt.smallPartsPath, err) } logger.Infof("duplicate samples have been removed from partition (%s, %s) in %.3f seconds", pt.bigPartsPath, pt.smallPartsPath, time.Since(t).Seconds()) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index be09f9db5..dd1a0a598 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -124,7 +124,7 @@ type Storage struct { // prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series. prefetchedMetricIDsDeadline atomic.Uint64 - stop chan struct{} + stopCh chan struct{} currHourMetricIDsUpdaterWG sync.WaitGroup nextDayMetricIDsUpdaterWG sync.WaitGroup @@ -173,7 +173,7 @@ func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxD path: path, cachePath: filepath.Join(path, cacheDirname), retentionMsecs: retention.Milliseconds(), - stop: make(chan struct{}), + stopCh: make(chan struct{}), } fs.MustMkdirIfNotExist(path) @@ -693,7 +693,7 @@ func (s *Storage) startFreeDiskSpaceWatcher() { defer ticker.Stop() for { select { - case <-s.stop: + case <-s.stopCh: return case <-ticker.C: f() @@ -724,7 +724,7 @@ func (s *Storage) retentionWatcher() { for { d := s.nextRetentionSeconds() select { - case <-s.stop: + case <-s.stopCh: return case currentTime := <-time.After(time.Second * time.Duration(d)): s.mustRotateIndexDB(currentTime) @@ -754,7 +754,7 @@ func (s *Storage) currHourMetricIDsUpdater() { defer ticker.Stop() for { select { - case <-s.stop: + case <-s.stopCh: hour := fasttime.UnixHour() s.updateCurrHourMetricIDs(hour) return @@ -771,7 +771,7 @@ func (s *Storage) nextDayMetricIDsUpdater() { defer ticker.Stop() for { select { - case <-s.stop: + case <-s.stopCh: date := fasttime.UnixDate() s.updateNextDayMetricIDs(date) return @@ -856,7 +856,7 @@ func (s *Storage) resetAndSaveTSIDCache() { // // It is expected that the s is no longer used during the close. func (s *Storage) MustClose() { - close(s.stop) + close(s.stopCh) s.freeDiskSpaceWatcherWG.Wait() s.retentionWatcherWG.Wait() diff --git a/lib/storage/table.go b/lib/storage/table.go index 0d64fad8d..bbb48405d 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -25,11 +25,11 @@ type table struct { ptws []*partitionWrapper ptwsLock sync.Mutex - stop chan struct{} + stopCh chan struct{} retentionWatcherWG sync.WaitGroup finalDedupWatcherWG sync.WaitGroup - forceMergesWG sync.WaitGroup + forceMergeWG sync.WaitGroup } // partitionWrapper provides refcounting mechanism for the partition. @@ -108,7 +108,7 @@ func mustOpenTable(path string, s *Storage) *table { bigPartitionsPath: bigPartitionsPath, s: s, - stop: make(chan struct{}), + stopCh: make(chan struct{}), } for _, pt := range pts { tb.addPartitionNolock(pt) @@ -166,10 +166,10 @@ func (tb *table) addPartitionNolock(pt *partition) { // MustClose closes the table. // It is expected that all the pending searches on the table are finished before calling MustClose. func (tb *table) MustClose() { - close(tb.stop) + close(tb.stopCh) tb.retentionWatcherWG.Wait() tb.finalDedupWatcherWG.Wait() - tb.forceMergesWG.Wait() + tb.forceMergeWG.Wait() tb.ptwsLock.Lock() ptws := tb.ptws @@ -244,15 +244,17 @@ func (tb *table) UpdateMetrics(m *TableMetrics) { func (tb *table) ForceMergePartitions(partitionNamePrefix string) error { ptws := tb.GetPartitions(nil) defer tb.PutPartitions(ptws) - tb.forceMergesWG.Add(1) - defer tb.forceMergesWG.Done() + + tb.forceMergeWG.Add(1) + defer tb.forceMergeWG.Done() + for _, ptw := range ptws { if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) { continue } logger.Infof("starting forced merge for partition %q", ptw.pt.name) startTime := time.Now() - if err := ptw.pt.ForceMergeAllParts(tb.stop); err != nil { + if err := ptw.pt.ForceMergeAllParts(tb.stopCh); err != nil { return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err) } logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds()) @@ -390,7 +392,7 @@ func (tb *table) retentionWatcher() { defer ticker.Stop() for { select { - case <-tb.stop: + case <-tb.stopCh: return case <-ticker.C: } @@ -457,7 +459,7 @@ func (tb *table) finalDedupWatcher() { ptwsToDedup = append(ptwsToDedup, ptw) } for _, ptw := range ptwsToDedup { - if err := ptw.pt.runFinalDedup(); err != nil { + if err := ptw.pt.runFinalDedup(tb.stopCh); err != nil { logger.Errorf("cannot run final dedup for partition %s: %s", ptw.pt.name, err) } ptw.pt.isDedupScheduled.Store(false) @@ -468,7 +470,7 @@ func (tb *table) finalDedupWatcher() { defer t.Stop() for { select { - case <-tb.stop: + case <-tb.stopCh: return case <-t.C: f()