From 29e4e7f4229fc7b7660a6d88fb37aa9a1609b534 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 31 Oct 2020 20:42:13 +0200 Subject: [PATCH] lib/storage: drop more samples outside the given retention during background merge Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/17 --- lib/storage/block.go | 2 +- lib/storage/merge.go | 65 ++++++++++++++++++++++---------------------- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/lib/storage/block.go b/lib/storage/block.go index ec0b3fdeb..c446c370e 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -23,7 +23,7 @@ const ( type Block struct { bh blockHeader - // nextIdx is the next row index for timestamps and values. + // nextIdx is the next index for reading timestamps and values. nextIdx int timestamps []int64 diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 6a1845ce9..0fc16d434 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -40,8 +40,11 @@ var errForciblyStopped = fmt.Errorf("forcibly stopped") func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { - // Search for the first block to merge - var pendingBlock *Block + pendingBlockIsEmpty := true + pendingBlock := getBlock() + defer putBlock(pendingBlock) + tmpBlock := getBlock() + defer putBlock(tmpBlock) for bsm.NextBlock() { select { case <-stopCh: @@ -58,31 +61,10 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue } - pendingBlock = getBlock() - pendingBlock.CopyFrom(bsm.Block) - break - } - if pendingBlock != nil { - defer putBlock(pendingBlock) - } - - // Merge blocks. - tmpBlock := getBlock() - defer putBlock(tmpBlock) - for bsm.NextBlock() { - select { - case <-stopCh: - return errForciblyStopped - default: - } - if dmis.Has(bsm.Block.bh.TSID.MetricID) { - // Skip blocks for deleted metrics. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) - continue - } - if bsm.Block.bh.MaxTimestamp < retentionDeadline { - // skip blocks out of the given retention. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + if pendingBlockIsEmpty { + // Load the next block if pendingBlock is empty. + pendingBlock.CopyFrom(bsm.Block) + pendingBlockIsEmpty = false continue } @@ -114,16 +96,20 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc tmpBlock.bh.TSID = bsm.Block.bh.TSID tmpBlock.bh.Scale = bsm.Block.bh.Scale tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits) - mergeBlocks(tmpBlock, pendingBlock, bsm.Block) + mergeBlocks(tmpBlock, pendingBlock, bsm.Block, retentionDeadline, rowsDeleted) if len(tmpBlock.timestamps) <= maxRowsPerBlock { // More entries may be added to tmpBlock. Swap it with pendingBlock, // so more entries may be added to pendingBlock on the next iteration. - tmpBlock.fixupTimestamps() + if len(tmpBlock.timestamps) > 0 { + tmpBlock.fixupTimestamps() + } else { + pendingBlockIsEmpty = true + } pendingBlock, tmpBlock = tmpBlock, pendingBlock continue } - // Write the first len(maxRowsPerBlock) of tmpBlock.timestamps to bsw, + // Write the first maxRowsPerBlock of tmpBlock.timestamps to bsw, // leave the rest in pendingBlock. tmpBlock.nextIdx = maxRowsPerBlock pendingBlock.CopyFrom(tmpBlock) @@ -137,18 +123,21 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc if err := bsm.Error(); err != nil { return fmt.Errorf("cannot read block to be merged: %w", err) } - if pendingBlock != nil { + if !pendingBlockIsEmpty { bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) } return nil } // mergeBlocks merges ib1 and ib2 to ob. -func mergeBlocks(ob, ib1, ib2 *Block) { +func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint64) { ib1.assertMergeable(ib2) ib1.assertUnmarshaled() ib2.assertUnmarshaled() + skipSamplesOutsideRetention(ib1, retentionDeadline, rowsDeleted) + skipSamplesOutsideRetention(ib2, retentionDeadline, rowsDeleted) + if ib1.bh.MaxTimestamp < ib2.bh.MinTimestamp { // Fast path - ib1 values have smaller timestamps than ib2 values. appendRows(ob, ib1) @@ -186,6 +175,16 @@ func mergeBlocks(ob, ib1, ib2 *Block) { } } +func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) { + timestamps := b.timestamps + nextIdx := b.nextIdx + for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { + nextIdx++ + } + *rowsDeleted += uint64(nextIdx - b.nextIdx) + b.nextIdx = nextIdx +} + func appendRows(ob, ib *Block) { ob.timestamps = append(ob.timestamps, ib.timestamps[ib.nextIdx:]...) ob.values = append(ob.values, ib.values[ib.nextIdx:]...) @@ -199,7 +198,7 @@ func unmarshalAndCalibrateScale(b1, b2 *Block) error { return err } - scale := decimal.CalibrateScale(b1.values, b1.bh.Scale, b2.values, b2.bh.Scale) + scale := decimal.CalibrateScale(b1.values[b1.nextIdx:], b1.bh.Scale, b2.values[b2.nextIdx:], b2.bh.Scale) b1.bh.Scale = scale b2.bh.Scale = scale return nil