mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage/table: wait for merges to be completed when closing a table (#5965)
* lib/storage/table: properly wait for force merges to be completed during shutdown Properly keep track of running background merges and wait for merges completion when closing the table. Previously, force merge was not in sync with overall storage shutdown which could lead to holding ptw ref. Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> * docs: add changelog entry Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com> --------- Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
parent
9c92cc2759
commit
7c1ee69205
3 changed files with 9 additions and 4 deletions
|
@ -77,6 +77,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details.
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly account for `-rule.resendDelay` for alerting rules that are constantly switching state from inactive to firing. Before, notifications for such rules could have been skipped if state change happened more often than `-rule.resendDelay`. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6028) for details.
|
||||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): respect `-remoteWrite.maxBatchSize` at shutdown period. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025). Thanks to @jiekun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6039).
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): respect `-remoteWrite.maxBatchSize` at shutdown period. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6025). Thanks to @jiekun for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6039).
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fixed response body and headers for AWS Firehose HTTP Destination.
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fixed response body and headers for AWS Firehose HTTP Destination.
|
||||||
|
* BUGFIX: properly wait for force merge to be completed during the shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5944) for the details.
|
||||||
|
|
||||||
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)
|
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)
|
||||||
|
|
||||||
|
|
|
@ -1182,7 +1182,7 @@ func (pt *partition) mergePartsToFiles(pws []*partWrapper, stopCh <-chan struct{
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForceMergeAllParts runs merge for all the parts in pt.
|
// ForceMergeAllParts runs merge for all the parts in pt.
|
||||||
func (pt *partition) ForceMergeAllParts() error {
|
func (pt *partition) ForceMergeAllParts(stop chan struct{}) error {
|
||||||
pws := pt.getAllPartsForMerge()
|
pws := pt.getAllPartsForMerge()
|
||||||
if len(pws) == 0 {
|
if len(pws) == 0 {
|
||||||
// Nothing to merge.
|
// Nothing to merge.
|
||||||
|
@ -1202,7 +1202,7 @@ func (pt *partition) ForceMergeAllParts() error {
|
||||||
// If len(pws) == 1, then the merge must run anyway.
|
// If len(pws) == 1, then the merge must run anyway.
|
||||||
// This allows applying the configured retention, removing the deleted series
|
// This allows applying the configured retention, removing the deleted series
|
||||||
// and performing de-duplication if needed.
|
// and performing de-duplication if needed.
|
||||||
if err := pt.mergePartsToFiles(pws, pt.stopCh, bigPartsConcurrencyCh); err != nil {
|
if err := pt.mergePartsToFiles(pws, stop, bigPartsConcurrencyCh); err != nil {
|
||||||
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
|
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1316,7 +1316,7 @@ func (pt *partition) runFinalDedup() error {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms",
|
logger.Infof("starting final dedup for partition %s using requiredDedupInterval=%d ms, since the partition has smaller actualDedupInterval=%d ms",
|
||||||
pt.bigPartsPath, requiredDedupInterval, actualDedupInterval)
|
pt.bigPartsPath, requiredDedupInterval, actualDedupInterval)
|
||||||
if err := pt.ForceMergeAllParts(); err != nil {
|
if err := pt.ForceMergeAllParts(pt.stopCh); err != nil {
|
||||||
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.bigPartsPath, err)
|
return fmt.Errorf("cannot perform final dedup for partition %s: %w", pt.bigPartsPath, err)
|
||||||
}
|
}
|
||||||
logger.Infof("final dedup for partition %s has been finished in %.3f seconds", pt.bigPartsPath, time.Since(t).Seconds())
|
logger.Infof("final dedup for partition %s has been finished in %.3f seconds", pt.bigPartsPath, time.Since(t).Seconds())
|
||||||
|
|
|
@ -29,6 +29,7 @@ type table struct {
|
||||||
|
|
||||||
retentionWatcherWG sync.WaitGroup
|
retentionWatcherWG sync.WaitGroup
|
||||||
finalDedupWatcherWG sync.WaitGroup
|
finalDedupWatcherWG sync.WaitGroup
|
||||||
|
forceMergesWG sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// partitionWrapper provides refcounting mechanism for the partition.
|
// partitionWrapper provides refcounting mechanism for the partition.
|
||||||
|
@ -168,6 +169,7 @@ func (tb *table) MustClose() {
|
||||||
close(tb.stop)
|
close(tb.stop)
|
||||||
tb.retentionWatcherWG.Wait()
|
tb.retentionWatcherWG.Wait()
|
||||||
tb.finalDedupWatcherWG.Wait()
|
tb.finalDedupWatcherWG.Wait()
|
||||||
|
tb.forceMergesWG.Wait()
|
||||||
|
|
||||||
tb.ptwsLock.Lock()
|
tb.ptwsLock.Lock()
|
||||||
ptws := tb.ptws
|
ptws := tb.ptws
|
||||||
|
@ -242,13 +244,15 @@ func (tb *table) UpdateMetrics(m *TableMetrics) {
|
||||||
func (tb *table) ForceMergePartitions(partitionNamePrefix string) error {
|
func (tb *table) ForceMergePartitions(partitionNamePrefix string) error {
|
||||||
ptws := tb.GetPartitions(nil)
|
ptws := tb.GetPartitions(nil)
|
||||||
defer tb.PutPartitions(ptws)
|
defer tb.PutPartitions(ptws)
|
||||||
|
tb.forceMergesWG.Add(1)
|
||||||
|
defer tb.forceMergesWG.Done()
|
||||||
for _, ptw := range ptws {
|
for _, ptw := range ptws {
|
||||||
if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) {
|
if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
logger.Infof("starting forced merge for partition %q", ptw.pt.name)
|
logger.Infof("starting forced merge for partition %q", ptw.pt.name)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
if err := ptw.pt.ForceMergeAllParts(); err != nil {
|
if err := ptw.pt.ForceMergeAllParts(tb.stop); err != nil {
|
||||||
return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err)
|
return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err)
|
||||||
}
|
}
|
||||||
logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds())
|
logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds())
|
||||||
|
|
Loading…
Reference in a new issue