diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 4ef383797..2e2df284c 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -52,52 +52,53 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc return errForciblyStopped default: } - if dmis.Has(bsm.Block.bh.TSID.MetricID) { + b := bsm.Block + if dmis.Has(b.bh.TSID.MetricID) { // Skip blocks for deleted metrics. - atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) + atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) continue } - if bsm.Block.bh.MaxTimestamp < retentionDeadline { + if b.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. - atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) + atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) continue } if pendingBlockIsEmpty { // Load the next block if pendingBlock is empty. - pendingBlock.CopyFrom(bsm.Block) + pendingBlock.CopyFrom(b) pendingBlockIsEmpty = false continue } - // Verify whether pendingBlock may be merged with bsm.Block (the current block). - if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID { + // Verify whether pendingBlock may be merged with b (the current block). + if pendingBlock.bh.TSID.MetricID != b.bh.TSID.MetricID { // Fast path - blocks belong to distinct time series. - // Write the pendingBlock and then deal with bsm.Block. - if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) { - logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID) + // Write the pendingBlock and then deal with b. + if b.bh.TSID.Less(&pendingBlock.bh.TSID) { + logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &b.bh.TSID, &pendingBlock.bh.TSID) } bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) - pendingBlock.CopyFrom(bsm.Block) + pendingBlock.CopyFrom(b) continue } - if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp { - // Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block. - // Write the pendingBlock and then deal with bsm.Block. + if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= b.bh.MinTimestamp { + // Fast path - pendingBlock is too big and it doesn't overlap with b. + // Write the pendingBlock and then deal with b. bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) - pendingBlock.CopyFrom(bsm.Block) + pendingBlock.CopyFrom(b) continue } - // Slow path - pendingBlock and bsm.Block belong to the same time series, + // Slow path - pendingBlock and b belong to the same time series, // so they must be merged. - if err := unmarshalAndCalibrateScale(pendingBlock, bsm.Block); err != nil { + if err := unmarshalAndCalibrateScale(pendingBlock, b); err != nil { return fmt.Errorf("cannot unmarshal and calibrate scale for blocks to be merged: %w", err) } tmpBlock.Reset() - tmpBlock.bh.TSID = bsm.Block.bh.TSID - tmpBlock.bh.Scale = bsm.Block.bh.Scale - tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits) - mergeBlocks(tmpBlock, pendingBlock, bsm.Block, retentionDeadline, rowsDeleted) + tmpBlock.bh.TSID = b.bh.TSID + tmpBlock.bh.Scale = b.bh.Scale + tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, b.bh.PrecisionBits) + mergeBlocks(tmpBlock, pendingBlock, b, retentionDeadline, rowsDeleted) if len(tmpBlock.timestamps) <= maxRowsPerBlock { // More entries may be added to tmpBlock. Swap it with pendingBlock, // so more entries may be added to pendingBlock on the next iteration.