VictoriaMetrics/lib/storage/merge.go
Aliaksandr Valialkin e7959094f6 lib/storage: remove prioritizing of merging small parts over merging big parts, since it doesn't work as expected
The prioritizing could lead to big merge starvation, which could end up in too big number of parts that must be merged into big parts.

Multiple big merges may be initiated after the migration from v1.39.0 or v1.39.1. It is OK - these merges should be finished soon,
which should return CPU and disk IO usage to normal levels.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/618
2020-07-30 19:57:27 +03:00

206 lines
5.9 KiB
Go

package storage
import (
"fmt"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
// mergeBlockStreams merges bsrs into bsw and updates ph.
//
// mergeBlockStreams returns immediately if stopCh is closed.
//
// rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger)
bsm.Init(bsrs)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
bsm.reset()
bsmPool.Put(bsm)
bsw.MustClose()
if err == nil {
return nil
}
if err == errForciblyStopped {
return err
}
return fmt.Errorf("cannot merge %d streams: %s: %w", len(bsrs), bsrs, err)
}
var bsmPool = &sync.Pool{
New: func() interface{} {
return &blockStreamMerger{}
},
}
var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
// Search for the first block to merge
var pendingBlock *Block
for bsm.NextBlock() {
select {
case <-stopCh:
return errForciblyStopped
default:
}
if dmis.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
pendingBlock = getBlock()
pendingBlock.CopyFrom(bsm.Block)
break
}
if pendingBlock != nil {
defer putBlock(pendingBlock)
}
// Merge blocks.
tmpBlock := getBlock()
defer putBlock(tmpBlock)
for bsm.NextBlock() {
select {
case <-stopCh:
return errForciblyStopped
default:
}
if dmis.Has(bsm.Block.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
// Verify whether pendingBlock may be merged with bsm.Block (the current block).
if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID {
// Fast path - blocks belong to distinct time series.
// Write the pendingBlock and then deal with bsm.Block.
if bsm.Block.bh.TSID.Less(&pendingBlock.bh.TSID) {
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &bsm.Block.bh.TSID, &pendingBlock.bh.TSID)
}
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block)
continue
}
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= bsm.Block.bh.MinTimestamp {
// Fast path - pendingBlock is too big and it doesn't overlap with bsm.Block.
// Write the pendingBlock and then deal with bsm.Block.
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
pendingBlock.CopyFrom(bsm.Block)
continue
}
// Slow path - pendingBlock and bsm.Block belong to the same time series,
// so they must be merged.
if err := unmarshalAndCalibrateScale(pendingBlock, bsm.Block); err != nil {
return fmt.Errorf("cannot unmarshal and calibrate scale for blocks to be merged: %w", err)
}
tmpBlock.Reset()
tmpBlock.bh.TSID = bsm.Block.bh.TSID
tmpBlock.bh.Scale = bsm.Block.bh.Scale
tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, bsm.Block.bh.PrecisionBits)
mergeBlocks(tmpBlock, pendingBlock, bsm.Block)
if len(tmpBlock.timestamps) <= maxRowsPerBlock {
// More entries may be added to tmpBlock. Swap it with pendingBlock,
// so more entries may be added to pendingBlock on the next iteration.
tmpBlock.fixupTimestamps()
pendingBlock, tmpBlock = tmpBlock, pendingBlock
continue
}
// Write the first len(maxRowsPerBlock) of tmpBlock.timestamps to bsw,
// leave the rest in pendingBlock.
tmpBlock.nextIdx = maxRowsPerBlock
pendingBlock.CopyFrom(tmpBlock)
pendingBlock.fixupTimestamps()
tmpBlock.nextIdx = 0
tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock]
tmpBlock.values = tmpBlock.values[:maxRowsPerBlock]
tmpBlock.fixupTimestamps()
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged)
}
if err := bsm.Error(); err != nil {
return fmt.Errorf("cannot read block to be merged: %w", err)
}
if pendingBlock != nil {
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
}
return nil
}
// mergeBlocks merges ib1 and ib2 to ob.
func mergeBlocks(ob, ib1, ib2 *Block) {
ib1.assertMergeable(ib2)
ib1.assertUnmarshaled()
ib2.assertUnmarshaled()
if ib1.bh.MaxTimestamp < ib2.bh.MinTimestamp {
// Fast path - ib1 values have smaller timestamps than ib2 values.
appendRows(ob, ib1)
appendRows(ob, ib2)
return
}
if ib2.bh.MaxTimestamp < ib1.bh.MinTimestamp {
// Fast path - ib2 values have smaller timestamps than ib1 values.
appendRows(ob, ib2)
appendRows(ob, ib1)
return
}
if ib1.nextIdx >= len(ib1.timestamps) {
appendRows(ob, ib2)
return
}
if ib2.nextIdx >= len(ib2.timestamps) {
appendRows(ob, ib1)
return
}
for {
i := ib1.nextIdx
ts2 := ib2.timestamps[ib2.nextIdx]
for i < len(ib1.timestamps) && ib1.timestamps[i] <= ts2 {
i++
}
ob.timestamps = append(ob.timestamps, ib1.timestamps[ib1.nextIdx:i]...)
ob.values = append(ob.values, ib1.values[ib1.nextIdx:i]...)
ib1.nextIdx = i
if ib1.nextIdx >= len(ib1.timestamps) {
appendRows(ob, ib2)
return
}
ib1, ib2 = ib2, ib1
}
}
func appendRows(ob, ib *Block) {
ob.timestamps = append(ob.timestamps, ib.timestamps[ib.nextIdx:]...)
ob.values = append(ob.values, ib.values[ib.nextIdx:]...)
}
func unmarshalAndCalibrateScale(b1, b2 *Block) error {
if err := b1.UnmarshalData(); err != nil {
return err
}
if err := b2.UnmarshalData(); err != nil {
return err
}
scale := decimal.CalibrateScale(b1.values, b1.bh.Scale, b2.values, b2.bh.Scale)
b1.bh.Scale = scale
b2.bh.Scale = scale
return nil
}
func minUint8(a, b uint8) uint8 {
if a < b {
return a
}
return b
}