diff --git a/lib/fs/fs.go b/lib/fs/fs.go index 10b9e0cc9..925f16012 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -316,7 +316,7 @@ func CopyDirectory(srcPath, dstPath string) error { } src := filepath.Join(srcPath, de.Name()) dst := filepath.Join(dstPath, de.Name()) - if err := copyFile(src, dst); err != nil { + if err := CopyFile(src, dst); err != nil { return err } } @@ -324,7 +324,8 @@ func CopyDirectory(srcPath, dstPath string) error { return nil } -func copyFile(srcPath, dstPath string) error { +// CopyFile copies the file from srcPath to dstPath. +func CopyFile(srcPath, dstPath string) error { src, err := os.Open(srcPath) if err != nil { return err diff --git a/lib/storage/block_stream_merger.go b/lib/storage/block_stream_merger.go index cc5935aaa..77992295e 100644 --- a/lib/storage/block_stream_merger.go +++ b/lib/storage/block_stream_merger.go @@ -61,7 +61,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline bsm.nextBlockNoop = true } -func (bsm *blockStreamMerger) getRetentionDeadline(b *Block) int64 { +func (bsm *blockStreamMerger) getRetentionDeadline(bh *blockHeader) int64 { return bsm.retentionDeadline } diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 347f2c792..7207fd213 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -57,7 +57,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) continue } - retentionDeadline := bsm.getRetentionDeadline(b) + retentionDeadline := bsm.getRetentionDeadline(&b.bh) if b.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) diff --git a/lib/storage/search.go b/lib/storage/search.go index 64b0a7cfb..1a3cdefe6 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -132,6 +132,9 @@ type Search struct { // idb is used for MetricName lookup for the found data blocks. idb *indexDB + // retentionDeadline is used for filtering out blocks outside the configured retention. + retentionDeadline int64 + ts tableSearch // tr contains time range used in the serach. @@ -157,6 +160,7 @@ func (s *Search) reset() { s.MetricBlockRef.BlockRef = nil s.idb = nil + s.retentionDeadline = 0 s.ts.reset() s.tr = TimeRange{} s.tfss = nil @@ -178,9 +182,11 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte if s.needClosing { logger.Panicf("BUG: missing MustClose call before the next call to Init") } + retentionDeadline := int64(fasttime.UnixTimestamp()*1e3) - storage.retentionMsecs s.reset() s.idb = storage.idb() + s.retentionDeadline = retentionDeadline s.tr = tr s.tfss = tfss s.deadline = deadline @@ -240,6 +246,10 @@ func (s *Search) NextMetricBlock() bool { s.loops++ tsid := &s.ts.BlockRef.bh.TSID if tsid.MetricID != s.prevMetricID { + if s.ts.BlockRef.bh.MaxTimestamp < s.retentionDeadline { + // Skip the block, since it contains only data outside the configured retention. + continue + } var err error s.MetricBlockRef.MetricName, err = s.idb.searchMetricNameWithCache(s.MetricBlockRef.MetricName[:0], tsid.MetricID, tsid.AccountID, tsid.ProjectID) if err != nil {