VictoriaMetrics/lib/logstorage/block_stream_merger.go
Zakhar Bessarab 876bce5a57
lib/logstorage: prevent from panic during background merge (#4969)
* lib/logstorage: prevent from panic during background merge

Fixes panic during background merge when resulting block would contain more columns than maxColumnsPerBlock.
Buffered data will be flushed and replaced by the next block.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/logstorage: clarify field description and comment

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
2023-10-02 19:29:31 +02:00

313 lines
8.8 KiB
Go

package logstorage
import (
"container/heap"
"fmt"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// mustMergeBlockStreams merges bsrs to bsw and updates ph accordingly.
//
// Finalize() is guaranteed to be called on bsrs and bsw before returning from the func.
func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}) {
bsm := getBlockStreamMerger()
bsm.mustInit(bsw, bsrs)
for len(bsm.readersHeap) > 0 {
if needStop(stopCh) {
break
}
bsr := bsm.readersHeap[0]
bsm.mustWriteBlock(&bsr.blockData, bsw)
if bsr.NextBlock() {
heap.Fix(&bsm.readersHeap, 0)
} else {
heap.Pop(&bsm.readersHeap)
}
}
bsm.mustFlushRows()
putBlockStreamMerger(bsm)
bsw.Finalize(ph)
mustCloseBlockStreamReaders(bsrs)
}
// blockStreamMerger merges block streams
type blockStreamMerger struct {
// bsw is the block stream writer to write the merged blocks.
bsw *blockStreamWriter
// bsrs contains the original readers passed to mustInit().
// They are used by ReadersPaths()
bsrs []*blockStreamReader
// readersHeap contains a heap of readers to read blocks to merge.
readersHeap blockStreamReadersHeap
// streamID is the stream ID for the pending data.
streamID streamID
// sbu is the unmarshaler for strings in rows and rowsTmp.
sbu *stringsBlockUnmarshaler
// vd is the decoder for unmarshaled strings.
vd *valuesDecoder
// bd is the pending blockData.
// bd is unpacked into rows when needed.
bd blockData
// rows is pending log entries.
rows rows
// rowsTmp is temporary storage for log entries during merge.
rowsTmp rows
// uncompressedRowsSizeBytes is the current size of uncompressed rows.
//
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
uncompressedRowsSizeBytes uint64
}
func (bsm *blockStreamMerger) reset() {
bsm.bsw = nil
rhs := bsm.readersHeap
for i := range rhs {
rhs[i] = nil
}
bsm.readersHeap = rhs[:0]
bsm.streamID.reset()
bsm.resetRows()
}
func (bsm *blockStreamMerger) resetRows() {
if bsm.sbu != nil {
putStringsBlockUnmarshaler(bsm.sbu)
bsm.sbu = nil
}
if bsm.vd != nil {
putValuesDecoder(bsm.vd)
bsm.vd = nil
}
bsm.bd.reset()
bsm.rows.reset()
bsm.rowsTmp.reset()
bsm.uncompressedRowsSizeBytes = 0
}
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
bsm.reset()
bsm.bsw = bsw
bsm.bsrs = bsrs
rsh := bsm.readersHeap[:0]
for _, bsr := range bsrs {
if bsr.NextBlock() {
rsh = append(rsh, bsr)
}
}
bsm.readersHeap = rsh
heap.Init(&bsm.readersHeap)
}
var mergeStreamsExceedLogger = logger.WithThrottler("mergeStreamsExceed", 10*time.Second)
func (bsm *blockStreamMerger) mergeStreamsLimitWarn(bd *blockData) {
attempted := bsm.rows.uniqueFields + len(bd.columnsData) + len(bd.constColumns)
mergeStreamsExceedLogger.Warnf("cannot perform background merge: too many columns for block after merge: %d, max columns: %d; "+
"check ingestion configuration; see: https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting", attempted, maxColumnsPerBlock)
}
// mustWriteBlock writes bd to bsm
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
bsm.checkNextBlock(bd)
switch {
case !bd.streamID.equal(&bsm.streamID):
// The bd contains another streamID.
// Write the current log entries under the current streamID, then process the bd.
bsm.mustFlushRows()
bsm.streamID = bd.streamID
if bd.uncompressedSizeBytes >= maxUncompressedBlockSize {
// Fast path - write full bd to the output without extracting log entries from it.
bsw.MustWriteBlockData(bd)
} else {
// Slow path - copy the bd to the curr bd.
bsm.bd.copyFrom(bd)
}
case !bsm.rows.hasCapacityFor(bd):
// Cannot merge bd with bsm.rows as too many columns will be created.
// Flush bsm.rows and write bd as is.
bsm.mergeStreamsLimitWarn(bd)
bsm.mustFlushRows()
bsw.MustWriteBlockData(bd)
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
// The bd contains the same streamID and it is full,
// so it can be written next after the current log entries
// without the need to merge the bd with the current log entries.
// Write the current log entries and then the bd.
bsm.mustFlushRows()
bsw.MustWriteBlockData(bd)
default:
// The bd contains the same streamID and it isn't full,
// so it must be merged with the current log entries.
bsm.mustMergeRows(bd)
}
}
// checkNextBlock checks whether the bd can be written next after the current data.
func (bsm *blockStreamMerger) checkNextBlock(bd *blockData) {
if len(bsm.rows.timestamps) > 0 && bsm.bd.rowsCount > 0 {
logger.Panicf("BUG: bsm.bd must be empty when bsm.rows isn't empty! got %d log entries in bsm.bd", bsm.bd.rowsCount)
}
if bd.streamID.less(&bsm.streamID) {
logger.Panicf("FATAL: cannot merge %s: the streamID=%s for the next block is smaller than the streamID=%s for the current block",
bsm.ReadersPaths(), &bd.streamID, &bsm.streamID)
}
if !bd.streamID.equal(&bsm.streamID) {
return
}
// streamID at bd equals streamID at bsm. Check that minTimestamp in bd is bigger or equal to the minTimestmap at bsm.
if bd.rowsCount == 0 {
return
}
nextMinTimestamp := bd.timestampsData.minTimestamp
if len(bsm.rows.timestamps) == 0 {
if bsm.bd.rowsCount == 0 {
return
}
minTimestamp := bsm.bd.timestampsData.minTimestamp
if nextMinTimestamp < minTimestamp {
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for the current block",
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
}
return
}
minTimestamp := bsm.rows.timestamps[0]
if nextMinTimestamp < minTimestamp {
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for log entries for the current block",
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
}
}
// ReadersPaths returns paths for input blockStreamReaders
func (bsm *blockStreamMerger) ReadersPaths() string {
paths := make([]string, len(bsm.bsrs))
for i, bsr := range bsm.bsrs {
paths[i] = bsr.Path()
}
return fmt.Sprintf("[%s]", strings.Join(paths, ","))
}
// mustMergeRows merges the current log entries inside bsm with bd log entries.
func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
if bsm.bd.rowsCount > 0 {
// Unmarshal log entries from bsm.bd
bsm.mustUnmarshalRows(&bsm.bd)
bsm.bd.reset()
}
if !bsm.rows.hasCapacityFor(bd) {
// Cannot merge bd with bsm.rows as too many columns will be created.
// Flush bsm.rows and write bd as is.
bsm.mergeStreamsLimitWarn(bd)
bsm.mustFlushRows()
bsm.bsw.MustWriteBlockData(bd)
return
}
// Unmarshal log entries from bd
rowsLen := len(bsm.rows.timestamps)
bsm.mustUnmarshalRows(bd)
// Merge unmarshaled log entries
timestamps := bsm.rows.timestamps
rows := bsm.rows.rows
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
bsm.rows.uniqueFields = bsm.rowsTmp.uniqueFields
bsm.rowsTmp.reset()
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
bsm.mustFlushRows()
}
}
func (bsm *blockStreamMerger) mustUnmarshalRows(bd *blockData) {
rowsLen := len(bsm.rows.timestamps)
if bsm.sbu == nil {
bsm.sbu = getStringsBlockUnmarshaler()
}
if bsm.vd == nil {
bsm.vd = getValuesDecoder()
}
if err := bd.unmarshalRows(&bsm.rows, bsm.sbu, bsm.vd); err != nil {
logger.Panicf("FATAL: cannot merge %s: cannot unmarshal log entries from blockData: %s", bsm.ReadersPaths(), err)
}
bsm.uncompressedRowsSizeBytes += uncompressedRowsSizeBytes(bsm.rows.rows[rowsLen:])
}
func (bsm *blockStreamMerger) mustFlushRows() {
if len(bsm.rows.timestamps) == 0 {
bsm.bsw.MustWriteBlockData(&bsm.bd)
} else {
bsm.bsw.MustWriteRows(&bsm.streamID, bsm.rows.timestamps, bsm.rows.rows)
}
bsm.resetRows()
}
func getBlockStreamMerger() *blockStreamMerger {
v := blockStreamMergerPool.Get()
if v == nil {
return &blockStreamMerger{}
}
return v.(*blockStreamMerger)
}
func putBlockStreamMerger(bsm *blockStreamMerger) {
bsm.reset()
blockStreamMergerPool.Put(bsm)
}
var blockStreamMergerPool sync.Pool
type blockStreamReadersHeap []*blockStreamReader
func (h *blockStreamReadersHeap) Len() int {
return len(*h)
}
func (h *blockStreamReadersHeap) Less(i, j int) bool {
x := *h
a := &x[i].blockData
b := &x[j].blockData
if !a.streamID.equal(&b.streamID) {
return a.streamID.less(&b.streamID)
}
return a.timestampsData.minTimestamp < b.timestampsData.minTimestamp
}
func (h *blockStreamReadersHeap) Swap(i, j int) {
x := *h
x[i], x[j] = x[j], x[i]
}
func (h *blockStreamReadersHeap) Push(v interface{}) {
bsr := v.(*blockStreamReader)
*h = append(*h, bsr)
}
func (h *blockStreamReadersHeap) Pop() interface{} {
x := *h
bsr := x[len(x)-1]
x[len(x)-1] = nil
*h = x[:len(x)-1]
return bsr
}