diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e22b05b96..90d8a49d2 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -67,6 +67,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [vmselect](https://docs.victoriametrics.com/): make vmselect resilient to absence of cache folder. If cache folder was mistakenly deleted by user or OS, vmselect will try re-creating it first. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5985). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): limit duration of requests to /api/v1/labels, /api/v1/label/.../values or /api/v1/series with `-search.maxLabelsAPIDuration` duration. Before, `-search.maxExportDuration` value was used by mistake. Thanks to @kbweave for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5992). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fixed response body and headers for AWS Firehose HTTP Destination. +* BUGFIX: properly wait for force merge to be completed during the shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5944) for the details. ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index e1f0c9c34..d098081e4 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1182,7 +1182,7 @@ func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{ } // ForceMergeAllParts runs merge for all the parts in pt. -func (pt *partition) ForceMergeAllParts() error { +func (pt *partition) ForceMergeAllParts(stop chan struct{}) error { pws := pt.getAllPartsForMerge() if len(pws) == 0 { // Nothing to merge. @@ -1202,7 +1202,7 @@ func (pt *partition) ForceMergeAllParts() error { // If len(pws) == 1, then the merge must run anyway. // This allows applying the configured retention, removing the deleted series // and performing de-duplication if needed. - if err := pt.mergePartsToFiles(pws, pt.stopCh, bigPartsConcurrencyCh); err != nil { + if err := pt.mergePartsToFiles(pws, stop, bigPartsConcurrencyCh); err != nil { return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) } @@ -1316,7 +1316,7 @@ func (pt *partition) runFinalDedup() error { t := time.Now() logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms", pt.bigPartsPath, requiredDedupInterval, actualDedupInterval) - if err := pt.ForceMergeAllParts(); err != nil { + if err := pt.ForceMergeAllParts(pt.stopCh); err != nil { return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.bigPartsPath, err) } logger.Infof("final dedup for partition %s has been finished in %.3f seconds", pt.bigPartsPath, time.Since(t).Seconds()) diff --git a/lib/storage/table.go b/lib/storage/table.go index 2fa5979b2..7f1234df4 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -29,6 +29,7 @@ type table struct { retentionWatcherWG sync.WaitGroup finalDedupWatcherWG sync.WaitGroup + forceMergesWG sync.WaitGroup } // partitionWrapper provides refcounting mechanism for the partition. @@ -168,6 +169,7 @@ func (tb *table) MustClose() { close(tb.stop) tb.retentionWatcherWG.Wait() tb.finalDedupWatcherWG.Wait() + tb.forceMergesWG.Wait() tb.ptwsLock.Lock() ptws := tb.ptws @@ -242,13 +244,15 @@ func (tb *table) UpdateMetrics(m *TableMetrics) { func (tb *table) ForceMergePartitions(partitionNamePrefix string) error { ptws := tb.GetPartitions(nil) defer tb.PutPartitions(ptws) + tb.forceMergesWG.Add(1) + defer tb.forceMergesWG.Done() for _, ptw := range ptws { if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) { continue } logger.Infof("starting forced merge for partition %q", ptw.pt.name) startTime := time.Now() - if err := ptw.pt.ForceMergeAllParts(); err != nil { + if err := ptw.pt.ForceMergeAllParts(tb.stop); err != nil { return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err) } logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds())