lib/logstorage: prevent from panic during background merge (#4969)

* lib/logstorage: prevent from panic during background merge

Fixes panic during background merge when resulting block would contain more columns than maxColumnsPerBlock.
Buffered data will be flushed and replaced by the next block.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* lib/logstorage: clarify field description and comment

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-09-29 13:58:20 +04:00 committed by GitHub
parent 8a23d08c21
commit 94627113db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 0 deletions

View file

@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* BUGFIX: fix possible panic when no data is written to VictoriaLogs for a long time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4895). Thanks to @crossoverJie for filing and fixing the issue.
* BUGFIX: add `/insert/loky/ready` endpoint, which is used by Promtail for healthchecks. This should remove `unsupported path requested: /insert/loki/ready` warning logs. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762#issuecomment-1690966722).
* BUGFIX: prevent panic during background merge when amount of columns in resulting block exceeds max number of columns per block. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762).
## [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs)

View file

@ -505,6 +505,7 @@ func (b *block) appendRows(dst *rows) {
dst.rows = append(dst.rows, fieldsBuf[fieldsLen:])
}
dst.fieldsBuf = fieldsBuf
dst.uniqueFields += len(ccs) + len(cs)
}
func areSameFieldsInRows(rows [][]Field) bool {

View file

@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -117,6 +118,14 @@ func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStre
heap.Init(&bsm.readersHeap)
}
var mergeStreamsExceedLogger = logger.WithThrottler("mergeStreamsExceed", 10*time.Second)
func (bsm *blockStreamMerger) mergeStreamsLimitWarn(bd *blockData) {
attempted := bsm.rows.uniqueFields + len(bd.columnsData) + len(bd.constColumns)
mergeStreamsExceedLogger.Warnf("cannot perform background merge: too many columns for block after merge: %d, max columns: %d; "+
"check ingestion configuration; see: https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting", attempted, maxColumnsPerBlock)
}
// mustWriteBlock writes bd to bsm
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
bsm.checkNextBlock(bd)
@ -133,6 +142,12 @@ func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWrit
// Slow path - copy the bd to the curr bd.
bsm.bd.copyFrom(bd)
}
case !bsm.rows.hasCapacityFor(bd):
// Cannot merge bd with bsm.rows as too many columns will be created.
// Flush bsm.rows and write bd as is.
bsm.mergeStreamsLimitWarn(bd)
bsm.mustFlushRows()
bsw.MustWriteBlockData(bd)
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
// The bd contains the same streamID and it is full,
// so it can be written next after the current log entries
@ -199,6 +214,15 @@ func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
bsm.bd.reset()
}
if !bsm.rows.hasCapacityFor(bd) {
// Cannot merge bd with bsm.rows as too many columns will be created.
// Flush bsm.rows and write bd as is.
bsm.mergeStreamsLimitWarn(bd)
bsm.mustFlushRows()
bsm.bsw.MustWriteBlockData(bd)
return
}
// Unmarshal log entries from bd
rowsLen := len(bsm.rows.timestamps)
bsm.mustUnmarshalRows(bd)
@ -208,6 +232,7 @@ func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
rows := bsm.rows.rows
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
bsm.rows.uniqueFields = bsm.rowsTmp.uniqueFields
bsm.rowsTmp.reset()
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {

View file

@ -65,6 +65,10 @@ func (f *Field) unmarshal(src []byte) ([]byte, error) {
type rows struct {
fieldsBuf []Field
// uniqueFields is the maximum estimated number of unique fields which are currently stored in fieldsBuf.
// it is used to perform worst case estimation when merging rows.
uniqueFields int
timestamps []int64
rows [][]Field
@ -121,3 +125,9 @@ func (rs *rows) mergeRows(timestampsA, timestampsB []int64, fieldsA, fieldsB [][
rs.appendRows(timestampsA, fieldsA)
}
}
// hasCapacityFor returns true if merging bd with rs won't create too many columns
// for creating a new block.
func (rs *rows) hasCapacityFor(bd *blockData) bool {
return rs.uniqueFields+len(bd.columnsData)+len(bd.constColumns) < maxColumnsPerBlock
}