mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00

The storage isn't designed to work efficiently with logs containing too many log fields.
It is better to emit a warning to the user and ignore such logs instead of trying to store them.
This will allow fixing the issue by the user ASAP, and won't lead to excess resource usage
at VictoriaLogs side, such as RAM, CPU, disk IO and disk space.
While at it, ignore too long logs with the size exceeding the maximum block size during data ingestion.
This should prevent from possible issues when dealing with such long logs if they were stored in the storage.
Emit a warning in this case, so the user could identify and fix the issue ASAP.
This is a follow-up for 22e6385f56
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7568
46 lines
1.2 KiB
Go
46 lines
1.2 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
)
|
|
|
|
func BenchmarkBlock_MustInitFromRows(b *testing.B) {
|
|
for _, rowsPerBlock := range []int{1, 10, 100, 1000, 10000} {
|
|
b.Run(fmt.Sprintf("rowsPerBlock_%d", rowsPerBlock), func(b *testing.B) {
|
|
benchmarkBlockMustInitFromRows(b, rowsPerBlock)
|
|
})
|
|
}
|
|
}
|
|
|
|
func benchmarkBlockMustInitFromRows(b *testing.B, rowsPerBlock int) {
|
|
timestamps, rows := newTestRows(rowsPerBlock, 10)
|
|
b.ReportAllocs()
|
|
b.SetBytes(int64(len(timestamps)))
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
block := getBlock()
|
|
defer putBlock(block)
|
|
for pb.Next() {
|
|
block.MustInitFromRows(timestamps, rows)
|
|
if n := block.Len(); n != len(timestamps) {
|
|
panic(fmt.Errorf("unexpected block length; got %d; want %d", n, len(timestamps)))
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func newTestRows(rowsCount, fieldsPerRow int) ([]int64, [][]Field) {
|
|
timestamps := make([]int64, rowsCount)
|
|
rows := make([][]Field, rowsCount)
|
|
for i := range timestamps {
|
|
timestamps[i] = int64(i) * 1e9
|
|
fields := make([]Field, fieldsPerRow)
|
|
for j := range fields {
|
|
f := &fields[j]
|
|
f.Name = fmt.Sprintf("field_%d", j)
|
|
f.Value = fmt.Sprintf("value_%d_%d", i, j)
|
|
}
|
|
rows[i] = fields
|
|
}
|
|
return timestamps, rows
|
|
}
|