lib/storage: remove stale parts as soon as they go outside the configured retention

Previously such parts could remain undeleted for long durations until they are merged with other parts.
This should help for `-retentionPeriod` values smaller than one month.
This commit is contained in:
Aliaksandr Valialkin 2020-12-22 19:48:27 +02:00
parent 675056c71e
commit fa3bcf220f
2 changed files with 68 additions and 1 deletions

View file

@ -2,6 +2,8 @@
# tip
* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month.
# [v1.50.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.50.2)

View file

@ -167,6 +167,7 @@ type partition struct {
bigPartsMergerWG sync.WaitGroup
rawRowsFlusherWG sync.WaitGroup
inmemoryPartsFlusherWG sync.WaitGroup
stalePartsRemoverWG sync.WaitGroup
}
// partWrapper is a wrapper for the part.
@ -278,6 +279,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
pt.startMergeWorkers()
pt.startRawRowsFlusher()
pt.startInmemoryPartsFlusher()
pt.startStalePartsRemover()
return pt, nil
}
@ -641,8 +643,13 @@ func (pt *partition) PutParts(pws []*partWrapper) {
func (pt *partition) MustClose() {
close(pt.stopCh)
logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath)
logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath)
startTime := time.Now()
pt.stalePartsRemoverWG.Wait()
logger.Infof("stale parts remover stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath)
startTime = time.Now()
pt.inmemoryPartsFlusherWG.Wait()
logger.Infof("inmemory parts flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath)
@ -1289,6 +1296,64 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
return dst, removedParts
}
func (pt *partition) startStalePartsRemover() {
pt.stalePartsRemoverWG.Add(1)
go func() {
pt.stalePartsRemover()
pt.stalePartsRemoverWG.Done()
}()
}
func (pt *partition) stalePartsRemover() {
ticker := time.NewTicker(7 * time.Minute)
defer ticker.Stop()
for {
select {
case <-pt.stopCh:
return
case <-ticker.C:
pt.removeStaleParts()
}
}
}
func (pt *partition) removeStaleParts() {
m := make(map[*partWrapper]bool)
startTime := time.Now()
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
pt.partsLock.Lock()
for _, pw := range pt.bigParts {
if pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount)
m[pw] = true
}
}
for _, pw := range pt.smallParts {
if pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount)
m[pw] = true
}
}
removedSmallParts := 0
removedBigParts := 0
if len(m) > 0 {
pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m, false)
pt.bigParts, removedBigParts = removeParts(pt.bigParts, m, true)
}
pt.partsLock.Unlock()
if removedSmallParts+removedBigParts != len(m) {
logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedSmallParts+removedBigParts, len(m))
}
// Remove partition references from removed parts, so they are eventually deleted when nobody reads from them.
for pw := range m {
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000)
pw.decRef()
}
}
// getPartsToMerge returns optimal parts to merge from pws.
//
// The returned parts will contain less than maxRows rows.