mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
316 lines
8.6 KiB
Go
316 lines
8.6 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
)
|
|
|
|
// mustMergeBlockStreams merges bsrs to bsw and updates ph accordingly.
|
|
//
|
|
// Finalize() is guaranteed to be called on bsrs and bsw before returning from the func.
|
|
func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}) {
|
|
bsm := getBlockStreamMerger()
|
|
bsm.mustInit(bsw, bsrs)
|
|
for len(bsm.readersHeap) > 0 {
|
|
if needStop(stopCh) {
|
|
break
|
|
}
|
|
bsr := bsm.readersHeap[0]
|
|
bsm.mustWriteBlock(&bsr.blockData, bsw)
|
|
if bsr.NextBlock() {
|
|
heap.Fix(&bsm.readersHeap, 0)
|
|
} else {
|
|
heap.Pop(&bsm.readersHeap)
|
|
}
|
|
}
|
|
bsm.mustFlushRows()
|
|
putBlockStreamMerger(bsm)
|
|
|
|
bsw.Finalize(ph)
|
|
mustCloseBlockStreamReaders(bsrs)
|
|
}
|
|
|
|
// blockStreamMerger merges block streams
|
|
type blockStreamMerger struct {
|
|
// bsw is the block stream writer to write the merged blocks.
|
|
bsw *blockStreamWriter
|
|
|
|
// bsrs contains the original readers passed to mustInit().
|
|
// They are used by ReadersPaths()
|
|
bsrs []*blockStreamReader
|
|
|
|
// readersHeap contains a heap of readers to read blocks to merge.
|
|
readersHeap blockStreamReadersHeap
|
|
|
|
// streamID is the stream ID for the pending data.
|
|
streamID streamID
|
|
|
|
// sbu is the unmarshaler for strings in rows and rowsTmp.
|
|
sbu *stringsBlockUnmarshaler
|
|
|
|
// vd is the decoder for unmarshaled strings.
|
|
vd *valuesDecoder
|
|
|
|
// bd is the pending blockData.
|
|
// bd is unpacked into rows when needed.
|
|
bd blockData
|
|
|
|
// a holds bd data.
|
|
a arena
|
|
|
|
// rows is pending log entries.
|
|
rows rows
|
|
|
|
// rowsTmp is temporary storage for log entries during merge.
|
|
rowsTmp rows
|
|
|
|
// uncompressedRowsSizeBytes is the current size of uncompressed rows.
|
|
//
|
|
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
|
|
uncompressedRowsSizeBytes uint64
|
|
|
|
// uniqueFields is an upper bound estimation for the number of unique fields in either rows or bd
|
|
//
|
|
// It is used for limiting the number of columns written per block
|
|
uniqueFields int
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) reset() {
|
|
bsm.bsw = nil
|
|
|
|
rhs := bsm.readersHeap
|
|
for i := range rhs {
|
|
rhs[i] = nil
|
|
}
|
|
bsm.readersHeap = rhs[:0]
|
|
|
|
bsm.streamID.reset()
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) resetRows() {
|
|
if bsm.sbu != nil {
|
|
putStringsBlockUnmarshaler(bsm.sbu)
|
|
bsm.sbu = nil
|
|
}
|
|
if bsm.vd != nil {
|
|
putValuesDecoder(bsm.vd)
|
|
bsm.vd = nil
|
|
}
|
|
bsm.bd.reset()
|
|
bsm.a.reset()
|
|
|
|
bsm.rows.reset()
|
|
bsm.rowsTmp.reset()
|
|
|
|
bsm.uncompressedRowsSizeBytes = 0
|
|
bsm.uniqueFields = 0
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
|
|
bsm.reset()
|
|
|
|
bsm.bsw = bsw
|
|
bsm.bsrs = bsrs
|
|
|
|
rsh := bsm.readersHeap[:0]
|
|
for _, bsr := range bsrs {
|
|
if bsr.NextBlock() {
|
|
rsh = append(rsh, bsr)
|
|
}
|
|
}
|
|
bsm.readersHeap = rsh
|
|
heap.Init(&bsm.readersHeap)
|
|
}
|
|
|
|
// mustWriteBlock writes bd to bsm
|
|
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
|
|
bsm.checkNextBlock(bd)
|
|
uniqueFields := len(bd.columnsData) + len(bd.constColumns)
|
|
switch {
|
|
case !bd.streamID.equal(&bsm.streamID):
|
|
// The bd contains another streamID.
|
|
// Write the current log entries under the current streamID, then process the bd.
|
|
bsm.mustFlushRows()
|
|
bsm.streamID = bd.streamID
|
|
if bd.uncompressedSizeBytes >= maxUncompressedBlockSize {
|
|
// Fast path - write full bd to the output without extracting log entries from it.
|
|
bsw.MustWriteBlockData(bd)
|
|
} else {
|
|
// Slow path - copy the bd to the curr bd.
|
|
bsm.a.reset()
|
|
bsm.bd.copyFrom(&bsm.a, bd)
|
|
bsm.uniqueFields = uniqueFields
|
|
}
|
|
case bsm.uniqueFields+uniqueFields >= maxColumnsPerBlock:
|
|
// Cannot merge bd with bsm.rows, because too many columns will be created.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
|
|
//
|
|
// Flush bsm.rows and copy the bd to the curr bd.
|
|
bsm.mustFlushRows()
|
|
if uniqueFields >= maxColumnsPerBlock {
|
|
bsw.MustWriteBlockData(bd)
|
|
} else {
|
|
bsm.a.reset()
|
|
bsm.bd.copyFrom(&bsm.a, bd)
|
|
bsm.uniqueFields = uniqueFields
|
|
}
|
|
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
|
|
// The bd contains the same streamID and it is full,
|
|
// so it can be written next after the current log entries
|
|
// without the need to merge the bd with the current log entries.
|
|
// Write the current log entries and then the bd.
|
|
bsm.mustFlushRows()
|
|
bsw.MustWriteBlockData(bd)
|
|
default:
|
|
// The bd contains the same streamID and it isn't full,
|
|
// so it must be merged with the current log entries.
|
|
bsm.mustMergeRows(bd)
|
|
bsm.uniqueFields += uniqueFields
|
|
}
|
|
}
|
|
|
|
// checkNextBlock checks whether the bd can be written next after the current data.
|
|
func (bsm *blockStreamMerger) checkNextBlock(bd *blockData) {
|
|
if len(bsm.rows.timestamps) > 0 && bsm.bd.rowsCount > 0 {
|
|
logger.Panicf("BUG: bsm.bd must be empty when bsm.rows isn't empty! got %d log entries in bsm.bd", bsm.bd.rowsCount)
|
|
}
|
|
if bd.streamID.less(&bsm.streamID) {
|
|
logger.Panicf("FATAL: cannot merge %s: the streamID=%s for the next block is smaller than the streamID=%s for the current block",
|
|
bsm.ReadersPaths(), &bd.streamID, &bsm.streamID)
|
|
}
|
|
if !bd.streamID.equal(&bsm.streamID) {
|
|
return
|
|
}
|
|
// streamID at bd equals streamID at bsm. Check that minTimestamp in bd is bigger or equal to the minTimestmap at bsm.
|
|
if bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
nextMinTimestamp := bd.timestampsData.minTimestamp
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
if bsm.bd.rowsCount == 0 {
|
|
return
|
|
}
|
|
minTimestamp := bsm.bd.timestampsData.minTimestamp
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
return
|
|
}
|
|
minTimestamp := bsm.rows.timestamps[0]
|
|
if nextMinTimestamp < minTimestamp {
|
|
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for log entries for the current block",
|
|
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
|
|
}
|
|
}
|
|
|
|
// ReadersPaths returns paths for input blockStreamReaders
|
|
func (bsm *blockStreamMerger) ReadersPaths() string {
|
|
paths := make([]string, len(bsm.bsrs))
|
|
for i, bsr := range bsm.bsrs {
|
|
paths[i] = bsr.Path()
|
|
}
|
|
return fmt.Sprintf("[%s]", strings.Join(paths, ","))
|
|
}
|
|
|
|
// mustMergeRows merges the current log entries inside bsm with bd log entries.
|
|
func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
|
|
if bsm.bd.rowsCount > 0 {
|
|
// Unmarshal log entries from bsm.bd
|
|
bsm.mustUnmarshalRows(&bsm.bd)
|
|
bsm.bd.reset()
|
|
bsm.a.reset()
|
|
}
|
|
|
|
// Unmarshal log entries from bd
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
bsm.mustUnmarshalRows(bd)
|
|
|
|
// Merge unmarshaled log entries
|
|
timestamps := bsm.rows.timestamps
|
|
rows := bsm.rows.rows
|
|
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
|
|
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
|
|
bsm.rowsTmp.reset()
|
|
|
|
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
|
|
bsm.mustFlushRows()
|
|
}
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustUnmarshalRows(bd *blockData) {
|
|
rowsLen := len(bsm.rows.timestamps)
|
|
if bsm.sbu == nil {
|
|
bsm.sbu = getStringsBlockUnmarshaler()
|
|
}
|
|
if bsm.vd == nil {
|
|
bsm.vd = getValuesDecoder()
|
|
}
|
|
if err := bd.unmarshalRows(&bsm.rows, bsm.sbu, bsm.vd); err != nil {
|
|
logger.Panicf("FATAL: cannot merge %s: cannot unmarshal log entries from blockData: %s", bsm.ReadersPaths(), err)
|
|
}
|
|
bsm.uncompressedRowsSizeBytes += uncompressedRowsSizeBytes(bsm.rows.rows[rowsLen:])
|
|
}
|
|
|
|
func (bsm *blockStreamMerger) mustFlushRows() {
|
|
if len(bsm.rows.timestamps) == 0 {
|
|
bsm.bsw.MustWriteBlockData(&bsm.bd)
|
|
} else {
|
|
bsm.bsw.MustWriteRows(&bsm.streamID, bsm.rows.timestamps, bsm.rows.rows)
|
|
}
|
|
bsm.resetRows()
|
|
}
|
|
|
|
func getBlockStreamMerger() *blockStreamMerger {
|
|
v := blockStreamMergerPool.Get()
|
|
if v == nil {
|
|
return &blockStreamMerger{}
|
|
}
|
|
return v.(*blockStreamMerger)
|
|
}
|
|
|
|
func putBlockStreamMerger(bsm *blockStreamMerger) {
|
|
bsm.reset()
|
|
blockStreamMergerPool.Put(bsm)
|
|
}
|
|
|
|
var blockStreamMergerPool sync.Pool
|
|
|
|
type blockStreamReadersHeap []*blockStreamReader
|
|
|
|
func (h *blockStreamReadersHeap) Len() int {
|
|
return len(*h)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Less(i, j int) bool {
|
|
x := *h
|
|
a := &x[i].blockData
|
|
b := &x[j].blockData
|
|
if !a.streamID.equal(&b.streamID) {
|
|
return a.streamID.less(&b.streamID)
|
|
}
|
|
return a.timestampsData.minTimestamp < b.timestampsData.minTimestamp
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Swap(i, j int) {
|
|
x := *h
|
|
x[i], x[j] = x[j], x[i]
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Push(v interface{}) {
|
|
bsr := v.(*blockStreamReader)
|
|
*h = append(*h, bsr)
|
|
}
|
|
|
|
func (h *blockStreamReadersHeap) Pop() interface{} {
|
|
x := *h
|
|
bsr := x[len(x)-1]
|
|
x[len(x)-1] = nil
|
|
*h = x[:len(x)-1]
|
|
return bsr
|
|
}
|