diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 85c250f2b..5aae71a1b 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -219,33 +219,37 @@ func (b *block) assertValid() { // // b is valid until rows are changed. // -// Returns offset of the processed timestamps and rows -func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) (offset int) { +// Returns the number of the processed timestamps and rows. +// If the returned number is smaller than len(rows), then the rest of rows aren't processed. +func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) int { b.reset() assertTimestampsSorted(timestamps) - if len(timestamps) != len(rows) { - logger.Panicf("BUG: len of timestamps %d and rows %d must be equal", len(timestamps), len(rows)) - } - offset = b.mustInitFromRows(timestamps, rows) + rowsProcessed := b.mustInitFromRows(timestamps, rows) b.sortColumnsByName() - return + return rowsProcessed } -// mustInitFromRows initializes b from rows. +// mustInitFromRows initializes b from the given timestamps and rows. // // b is valid until rows are changed. // -// Returns offset of processed timestamps and rows -func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int) { - offset = len(rows) - if offset == 0 { +// Returns the number of the processed timestamps and rows. +// If the returned number is smaller than len(rows), then the rest of rows aren't processed. +func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) int { + if len(timestamps) != len(rows) { + logger.Panicf("BUG: len of timestamps %d and rows %d must be equal", len(timestamps), len(rows)) + } + + rowsLen := len(rows) + if rowsLen == 0 { // Nothing to do - return + return 0 } if areSameFieldsInRows(rows) { // Fast path - all the log entries have the same fields + b.timestamps = append(b.timestamps, timestamps...) fields := rows[0] for i := range fields { f := &fields[i] @@ -256,23 +260,22 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int } else { c := b.extendColumns() c.name = f.Name - values := c.resizeValues(offset) + values := c.resizeValues(rowsLen) for j := range rows { values[j] = rows[j][i].Value } } } - b.timestamps = append(b.timestamps, timestamps...) - return + return rowsLen } // Slow path - log entries contain different set of fields // Determine indexes for columns - offset = 0 columnIdxs := getColumnIdxs() - for i := range rows { + i := 0 + for i < len(rows) { fields := rows[i] if len(columnIdxs)+len(fields) > maxColumnsPerBlock { break @@ -283,12 +286,13 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int columnIdxs[name] = len(columnIdxs) } } - offset++ + i++ } + rowsProcessed := i // keep only rows that fit maxColumnsPerBlock limit - rows = rows[:offset] - timestamps = timestamps[:offset] + rows = rows[:rowsProcessed] + timestamps = timestamps[:rowsProcessed] b.timestamps = append(b.timestamps, timestamps...) @@ -297,7 +301,7 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int for name, idx := range columnIdxs { c := &cs[idx] c.name = name - c.resizeValues(offset) + c.resizeValues(len(rows)) } // Write rows to block @@ -326,7 +330,7 @@ func (b *block) mustInitFromRows(timestamps []int64, rows [][]Field) (offset int cs = cs[:len(cs)-1] } b.columns = cs - return + return rowsProcessed } func swapColumns(a, b *column) { diff --git a/lib/logstorage/block_stream_writer.go b/lib/logstorage/block_stream_writer.go index 54ec642bf..ca37c3c42 100644 --- a/lib/logstorage/block_stream_writer.go +++ b/lib/logstorage/block_stream_writer.go @@ -336,11 +336,10 @@ func (bsw *blockStreamWriter) MustWriteRows(sid *streamID, timestamps []int64, r b := getBlock() for len(rows) > 0 { - rowsOffset := b.MustInitFromRows(timestamps, rows) + rowsProcessed := b.MustInitFromRows(timestamps, rows) bsw.MustWriteBlock(sid, b) - timestamps, rows = timestamps[rowsOffset:], rows[rowsOffset:] + timestamps, rows = timestamps[rowsProcessed:], rows[rowsProcessed:] } - putBlock(b) } diff --git a/lib/logstorage/block_test.go b/lib/logstorage/block_test.go index 163e53c5c..ec2ca6086 100644 --- a/lib/logstorage/block_test.go +++ b/lib/logstorage/block_test.go @@ -12,9 +12,9 @@ func TestBlockMustInitFromRows(t *testing.T) { b := getBlock() defer putBlock(b) - offset := b.MustInitFromRows(timestamps, rows) - if offset != len(rows) { - t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows)) + rowsProcessed := b.MustInitFromRows(timestamps, rows) + if rowsProcessed != len(rows) { + t.Fatalf("unexpected rowsProcessed; got %d; want %d", rowsProcessed, len(rows)) } if b.uncompressedSizeBytes() >= maxUncompressedBlockSize { t.Fatalf("expecting non-full block") @@ -171,9 +171,9 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) { b := getBlock() defer putBlock(b) - offset := b.MustInitFromRows(timestamps, rows) - if offset != len(rows) { - t.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows)) + rowsProcessed := b.MustInitFromRows(timestamps, rows) + if rowsProcessed != len(rows) { + t.Fatalf("unexpected rowsProcessed; got %d; want %d", rowsProcessed, len(rows)) } b.assertValid() if n := b.Len(); n != len(rows) { @@ -185,7 +185,7 @@ func TestBlockMustInitFromRowsFullBlock(t *testing.T) { } func TestBlockMustInitWithNonEmptyOffset(t *testing.T) { - f := func(rowsCount int, fieldsPerRow int, expectedOffset int) { + f := func(rowsCount int, fieldsPerRow int, expectedRowsProcessed int) { t.Helper() timestamps := make([]int64, rowsCount) rows := make([][]Field, rowsCount) @@ -201,13 +201,13 @@ func TestBlockMustInitWithNonEmptyOffset(t *testing.T) { } b := getBlock() defer putBlock(b) - offset := b.MustInitFromRows(timestamps, rows) - if offset != expectedOffset { - t.Fatalf("unexpected processed rows offset; got %d; want: %d", offset, expectedOffset) + rowsProcessed := b.MustInitFromRows(timestamps, rows) + if rowsProcessed != expectedRowsProcessed { + t.Fatalf("unexpected rowsProcessed; got %d; want %d", rowsProcessed, expectedRowsProcessed) } b.assertValid() - if n := b.Len(); n != len(rows[:offset]) { - t.Fatalf("unexpected total log entries; got %d; want %d", n, len(rows[:offset])) + if n := b.Len(); n != rowsProcessed { + t.Fatalf("unexpected total log entries; got %d; want %d", n, rowsProcessed) } } f(10, 300, 6) diff --git a/lib/logstorage/block_timing_test.go b/lib/logstorage/block_timing_test.go index 7929dbf4f..cb1be227b 100644 --- a/lib/logstorage/block_timing_test.go +++ b/lib/logstorage/block_timing_test.go @@ -21,9 +21,9 @@ func benchmarkBlockMustInitFromRows(b *testing.B, rowsPerBlock int) { block := getBlock() defer putBlock(block) for pb.Next() { - offset := block.MustInitFromRows(timestamps, rows) - if offset != len(rows) { - b.Fatalf("expected offset: %d to match processed rows: %d", offset, len(rows)) + rowsProcessed := block.MustInitFromRows(timestamps, rows) + if rowsProcessed != len(rows) { + b.Fatalf("expected rowsProcessed; got %d; want %d", rowsProcessed, len(rows)) } if n := block.Len(); n != len(timestamps) { panic(fmt.Errorf("unexpected block length; got %d; want %d", n, len(timestamps)))