From 4c457cf20f5d29d3a529184065f7756786a3255f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 11 May 2024 00:39:12 +0200 Subject: [PATCH] wip --- lib/logstorage/arena.go | 31 +++++++++++ lib/logstorage/arena_test.go | 80 +++++++++++++++++++++++++++ lib/logstorage/block.go | 42 +++++++------- lib/logstorage/block_data.go | 75 ++++++++++++++----------- lib/logstorage/block_data_test.go | 11 ++-- lib/logstorage/block_header.go | 20 ++++--- lib/logstorage/block_header_test.go | 28 ++++++++-- lib/logstorage/block_result.go | 33 ++++++----- lib/logstorage/block_search.go | 10 +++- lib/logstorage/block_stream_merger.go | 11 +++- lib/logstorage/block_stream_reader.go | 9 ++- lib/logstorage/consts.go | 2 +- lib/logstorage/encoding.go | 16 ++++++ lib/logstorage/rows.go | 18 ++++-- lib/logstorage/values_encoder.go | 17 ++++-- 15 files changed, 307 insertions(+), 96 deletions(-) create mode 100644 lib/logstorage/arena_test.go diff --git a/lib/logstorage/arena.go b/lib/logstorage/arena.go index ab68b747b..f6b37375a 100644 --- a/lib/logstorage/arena.go +++ b/lib/logstorage/arena.go @@ -1,9 +1,26 @@ package logstorage import ( + "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) +func getArena() *arena { + v := arenaPool.Get() + if v == nil { + return &arena{} + } + return v.(*arena) +} + +func putArena(a *arena) { + a.reset() + arenaPool.Put(a) +} + +var arenaPool sync.Pool + type arena struct { b []byte } @@ -12,6 +29,10 @@ func (a *arena) reset() { a.b = a.b[:0] } +func (a *arena) sizeBytes() int { + return len(a.b) +} + func (a *arena) copyBytes(b []byte) []byte { ab := a.b abLen := len(ab) @@ -21,6 +42,16 @@ func (a *arena) copyBytes(b []byte) []byte { return result } +func (a *arena) copyBytesToString(b []byte) string { + bCopy := a.copyBytes(b) + return bytesutil.ToUnsafeString(bCopy) +} + +func (a *arena) copyString(s string) string { + b := bytesutil.ToUnsafeBytes(s) + return a.copyBytesToString(b) +} + func (a *arena) newBytes(size int) []byte { ab := a.b abLen := len(ab) diff --git a/lib/logstorage/arena_test.go b/lib/logstorage/arena_test.go new file mode 100644 index 000000000..0e2072bb1 --- /dev/null +++ b/lib/logstorage/arena_test.go @@ -0,0 +1,80 @@ +package logstorage + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +func TestArena(t *testing.T) { + values := []string{"foo", "bar", "", "adsfjkljsdfdsf", "dsfsopq", "io234"} + + for i := 0; i < 10; i++ { + a := getArena() + if n := a.sizeBytes(); n != 0 { + t.Fatalf("unexpected non-zero size of empty arena: %d", n) + } + + // add values to arena + valuesCopy := make([]string, len(values)) + valuesLen := 0 + for j, v := range values { + vCopy := a.copyString(v) + if vCopy != v { + t.Fatalf("unexpected value; got %q; want %q", vCopy, v) + } + valuesCopy[j] = vCopy + valuesLen += len(v) + } + + // verify that the values returned from arena match the original values + for j, v := range values { + vCopy := valuesCopy[j] + if vCopy != v { + t.Fatalf("unexpected value; got %q; want %q", vCopy, v) + } + } + + if n := a.sizeBytes(); n != valuesLen { + t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen) + } + + // Try allocating slices with different lengths + bs := make([]string, 100) + for j := range bs { + b := a.newBytes(j) + if len(b) != j { + t.Fatalf("unexpected len(b); got %d; want %d", len(b), j) + } + valuesLen += j + if n := a.sizeBytes(); n != valuesLen { + t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen) + } + for k := range b { + b[k] = byte(k) + } + bs[j] = bytesutil.ToUnsafeString(b) + } + + // verify that the allocated slices didn't change + for j, v := range bs { + b := make([]byte, j) + for k := 0; k < j; k++ { + b[k] = byte(k) + } + if v != string(b) { + t.Fatalf("unexpected value at index %d; got %X; want %X", j, v, b) + } + } + + // verify that the values returned from arena match the original values + for j, v := range values { + vCopy := valuesCopy[j] + if vCopy != v { + t.Fatalf("unexpected value; got %q; want %q", vCopy, v) + } + } + + putArena(a) + } +} diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 593e8e7a4..3c14ecb31 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -120,11 +120,8 @@ type column struct { func (c *column) reset() { c.name = "" - values := c.values - for i := range values { - values[i] = "" - } - c.values = values[:0] + clear(c.values) + c.values = c.values[:0] } func (c *column) canStoreInConstColumn() bool { @@ -155,7 +152,9 @@ func (c *column) resizeValues(valuesLen int) []string { } // mustWriteTo writes c to sw and updates ch accordingly. -func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) { +// +// ch is valid until a.reset() is called. +func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -165,7 +164,7 @@ func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) { bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = c.name + ch.name = a.copyString(c.name) // encode values ve := getValuesEncoder() @@ -226,6 +225,8 @@ func (b *block) assertValid() { // MustInitFromRows initializes b from the given timestamps and rows. // // It is expected that timestamps are sorted. +// +// b is valid until timestamps and rows are changed. func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) { b.reset() @@ -235,6 +236,9 @@ func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) { b.sortColumnsByName() } +// mustInitiFromRows initializes b from rows. +// +// b is valid until rows are changed. func (b *block) mustInitFromRows(rows [][]Field) { rowsLen := len(rows) if rowsLen == 0 { @@ -424,7 +428,7 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v for i := range cds { cd := &cds[i] c := &cs[i] - c.name = cd.name + c.name = sbu.copyString(cd.name) c.values, err = sbu.unmarshal(c.values[:0], cd.valuesData, uint64(rowsCount)) if err != nil { return fmt.Errorf("cannot unmarshal column %d: %w", i, err) @@ -435,12 +439,12 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v } // unmarshal constColumns - b.constColumns = append(b.constColumns[:0], bd.constColumns...) + b.constColumns = sbu.appendFields(b.constColumns[:0], bd.constColumns) return nil } -// mustWriteTo writes b with the given sid to sw and updates bh accordingly +// mustWriteTo writes b with the given sid to sw and updates bh accordingly. func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { // Do not store the version used for encoding directly in the block data, since: // - all the blocks in the same part use the same encoding @@ -458,16 +462,22 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { // Marshal columns cs := b.columns + + a := getArena() csh := getColumnsHeader() + chs := csh.resizeColumnHeaders(len(cs)) for i := range cs { - cs[i].mustWriteTo(&chs[i], sw) + cs[i].mustWriteTo(a, &chs[i], sw) } - csh.constColumns = append(csh.constColumns[:0], b.constColumns...) + csh.constColumns = appendFields(a, csh.constColumns[:0], b.constColumns) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) + putColumnsHeader(csh) + putArena(a) + bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) if bh.columnsHeaderSize > maxColumnsHeaderSize { @@ -489,13 +499,7 @@ func (b *block) appendRowsTo(dst *rows) { for i := range b.timestamps { fieldsLen := len(fieldsBuf) // copy const columns - for j := range ccs { - cc := &ccs[j] - fieldsBuf = append(fieldsBuf, Field{ - Name: cc.Name, - Value: cc.Value, - }) - } + fieldsBuf = append(fieldsBuf, ccs...) // copy other columns for j := range cs { c := &cs[j] diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index f3e3a112f..43f296fb2 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -29,11 +29,6 @@ type blockData struct { // constColumns contains data for const columns across the block constColumns []Field - - // a is used for storing byte slices for timestamps and columns. - // - // It reduces fragmentation for them. - a arena } // reset resets bd for subsequent re-use @@ -54,8 +49,6 @@ func (bd *blockData) reset() { ccs[i].Reset() } bd.constColumns = ccs[:0] - - bd.a.reset() } func (bd *blockData) resizeColumnsData(columnsDataLen int) []columnData { @@ -69,27 +62,29 @@ func (bd *blockData) resizeColumnsData(columnsDataLen int) []columnData { } // copyFrom copies src to bd. -func (bd *blockData) copyFrom(src *blockData) { +// +// bd is valid until a.reset() is called. +func (bd *blockData) copyFrom(a *arena, src *blockData) { bd.reset() bd.streamID = src.streamID bd.uncompressedSizeBytes = src.uncompressedSizeBytes bd.rowsCount = src.rowsCount - bd.timestampsData.copyFrom(&src.timestampsData, &bd.a) + bd.timestampsData.copyFrom(a, &src.timestampsData) cdsSrc := src.columnsData cds := bd.resizeColumnsData(len(cdsSrc)) for i := range cds { - cds[i].copyFrom(&cdsSrc[i], &bd.a) + cds[i].copyFrom(a, &cdsSrc[i]) } bd.columnsData = cds - bd.constColumns = append(bd.constColumns[:0], src.constColumns...) + bd.constColumns = appendFields(a, bd.constColumns[:0], src.constColumns) } // unmarshalRows appends unmarshaled from bd log entries to dst. // -// The returned log entries are valid until sbu and vd are valid. +// The unmarshaled log entries are valid until sbu and vd are reset. func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *valuesDecoder) error { b := getBlock() defer putBlock(b) @@ -101,7 +96,7 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd * return nil } -// mustWriteTo writes bd with the given sid to sw and updates bh accordingly +// mustWriteTo writes bd to sw and updates bh accordingly func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { // Do not store the version used for encoding directly in the block data, since: // - all the blocks in the same part use the same encoding @@ -118,16 +113,20 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { // Marshal columns cds := bd.columnsData + + a := getArena() csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cds)) for i := range cds { - cds[i].mustWriteTo(&chs[i], sw) + cds[i].mustWriteTo(a, &chs[i], sw) } - csh.constColumns = append(csh.constColumns[:0], bd.constColumns...) + csh.constColumns = appendFields(a, csh.constColumns[:0], bd.constColumns) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) putColumnsHeader(csh) + putArena(a) + bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) if bh.columnsHeaderSize > maxColumnsHeaderSize { @@ -138,7 +137,9 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { } // mustReadFrom reads block data associated with bh from sr to bd. -func (bd *blockData) mustReadFrom(bh *blockHeader, sr *streamReaders) { +// +// The bd is valid until a.reset() is called. +func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) { bd.reset() bd.streamID = bh.streamID @@ -146,7 +147,7 @@ func (bd *blockData) mustReadFrom(bh *blockHeader, sr *streamReaders) { bd.rowsCount = bh.rowsCount // Read timestamps - bd.timestampsData.mustReadFrom(&bh.timestampsHeader, sr, &bd.a) + bd.timestampsData.mustReadFrom(a, &bh.timestampsHeader, sr) // Read columns if bh.columnsHeaderOffset != sr.columnsHeaderReader.bytesRead { @@ -161,18 +162,20 @@ func (bd *blockData) mustReadFrom(bh *blockHeader, sr *streamReaders) { bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize)) sr.columnsHeaderReader.MustReadFull(bb.B) + cshA := getArena() csh := getColumnsHeader() - if err := csh.unmarshal(bb.B); err != nil { + if err := csh.unmarshal(cshA, bb.B); err != nil { logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err) } longTermBufPool.Put(bb) chs := csh.columnHeaders cds := bd.resizeColumnsData(len(chs)) for i := range chs { - cds[i].mustReadFrom(&chs[i], sr, &bd.a) + cds[i].mustReadFrom(a, &chs[i], sr) } - bd.constColumns = append(bd.constColumns[:0], csh.constColumns...) + bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns) putColumnsHeader(csh) + putArena(cshA) } // timestampsData contains the encoded timestamps data. @@ -199,7 +202,9 @@ func (td *timestampsData) reset() { } // copyFrom copies src to td. -func (td *timestampsData) copyFrom(src *timestampsData, a *arena) { +// +// td is valid until a.reset() is called. +func (td *timestampsData) copyFrom(a *arena, src *timestampsData) { td.reset() td.data = a.copyBytes(src.data) @@ -224,7 +229,9 @@ func (td *timestampsData) mustWriteTo(th *timestampsHeader, sw *streamWriters) { } // mustReadFrom reads timestamps data associated with th from sr to td. -func (td *timestampsData) mustReadFrom(th *timestampsHeader, sr *streamReaders, a *arena) { +// +// td is valid until a.reset() is called. +func (td *timestampsData) mustReadFrom(a *arena, th *timestampsHeader, sr *streamReaders) { td.reset() td.marshalType = th.marshalType @@ -287,22 +294,26 @@ func (cd *columnData) reset() { } // copyFrom copies src to cd. -func (cd *columnData) copyFrom(src *columnData, a *arena) { +// +// cd is valid until a.reset() is called. +func (cd *columnData) copyFrom(a *arena, src *columnData) { cd.reset() - cd.name = src.name + cd.name = a.copyString(src.name) cd.valueType = src.valueType cd.minValue = src.minValue cd.maxValue = src.maxValue - cd.valuesDict.copyFrom(&src.valuesDict) + cd.valuesDict.copyFrom(a, &src.valuesDict) cd.valuesData = a.copyBytes(src.valuesData) cd.bloomFilterData = a.copyBytes(src.bloomFilterData) } // mustWriteTo writes cd to sw and updates ch accordingly. -func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) { +// +// ch is valid until a.reset() is called. +func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -312,12 +323,12 @@ func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) { bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = cd.name + ch.name = a.copyString(cd.name) ch.valueType = cd.valueType ch.minValue = cd.minValue ch.maxValue = cd.maxValue - ch.valuesDict.copyFrom(&cd.valuesDict) + ch.valuesDict.copyFrom(a, &cd.valuesDict) // marshal values ch.valuesSize = uint64(len(cd.valuesData)) @@ -337,7 +348,9 @@ func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) { } // mustReadFrom reads columns data associated with ch from sr to cd. -func (cd *columnData) mustReadFrom(ch *columnHeader, sr *streamReaders, a *arena) { +// +// cd is valid until a.reset() is called. +func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders) { cd.reset() valuesReader := &sr.fieldValuesReader @@ -347,12 +360,12 @@ func (cd *columnData) mustReadFrom(ch *columnHeader, sr *streamReaders, a *arena bloomFilterReader = &sr.messageBloomFilterReader } - cd.name = ch.name + cd.name = a.copyString(ch.name) cd.valueType = ch.valueType cd.minValue = ch.minValue cd.maxValue = ch.maxValue - cd.valuesDict.copyFrom(&ch.valuesDict) + cd.valuesDict.copyFrom(a, &ch.valuesDict) // read values if ch.valuesOffset != valuesReader.bytesRead { diff --git a/lib/logstorage/block_data_test.go b/lib/logstorage/block_data_test.go index 975d8c486..d92eb430f 100644 --- a/lib/logstorage/block_data_test.go +++ b/lib/logstorage/block_data_test.go @@ -51,20 +51,23 @@ func TestBlockDataReset(t *testing.T) { func TestBlockDataCopyFrom(t *testing.T) { f := func(bd *blockData) { t.Helper() + + a := getArena() + defer putArena(a) + var bd2 blockData - bd2.copyFrom(bd) - bd2.a.b = nil + bd2.copyFrom(a, bd) if !reflect.DeepEqual(bd, &bd2) { t.Fatalf("unexpected blockData copy\ngot\n%v\nwant\n%v", &bd2, bd) } // Try copying it again to the same destination - bd2.copyFrom(bd) - bd2.a.b = nil + bd2.copyFrom(a, bd) if !reflect.DeepEqual(bd, &bd2) { t.Fatalf("unexpected blockData copy to the same destination\ngot\n%v\nwant\n%v", &bd2, bd) } } + f(&blockData{}) bd := &blockData{ diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index 454e27eaf..4ba286b2c 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -298,7 +298,10 @@ func (csh *columnsHeader) marshal(dst []byte) []byte { return dst } -func (csh *columnsHeader) unmarshal(src []byte) error { +// unmarshal unmarshals csh from src. +// +// csh is valid until a.reset() is called. +func (csh *columnsHeader) unmarshal(a *arena, src []byte) error { csh.reset() // unmarshal columnHeaders @@ -312,7 +315,7 @@ func (csh *columnsHeader) unmarshal(src []byte) error { src = tail chs := csh.resizeColumnHeaders(int(n)) for i := range chs { - tail, err = chs[i].unmarshal(src) + tail, err = chs[i].unmarshal(a, src) if err != nil { return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err) } @@ -331,7 +334,7 @@ func (csh *columnsHeader) unmarshal(src []byte) error { src = tail ccs := csh.resizeConstColumns(int(n)) for i := range ccs { - tail, err = ccs[i].unmarshal(src) + tail, err = ccs[i].unmarshal(a, src) if err != nil { return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err) } @@ -357,7 +360,7 @@ func (csh *columnsHeader) unmarshal(src []byte) error { // // Tokens in bloom filter depend on valueType: // -// - valueTypeString stores lowercased tokens seen in all the values +// - valueTypeString stores tokens seen in all the values // - valueTypeDict doesn't store anything in the bloom filter, since all the encoded values // are available directly in the valuesDict field // - valueTypeUint8, valueTypeUint16, valueTypeUint32 and valueTypeUint64 stores encoded uint values @@ -502,7 +505,9 @@ func (ch *columnHeader) marshalBloomFilters(dst []byte) []byte { } // unmarshal unmarshals ch from src and returns the tail left after unmarshaling. -func (ch *columnHeader) unmarshal(src []byte) ([]byte, error) { +// +// ch is valid until a.reset() is called. +func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { ch.reset() srcOrig := src @@ -512,8 +517,7 @@ func (ch *columnHeader) unmarshal(src []byte) ([]byte, error) { if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal column name: %w", err) } - // Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod - ch.name = string(data) + ch.name = a.copyBytesToString(data) src = tail // Unmarshal value type @@ -532,7 +536,7 @@ func (ch *columnHeader) unmarshal(src []byte) ([]byte, error) { } src = tail case valueTypeDict: - tail, err = ch.valuesDict.unmarshal(src) + tail, err = ch.valuesDict.unmarshal(a, src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err) } diff --git a/lib/logstorage/block_header_test.go b/lib/logstorage/block_header_test.go index d6df322f6..bec76b3de 100644 --- a/lib/logstorage/block_header_test.go +++ b/lib/logstorage/block_header_test.go @@ -55,12 +55,16 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) { func TestColumnsHeaderMarshalUnmarshal(t *testing.T) { f := func(csh *columnsHeader, marshaledLen int) { t.Helper() + + a := getArena() + defer putArena(a) + data := csh.marshal(nil) if len(data) != marshaledLen { t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen) } csh2 := &columnsHeader{} - err := csh2.unmarshal(data) + err := csh2.unmarshal(a, data) if err != nil { t.Fatalf("unexpected error in unmarshal: %s", err) } @@ -68,6 +72,7 @@ func TestColumnsHeaderMarshalUnmarshal(t *testing.T) { t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", csh2, csh) } } + f(&columnsHeader{}, 2) f(&columnsHeader{ columnHeaders: []columnHeader{ @@ -149,13 +154,18 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) { func TestColumnsHeaderUnmarshalFailure(t *testing.T) { f := func(data []byte) { t.Helper() + + a := getArena() + defer putArena(a) + csh := getColumnsHeader() defer putColumnsHeader(csh) - err := csh.unmarshal(data) + err := csh.unmarshal(a, data) if err == nil { t.Fatalf("expecting non-nil error") } } + f(nil) f([]byte("foo")) @@ -315,12 +325,16 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) { func TestColumnHeaderMarshalUnmarshal(t *testing.T) { f := func(ch *columnHeader, marshaledLen int) { t.Helper() + + a := getArena() + defer putArena(a) + data := ch.marshal(nil) if len(data) != marshaledLen { t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen) } var ch2 columnHeader - tail, err := ch2.unmarshal(data) + tail, err := ch2.unmarshal(a, data) if err != nil { t.Fatalf("unexpected error in umarshal(%v): %s", ch, err) } @@ -331,6 +345,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) { t.Fatalf("unexpected columnHeader after unmarshal;\ngot\n%v\nwant\n%v", &ch2, ch) } } + f(&columnHeader{ name: "foo", valueType: valueTypeUint8, @@ -349,9 +364,13 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) { func TestColumnHeaderUnmarshalFailure(t *testing.T) { f := func(data []byte) { t.Helper() + + a := getArena() + defer putArena(a) + dataOrig := append([]byte{}, data...) var ch columnHeader - tail, err := ch.unmarshal(data) + tail, err := ch.unmarshal(a, data) if err == nil { t.Fatalf("expecting non-nil error") } @@ -359,6 +378,7 @@ func TestColumnHeaderUnmarshalFailure(t *testing.T) { t.Fatalf("unexpected tail left; got %q; want %q", tail, dataOrig) } } + f(nil) f([]byte("foo")) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index c850cee73..94071eaff 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -120,6 +120,12 @@ func (br *blockResult) cloneValues(values []string) []string { return valuesBuf[valuesBufLen:] } +func (br *blockResult) copyString(s string) string { + bufLen := len(br.buf) + br.buf = append(br.buf, s...) + return bytesutil.ToUnsafeString(br.buf[bufLen:]) +} + // sizeBytes returns the size of br in bytes. func (br *blockResult) sizeBytes() int { n := int(unsafe.Sizeof(*br)) @@ -149,13 +155,13 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { if areConstValues(rc.values) { // This optimization allows reducing memory usage after br cloning csBuf = append(csBuf, blockResultColumn{ - name: rc.name, + name: br.copyString(rc.name), isConst: true, encodedValues: rc.values[:1], }) } else { csBuf = append(csBuf, blockResultColumn{ - name: rc.name, + name: br.copyString(rc.name), valueType: valueTypeString, encodedValues: rc.values, }) @@ -373,9 +379,13 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) } dictValues = valuesBuf[valuesBufLen:] - name := getCanonicalColumnName(ch.name) + // copy ch.name to buf + bufLen := len(buf) + buf = append(buf, ch.name...) + name := bytesutil.ToUnsafeString(buf[bufLen:]) + br.csBuf = append(br.csBuf, blockResultColumn{ - name: name, + name: getCanonicalColumnName(name), valueType: ch.valueType, dictValues: dictValues, encodedValues: encodedValues, @@ -417,21 +427,18 @@ func (br *blockResult) addStreamColumn(bs *blockSearch) bool { } func (br *blockResult) addConstColumn(name, value string) { - buf := br.buf - bufLen := len(buf) - buf = append(buf, value...) - s := bytesutil.ToUnsafeString(buf[bufLen:]) - br.buf = buf + value = br.copyString(value) valuesBuf := br.valuesBuf valuesBufLen := len(valuesBuf) - valuesBuf = append(valuesBuf, s) + valuesBuf = append(valuesBuf, value) br.valuesBuf = valuesBuf + encodedValues := valuesBuf[valuesBufLen:] br.csBuf = append(br.csBuf, blockResultColumn{ - name: name, + name: br.copyString(name), isConst: true, - encodedValues: valuesBuf[valuesBufLen:], + encodedValues: encodedValues, }) br.csInitialized = false } @@ -1265,7 +1272,7 @@ type blockResultColumn struct { func (c *blockResultColumn) clone(br *blockResult) blockResultColumn { var cNew blockResultColumn - cNew.name = c.name + cNew.name = br.copyString(c.name) cNew.isConst = c.isConst cNew.isTime = c.isTime cNew.valueType = c.valueType diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 6ea32a8d5..502e51941 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -63,6 +63,9 @@ type blockSearch struct { // csh is the columnsHeader associated with the given block csh columnsHeader + + // a is used for storing unmarshaled data in csh + a arena } func (bs *blockSearch) reset() { @@ -88,6 +91,7 @@ func (bs *blockSearch) reset() { bs.sbu.reset() bs.csh.reset() + bs.a.reset() } func (bs *blockSearch) partPath() string { @@ -99,7 +103,7 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { bs.bsw = bsw - bs.csh.initFromBlockHeader(bsw.p, &bsw.bh) + bs.csh.initFromBlockHeader(&bs.a, bsw.p, &bsw.bh) // search rows matching the given filter bm := getBitmap(int(bsw.bh.rowsCount)) @@ -122,7 +126,7 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { } } -func (csh *columnsHeader) initFromBlockHeader(p *part, bh *blockHeader) { +func (csh *columnsHeader) initFromBlockHeader(a *arena, p *part, bh *blockHeader) { bb := longTermBufPool.Get() columnsHeaderSize := bh.columnsHeaderSize if columnsHeaderSize > maxColumnsHeaderSize { @@ -131,7 +135,7 @@ func (csh *columnsHeader) initFromBlockHeader(p *part, bh *blockHeader) { bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize)) p.columnsHeaderFile.MustReadAt(bb.B, int64(bh.columnsHeaderOffset)) - if err := csh.unmarshal(bb.B); err != nil { + if err := csh.unmarshal(a, bb.B); err != nil { logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", p.path, err) } longTermBufPool.Put(bb) diff --git a/lib/logstorage/block_stream_merger.go b/lib/logstorage/block_stream_merger.go index 3f3f7d844..1ff7ec69d 100644 --- a/lib/logstorage/block_stream_merger.go +++ b/lib/logstorage/block_stream_merger.go @@ -59,6 +59,9 @@ type blockStreamMerger struct { // bd is unpacked into rows when needed. bd blockData + // a holds bd data. + a arena + // rows is pending log entries. rows rows @@ -99,6 +102,7 @@ func (bsm *blockStreamMerger) resetRows() { bsm.vd = nil } bsm.bd.reset() + bsm.a.reset() bsm.rows.reset() bsm.rowsTmp.reset() @@ -138,7 +142,8 @@ func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWrit bsw.MustWriteBlockData(bd) } else { // Slow path - copy the bd to the curr bd. - bsm.bd.copyFrom(bd) + bsm.a.reset() + bsm.bd.copyFrom(&bsm.a, bd) bsm.uniqueFields = uniqueFields } case bsm.uniqueFields+uniqueFields >= maxColumnsPerBlock: @@ -150,7 +155,8 @@ func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWrit if uniqueFields >= maxColumnsPerBlock { bsw.MustWriteBlockData(bd) } else { - bsm.bd.copyFrom(bd) + bsm.a.reset() + bsm.bd.copyFrom(&bsm.a, bd) bsm.uniqueFields = uniqueFields } case bd.uncompressedSizeBytes >= maxUncompressedBlockSize: @@ -218,6 +224,7 @@ func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) { // Unmarshal log entries from bsm.bd bsm.mustUnmarshalRows(&bsm.bd) bsm.bd.reset() + bsm.a.reset() } // Unmarshal log entries from bd diff --git a/lib/logstorage/block_stream_reader.go b/lib/logstorage/block_stream_reader.go index 00bcbc4e5..018df187d 100644 --- a/lib/logstorage/block_stream_reader.go +++ b/lib/logstorage/block_stream_reader.go @@ -112,6 +112,9 @@ type blockStreamReader struct { // blockData contains the data for the last read block blockData blockData + // a contains data for blockData + a arena + // ph is the header for the part ph partHeader @@ -149,6 +152,7 @@ type blockStreamReader struct { // reset resets bsr, so it can be re-used func (bsr *blockStreamReader) reset() { bsr.blockData.reset() + bsr.a.reset() bsr.ph.reset() bsr.streamReaders.reset() @@ -247,6 +251,8 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { // NextBlock reads the next block from bsr and puts it into bsr.blockData. // // false is returned if there are no other blocks. +// +// bsr.blockData is valid until the next call to NextBlock(). func (bsr *blockStreamReader) NextBlock() bool { for bsr.nextBlockIdx >= len(bsr.blockHeaders) { if !bsr.nextIndexBlock() { @@ -275,7 +281,8 @@ func (bsr *blockStreamReader) NextBlock() bool { } // Read bsr.blockData - bsr.blockData.mustReadFrom(bh, &bsr.streamReaders) + bsr.a.reset() + bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders) bsr.globalUncompressedSizeBytes += bh.uncompressedSizeBytes bsr.globalRowsCount += bh.rowsCount diff --git a/lib/logstorage/consts.go b/lib/logstorage/consts.go index 1bd17af95..1ef655749 100644 --- a/lib/logstorage/consts.go +++ b/lib/logstorage/consts.go @@ -14,7 +14,7 @@ const maxUncompressedBlockSize = 2 * 1024 * 1024 const maxRowsPerBlock = 8 * 1024 * 1024 // maxColumnsPerBlock is the maximum number of columns per block. -const maxColumnsPerBlock = 2_000 +const maxColumnsPerBlock = 1_000 // MaxFieldNameSize is the maximum size in bytes for field name. // diff --git a/lib/logstorage/encoding.go b/lib/logstorage/encoding.go index 48f05154d..ccce895f7 100644 --- a/lib/logstorage/encoding.go +++ b/lib/logstorage/encoding.go @@ -47,6 +47,22 @@ func (sbu *stringsBlockUnmarshaler) reset() { sbu.data = sbu.data[:0] } +func (sbu *stringsBlockUnmarshaler) copyString(s string) string { + dataLen := len(sbu.data) + sbu.data = append(sbu.data, s...) + return bytesutil.ToUnsafeString(sbu.data[dataLen:]) +} + +func (sbu *stringsBlockUnmarshaler) appendFields(dst, src []Field) []Field { + for _, f := range src { + dst = append(dst, Field{ + Name: sbu.copyString(f.Name), + Value: sbu.copyString(f.Value), + }) + } + return dst +} + // unmarshal unmarshals itemsCount strings from src, appends them to dst and returns the result. // // The returned strings are valid until sbu.reset() call. diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index d8d61b015..62ab3c53f 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -34,7 +34,7 @@ func (f *Field) marshal(dst []byte) []byte { return dst } -func (f *Field) unmarshal(src []byte) ([]byte, error) { +func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) { srcOrig := src // Unmarshal field name @@ -42,8 +42,7 @@ func (f *Field) unmarshal(src []byte) ([]byte, error) { if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal field name: %w", err) } - // Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod - f.Name = string(b) + f.Name = a.copyBytesToString(b) src = tail // Unmarshal field value @@ -51,13 +50,22 @@ func (f *Field) unmarshal(src []byte) ([]byte, error) { if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal field value: %w", err) } - // Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod - f.Value = string(b) + f.Value = a.copyBytesToString(b) src = tail return src, nil } +func appendFields(a *arena, dst, src []Field) []Field { + for _, f := range src { + dst = append(dst, Field{ + Name: a.copyString(f.Name), + Value: a.copyString(f.Value), + }) + } + return dst +} + // rows is an aux structure used during rows merge type rows struct { fieldsBuf []Field diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 659849c7a..9ccb0164d 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -1074,10 +1074,15 @@ func (vd *valuesDict) reset() { vd.values = vd.values[:0] } -func (vd *valuesDict) copyFrom(src *valuesDict) { +func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) { vd.reset() - vd.values = append(vd.values[:0], src.values...) + dstValues := vd.values + for _, v := range src.values { + v = a.copyString(v) + dstValues = append(dstValues, v) + } + vd.values = dstValues } func (vd *valuesDict) getOrAdd(k string) (byte, bool) { @@ -1113,7 +1118,10 @@ func (vd *valuesDict) marshal(dst []byte) []byte { return dst } -func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) { +// unmarshal unmarshals vd from src. +// +// vd is valid until a.reset() is called. +func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) { vd.reset() srcOrig := src @@ -1129,8 +1137,7 @@ func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) { } src = tail - // Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod - v := string(data) + v := a.copyBytesToString(data) vd.values = append(vd.values, v) } return src, nil