VictoriaMetrics/lib/logstorage/block_stream_merger.go
2023-07-06 17:30:05 -07:00

288 lines
7.8 KiB
Go

package logstorage
import (
"container/heap"
"fmt"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// mustMergeBlockStreams merges bsrs to bsw and updates ph accordingly.
//
// Finalize() is guaranteed to be called on bsrs and bsw before returning from the func.
func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}) {
bsm := getBlockStreamMerger()
bsm.mustInit(bsw, bsrs)
for len(bsm.readersHeap) > 0 {
if needStop(stopCh) {
break
}
bsr := bsm.readersHeap[0]
bsm.mustWriteBlock(&bsr.blockData, bsw)
if bsr.NextBlock() {
heap.Fix(&bsm.readersHeap, 0)
} else {
heap.Pop(&bsm.readersHeap)
}
}
bsm.mustFlushRows()
putBlockStreamMerger(bsm)
bsw.Finalize(ph)
mustCloseBlockStreamReaders(bsrs)
}
// blockStreamMerger merges block streams
type blockStreamMerger struct {
// bsw is the block stream writer to write the merged blocks.
bsw *blockStreamWriter
// bsrs contains the original readers passed to mustInit().
// They are used by ReadersPaths()
bsrs []*blockStreamReader
// readersHeap contains a heap of readers to read blocks to merge.
readersHeap blockStreamReadersHeap
// streamID is the stream ID for the pending data.
streamID streamID
// sbu is the unmarshaler for strings in rows and rowsTmp.
sbu *stringsBlockUnmarshaler
// vd is the decoder for unmarshaled strings.
vd *valuesDecoder
// bd is the pending blockData.
// bd is unpacked into rows when needed.
bd blockData
// rows is pending log entries.
rows rows
// rowsTmp is temporary storage for log entries during merge.
rowsTmp rows
// uncompressedRowsSizeBytes is the current size of uncompressed rows.
//
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
uncompressedRowsSizeBytes uint64
}
func (bsm *blockStreamMerger) reset() {
bsm.bsw = nil
rhs := bsm.readersHeap
for i := range rhs {
rhs[i] = nil
}
bsm.readersHeap = rhs[:0]
bsm.streamID.reset()
bsm.resetRows()
}
func (bsm *blockStreamMerger) resetRows() {
if bsm.sbu != nil {
putStringsBlockUnmarshaler(bsm.sbu)
bsm.sbu = nil
}
if bsm.vd != nil {
putValuesDecoder(bsm.vd)
bsm.vd = nil
}
bsm.bd.reset()
bsm.rows.reset()
bsm.rowsTmp.reset()
bsm.uncompressedRowsSizeBytes = 0
}
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
bsm.reset()
bsm.bsw = bsw
bsm.bsrs = bsrs
rsh := bsm.readersHeap[:0]
for _, bsr := range bsrs {
if bsr.NextBlock() {
rsh = append(rsh, bsr)
}
}
bsm.readersHeap = rsh
heap.Init(&bsm.readersHeap)
}
// mustWriteBlock writes bd to bsm
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
bsm.checkNextBlock(bd)
switch {
case !bd.streamID.equal(&bsm.streamID):
// The bd contains another streamID.
// Write the current log entries under the current streamID, then process the bd.
bsm.mustFlushRows()
bsm.streamID = bd.streamID
if bd.uncompressedSizeBytes >= maxUncompressedBlockSize {
// Fast path - write full bd to the output without extracting log entries from it.
bsw.MustWriteBlockData(bd)
} else {
// Slow path - copy the bd to the curr bd.
bsm.bd.copyFrom(bd)
}
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
// The bd contains the same streamID and it is full,
// so it can be written next after the current log entries
// without the need to merge the bd with the current log entries.
// Write the current log entries and then the bd.
bsm.mustFlushRows()
bsw.MustWriteBlockData(bd)
default:
// The bd contains the same streamID and it isn't full,
// so it must be merged with the current log entries.
bsm.mustMergeRows(bd)
}
}
// checkNextBlock checks whether the bd can be written next after the current data.
func (bsm *blockStreamMerger) checkNextBlock(bd *blockData) {
if len(bsm.rows.timestamps) > 0 && bsm.bd.rowsCount > 0 {
logger.Panicf("BUG: bsm.bd must be empty when bsm.rows isn't empty! got %d log entries in bsm.bd", bsm.bd.rowsCount)
}
if bd.streamID.less(&bsm.streamID) {
logger.Panicf("FATAL: cannot merge %s: the streamID=%s for the next block is smaller than the streamID=%s for the current block",
bsm.ReadersPaths(), &bd.streamID, &bsm.streamID)
}
if !bd.streamID.equal(&bsm.streamID) {
return
}
// streamID at bd equals streamID at bsm. Check that minTimestamp in bd is bigger or equal to the minTimestmap at bsm.
if bd.rowsCount == 0 {
return
}
nextMinTimestamp := bd.timestampsData.minTimestamp
if len(bsm.rows.timestamps) == 0 {
if bsm.bd.rowsCount == 0 {
return
}
minTimestamp := bsm.bd.timestampsData.minTimestamp
if nextMinTimestamp < minTimestamp {
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for the current block",
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
}
return
}
minTimestamp := bsm.rows.timestamps[0]
if nextMinTimestamp < minTimestamp {
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for log entries for the current block",
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
}
}
// ReadersPaths returns paths for input blockStreamReaders
func (bsm *blockStreamMerger) ReadersPaths() string {
paths := make([]string, len(bsm.bsrs))
for i, bsr := range bsm.bsrs {
paths[i] = bsr.Path()
}
return fmt.Sprintf("[%s]", strings.Join(paths, ","))
}
// mustMergeRows merges the current log entries inside bsm with bd log entries.
func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
if bsm.bd.rowsCount > 0 {
// Unmarshal log entries from bsm.bd
bsm.mustUnmarshalRows(&bsm.bd)
bsm.bd.reset()
}
// Unmarshal log entries from bd
rowsLen := len(bsm.rows.timestamps)
bsm.mustUnmarshalRows(bd)
// Merge unmarshaled log entries
timestamps := bsm.rows.timestamps
rows := bsm.rows.rows
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
bsm.rowsTmp.reset()
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
bsm.mustFlushRows()
}
}
func (bsm *blockStreamMerger) mustUnmarshalRows(bd *blockData) {
rowsLen := len(bsm.rows.timestamps)
if bsm.sbu == nil {
bsm.sbu = getStringsBlockUnmarshaler()
}
if bsm.vd == nil {
bsm.vd = getValuesDecoder()
}
if err := bd.unmarshalRows(&bsm.rows, bsm.sbu, bsm.vd); err != nil {
logger.Panicf("FATAL: cannot merge %s: cannot unmarshal log entries from blockData: %s", bsm.ReadersPaths(), err)
}
bsm.uncompressedRowsSizeBytes += uncompressedRowsSizeBytes(bsm.rows.rows[rowsLen:])
}
func (bsm *blockStreamMerger) mustFlushRows() {
if len(bsm.rows.timestamps) == 0 {
bsm.bsw.MustWriteBlockData(&bsm.bd)
} else {
bsm.bsw.MustWriteRows(&bsm.streamID, bsm.rows.timestamps, bsm.rows.rows)
}
bsm.resetRows()
}
func getBlockStreamMerger() *blockStreamMerger {
v := blockStreamMergerPool.Get()
if v == nil {
return &blockStreamMerger{}
}
return v.(*blockStreamMerger)
}
func putBlockStreamMerger(bsm *blockStreamMerger) {
bsm.reset()
blockStreamMergerPool.Put(bsm)
}
var blockStreamMergerPool sync.Pool
type blockStreamReadersHeap []*blockStreamReader
func (h *blockStreamReadersHeap) Len() int {
return len(*h)
}
func (h *blockStreamReadersHeap) Less(i, j int) bool {
x := *h
a := &x[i].blockData
b := &x[j].blockData
if !a.streamID.equal(&b.streamID) {
return a.streamID.less(&b.streamID)
}
return a.timestampsData.minTimestamp < b.timestampsData.minTimestamp
}
func (h *blockStreamReadersHeap) Swap(i, j int) {
x := *h
x[i], x[j] = x[j], x[i]
}
func (h *blockStreamReadersHeap) Push(v interface{}) {
bsr := v.(*blockStreamReader)
*h = append(*h, bsr)
}
func (h *blockStreamReadersHeap) Pop() interface{} {
x := *h
bsr := x[len(x)-1]
x[len(x)-1] = nil
*h = x[:len(x)-1]
return bsr
}