mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
lib/logstorage: fixes panic at Block.MustInitFromRows (#7695)
Previously Block columns wasn't properly limited by maxColumnsPerBlock. And it was possible a case, when more columns per block added than expected. For example, if ingested log stream has many unuqie fields and it's sum exceed maxColumnsPerBlock. We only enforce fieldsPerBlock limit during row parsing, which limits isn't enough to mitigate this issue. Also it would be very expensive to apply maxColumnsPerBlock limit during ingestion, since it requires to track all possible field tags combinations. This commit adds check for maxColumnsPerBlock limit during MustInitFromRows function call. And it returns offset of the rows and timestamps added to the block. Function caller must create another block and ingest remaining rows into it. Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7568 ### Describe Your Changes Please provide a brief description of the changes you made. Be as specific as possible to help others understand the purpose and impact of your modifications. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: f41gh7 <nik@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
50bfa689c9
commit
22e6385f56
6 changed files with 118 additions and 13 deletions
|
@ -22,6 +22,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): expose `vl_bytes_ingested_total` [counter](https://docs.victoriametrics.com/keyconcepts/#counter) at `/metrics` page. This counter tracks an estimated number of bytes processed when parsing the ingested logs. This counter is exposed individually per every [supported data ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/) - the protocol name is exposed in the `type` label. For example, `vl_bytes_ingested_total{type="jsonline"}` tracks an estimated number of bytes processed when reading the ingested logs via [json line protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api). Thanks to @tenmozes for the idea and [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682).
|
||||
|
||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix for `showLegend` and `alias` flags in predefined panels. [See this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7565)
|
||||
* BUGFIX: fix `oo big number of columns detected in the block` panic when the ingested logs contain more than 2000 fields with different names per every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7568) for details.
|
||||
|
||||
## [v1.0.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.0.0-victorialogs)
|
||||
|
||||
|
|
|
@ -218,21 +218,28 @@ func (b *block) assertValid() {
|
|||
// It is expected that timestamps are sorted.
|
||||
//
|
||||
// b is valid until rows are changed.
|
||||
func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) {
|
||||
//
|
||||
// Returns offset of the processed timestamps and rows
|
||||
func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) (offset int) {
|
||||
b.reset()
|
||||
|
||||
assertTimestampsSorted(timestamps)
|
||||
b.timestamps = append(b.timestamps, timestamps...)
|
||||
b.mustInitFromRows(rows)
|
||||
if len(timestamps) != len(rows) {
|
||||
logger.Panicf("BUG: len of timestamps %d and rows %d must be equal", len(timestamps), len(rows))
|
||||
}
|
||||
offset = b.mustInitFromRows(timestamps, rows)
|
||||
b.sortColumnsByName()
|
||||
return
|
||||
}
|
||||
|
||||
// mustInitFromRows initializes b from rows.
|
||||
//
|
||||
// b is valid until rows are changed.
|
||||
func (b *block) mustInitFromRows(rows [][]Field) {
|
||||
rowsLen := len(rows)
|
||||
if rowsLen == 0 {
|
||||
//
|
||||
// Returns offset of processed timestamps and rows
|
||||
func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int) {
|
||||
offset = len(rows)
|
||||
if offset == 0 {
|
||||
// Nothing to do
|
||||
return
|
||||
}
|
||||
|
@ -249,35 +256,48 @@ func (b *block) mustInitFromRows(rows [][]Field) {
|
|||
} else {
|
||||
c := b.extendColumns()
|
||||
c.name = f.Name
|
||||
values := c.resizeValues(rowsLen)
|
||||
values := c.resizeValues(offset)
|
||||
for j := range rows {
|
||||
values[j] = rows[j][i].Value
|
||||
}
|
||||
}
|
||||
}
|
||||
b.timestamps = append(b.timestamps, timestamps...)
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path - log entries contain different set of fields
|
||||
|
||||
// Determine indexes for columns
|
||||
|
||||
offset = 0
|
||||
columnIdxs := getColumnIdxs()
|
||||
for i := range rows {
|
||||
fields := rows[i]
|
||||
if len(columnIdxs)+len(fields) > maxColumnsPerBlock {
|
||||
break
|
||||
}
|
||||
for j := range fields {
|
||||
name := fields[j].Name
|
||||
if _, ok := columnIdxs[name]; !ok {
|
||||
columnIdxs[name] = len(columnIdxs)
|
||||
}
|
||||
}
|
||||
offset++
|
||||
}
|
||||
|
||||
// keep only rows that fit maxColumnsPerBlock limit
|
||||
rows = rows[:offset]
|
||||
timestamps = timestamps[:offset]
|
||||
|
||||
b.timestamps = append(b.timestamps, timestamps...)
|
||||
|
||||
// Initialize columns
|
||||
cs := b.resizeColumns(len(columnIdxs))
|
||||
for name, idx := range columnIdxs {
|
||||
c := &cs[idx]
|
||||
c.name = name
|
||||
c.resizeValues(rowsLen)
|
||||
c.resizeValues(offset)
|
||||
}
|
||||
|
||||
// Write rows to block
|
||||
|
@ -306,6 +326,7 @@ func (b *block) mustInitFromRows(rows [][]Field) {
|
|||
cs = cs[:len(cs)-1]
|
||||
}
|
||||
b.columns = cs
|
||||
return
|
||||
}
|
||||
|
||||
func swapColumns(a, b *column) {
|
||||
|
|
|
@ -335,8 +335,12 @@ func (bsw *blockStreamWriter) MustWriteRows(sid *streamID, timestamps []int64, r
|
|||
}
|
||||
|
||||
b := getBlock()
|
||||
b.MustInitFromRows(timestamps, rows)
|
||||
bsw.MustWriteBlock(sid, b)
|
||||
for len(rows) > 0 {
|
||||
rowsOffset := b.MustInitFromRows(timestamps, rows)
|
||||
bsw.MustWriteBlock(sid, b)
|
||||
timestamps, rows = timestamps[rowsOffset:], rows[rowsOffset:]
|
||||
}
|
||||
|
||||
putBlock(b)
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,10 @@ func TestBlockMustInitFromRows(t *testing.T) {
|
|||
b := getBlock()
|
||||
defer putBlock(b)
|
||||
|
||||
b.MustInitFromRows(timestamps, rows)
|
||||
offset := b.MustInitFromRows(timestamps, rows)
|
||||
if offset != len(rows) {
|
||||
t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows))
|
||||
}
|
||||
if b.uncompressedSizeBytes() >= maxUncompressedBlockSize {
|
||||
t.Fatalf("expecting non-full block")
|
||||
}
|
||||
|
@ -168,7 +171,10 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) {
|
|||
|
||||
b := getBlock()
|
||||
defer putBlock(b)
|
||||
b.MustInitFromRows(timestamps, rows)
|
||||
offset := b.MustInitFromRows(timestamps, rows)
|
||||
if offset != len(rows) {
|
||||
t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows))
|
||||
}
|
||||
b.assertValid()
|
||||
if n := b.Len(); n != len(rows) {
|
||||
t.Fatalf("unexpected total log entries; got %d; want %d", n, len(rows))
|
||||
|
@ -177,3 +183,35 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) {
|
|||
t.Fatalf("expecting full block with %d bytes; got %d bytes", maxUncompressedBlockSize, n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockMustInitWithNonEmptyOffset(t *testing.T) {
|
||||
f := func(rowsCount int, fieldsPerRow int, expectedOffset int) {
|
||||
t.Helper()
|
||||
timestamps := make([]int64, rowsCount)
|
||||
rows := make([][]Field, rowsCount)
|
||||
for i := range timestamps {
|
||||
fields := make([]Field, fieldsPerRow)
|
||||
for j := range fields {
|
||||
fields[j] = Field{
|
||||
Name: fmt.Sprintf("field_%d_%d", i, j),
|
||||
Value: "very very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong value",
|
||||
}
|
||||
}
|
||||
rows[i] = fields
|
||||
}
|
||||
b := getBlock()
|
||||
defer putBlock(b)
|
||||
offset := b.MustInitFromRows(timestamps, rows)
|
||||
if offset != expectedOffset {
|
||||
t.Fatalf("unexpected processed rows offset; got %d; want: %d", offset, expectedOffset)
|
||||
}
|
||||
b.assertValid()
|
||||
if n := b.Len(); n != len(rows[:offset]) {
|
||||
t.Fatalf("unexpected total log entries; got %d; want %d", n, len(rows[:offset]))
|
||||
}
|
||||
}
|
||||
f(10, 300, 6)
|
||||
f(10, 10, 10)
|
||||
f(15, 30, 15)
|
||||
f(maxColumnsPerBlock+1000, 1, maxColumnsPerBlock)
|
||||
}
|
||||
|
|
|
@ -21,7 +21,10 @@ func benchmarkBlockMustInitFromRows(b *testing.B, rowsPerBlock int) {
|
|||
block := getBlock()
|
||||
defer putBlock(block)
|
||||
for pb.Next() {
|
||||
block.MustInitFromRows(timestamps, rows)
|
||||
offset := block.MustInitFromRows(timestamps, rows)
|
||||
if offset != len(rows) {
|
||||
b.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows))
|
||||
}
|
||||
if n := block.Len(); n != len(timestamps) {
|
||||
panic(fmt.Errorf("unexpected block length; got %d; want %d", n, len(timestamps)))
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
@ -88,6 +89,13 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) {
|
|||
f(newTestLogRows(10, 5, 0), 10, 1.5)
|
||||
f(newTestLogRows(10, 1000, 0), 10, 7.2)
|
||||
f(newTestLogRows(100, 100, 0), 100, 5.0)
|
||||
|
||||
// check block overflow with unique tag rows
|
||||
f(newTestLogRowsUniqTags(5, 21, 100), 10, 0.4)
|
||||
f(newTestLogRowsUniqTags(5, 10, 100), 5, 0.5)
|
||||
f(newTestLogRowsUniqTags(1, 2001, 1), 2, 1.4)
|
||||
f(newTestLogRowsUniqTags(15, 20, 250), 45, 0.6)
|
||||
|
||||
}
|
||||
|
||||
func checkCompressionRate(t *testing.T, ph *partHeader, compressionRateExpected float64) {
|
||||
|
@ -341,3 +349,33 @@ func (mp *inmemoryPart) readLogRows(sbu *stringsBlockUnmarshaler, vd *valuesDeco
|
|||
}
|
||||
return lr
|
||||
}
|
||||
|
||||
func newTestLogRowsUniqTags(streams, rowsPerStream, uniqFieldsPerRow int) *LogRows {
|
||||
streamTags := []string{
|
||||
"some-stream-tag",
|
||||
}
|
||||
lr := GetLogRows(streamTags, nil, nil, "")
|
||||
var fields []Field
|
||||
for i := 0; i < streams; i++ {
|
||||
tenantID := TenantID{
|
||||
AccountID: 0,
|
||||
ProjectID: 0,
|
||||
}
|
||||
for j := 0; j < rowsPerStream; j++ {
|
||||
// Add stream tags
|
||||
fields = append(fields[:0], Field{
|
||||
Name: "some-stream-tag",
|
||||
Value: fmt.Sprintf("some-stream-value-%d", i),
|
||||
})
|
||||
// Add the remaining unique tags
|
||||
for k := 0; k < uniqFieldsPerRow; k++ {
|
||||
fields = append(fields, Field{
|
||||
Name: fmt.Sprintf("field_%d_%d_%d", i, j, k),
|
||||
Value: fmt.Sprintf("value_%d_%d_%d", i, j, k),
|
||||
})
|
||||
}
|
||||
lr.MustAdd(tenantID, time.Now().UnixMilli(), fields)
|
||||
}
|
||||
}
|
||||
return lr
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue