From 54f35c175cd8be9438657334ac4a3dd34ec16393 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 23 Oct 2022 14:30:16 +0300 Subject: [PATCH] lib/storage: small refactoring: move retentionDeadline to blockStreamMerger This allows defining per-block retention in the future by updating the getRetentionDeadline function --- lib/storage/block_stream_merger.go | 11 ++++++++++- lib/storage/merge.go | 7 ++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/lib/storage/block_stream_merger.go b/lib/storage/block_stream_merger.go index d48ad81a9..50d1e5f9a 100644 --- a/lib/storage/block_stream_merger.go +++ b/lib/storage/block_stream_merger.go @@ -13,6 +13,9 @@ type blockStreamMerger struct { bsrHeap blockStreamReaderHeap + // Blocks with smaller timestamps are removed because of retention. + retentionDeadline int64 + // Whether the call to NextBlock must be no-op. nextBlockNoop bool @@ -26,13 +29,15 @@ func (bsm *blockStreamMerger) reset() { bsm.bsrHeap[i] = nil } bsm.bsrHeap = bsm.bsrHeap[:0] + bsm.retentionDeadline = 0 bsm.nextBlockNoop = false bsm.err = nil } // Init initializes bsm with the given bsrs. -func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) { +func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64) { bsm.reset() + bsm.retentionDeadline = retentionDeadline for _, bsr := range bsrs { if bsr.NextBlock() { bsm.bsrHeap = append(bsm.bsrHeap, bsr) @@ -54,6 +59,10 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) { bsm.nextBlockNoop = true } +func (bsm *blockStreamMerger) getRetentionDeadline(b *Block) int64 { + return bsm.retentionDeadline +} + // NextBlock stores the next block in bsm.Block. // // The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 2e2df284c..8b326bf9d 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -20,8 +20,8 @@ func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStre ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) - bsm.Init(bsrs) - err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted) + bsm.Init(bsrs, retentionDeadline) + err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) bsw.MustClose() @@ -40,7 +40,7 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, - dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { + dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { pendingBlockIsEmpty := true pendingBlock := getBlock() defer putBlock(pendingBlock) @@ -58,6 +58,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) continue } + retentionDeadline := bsm.getRetentionDeadline(b) if b.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))