mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
dba218a8ce
Blocks outside the configured retention are eventually deleted during background merge. But such blocks may reside in the storage for long time until background merge. Previously VictoriaMetrics could spend additional CPU time on processing such blocks during search queries. Now these blocks are skipped.
217 lines
6.4 KiB
Go
217 lines
6.4 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// mergeBlockStreams merges bsrs into bsw and updates ph.
|
|
//
|
|
// mergeBlockStreams returns immediately if stopCh is closed.
|
|
//
|
|
// rowsMerged is atomically updated with the number of merged rows during the merge.
|
|
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, s *Storage, retentionDeadline int64,
|
|
rowsMerged, rowsDeleted *uint64) error {
|
|
ph.Reset()
|
|
|
|
bsm := bsmPool.Get().(*blockStreamMerger)
|
|
bsm.Init(bsrs, retentionDeadline)
|
|
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, s, rowsMerged, rowsDeleted)
|
|
bsm.reset()
|
|
bsmPool.Put(bsm)
|
|
bsw.MustClose()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("cannot merge %d streams: %s: %w", len(bsrs), bsrs, err)
|
|
}
|
|
|
|
var bsmPool = &sync.Pool{
|
|
New: func() interface{} {
|
|
return &blockStreamMerger{}
|
|
},
|
|
}
|
|
|
|
var errForciblyStopped = fmt.Errorf("forcibly stopped")
|
|
|
|
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, s *Storage, rowsMerged, rowsDeleted *uint64) error {
|
|
dmis := s.getDeletedMetricIDs()
|
|
pendingBlockIsEmpty := true
|
|
pendingBlock := getBlock()
|
|
defer putBlock(pendingBlock)
|
|
tmpBlock := getBlock()
|
|
defer putBlock(tmpBlock)
|
|
for bsm.NextBlock() {
|
|
select {
|
|
case <-stopCh:
|
|
return errForciblyStopped
|
|
default:
|
|
}
|
|
b := bsm.Block
|
|
if dmis.Has(b.bh.TSID.MetricID) {
|
|
// Skip blocks for deleted metrics.
|
|
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
|
|
continue
|
|
}
|
|
retentionDeadline := bsm.getRetentionDeadline(&b.bh)
|
|
if b.bh.MaxTimestamp < retentionDeadline {
|
|
// Skip blocks out of the given retention.
|
|
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
|
|
continue
|
|
}
|
|
if pendingBlockIsEmpty {
|
|
// Load the next block if pendingBlock is empty.
|
|
pendingBlock.CopyFrom(b)
|
|
pendingBlockIsEmpty = false
|
|
continue
|
|
}
|
|
|
|
// Verify whether pendingBlock may be merged with b (the current block).
|
|
if pendingBlock.bh.TSID.MetricID != b.bh.TSID.MetricID {
|
|
// Fast path - blocks belong to distinct time series.
|
|
// Write the pendingBlock and then deal with b.
|
|
if b.bh.TSID.Less(&pendingBlock.bh.TSID) {
|
|
logger.Panicf("BUG: the next TSID=%+v is smaller than the current TSID=%+v", &b.bh.TSID, &pendingBlock.bh.TSID)
|
|
}
|
|
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
|
pendingBlock.CopyFrom(b)
|
|
continue
|
|
}
|
|
if pendingBlock.tooBig() && pendingBlock.bh.MaxTimestamp <= b.bh.MinTimestamp {
|
|
// Fast path - pendingBlock is too big and it doesn't overlap with b.
|
|
// Write the pendingBlock and then deal with b.
|
|
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
|
pendingBlock.CopyFrom(b)
|
|
continue
|
|
}
|
|
|
|
// Slow path - pendingBlock and b belong to the same time series,
|
|
// so they must be merged.
|
|
if err := unmarshalAndCalibrateScale(pendingBlock, b); err != nil {
|
|
return fmt.Errorf("cannot unmarshal and calibrate scale for blocks to be merged: %w", err)
|
|
}
|
|
tmpBlock.Reset()
|
|
tmpBlock.bh.TSID = b.bh.TSID
|
|
tmpBlock.bh.Scale = b.bh.Scale
|
|
tmpBlock.bh.PrecisionBits = minUint8(pendingBlock.bh.PrecisionBits, b.bh.PrecisionBits)
|
|
mergeBlocks(tmpBlock, pendingBlock, b, retentionDeadline, rowsDeleted)
|
|
if len(tmpBlock.timestamps) <= maxRowsPerBlock {
|
|
// More entries may be added to tmpBlock. Swap it with pendingBlock,
|
|
// so more entries may be added to pendingBlock on the next iteration.
|
|
if len(tmpBlock.timestamps) > 0 {
|
|
tmpBlock.fixupTimestamps()
|
|
} else {
|
|
pendingBlockIsEmpty = true
|
|
}
|
|
pendingBlock, tmpBlock = tmpBlock, pendingBlock
|
|
continue
|
|
}
|
|
|
|
// Write the first maxRowsPerBlock of tmpBlock.timestamps to bsw,
|
|
// leave the rest in pendingBlock.
|
|
tmpBlock.nextIdx = maxRowsPerBlock
|
|
pendingBlock.CopyFrom(tmpBlock)
|
|
pendingBlock.fixupTimestamps()
|
|
tmpBlock.nextIdx = 0
|
|
tmpBlock.timestamps = tmpBlock.timestamps[:maxRowsPerBlock]
|
|
tmpBlock.values = tmpBlock.values[:maxRowsPerBlock]
|
|
tmpBlock.fixupTimestamps()
|
|
bsw.WriteExternalBlock(tmpBlock, ph, rowsMerged)
|
|
}
|
|
if err := bsm.Error(); err != nil {
|
|
return fmt.Errorf("cannot read block to be merged: %w", err)
|
|
}
|
|
if !pendingBlockIsEmpty {
|
|
bsw.WriteExternalBlock(pendingBlock, ph, rowsMerged)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// mergeBlocks merges ib1 and ib2 to ob.
|
|
func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint64) {
|
|
ib1.assertMergeable(ib2)
|
|
ib1.assertUnmarshaled()
|
|
ib2.assertUnmarshaled()
|
|
|
|
skipSamplesOutsideRetention(ib1, retentionDeadline, rowsDeleted)
|
|
skipSamplesOutsideRetention(ib2, retentionDeadline, rowsDeleted)
|
|
|
|
if ib1.bh.MaxTimestamp < ib2.bh.MinTimestamp {
|
|
// Fast path - ib1 values have smaller timestamps than ib2 values.
|
|
appendRows(ob, ib1)
|
|
appendRows(ob, ib2)
|
|
return
|
|
}
|
|
if ib2.bh.MaxTimestamp < ib1.bh.MinTimestamp {
|
|
// Fast path - ib2 values have smaller timestamps than ib1 values.
|
|
appendRows(ob, ib2)
|
|
appendRows(ob, ib1)
|
|
return
|
|
}
|
|
if ib1.nextIdx >= len(ib1.timestamps) {
|
|
appendRows(ob, ib2)
|
|
return
|
|
}
|
|
if ib2.nextIdx >= len(ib2.timestamps) {
|
|
appendRows(ob, ib1)
|
|
return
|
|
}
|
|
for {
|
|
i := ib1.nextIdx
|
|
ts2 := ib2.timestamps[ib2.nextIdx]
|
|
for i < len(ib1.timestamps) && ib1.timestamps[i] <= ts2 {
|
|
i++
|
|
}
|
|
ob.timestamps = append(ob.timestamps, ib1.timestamps[ib1.nextIdx:i]...)
|
|
ob.values = append(ob.values, ib1.values[ib1.nextIdx:i]...)
|
|
ib1.nextIdx = i
|
|
if ib1.nextIdx >= len(ib1.timestamps) {
|
|
appendRows(ob, ib2)
|
|
return
|
|
}
|
|
ib1, ib2 = ib2, ib1
|
|
}
|
|
}
|
|
|
|
func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) {
|
|
timestamps := b.timestamps
|
|
nextIdx := b.nextIdx
|
|
nextIdxOrig := nextIdx
|
|
for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline {
|
|
nextIdx++
|
|
}
|
|
if n := nextIdx - nextIdxOrig; n > 0 {
|
|
atomic.AddUint64(rowsDeleted, uint64(n))
|
|
b.nextIdx = nextIdx
|
|
}
|
|
}
|
|
|
|
func appendRows(ob, ib *Block) {
|
|
ob.timestamps = append(ob.timestamps, ib.timestamps[ib.nextIdx:]...)
|
|
ob.values = append(ob.values, ib.values[ib.nextIdx:]...)
|
|
}
|
|
|
|
func unmarshalAndCalibrateScale(b1, b2 *Block) error {
|
|
if err := b1.UnmarshalData(); err != nil {
|
|
return err
|
|
}
|
|
if err := b2.UnmarshalData(); err != nil {
|
|
return err
|
|
}
|
|
|
|
scale := decimal.CalibrateScale(b1.values[b1.nextIdx:], b1.bh.Scale, b2.values[b2.nextIdx:], b2.bh.Scale)
|
|
b1.bh.Scale = scale
|
|
b2.bh.Scale = scale
|
|
return nil
|
|
}
|
|
|
|
func minUint8(a, b uint8) uint8 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|