lib/storage: use a single reference to the currently merged block - bsm.Block during the block merge loop

This commit is contained in:
Aliaksandr Valialkin 2022-10-23 14:08:26 +03:00
parent d0a9ca1bc2
commit 187e294a53
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -52,52 +52,53 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
return errForciblyStopped
default:
}
if dmis.Has(bsm.Block.bh.TSID.MetricID) {
b := bsm.Block
if dmis.Has(b.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount))
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
continue
}
if bsm.Block.bh.MaxTimestamp < retentionDeadline {
if b.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention.
atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount))
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
continue
}
if pendingBlockIsEmpty {
// Load the next block if pendingBlock is empty.
pendingBlock.CopyFrom(bsm.Block)
pendingBlock.CopyFrom(b)
pendingBlockIsEmpty = false
continue
}
// Verify whether pendingBlock may be merged with bsm.Block (the current block).
if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID {
// Verify whether pendingBlock may be merged with b (the current block).
if pendingBlock.bh.TSID.MetricID != b.bh.TSID.MetricID {
// Fast path - blocks belong to distinct time series.
// Write the pendingBlock and then deal with bsm.Block.
if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) {
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID)
// Write the pendingBlock and then deal with b.
if b.bh.TSID.Less(&pendingBlock.bh.TSID) {
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &b.bh.TSID, &pendingBlock.bh.TSID)
}
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block)
pendingBlock.CopyFrom(b)
continue
}
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp {
// Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block.
// Write the pendingBlock and then deal with bsm.Block.
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= b.bh.MinTimestamp {
// Fast path - pendingBlock is too big and it doesn't overlap with b.
// Write the pendingBlock and then deal with b.
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block)
pendingBlock.CopyFrom(b)
continue
}
// Slow path - pendingBlock and bsm.Block belong to the same time series,
// Slow path - pendingBlock and b belong to the same time series,
// so they must be merged.
if err := unmarshalAndCalibrateScale(pendingBlock, bsm.Block); err != nil {
if err := unmarshalAndCalibrateScale(pendingBlock, b); err != nil {
return fmt.Errorf("cannot unmarshal and calibrate scale for blocks to be merged: %w", err)
}
tmpBlock.Reset()
tmpBlock.bh.TSID = bsm.Block.bh.TSID
tmpBlock.bh.Scale = bsm.Block.bh.Scale
tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits)
mergeBlocks(tmpBlock, pendingBlock, bsm.Block, retentionDeadline, rowsDeleted)
tmpBlock.bh.TSID = b.bh.TSID
tmpBlock.bh.Scale = b.bh.Scale
tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, b.bh.PrecisionBits)
mergeBlocks(tmpBlock, pendingBlock, b, retentionDeadline, rowsDeleted)
if len(tmpBlock.timestamps) <= maxRowsPerBlock {
// More entries may be added to tmpBlock. Swap it with pendingBlock,
// so more entries may be added to pendingBlock on the next iteration.