mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/logstorage: follow-up for 22e6385f56
Make variable names and comments more clear. This should simplify code maintenance in the future. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7568
This commit is contained in:
parent
22e6385f56
commit
d2cd004710
4 changed files with 44 additions and 41 deletions
|
@ -219,33 +219,37 @@ func (b *block) assertValid() {
|
|||
//
|
||||
// b is valid until rows are changed.
|
||||
//
|
||||
// Returns offset of the processed timestamps and rows
|
||||
func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) (offset int) {
|
||||
// Returns the number of the processed timestamps and rows.
|
||||
// If the returned number is smaller than len(rows), then the rest of rows aren't processed.
|
||||
func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) int {
|
||||
b.reset()
|
||||
|
||||
assertTimestampsSorted(timestamps)
|
||||
if len(timestamps) != len(rows) {
|
||||
logger.Panicf("BUG: len of timestamps %d and rows %d must be equal", len(timestamps), len(rows))
|
||||
}
|
||||
offset = b.mustInitFromRows(timestamps, rows)
|
||||
rowsProcessed := b.mustInitFromRows(timestamps, rows)
|
||||
b.sortColumnsByName()
|
||||
return
|
||||
return rowsProcessed
|
||||
}
|
||||
|
||||
// mustInitFromRows initializes b from rows.
|
||||
// mustInitFromRows initializes b from the given timestamps and rows.
|
||||
//
|
||||
// b is valid until rows are changed.
|
||||
//
|
||||
// Returns offset of processed timestamps and rows
|
||||
func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int) {
|
||||
offset = len(rows)
|
||||
if offset == 0 {
|
||||
// Returns the number of the processed timestamps and rows.
|
||||
// If the returned number is smaller than len(rows), then the rest of rows aren't processed.
|
||||
func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) int {
|
||||
if len(timestamps) != len(rows) {
|
||||
logger.Panicf("BUG: len of timestamps %d and rows %d must be equal", len(timestamps), len(rows))
|
||||
}
|
||||
|
||||
rowsLen := len(rows)
|
||||
if rowsLen == 0 {
|
||||
// Nothing to do
|
||||
return
|
||||
return 0
|
||||
}
|
||||
|
||||
if areSameFieldsInRows(rows) {
|
||||
// Fast path - all the log entries have the same fields
|
||||
b.timestamps = append(b.timestamps, timestamps...)
|
||||
fields := rows[0]
|
||||
for i := range fields {
|
||||
f := &fields[i]
|
||||
|
@ -256,23 +260,22 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int
|
|||
} else {
|
||||
c := b.extendColumns()
|
||||
c.name = f.Name
|
||||
values := c.resizeValues(offset)
|
||||
values := c.resizeValues(rowsLen)
|
||||
for j := range rows {
|
||||
values[j] = rows[j][i].Value
|
||||
}
|
||||
}
|
||||
}
|
||||
b.timestamps = append(b.timestamps, timestamps...)
|
||||
return
|
||||
return rowsLen
|
||||
}
|
||||
|
||||
// Slow path - log entries contain different set of fields
|
||||
|
||||
// Determine indexes for columns
|
||||
|
||||
offset = 0
|
||||
columnIdxs := getColumnIdxs()
|
||||
for i := range rows {
|
||||
i := 0
|
||||
for i < len(rows) {
|
||||
fields := rows[i]
|
||||
if len(columnIdxs)+len(fields) > maxColumnsPerBlock {
|
||||
break
|
||||
|
@ -283,12 +286,13 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int
|
|||
columnIdxs[name] = len(columnIdxs)
|
||||
}
|
||||
}
|
||||
offset++
|
||||
i++
|
||||
}
|
||||
rowsProcessed := i
|
||||
|
||||
// keep only rows that fit maxColumnsPerBlock limit
|
||||
rows = rows[:offset]
|
||||
timestamps = timestamps[:offset]
|
||||
rows = rows[:rowsProcessed]
|
||||
timestamps = timestamps[:rowsProcessed]
|
||||
|
||||
b.timestamps = append(b.timestamps, timestamps...)
|
||||
|
||||
|
@ -297,7 +301,7 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int
|
|||
for name, idx := range columnIdxs {
|
||||
c := &cs[idx]
|
||||
c.name = name
|
||||
c.resizeValues(offset)
|
||||
c.resizeValues(len(rows))
|
||||
}
|
||||
|
||||
// Write rows to block
|
||||
|
@ -326,7 +330,7 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int
|
|||
cs = cs[:len(cs)-1]
|
||||
}
|
||||
b.columns = cs
|
||||
return
|
||||
return rowsProcessed
|
||||
}
|
||||
|
||||
func swapColumns(a, b *column) {
|
||||
|
|
|
@ -336,11 +336,10 @@ func (bsw *blockStreamWriter) MustWriteRows(sid *streamID, timestamps []int64, r
|
|||
|
||||
b := getBlock()
|
||||
for len(rows) > 0 {
|
||||
rowsOffset := b.MustInitFromRows(timestamps, rows)
|
||||
rowsProcessed := b.MustInitFromRows(timestamps, rows)
|
||||
bsw.MustWriteBlock(sid, b)
|
||||
timestamps, rows = timestamps[rowsOffset:], rows[rowsOffset:]
|
||||
timestamps, rows = timestamps[rowsProcessed:], rows[rowsProcessed:]
|
||||
}
|
||||
|
||||
putBlock(b)
|
||||
}
|
||||
|
||||
|
|
|
@ -12,9 +12,9 @@ func TestBlockMustInitFromRows(t *testing.T) {
|
|||
b := getBlock()
|
||||
defer putBlock(b)
|
||||
|
||||
offset := b.MustInitFromRows(timestamps, rows)
|
||||
if offset != len(rows) {
|
||||
t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows))
|
||||
rowsProcessed := b.MustInitFromRows(timestamps, rows)
|
||||
if rowsProcessed != len(rows) {
|
||||
t.Fatalf("unexpected rowsProcessed; got %d; want %d", rowsProcessed, len(rows))
|
||||
}
|
||||
if b.uncompressedSizeBytes() >= maxUncompressedBlockSize {
|
||||
t.Fatalf("expecting non-full block")
|
||||
|
@ -171,9 +171,9 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) {
|
|||
|
||||
b := getBlock()
|
||||
defer putBlock(b)
|
||||
offset := b.MustInitFromRows(timestamps, rows)
|
||||
if offset != len(rows) {
|
||||
t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows))
|
||||
rowsProcessed := b.MustInitFromRows(timestamps, rows)
|
||||
if rowsProcessed != len(rows) {
|
||||
t.Fatalf("unexpected rowsProcessed; got %d; want %d", rowsProcessed, len(rows))
|
||||
}
|
||||
b.assertValid()
|
||||
if n := b.Len(); n != len(rows) {
|
||||
|
@ -185,7 +185,7 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBlockMustInitWithNonEmptyOffset(t *testing.T) {
|
||||
f := func(rowsCount int, fieldsPerRow int, expectedOffset int) {
|
||||
f := func(rowsCount int, fieldsPerRow int, expectedRowsProcessed int) {
|
||||
t.Helper()
|
||||
timestamps := make([]int64, rowsCount)
|
||||
rows := make([][]Field, rowsCount)
|
||||
|
@ -201,13 +201,13 @@ func TestBlockMustInitWithNonEmptyOffset(t *testing.T) {
|
|||
}
|
||||
b := getBlock()
|
||||
defer putBlock(b)
|
||||
offset := b.MustInitFromRows(timestamps, rows)
|
||||
if offset != expectedOffset {
|
||||
t.Fatalf("unexpected processed rows offset; got %d; want: %d", offset, expectedOffset)
|
||||
rowsProcessed := b.MustInitFromRows(timestamps, rows)
|
||||
if rowsProcessed != expectedRowsProcessed {
|
||||
t.Fatalf("unexpected rowsProcessed; got %d; want %d", rowsProcessed, expectedRowsProcessed)
|
||||
}
|
||||
b.assertValid()
|
||||
if n := b.Len(); n != len(rows[:offset]) {
|
||||
t.Fatalf("unexpected total log entries; got %d; want %d", n, len(rows[:offset]))
|
||||
if n := b.Len(); n != rowsProcessed {
|
||||
t.Fatalf("unexpected total log entries; got %d; want %d", n, rowsProcessed)
|
||||
}
|
||||
}
|
||||
f(10, 300, 6)
|
||||
|
|
|
@ -21,9 +21,9 @@ func benchmarkBlockMustInitFromRows(b *testing.B, rowsPerBlock int) {
|
|||
block := getBlock()
|
||||
defer putBlock(block)
|
||||
for pb.Next() {
|
||||
offset := block.MustInitFromRows(timestamps, rows)
|
||||
if offset != len(rows) {
|
||||
b.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows))
|
||||
rowsProcessed := block.MustInitFromRows(timestamps, rows)
|
||||
if rowsProcessed != len(rows) {
|
||||
b.Fatalf("expected rowsProcessed; got %d; want %d", rowsProcessed, len(rows))
|
||||
}
|
||||
if n := block.Len(); n != len(timestamps) {
|
||||
panic(fmt.Errorf("unexpected block length; got %d; want %d", n, len(timestamps)))
|
||||
|
|
Loading…
Reference in a new issue