mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
e7959094f6
The prioritizing could lead to big merge starvation, which could end up in too big number of parts that must be merged into big parts. Multiple big merges may be initiated after the migration from v1.39.0 or v1.39.1. It is OK - these merges should be finished soon, which should return CPU and disk IO usage to normal levels. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/618
206 lines
5.9 KiB
Go
206 lines
5.9 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
|
)
|
|
|
|
// mergeBlockStreams merges bsrs into bsw and updates ph.
|
|
//
|
|
// mergeBlockStreams returns immediately if stopCh is closed.
|
|
//
|
|
// rowsMerged is atomically updated with the number of merged rows during the merge.
|
|
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
|
|
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
|
|
ph.Reset()
|
|
|
|
bsm := bsmPool.Get().(*blockStreamMerger)
|
|
bsm.Init(bsrs)
|
|
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
|
|
bsm.reset()
|
|
bsmPool.Put(bsm)
|
|
bsw.MustClose()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if err == errForciblyStopped {
|
|
return err
|
|
}
|
|
return fmt.Errorf("cannot merge %d streams: %s: %w", len(bsrs), bsrs, err)
|
|
}
|
|
|
|
var bsmPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &blockStreamMerger{}
|
|
},
|
|
}
|
|
|
|
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
|
|
|
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
|
|
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
|
|
// Search for the first block to merge
|
|
var pendingBlock *Block
|
|
for bsm.NextBlock() {
|
|
select {
|
|
case <-stopCh:
|
|
return errForciblyStopped
|
|
default:
|
|
}
|
|
if dmis.Has(bsm.Block.bh.TSID.MetricID) {
|
|
// Skip blocks for deleted metrics.
|
|
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
|
|
continue
|
|
}
|
|
pendingBlock = getBlock()
|
|
pendingBlock.CopyFrom(bsm.Block)
|
|
break
|
|
}
|
|
if pendingBlock != nil {
|
|
defer putBlock(pendingBlock)
|
|
}
|
|
|
|
// Merge blocks.
|
|
tmpBlock := getBlock()
|
|
defer putBlock(tmpBlock)
|
|
for bsm.NextBlock() {
|
|
select {
|
|
case <-stopCh:
|
|
return errForciblyStopped
|
|
default:
|
|
}
|
|
if dmis.Has(bsm.Block.bh.TSID.MetricID) {
|
|
// Skip blocks for deleted metrics.
|
|
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
|
|
continue
|
|
}
|
|
|
|
// Verify whether pendingBlock may be merged with bsm.Block (the current block).
|
|
if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID {
|
|
// Fast path - blocks belong to distinct time series.
|
|
// Write the pendingBlock and then deal with bsm.Block.
|
|
if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) {
|
|
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID)
|
|
}
|
|
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
|
pendingBlock.CopyFrom(bsm.Block)
|
|
continue
|
|
}
|
|
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp {
|
|
// Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block.
|
|
// Write the pendingBlock and then deal with bsm.Block.
|
|
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
|
pendingBlock.CopyFrom(bsm.Block)
|
|
continue
|
|
}
|
|
|
|
// Slow path - pendingBlock and bsm.Block belong to the same time series,
|
|
// so they must be merged.
|
|
if err := unmarshalAndCalibrateScale(pendingBlock, bsm.Block); err != nil {
|
|
return fmt.Errorf("cannot unmarshal and calibrate scale for blocks to be merged: %w", err)
|
|
}
|
|
tmpBlock.Reset()
|
|
tmpBlock.bh.TSID = bsm.Block.bh.TSID
|
|
tmpBlock.bh.Scale = bsm.Block.bh.Scale
|
|
tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits)
|
|
mergeBlocks(tmpBlock, pendingBlock, bsm.Block)
|
|
if len(tmpBlock.timestamps) <= maxRowsPerBlock {
|
|
// More entries may be added to tmpBlock. Swap it with pendingBlock,
|
|
// so more entries may be added to pendingBlock on the next iteration.
|
|
tmpBlock.fixupTimestamps()
|
|
pendingBlock, tmpBlock = tmpBlock, pendingBlock
|
|
continue
|
|
}
|
|
|
|
// Write the first len(maxRowsPerBlock) of tmpBlock.timestamps to bsw,
|
|
// leave the rest in pendingBlock.
|
|
tmpBlock.nextIdx = maxRowsPerBlock
|
|
pendingBlock.CopyFrom(tmpBlock)
|
|
pendingBlock.fixupTimestamps()
|
|
tmpBlock.nextIdx = 0
|
|
tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock]
|
|
tmpBlock.values = tmpBlock.values[:maxRowsPerBlock]
|
|
tmpBlock.fixupTimestamps()
|
|
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged)
|
|
}
|
|
if err := bsm.Error(); err != nil {
|
|
return fmt.Errorf("cannot read block to be merged: %w", err)
|
|
}
|
|
if pendingBlock != nil {
|
|
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeBlocks merges ib1 and ib2 to ob.
|
|
func mergeBlocks(ob, ib1, ib2 *Block) {
|
|
ib1.assertMergeable(ib2)
|
|
ib1.assertUnmarshaled()
|
|
ib2.assertUnmarshaled()
|
|
|
|
if ib1.bh.MaxTimestamp < ib2.bh.MinTimestamp {
|
|
// Fast path - ib1 values have smaller timestamps than ib2 values.
|
|
appendRows(ob, ib1)
|
|
appendRows(ob, ib2)
|
|
return
|
|
}
|
|
if ib2.bh.MaxTimestamp < ib1.bh.MinTimestamp {
|
|
// Fast path - ib2 values have smaller timestamps than ib1 values.
|
|
appendRows(ob, ib2)
|
|
appendRows(ob, ib1)
|
|
return
|
|
}
|
|
if ib1.nextIdx >= len(ib1.timestamps) {
|
|
appendRows(ob, ib2)
|
|
return
|
|
}
|
|
if ib2.nextIdx >= len(ib2.timestamps) {
|
|
appendRows(ob, ib1)
|
|
return
|
|
}
|
|
for {
|
|
i := ib1.nextIdx
|
|
ts2 := ib2.timestamps[ib2.nextIdx]
|
|
for i < len(ib1.timestamps) && ib1.timestamps[i] <= ts2 {
|
|
i++
|
|
}
|
|
ob.timestamps = append(ob.timestamps, ib1.timestamps[ib1.nextIdx:i]...)
|
|
ob.values = append(ob.values, ib1.values[ib1.nextIdx:i]...)
|
|
ib1.nextIdx = i
|
|
if ib1.nextIdx >= len(ib1.timestamps) {
|
|
appendRows(ob, ib2)
|
|
return
|
|
}
|
|
ib1, ib2 = ib2, ib1
|
|
}
|
|
}
|
|
|
|
func appendRows(ob, ib *Block) {
|
|
ob.timestamps = append(ob.timestamps, ib.timestamps[ib.nextIdx:]...)
|
|
ob.values = append(ob.values, ib.values[ib.nextIdx:]...)
|
|
}
|
|
|
|
func unmarshalAndCalibrateScale(b1, b2 *Block) error {
|
|
if err := b1.UnmarshalData(); err != nil {
|
|
return err
|
|
}
|
|
if err := b2.UnmarshalData(); err != nil {
|
|
return err
|
|
}
|
|
|
|
scale := decimal.CalibrateScale(b1.values, b1.bh.Scale, b2.values, b2.bh.Scale)
|
|
b1.bh.Scale = scale
|
|
b2.bh.Scale = scale
|
|
return nil
|
|
}
|
|
|
|
func minUint8(a, b uint8) uint8 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|