diff --git a/lib/storage/block_stream_merger.go b/lib/storage/block_stream_merger.go index 50d1e5f9a9..cc5935aaa5 100644 --- a/lib/storage/block_stream_merger.go +++ b/lib/storage/block_stream_merger.go @@ -25,10 +25,12 @@ type blockStreamMerger struct { func (bsm *blockStreamMerger) reset() { bsm.Block = nil + for i := range bsm.bsrHeap { bsm.bsrHeap[i] = nil } bsm.bsrHeap = bsm.bsrHeap[:0] + bsm.retentionDeadline = 0 bsm.nextBlockNoop = false bsm.err = nil diff --git a/lib/storage/partition.go b/lib/storage/partition.go index d5b38a62d3..d0c5d99116 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -859,7 +859,9 @@ func (pt *partition) ForceMergeAllParts() error { return nil } - // If len(pws) == 1, then the merge must run anyway. This allows removing the deleted series and performing de-duplication if needed. + // If len(pws) == 1, then the merge must run anyway. + // This allows applying the configured retention, removing the deleted series + // and performing de-duplication if needed. if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil { return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err) } @@ -1604,8 +1606,11 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { continue } fn := fi.Name() - if fn == "tmp" || fn == "txn" || fn == "snapshots" { + if fn == "snapshots" { // "snapshots" dir is skipped for backwards compatibility. Now it is unused. + continue + } + if fn == "tmp" || fn == "txn" { // Skip special dirs. continue }