VictoriaMetrics/lib/logstorage/block_timing_test.go
Nikolay e45556fc05
lib/logstorage: fixes panic at Block.MustInitFromRows (#7695)
Previously Block columns wasn't properly limited by maxColumnsPerBlock.
And it was possible a case, when more columns per block added than
expected.
 For example, if ingested log stream has many unuqie fields
and it's sum exceed maxColumnsPerBlock.
 We only enforce fieldsPerBlock limit during row parsing, which limits
isn't enough to mitigate this issue. Also it
would be very expensive to apply maxColumnsPerBlock limit during
ingestion, since it requires to track all possible field tags
combinations.

 This commit adds check for maxColumnsPerBlock limit during
MustInitFromRows function call. And it returns offset of the rows and
timestamps added to the block.
 Function caller must create another block and ingest remaining rows
into it.

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7568

### Describe Your Changes

Please provide a brief description of the changes you made. Be as
specific as possible to help others understand the purpose and impact of
your modifications.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: f41gh7 <nik@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2024-11-30 18:06:58 +01:00

49 lines
1.3 KiB
Go

package logstorage
import (
"fmt"
"testing"
)
func BenchmarkBlock_MustInitFromRows(b *testing.B) {
for _, rowsPerBlock := range []int{1, 10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("rowsPerBlock_%d", rowsPerBlock), func(b *testing.B) {
benchmarkBlockMustInitFromRows(b, rowsPerBlock)
})
}
}
func benchmarkBlockMustInitFromRows(b *testing.B, rowsPerBlock int) {
timestamps, rows := newTestRows(rowsPerBlock, 10)
b.ReportAllocs()
b.SetBytes(int64(len(timestamps)))
b.RunParallel(func(pb *testing.PB) {
block := getBlock()
defer putBlock(block)
for pb.Next() {
offset := block.MustInitFromRows(timestamps, rows)
if offset != len(rows) {
b.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows))
}
if n := block.Len(); n != len(timestamps) {
panic(fmt.Errorf("unexpected block length; got %d; want %d", n, len(timestamps)))
}
}
})
}
func newTestRows(rowsCount, fieldsPerRow int) ([]int64, [][]Field) {
timestamps := make([]int64, rowsCount)
rows := make([][]Field, rowsCount)
for i := range timestamps {
timestamps[i] = int64(i) * 1e9
fields := make([]Field, fieldsPerRow)
for j := range fields {
f := &fields[j]
f.Name = fmt.Sprintf("field_%d", j)
f.Value = fmt.Sprintf("value_%d_%d", i, j)
}
rows[i] = fields
}
return timestamps, rows
}