mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: small refactoring: move retentionDeadline to blockStreamMerger
This allows defining per-block retention in the future by updating the getRetentionDeadline function
This commit is contained in:
parent
187e294a53
commit
54f35c175c
2 changed files with 14 additions and 4 deletions
|
@ -13,6 +13,9 @@ type blockStreamMerger struct {
|
|||
|
||||
bsrHeap blockStreamReaderHeap
|
||||
|
||||
// Blocks with smaller timestamps are removed because of retention.
|
||||
retentionDeadline int64
|
||||
|
||||
// Whether the call to NextBlock must be no-op.
|
||||
nextBlockNoop bool
|
||||
|
||||
|
@ -26,13 +29,15 @@ func (bsm *blockStreamMerger) reset() {
|
|||
bsm.bsrHeap[i] = nil
|
||||
}
|
||||
bsm.bsrHeap = bsm.bsrHeap[:0]
|
||||
bsm.retentionDeadline = 0
|
||||
bsm.nextBlockNoop = false
|
||||
bsm.err = nil
|
||||
}
|
||||
|
||||
// Init initializes bsm with the given bsrs.
|
||||
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) {
|
||||
func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, retentionDeadline int64) {
|
||||
bsm.reset()
|
||||
bsm.retentionDeadline = retentionDeadline
|
||||
for _, bsr := range bsrs {
|
||||
if bsr.NextBlock() {
|
||||
bsm.bsrHeap = append(bsm.bsrHeap, bsr)
|
||||
|
@ -54,6 +59,10 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader) {
|
|||
bsm.nextBlockNoop = true
|
||||
}
|
||||
|
||||
func (bsm *blockStreamMerger) getRetentionDeadline(b *Block) int64 {
|
||||
return bsm.retentionDeadline
|
||||
}
|
||||
|
||||
// NextBlock stores the next block in bsm.Block.
|
||||
//
|
||||
// The blocks are sorted by (TDIS, MinTimestamp). Two subsequent blocks
|
||||
|
|
|
@ -20,8 +20,8 @@ func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStre
|
|||
ph.Reset()
|
||||
|
||||
bsm := bsmPool.Get().(*blockStreamMerger)
|
||||
bsm.Init(bsrs)
|
||||
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted)
|
||||
bsm.Init(bsrs, retentionDeadline)
|
||||
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
|
||||
bsm.reset()
|
||||
bsmPool.Put(bsm)
|
||||
bsw.MustClose()
|
||||
|
@ -40,7 +40,7 @@ var bsmPool = &sync.Pool{
|
|||
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
||||
|
||||
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
|
||||
dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error {
|
||||
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
|
||||
pendingBlockIsEmpty := true
|
||||
pendingBlock := getBlock()
|
||||
defer putBlock(pendingBlock)
|
||||
|
@ -58,6 +58,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
|
|||
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
|
||||
continue
|
||||
}
|
||||
retentionDeadline := bsm.getRetentionDeadline(b)
|
||||
if b.bh.MaxTimestamp < retentionDeadline {
|
||||
// Skip blocks out of the given retention.
|
||||
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
|
||||
|
|
Loading…
Reference in a new issue