mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
lib/logstorage: follow-up for 94627113db
- Move uniqueFields from rows to blockStreamMerger struct. This allows localizing all the references to uniqueFields inside blockStreamMerger.mustWriteBlock(), which should improve readability and maintainability of the code. - Remove logging of the event when blocks cannot be merged because they contain more than maxColumnsPerBlock, since the provided logging didn't provide the solution for the issue with too many columns. I couldn't figure out the proper solution, which could be helpful for end user, so decided to remove the logging until we find the solution. This commit also contains the following additional changes: - It truncates field names longer than 128 chars during logs ingestion. This should prevent from ingesting bogus field names. This also should prevent from too big columnsHeader blocks, which could negatively affect search query performance, since columnsHeader is read on every scan of the corresponding data block. - It limits the maximum length of const column value to 256. Longer values are stored in an ordinary columns. This helps limiting the size of columnsHeader blocks and improving search query performance by avoiding reading too long const columns on every scan of the corresponding data block. - It deduplicates columns with identical names during data ingestion and background merging. Previously it was possible to pass columns with duplicate names to block.mustInitFromRows(), and they were stored as is in the block. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4969
This commit is contained in:
parent
e7b35455bd
commit
7bb5f75a2a
8 changed files with 81 additions and 57 deletions
|
@ -15,7 +15,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
|||
|
||||
* BUGFIX: fix possible panic when no data is written to VictoriaLogs for a long time. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4895). Thanks to @crossoverJie for filing and fixing the issue.
|
||||
* BUGFIX: add `/insert/loky/ready` endpoint, which is used by Promtail for healthchecks. This should remove `unsupported path requested: /insert/loki/ready` warning logs. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762#issuecomment-1690966722).
|
||||
* BUGFIX: prevent panic during background merge when amount of columns in resulting block exceeds max number of columns per block. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762).
|
||||
* BUGFIX: prevent from panic during background merge when the number of columns in the resulting block exceeds the maximum allowed number of columns per block. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762).
|
||||
|
||||
## [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs)
|
||||
|
||||
|
|
|
@ -127,12 +127,15 @@ func (c *column) reset() {
|
|||
c.values = values[:0]
|
||||
}
|
||||
|
||||
func (c *column) areSameValues() bool {
|
||||
func (c *column) canStoreInConstColumn() bool {
|
||||
values := c.values
|
||||
if len(values) < 2 {
|
||||
if len(values) == 0 {
|
||||
return true
|
||||
}
|
||||
value := values[0]
|
||||
if len(value) > maxConstColumnValueSize {
|
||||
return false
|
||||
}
|
||||
for _, v := range values[1:] {
|
||||
if value != v {
|
||||
return false
|
||||
|
@ -244,7 +247,7 @@ func (b *block) mustInitFromRows(rows [][]Field) {
|
|||
fields := rows[0]
|
||||
for i := range fields {
|
||||
f := &fields[i]
|
||||
if areSameValuesForColumn(rows, i) {
|
||||
if canStoreInConstColumn(rows, i) {
|
||||
cc := b.extendConstColumns()
|
||||
cc.Name = f.Name
|
||||
cc.Value = f.Value
|
||||
|
@ -294,7 +297,7 @@ func (b *block) mustInitFromRows(rows [][]Field) {
|
|||
// Detect const columns
|
||||
for i := len(cs) - 1; i >= 0; i-- {
|
||||
c := &cs[i]
|
||||
if !c.areSameValues() {
|
||||
if !c.canStoreInConstColumn() {
|
||||
continue
|
||||
}
|
||||
cc := b.extendConstColumns()
|
||||
|
@ -314,11 +317,14 @@ func swapColumns(a, b *column) {
|
|||
*a, *b = *b, *a
|
||||
}
|
||||
|
||||
func areSameValuesForColumn(rows [][]Field, colIdx int) bool {
|
||||
if len(rows) < 2 {
|
||||
func canStoreInConstColumn(rows [][]Field, colIdx int) bool {
|
||||
if len(rows) == 0 {
|
||||
return true
|
||||
}
|
||||
value := rows[0][colIdx].Value
|
||||
if len(value) > maxConstColumnValueSize {
|
||||
return false
|
||||
}
|
||||
rows = rows[1:]
|
||||
for i := range rows {
|
||||
if value != rows[i][colIdx].Value {
|
||||
|
@ -471,8 +477,8 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
|
|||
longTermBufPool.Put(bb)
|
||||
}
|
||||
|
||||
// appendRows appends log entries from b to dst.
|
||||
func (b *block) appendRows(dst *rows) {
|
||||
// appendRowsTo appends log entries from b to dst.
|
||||
func (b *block) appendRowsTo(dst *rows) {
|
||||
// copy timestamps
|
||||
dst.timestamps = append(dst.timestamps, b.timestamps...)
|
||||
|
||||
|
@ -505,7 +511,6 @@ func (b *block) appendRows(dst *rows) {
|
|||
dst.rows = append(dst.rows, fieldsBuf[fieldsLen:])
|
||||
}
|
||||
dst.fieldsBuf = fieldsBuf
|
||||
dst.uniqueFields += len(ccs) + len(cs)
|
||||
}
|
||||
|
||||
func areSameFieldsInRows(rows [][]Field) bool {
|
||||
|
@ -513,6 +518,19 @@ func areSameFieldsInRows(rows [][]Field) bool {
|
|||
return true
|
||||
}
|
||||
fields := rows[0]
|
||||
|
||||
// Verify that all the field names are unique
|
||||
m := make(map[string]struct{}, len(fields))
|
||||
for i := range fields {
|
||||
f := &fields[i]
|
||||
if _, ok := m[f.Name]; ok {
|
||||
// Field name isn't unique
|
||||
return false
|
||||
}
|
||||
m[f.Name] = struct{}{}
|
||||
}
|
||||
|
||||
// Verify that all the fields are the same across rows
|
||||
rows = rows[1:]
|
||||
for i := range rows {
|
||||
leFields := rows[i]
|
||||
|
|
|
@ -24,10 +24,10 @@ type blockData struct {
|
|||
// timestampsData contains the encoded timestamps data for the block
|
||||
timestampsData timestampsData
|
||||
|
||||
// columnsData contains packed per-column data.
|
||||
// columnsData contains packed per-column data
|
||||
columnsData []columnData
|
||||
|
||||
// constColumns contains data for const columns across the block.
|
||||
// constColumns contains data for const columns across the block
|
||||
constColumns []Field
|
||||
|
||||
// a is used for storing byte slices for timestamps and columns.
|
||||
|
@ -97,7 +97,7 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *
|
|||
if err := b.InitFromBlockData(bd, sbu, vd); err != nil {
|
||||
return err
|
||||
}
|
||||
b.appendRows(dst)
|
||||
b.appendRowsTo(dst)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
@ -70,6 +69,11 @@ type blockStreamMerger struct {
|
|||
//
|
||||
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
|
||||
uncompressedRowsSizeBytes uint64
|
||||
|
||||
// uniqueFields is an upper bound estimation for the number of unique fields in either rows or bd
|
||||
//
|
||||
// It is used for limiting the number of columns written per block
|
||||
uniqueFields int
|
||||
}
|
||||
|
||||
func (bsm *blockStreamMerger) reset() {
|
||||
|
@ -100,6 +104,7 @@ func (bsm *blockStreamMerger) resetRows() {
|
|||
bsm.rowsTmp.reset()
|
||||
|
||||
bsm.uncompressedRowsSizeBytes = 0
|
||||
bsm.uniqueFields = 0
|
||||
}
|
||||
|
||||
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
|
||||
|
@ -118,17 +123,10 @@ func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStre
|
|||
heap.Init(&bsm.readersHeap)
|
||||
}
|
||||
|
||||
var mergeStreamsExceedLogger = logger.WithThrottler("mergeStreamsExceed", 10*time.Second)
|
||||
|
||||
func (bsm *blockStreamMerger) mergeStreamsLimitWarn(bd *blockData) {
|
||||
attempted := bsm.rows.uniqueFields + len(bd.columnsData) + len(bd.constColumns)
|
||||
mergeStreamsExceedLogger.Warnf("cannot perform background merge: too many columns for block after merge: %d, max columns: %d; "+
|
||||
"check ingestion configuration; see: https://docs.victoriametrics.com/VictoriaLogs/data-ingestion/#troubleshooting", attempted, maxColumnsPerBlock)
|
||||
}
|
||||
|
||||
// mustWriteBlock writes bd to bsm
|
||||
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
|
||||
bsm.checkNextBlock(bd)
|
||||
uniqueFields := len(bd.columnsData) + len(bd.constColumns)
|
||||
switch {
|
||||
case !bd.streamID.equal(&bsm.streamID):
|
||||
// The bd contains another streamID.
|
||||
|
@ -141,13 +139,20 @@ func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWrit
|
|||
} else {
|
||||
// Slow path - copy the bd to the curr bd.
|
||||
bsm.bd.copyFrom(bd)
|
||||
bsm.uniqueFields = uniqueFields
|
||||
}
|
||||
case !bsm.rows.hasCapacityFor(bd):
|
||||
// Cannot merge bd with bsm.rows as too many columns will be created.
|
||||
// Flush bsm.rows and write bd as is.
|
||||
bsm.mergeStreamsLimitWarn(bd)
|
||||
case bsm.uniqueFields+uniqueFields >= maxColumnsPerBlock:
|
||||
// Cannot merge bd with bsm.rows, because too many columns will be created.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4762
|
||||
//
|
||||
// Flush bsm.rows and copy the bd to the curr bd.
|
||||
bsm.mustFlushRows()
|
||||
bsw.MustWriteBlockData(bd)
|
||||
if uniqueFields >= maxColumnsPerBlock {
|
||||
bsw.MustWriteBlockData(bd)
|
||||
} else {
|
||||
bsm.bd.copyFrom(bd)
|
||||
bsm.uniqueFields = uniqueFields
|
||||
}
|
||||
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
|
||||
// The bd contains the same streamID and it is full,
|
||||
// so it can be written next after the current log entries
|
||||
|
@ -159,6 +164,7 @@ func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWrit
|
|||
// The bd contains the same streamID and it isn't full,
|
||||
// so it must be merged with the current log entries.
|
||||
bsm.mustMergeRows(bd)
|
||||
bsm.uniqueFields += uniqueFields
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -214,15 +220,6 @@ func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
|
|||
bsm.bd.reset()
|
||||
}
|
||||
|
||||
if !bsm.rows.hasCapacityFor(bd) {
|
||||
// Cannot merge bd with bsm.rows as too many columns will be created.
|
||||
// Flush bsm.rows and write bd as is.
|
||||
bsm.mergeStreamsLimitWarn(bd)
|
||||
bsm.mustFlushRows()
|
||||
bsm.bsw.MustWriteBlockData(bd)
|
||||
return
|
||||
}
|
||||
|
||||
// Unmarshal log entries from bd
|
||||
rowsLen := len(bsm.rows.timestamps)
|
||||
bsm.mustUnmarshalRows(bd)
|
||||
|
@ -232,7 +229,6 @@ func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
|
|||
rows := bsm.rows.rows
|
||||
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
|
||||
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
|
||||
bsm.rows.uniqueFields = bsm.rowsTmp.uniqueFields
|
||||
bsm.rowsTmp.reset()
|
||||
|
||||
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
|
||||
|
|
|
@ -16,6 +16,17 @@ const maxRowsPerBlock = 8 * 1024 * 1024
|
|||
// maxColumnsPerBlock is the maximum number of columns per block.
|
||||
const maxColumnsPerBlock = 10000
|
||||
|
||||
// MaxFieldNameSize is the maximum size in bytes for field name.
|
||||
//
|
||||
// Longer field names are truncated during data ingestion to MaxFieldNameSize length.
|
||||
const MaxFieldNameSize = 128
|
||||
|
||||
// maxConstColumnValueSize is the maximum size in bytes for const column value.
|
||||
//
|
||||
// Const column values are stored in columnsHeader, which is read every time the corresponding block is scanned during search queries.
|
||||
// So it is better to store bigger values in regular columns in order to speed up search speed.
|
||||
const maxConstColumnValueSize = 256
|
||||
|
||||
// maxIndexBlockSize is the maximum size of the block with blockHeader entries (aka indexBlock)
|
||||
const maxIndexBlockSize = 8 * 1024 * 1024
|
||||
|
||||
|
@ -30,3 +41,14 @@ const maxBloomFilterBlockSize = 8 * 1024 * 1024
|
|||
|
||||
// maxColumnsHeaderSize is the maximum size of columnsHeader block
|
||||
const maxColumnsHeaderSize = 8 * 1024 * 1024
|
||||
|
||||
// maxDictSizeBytes is the maximum length of all the keys in the valuesDict.
|
||||
//
|
||||
// Dict is stored in columnsHeader, which is read every time the corresponding block is scanned during search qieries.
|
||||
// So it is better to store bigger values in regular columns in order to speed up search speed.
|
||||
const maxDictSizeBytes = 256
|
||||
|
||||
// maxDictLen is the maximum number of entries in the valuesDict.
|
||||
//
|
||||
// it shouldn't exceed 255, since the dict len is marshaled into a single byte.
|
||||
const maxDictLen = 8
|
||||
|
|
|
@ -135,6 +135,8 @@ func (lr *LogRows) NeedFlush() bool {
|
|||
//
|
||||
// It is OK to modify the args after returning from the function,
|
||||
// since lr copies all the args to internal data.
|
||||
//
|
||||
// field names longer than MaxFieldNameSize are automatically truncated to MaxFieldNameSize length.
|
||||
func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) {
|
||||
// Compose StreamTags from fields according to lr.streamFields
|
||||
sfs := lr.streamFields
|
||||
|
@ -190,8 +192,12 @@ func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field
|
|||
dstField := &fb[len(fb)-1]
|
||||
|
||||
bufLen = len(buf)
|
||||
if f.Name != "_msg" {
|
||||
buf = append(buf, f.Name...)
|
||||
fieldName := f.Name
|
||||
if len(fieldName) > MaxFieldNameSize {
|
||||
fieldName = fieldName[:MaxFieldNameSize]
|
||||
}
|
||||
if fieldName != "_msg" {
|
||||
buf = append(buf, fieldName...)
|
||||
}
|
||||
dstField.Name = bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
|
||||
|
|
|
@ -65,10 +65,6 @@ func (f *Field) unmarshal(src []byte) ([]byte, error) {
|
|||
type rows struct {
|
||||
fieldsBuf []Field
|
||||
|
||||
// uniqueFields is the maximum estimated number of unique fields which are currently stored in fieldsBuf.
|
||||
// it is used to perform worst case estimation when merging rows.
|
||||
uniqueFields int
|
||||
|
||||
timestamps []int64
|
||||
|
||||
rows [][]Field
|
||||
|
@ -125,9 +121,3 @@ func (rs *rows) mergeRows(timestampsA, timestampsB []int64, fieldsA, fieldsB [][
|
|||
rs.appendRows(timestampsA, fieldsA)
|
||||
}
|
||||
}
|
||||
|
||||
// hasCapacityFor returns true if merging bd with rs won't create too many columns
|
||||
// for creating a new block.
|
||||
func (rs *rows) hasCapacityFor(bd *blockData) bool {
|
||||
return rs.uniqueFields+len(bd.columnsData)+len(bd.constColumns) < maxColumnsPerBlock
|
||||
}
|
||||
|
|
|
@ -732,11 +732,3 @@ func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) {
|
|||
}
|
||||
return src, nil
|
||||
}
|
||||
|
||||
// maxDictSizeBytes is the maximum length of all the keys in the valuesDict
|
||||
const maxDictSizeBytes = 256
|
||||
|
||||
// maxDictLen is the maximum number of entries in the valuesDict.
|
||||
//
|
||||
// it shouldn't exceed 255, since the dict len is marshaled into a single byte.
|
||||
const maxDictLen = 8
|
||||
|
|
Loading…
Reference in a new issue