From 51f5ac1929224b648c0fa3000128759a8bd7360b Mon Sep 17 00:00:00 2001 From: Zakhar Bessarab Date: Tue, 26 Mar 2024 16:49:09 +0400 Subject: [PATCH] lib/storage/table: wait for merges to be completed when closing a table (#5965) * lib/storage/table: properly wait for force merges to be completed during shutdown Properly keep track of running background merges and wait for merges completion when closing the table. Previously, force merge was not in sync with overall storage shutdown which could lead to holding ptw ref. Signed-off-by: Zakhar Bessarab * docs: add changelog entry Signed-off-by: Zakhar Bessarab --------- Signed-off-by: Zakhar Bessarab --- docs/CHANGELOG.md | 1 + lib/storage/partition.go | 6 +++--- lib/storage/table.go | 6 +++++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e22b05b96..90d8a49d2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -67,6 +67,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmselect](https://docs.victoriametrics.com/): make vmselect resilient to absence of cache folder. If cache folder was mistakenly deleted by user or OS, vmselect will try re-creating it first. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5985). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): limit duration of requests to /api/v1/labels, /api/v1/label/.../values or /api/v1/series with `-search.maxLabelsAPIDuration` duration. Before, `-search.maxExportDuration` value was used by mistake. Thanks to @kbweave for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5992). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fixed response body and headers for AWS Firehose HTTP Destination. +* BUGFIX: properly wait for force merge to be completed during the shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5944) for the details. ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index e1f0c9c34..d098081e4 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1182,7 +1182,7 @@ func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{ } // ForceMergeAllParts runs merge for all the parts in pt. -func (pt *partition) ForceMergeAllParts() error { +func (pt *partition) ForceMergeAllParts(stop chan struct{}) error { pws := pt.getAllPartsForMerge() if len(pws) == 0 { // Nothing to merge. @@ -1202,7 +1202,7 @@ func (pt *partition) ForceMergeAllParts() error { // If len(pws) == 1, then the merge must run anyway. // This allows applying the configured retention, removing the deleted series // and performing de-duplication if needed. - if err := pt.mergePartsToFiles(pws, pt.stopCh, bigPartsConcurrencyCh); err != nil { + if err := pt.mergePartsToFiles(pws, stop, bigPartsConcurrencyCh); err != nil { return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) } @@ -1316,7 +1316,7 @@ func (pt *partition) runFinalDedup() error { t := time.Now() logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms", pt.bigPartsPath, requiredDedupInterval, actualDedupInterval) - if err := pt.ForceMergeAllParts(); err != nil { + if err := pt.ForceMergeAllParts(pt.stopCh); err != nil { return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.bigPartsPath, err) } logger.Infof("final dedup for partition %s has been finished in %.3f seconds", pt.bigPartsPath, time.Since(t).Seconds()) diff --git a/lib/storage/table.go b/lib/storage/table.go index 2fa5979b2..7f1234df4 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -29,6 +29,7 @@ type table struct { retentionWatcherWG sync.WaitGroup finalDedupWatcherWG sync.WaitGroup + forceMergesWG sync.WaitGroup } // partitionWrapper provides refcounting mechanism for the partition. @@ -168,6 +169,7 @@ func (tb *table) MustClose() { close(tb.stop) tb.retentionWatcherWG.Wait() tb.finalDedupWatcherWG.Wait() + tb.forceMergesWG.Wait() tb.ptwsLock.Lock() ptws := tb.ptws @@ -242,13 +244,15 @@ func (tb *table) UpdateMetrics(m *TableMetrics) { func (tb *table) ForceMergePartitions(partitionNamePrefix string) error { ptws := tb.GetPartitions(nil) defer tb.PutPartitions(ptws) + tb.forceMergesWG.Add(1) + defer tb.forceMergesWG.Done() for _, ptw := range ptws { if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) { continue } logger.Infof("starting forced merge for partition %q", ptw.pt.name) startTime := time.Now() - if err := ptw.pt.ForceMergeAllParts(); err != nil { + if err := ptw.pt.ForceMergeAllParts(tb.stop); err != nil { return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err) } logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds())