From 51f2e473f5459a07f5af2e82f79744c890e8030e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 24 Oct 2022 02:52:38 +0300 Subject: [PATCH] lib/storage: skip blocks outside the configured retention during search Blocks outside the configured retention are eventually deleted during background merge. But such blocks may reside in the storage for long time until background merge. Previously VictoriaMetrics could spend additional CPU time on processing such blocks during search queries. Now these blocks are skipped. --- lib/fs/fs.go | 5 +++-- lib/storage/block_stream_merger.go | 2 +- lib/storage/merge.go | 2 +- lib/storage/search.go | 10 ++++++++++ 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 10b9e0cc9..925f16012 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -316,7 +316,7 @@ func CopyDirectory(srcPath, dstPath string) error { } src := filepath.Join(srcPath, de.Name()) dst := filepath.Join(dstPath, de.Name()) - if err := copyFile(src, dst); err != nil { + if err := CopyFile(src, dst); err != nil { return err } } @@ -324,7 +324,8 @@ func CopyDirectory(srcPath, dstPath string) error { return nil } -func copyFile(srcPath, dstPath string) error { +// CopyFile copies the file from srcPath to dstPath. +func CopyFile(srcPath, dstPath string) error { src, err := os.Open(srcPath) if err != nil { return err diff --git a/lib/storage/block_stream_merger.go b/lib/storage/block_stream_merger.go index cc5935aaa..77992295e 100644 --- a/lib/storage/block_stream_merger.go +++ b/lib/storage/block_stream_merger.go @@ -61,7 +61,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline bsm.nextBlockNoop = true } -func (bsm *blockStreamMerger) getRetentionDeadline(b *Block) int64 { +func (bsm *blockStreamMerger) getRetentionDeadline(bh *blockHeader) int64 { return bsm.retentionDeadline } diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 347f2c792..7207fd213 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -57,7 +57,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) continue } - retentionDeadline := bsm.getRetentionDeadline(b) + retentionDeadline := bsm.getRetentionDeadline(&b.bh) if b.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) diff --git a/lib/storage/search.go b/lib/storage/search.go index 64b0a7cfb..1a3cdefe6 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -132,6 +132,9 @@ type Search struct { // idb is used for MetricName lookup for the found data blocks. idb *indexDB + // retentionDeadline is used for filtering out blocks outside the configured retention. + retentionDeadline int64 + ts tableSearch // tr contains time range used in the serach. @@ -157,6 +160,7 @@ func (s *Search) reset() { s.MetricBlockRef.BlockRef = nil s.idb = nil + s.retentionDeadline = 0 s.ts.reset() s.tr = TimeRange{} s.tfss = nil @@ -178,9 +182,11 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte if s.needClosing { logger.Panicf("BUG: missing MustClose call before the next call to Init") } + retentionDeadline := int64(fasttime.UnixTimestamp()*1e3) - storage.retentionMsecs s.reset() s.idb = storage.idb() + s.retentionDeadline = retentionDeadline s.tr = tr s.tfss = tfss s.deadline = deadline @@ -240,6 +246,10 @@ func (s *Search) NextMetricBlock() bool { s.loops++ tsid := &s.ts.BlockRef.bh.TSID if tsid.MetricID != s.prevMetricID { + if s.ts.BlockRef.bh.MaxTimestamp < s.retentionDeadline { + // Skip the block, since it contains only data outside the configured retention. + continue + } var err error s.MetricBlockRef.MetricName, err = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID) if err != nil {