Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-10-23 14:09:38 +03:00
commit 33343695a9
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -52,52 +52,53 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
return errForciblyStopped return errForciblyStopped
default: default:
} }
if dmis.Has(bsm.Block.bh.TSID.MetricID) { b := bsm.Block
if dmis.Has(b.bh.TSID.MetricID) {
// Skip blocks for deleted metrics. // Skip blocks for deleted metrics.
atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
continue continue
} }
if bsm.Block.bh.MaxTimestamp < retentionDeadline { if b.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention. // Skip blocks out of the given retention.
atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
continue continue
} }
if pendingBlockIsEmpty { if pendingBlockIsEmpty {
// Load the next block if pendingBlock is empty. // Load the next block if pendingBlock is empty.
pendingBlock.CopyFrom(bsm.Block) pendingBlock.CopyFrom(b)
pendingBlockIsEmpty = false pendingBlockIsEmpty = false
continue continue
} }
// Verify whether pendingBlock may be merged with bsm.Block (the current block). // Verify whether pendingBlock may be merged with b (the current block).
if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID { if pendingBlock.bh.TSID.MetricID != b.bh.TSID.MetricID {
// Fast path - blocks belong to distinct time series. // Fast path - blocks belong to distinct time series.
// Write the pendingBlock and then deal with bsm.Block. // Write the pendingBlock and then deal with b.
if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) { if b.bh.TSID.Less(&pendingBlock.bh.TSID) {
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID) logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &b.bh.TSID, &pendingBlock.bh.TSID)
} }
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block) pendingBlock.CopyFrom(b)
continue continue
} }
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp { if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= b.bh.MinTimestamp {
// Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block. // Fast path - pendingBlock is too big and it doesn't overlap with b.
// Write the pendingBlock and then deal with bsm.Block. // Write the pendingBlock and then deal with b.
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged) bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block) pendingBlock.CopyFrom(b)
continue continue
} }
// Slow path - pendingBlock and bsm.Block belong to the same time series, // Slow path - pendingBlock and b belong to the same time series,
// so they must be merged. // so they must be merged.
if err := unmarshalAndCalibrateScale(pendingBlock, bsm.Block); err != nil { if err := unmarshalAndCalibrateScale(pendingBlock, b); err != nil {
return fmt.Errorf("cannot unmarshal and calibrate scale for blocks to be merged: %w", err) return fmt.Errorf("cannot unmarshal and calibrate scale for blocks to be merged: %w", err)
} }
tmpBlock.Reset() tmpBlock.Reset()
tmpBlock.bh.TSID = bsm.Block.bh.TSID tmpBlock.bh.TSID = b.bh.TSID
tmpBlock.bh.Scale = bsm.Block.bh.Scale tmpBlock.bh.Scale = b.bh.Scale
tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits) tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, b.bh.PrecisionBits)
mergeBlocks(tmpBlock, pendingBlock, bsm.Block, retentionDeadline, rowsDeleted) mergeBlocks(tmpBlock, pendingBlock, b, retentionDeadline, rowsDeleted)
if len(tmpBlock.timestamps) <= maxRowsPerBlock { if len(tmpBlock.timestamps) <= maxRowsPerBlock {
// More entries may be added to tmpBlock. Swap it with pendingBlock, // More entries may be added to tmpBlock. Swap it with pendingBlock,
// so more entries may be added to pendingBlock on the next iteration. // so more entries may be added to pendingBlock on the next iteration.