diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index be1ca475a..f1beac2c8 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -22,6 +22,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): expose `vl_bytes_ingested_total` [counter](https://docs.victoriametrics.com/keyconcepts/#counter) at `/metrics` page. This counter tracks an estimated number of bytes processed when parsing the ingested logs. This counter is exposed individually per every [supported data ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/) - the protocol name is exposed in the `type` label. For example, `vl_bytes_ingested_total{type="jsonline"}` tracks an estimated number of bytes processed when reading the ingested logs via [json line protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api). Thanks to @tenmozes for the idea and [the initial implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix for `showLegend` and `alias` flags in predefined panels. [See this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7565) +* BUGFIX: fix `oo big number of columns detected in the block` panic when the ingested logs contain more than 2000 fields with different names per every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7568) for details. ## [v1.0.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.0.0-victorialogs) diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 5623f3e44..85c250f2b 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -218,21 +218,28 @@ func (b *block) assertValid() { // It is expected that timestamps are sorted. // // b is valid until rows are changed. -func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) { +// +// Returns offset of the processed timestamps and rows +func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) (offset int) { b.reset() assertTimestampsSorted(timestamps) - b.timestamps = append(b.timestamps, timestamps...) - b.mustInitFromRows(rows) + if len(timestamps) != len(rows) { + logger.Panicf("BUG: len of timestamps %d and rows %d must be equal", len(timestamps), len(rows)) + } + offset = b.mustInitFromRows(timestamps, rows) b.sortColumnsByName() + return } // mustInitFromRows initializes b from rows. // // b is valid until rows are changed. -func (b *block) mustInitFromRows(rows [][]Field) { - rowsLen := len(rows) - if rowsLen == 0 { +// +// Returns offset of processed timestamps and rows +func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int) { + offset = len(rows) + if offset == 0 { // Nothing to do return } @@ -249,35 +256,48 @@ func (b *block) mustInitFromRows(rows [][]Field) { } else { c := b.extendColumns() c.name = f.Name - values := c.resizeValues(rowsLen) + values := c.resizeValues(offset) for j := range rows { values[j] = rows[j][i].Value } } } + b.timestamps = append(b.timestamps, timestamps...) return } // Slow path - log entries contain different set of fields // Determine indexes for columns + + offset = 0 columnIdxs := getColumnIdxs() for i := range rows { fields := rows[i] + if len(columnIdxs)+len(fields) > maxColumnsPerBlock { + break + } for j := range fields { name := fields[j].Name if _, ok := columnIdxs[name]; !ok { columnIdxs[name] = len(columnIdxs) } } + offset++ } + // keep only rows that fit maxColumnsPerBlock limit + rows = rows[:offset] + timestamps = timestamps[:offset] + + b.timestamps = append(b.timestamps, timestamps...) + // Initialize columns cs := b.resizeColumns(len(columnIdxs)) for name, idx := range columnIdxs { c := &cs[idx] c.name = name - c.resizeValues(rowsLen) + c.resizeValues(offset) } // Write rows to block @@ -306,6 +326,7 @@ func (b *block) mustInitFromRows(rows [][]Field) { cs = cs[:len(cs)-1] } b.columns = cs + return } func swapColumns(a, b *column) { diff --git a/lib/logstorage/block_stream_writer.go b/lib/logstorage/block_stream_writer.go index fcffb3eab..54ec642bf 100644 --- a/lib/logstorage/block_stream_writer.go +++ b/lib/logstorage/block_stream_writer.go @@ -335,8 +335,12 @@ func (bsw *blockStreamWriter) MustWriteRows(sid *streamID, timestamps []int64, r } b := getBlock() - b.MustInitFromRows(timestamps, rows) - bsw.MustWriteBlock(sid, b) + for len(rows) > 0 { + rowsOffset := b.MustInitFromRows(timestamps, rows) + bsw.MustWriteBlock(sid, b) + timestamps, rows = timestamps[rowsOffset:], rows[rowsOffset:] + } + putBlock(b) } diff --git a/lib/logstorage/block_test.go b/lib/logstorage/block_test.go index 13efcf4a8..163e53c5c 100644 --- a/lib/logstorage/block_test.go +++ b/lib/logstorage/block_test.go @@ -12,7 +12,10 @@ func TestBlockMustInitFromRows(t *testing.T) { b := getBlock() defer putBlock(b) - b.MustInitFromRows(timestamps, rows) + offset := b.MustInitFromRows(timestamps, rows) + if offset != len(rows) { + t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows)) + } if b.uncompressedSizeBytes() >= maxUncompressedBlockSize { t.Fatalf("expecting non-full block") } @@ -168,7 +171,10 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) { b := getBlock() defer putBlock(b) - b.MustInitFromRows(timestamps, rows) + offset := b.MustInitFromRows(timestamps, rows) + if offset != len(rows) { + t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows)) + } b.assertValid() if n := b.Len(); n != len(rows) { t.Fatalf("unexpected total log entries; got %d; want %d", n, len(rows)) @@ -177,3 +183,35 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) { t.Fatalf("expecting full block with %d bytes; got %d bytes", maxUncompressedBlockSize, n) } } + +func TestBlockMustInitWithNonEmptyOffset(t *testing.T) { + f := func(rowsCount int, fieldsPerRow int, expectedOffset int) { + t.Helper() + timestamps := make([]int64, rowsCount) + rows := make([][]Field, rowsCount) + for i := range timestamps { + fields := make([]Field, fieldsPerRow) + for j := range fields { + fields[j] = Field{ + Name: fmt.Sprintf("field_%d_%d", i, j), + Value: "very very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong value", + } + } + rows[i] = fields + } + b := getBlock() + defer putBlock(b) + offset := b.MustInitFromRows(timestamps, rows) + if offset != expectedOffset { + t.Fatalf("unexpected processed rows offset; got %d; want: %d", offset, expectedOffset) + } + b.assertValid() + if n := b.Len(); n != len(rows[:offset]) { + t.Fatalf("unexpected total log entries; got %d; want %d", n, len(rows[:offset])) + } + } + f(10, 300, 6) + f(10, 10, 10) + f(15, 30, 15) + f(maxColumnsPerBlock+1000, 1, maxColumnsPerBlock) +} diff --git a/lib/logstorage/block_timing_test.go b/lib/logstorage/block_timing_test.go index 9d2a5e15a..7929dbf4f 100644 --- a/lib/logstorage/block_timing_test.go +++ b/lib/logstorage/block_timing_test.go @@ -21,7 +21,10 @@ func benchmarkBlockMustInitFromRows(b *testing.B, rowsPerBlock int) { block := getBlock() defer putBlock(block) for pb.Next() { - block.MustInitFromRows(timestamps, rows) + offset := block.MustInitFromRows(timestamps, rows) + if offset != len(rows) { + b.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows)) + } if n := block.Len(); n != len(timestamps) { panic(fmt.Errorf("unexpected block length; got %d; want %d", n, len(timestamps))) } diff --git a/lib/logstorage/inmemory_part_test.go b/lib/logstorage/inmemory_part_test.go index ce17fa149..a57fb1f65 100644 --- a/lib/logstorage/inmemory_part_test.go +++ b/lib/logstorage/inmemory_part_test.go @@ -7,6 +7,7 @@ import ( "reflect" "sort" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -88,6 +89,13 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) { f(newTestLogRows(10, 5, 0), 10, 1.5) f(newTestLogRows(10, 1000, 0), 10, 7.2) f(newTestLogRows(100, 100, 0), 100, 5.0) + + // check block overflow with unique tag rows + f(newTestLogRowsUniqTags(5, 21, 100), 10, 0.4) + f(newTestLogRowsUniqTags(5, 10, 100), 5, 0.5) + f(newTestLogRowsUniqTags(1, 2001, 1), 2, 1.4) + f(newTestLogRowsUniqTags(15, 20, 250), 45, 0.6) + } func checkCompressionRate(t *testing.T, ph *partHeader, compressionRateExpected float64) { @@ -341,3 +349,33 @@ func (mp *inmemoryPart) readLogRows(sbu *stringsBlockUnmarshaler, vd *valuesDeco } return lr } + +func newTestLogRowsUniqTags(streams, rowsPerStream, uniqFieldsPerRow int) *LogRows { + streamTags := []string{ + "some-stream-tag", + } + lr := GetLogRows(streamTags, nil, nil, "") + var fields []Field + for i := 0; i < streams; i++ { + tenantID := TenantID{ + AccountID: 0, + ProjectID: 0, + } + for j := 0; j < rowsPerStream; j++ { + // Add stream tags + fields = append(fields[:0], Field{ + Name: "some-stream-tag", + Value: fmt.Sprintf("some-stream-value-%d", i), + }) + // Add the remaining unique tags + for k := 0; k < uniqFieldsPerRow; k++ { + fields = append(fields, Field{ + Name: fmt.Sprintf("field_%d_%d_%d", i, j, k), + Value: fmt.Sprintf("value_%d_%d_%d", i, j, k), + }) + } + lr.MustAdd(tenantID, time.Now().UnixMilli(), fields) + } + } + return lr +}