From fa3bcf220fa287c54a3d8f2e348bd768f0fc5ebb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 22 Dec 2020 19:48:27 +0200 Subject: [PATCH] lib/storage: remove stale parts as soon as they go outside the configured retention Previously such parts could remain undeleted for long durations until they are merged with other parts. This should help for `-retentionPeriod` values smaller than one month. --- docs/CHANGELOG.md | 2 ++ lib/storage/partition.go | 67 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f3d339a83..bb4677cc3 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,8 @@ # tip +* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. + # [v1.50.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.50.2) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 4369ec116..c049c0812 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -167,6 +167,7 @@ type partition struct { bigPartsMergerWG sync.WaitGroup rawRowsFlusherWG sync.WaitGroup inmemoryPartsFlusherWG sync.WaitGroup + stalePartsRemoverWG sync.WaitGroup } // partWrapper is a wrapper for the part. @@ -278,6 +279,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func pt.startMergeWorkers() pt.startRawRowsFlusher() pt.startInmemoryPartsFlusher() + pt.startStalePartsRemover() return pt, nil } @@ -641,8 +643,13 @@ func (pt *partition) PutParts(pws []*partWrapper) { func (pt *partition) MustClose() { close(pt.stopCh) - logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath) + logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath) startTime := time.Now() + pt.stalePartsRemoverWG.Wait() + logger.Infof("stale parts remover stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) + + logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath) + startTime = time.Now() pt.inmemoryPartsFlusherWG.Wait() logger.Infof("inmemory parts flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) @@ -1289,6 +1296,64 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig return dst, removedParts } +func (pt *partition) startStalePartsRemover() { + pt.stalePartsRemoverWG.Add(1) + go func() { + pt.stalePartsRemover() + pt.stalePartsRemoverWG.Done() + }() +} + +func (pt *partition) stalePartsRemover() { + ticker := time.NewTicker(7 * time.Minute) + defer ticker.Stop() + for { + select { + case <-pt.stopCh: + return + case <-ticker.C: + pt.removeStaleParts() + } + } +} + +func (pt *partition) removeStaleParts() { + m := make(map[*partWrapper]bool) + startTime := time.Now() + retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs + + pt.partsLock.Lock() + for _, pw := range pt.bigParts { + if pw.p.ph.MaxTimestamp < retentionDeadline { + atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount) + m[pw] = true + } + } + for _, pw := range pt.smallParts { + if pw.p.ph.MaxTimestamp < retentionDeadline { + atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount) + m[pw] = true + } + } + removedSmallParts := 0 + removedBigParts := 0 + if len(m) > 0 { + pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m, false) + pt.bigParts, removedBigParts = removeParts(pt.bigParts, m, true) + } + pt.partsLock.Unlock() + + if removedSmallParts+removedBigParts != len(m) { + logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedSmallParts+removedBigParts, len(m)) + } + + // Remove partition references from removed parts, so they are eventually deleted when nobody reads from them. + for pw := range m { + logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) + pw.decRef() + } +} + // getPartsToMerge returns optimal parts to merge from pws. // // The returned parts will contain less than maxRows rows.