lib/storage: skip blocks outside the configured retention during search

Blocks outside the configured retention are eventually deleted during background merge.
But such blocks may reside in the storage for long time until background merge.
Previously VictoriaMetrics could spend additional CPU time on processing such blocks
during search queries. Now these blocks are skipped.
This commit is contained in:
Aliaksandr Valialkin 2022-10-24 02:52:38 +03:00
parent 2fc82b846e
commit 51f2e473f5
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 15 additions and 4 deletions

View file

@ -316,7 +316,7 @@ func CopyDirectory(srcPath, dstPath string) error {
} }
src := filepath.Join(srcPath, de.Name()) src := filepath.Join(srcPath, de.Name())
dst := filepath.Join(dstPath, de.Name()) dst := filepath.Join(dstPath, de.Name())
if err := copyFile(src, dst); err != nil { if err := CopyFile(src, dst); err != nil {
return err return err
} }
} }
@ -324,7 +324,8 @@ func CopyDirectory(srcPath, dstPath string) error {
return nil return nil
} }
func copyFile(srcPath, dstPath string) error { // CopyFile copies the file from srcPath to dstPath.
func CopyFile(srcPath, dstPath string) error {
src, err := os.Open(srcPath) src, err := os.Open(srcPath)
if err != nil { if err != nil {
return err return err

View file

@ -61,7 +61,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline
bsm.nextBlockNoop = true bsm.nextBlockNoop = true
} }
func (bsm *blockStreamMerger) getRetentionDeadline(b *Block) int64 { func (bsm *blockStreamMerger) getRetentionDeadline(bh *blockHeader) int64 {
return bsm.retentionDeadline return bsm.retentionDeadline
} }

View file

@ -57,7 +57,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
continue continue
} }
retentionDeadline := bsm.getRetentionDeadline(b) retentionDeadline := bsm.getRetentionDeadline(&b.bh)
if b.bh.MaxTimestamp < retentionDeadline { if b.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention. // Skip blocks out of the given retention.
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))

View file

@ -132,6 +132,9 @@ type Search struct {
// idb is used for MetricName lookup for the found data blocks. // idb is used for MetricName lookup for the found data blocks.
idb *indexDB idb *indexDB
// retentionDeadline is used for filtering out blocks outside the configured retention.
retentionDeadline int64
ts tableSearch ts tableSearch
// tr contains time range used in the serach. // tr contains time range used in the serach.
@ -157,6 +160,7 @@ func (s *Search) reset() {
s.MetricBlockRef.BlockRef = nil s.MetricBlockRef.BlockRef = nil
s.idb = nil s.idb = nil
s.retentionDeadline = 0
s.ts.reset() s.ts.reset()
s.tr = TimeRange{} s.tr = TimeRange{}
s.tfss = nil s.tfss = nil
@ -178,9 +182,11 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
if s.needClosing { if s.needClosing {
logger.Panicf("BUG: missing MustClose call before the next call to Init") logger.Panicf("BUG: missing MustClose call before the next call to Init")
} }
retentionDeadline := int64(fasttime.UnixTimestamp()*1e3) - storage.retentionMsecs
s.reset() s.reset()
s.idb = storage.idb() s.idb = storage.idb()
s.retentionDeadline = retentionDeadline
s.tr = tr s.tr = tr
s.tfss = tfss s.tfss = tfss
s.deadline = deadline s.deadline = deadline
@ -240,6 +246,10 @@ func (s *Search) NextMetricBlock() bool {
s.loops++ s.loops++
tsid := &s.ts.BlockRef.bh.TSID tsid := &s.ts.BlockRef.bh.TSID
if tsid.MetricID != s.prevMetricID { if tsid.MetricID != s.prevMetricID {
if s.ts.BlockRef.bh.MaxTimestamp < s.retentionDeadline {
// Skip the block, since it contains only data outside the configured retention.
continue
}
var err error var err error
s.MetricBlockRef.MetricName, err = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID) s.MetricBlockRef.MetricName, err = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID)
if err != nil { if err != nil {