diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index 9f009c098..0ad517c02 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -158,12 +158,10 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize)) sr.columnsHeaderReader.MustReadFull(bb.B) - cshA := getArena() csh := getColumnsHeader() - if err := csh.unmarshal(cshA, bb.B); err != nil { + if err := csh.unmarshalNoArena(bb.B); err != nil { logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err) } - longTermBufPool.Put(bb) chs := csh.columnHeaders cds := bd.resizeColumnsData(len(chs)) for i := range chs { @@ -171,7 +169,7 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) } bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns) putColumnsHeader(csh) - putArena(cshA) + longTermBufPool.Put(bb) } // timestampsData contains the encoded timestamps data. diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index 578cae535..a2c771e56 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -289,10 +289,10 @@ func (csh *columnsHeader) marshal(dst []byte) []byte { return dst } -// unmarshal unmarshals csh from src. +// unmarshalNoArena unmarshals csh from src. // -// csh is valid until a.reset() is called. -func (csh *columnsHeader) unmarshal(a *arena, src []byte) error { +// csh is valid until src is changed. +func (csh *columnsHeader) unmarshalNoArena(src []byte) error { csh.reset() // unmarshal columnHeaders @@ -307,7 +307,7 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error { chs := csh.resizeColumnHeaders(int(n)) for i := range chs { - tail, err := chs[i].unmarshal(a, src) + tail, err := chs[i].unmarshalNoArena(src) if err != nil { return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err) } @@ -327,7 +327,7 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error { ccs := csh.resizeConstColumns(int(n)) for i := range ccs { - tail, err := ccs[i].unmarshal(a, src) + tail, err := ccs[i].unmarshalNoArena(src) if err != nil { return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err) } @@ -497,10 +497,10 @@ func (ch *columnHeader) marshalBloomFilters(dst []byte) []byte { return dst } -// unmarshal unmarshals ch from src and returns the tail left after unmarshaling. +// unmarshalNoArena unmarshals ch from src and returns the tail left after unmarshaling. // -// ch is valid until a.reset() is called. -func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { +// ch is valid until src is changed. +func (ch *columnHeader) unmarshalNoArena(src []byte) ([]byte, error) { ch.reset() srcOrig := src @@ -511,7 +511,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { return srcOrig, fmt.Errorf("cannot unmarshal column name") } src = src[nSize:] - ch.name = a.copyBytesToString(data) + ch.name = bytesutil.ToUnsafeString(data) // Unmarshal value type if len(src) < 1 { @@ -529,7 +529,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) { } src = tail case valueTypeDict: - tail, err := ch.valuesDict.unmarshal(a, src) + tail, err := ch.valuesDict.unmarshalNoArena(src) if err != nil { return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err) } diff --git a/lib/logstorage/block_header_test.go b/lib/logstorage/block_header_test.go index bec76b3de..a358f25cc 100644 --- a/lib/logstorage/block_header_test.go +++ b/lib/logstorage/block_header_test.go @@ -56,15 +56,12 @@ func TestColumnsHeaderMarshalUnmarshal(t *testing.T) { f := func(csh *columnsHeader, marshaledLen int) { t.Helper() - a := getArena() - defer putArena(a) - data := csh.marshal(nil) if len(data) != marshaledLen { t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen) } csh2 := &columnsHeader{} - err := csh2.unmarshal(a, data) + err := csh2.unmarshalNoArena(data) if err != nil { t.Fatalf("unexpected error in unmarshal: %s", err) } @@ -155,12 +152,9 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) { f := func(data []byte) { t.Helper() - a := getArena() - defer putArena(a) - csh := getColumnsHeader() defer putColumnsHeader(csh) - err := csh.unmarshal(a, data) + err := csh.unmarshalNoArena(data) if err == nil { t.Fatalf("expecting non-nil error") } @@ -326,15 +320,12 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) { f := func(ch *columnHeader, marshaledLen int) { t.Helper() - a := getArena() - defer putArena(a) - data := ch.marshal(nil) if len(data) != marshaledLen { t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen) } var ch2 columnHeader - tail, err := ch2.unmarshal(a, data) + tail, err := ch2.unmarshalNoArena(data) if err != nil { t.Fatalf("unexpected error in umarshal(%v): %s", ch, err) } @@ -365,12 +356,9 @@ func TestColumnHeaderUnmarshalFailure(t *testing.T) { f := func(data []byte) { t.Helper() - a := getArena() - defer putArena(a) - dataOrig := append([]byte{}, data...) var ch columnHeader - tail, err := ch.unmarshal(a, data) + tail, err := ch.unmarshalNoArena(data) if err == nil { t.Fatalf("expecting non-nil error") } diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 48029155b..3b4ed9ed3 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -113,14 +113,16 @@ type blockSearch struct { // sbu is used for unmarshaling local columns sbu stringsBlockUnmarshaler + // cshBlockCache holds columnsHeader data for the given block. + // + // it is initialized lazily by calling getColumnsHeader(). + cshBlockCache []byte + // cshCache is the columnsHeader associated with the given block // // it is initialized lazily by calling getColumnsHeader(). cshCache *columnsHeader - // a is used for storing unmarshaled data in cshCached - a arena - // seenStreams contains seen streamIDs for the recent searches. // It is used for speeding up fetching _stream column. seenStreams map[u128]string @@ -149,13 +151,13 @@ func (bs *blockSearch) reset() { bs.sbu.reset() + bs.cshBlockCache = bs.cshBlockCache[:0] + if bs.cshCache != nil { putColumnsHeader(bs.cshCache) bs.cshCache = nil } - bs.a.reset() - // Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code. } @@ -190,25 +192,25 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) { func (bs *blockSearch) getColumnsHeader() *columnsHeader { if bs.cshCache == nil { + bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh) + bs.cshCache = getColumnsHeader() - bs.cshCache.initFromBlockHeader(&bs.a, bs.bsw.p, &bs.bsw.bh) + if err := bs.cshCache.unmarshalNoArena(bs.cshBlockCache); err != nil { + logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err) + } } return bs.cshCache } -func (csh *columnsHeader) initFromBlockHeader(a *arena, p *part, bh *blockHeader) { - bb := longTermBufPool.Get() +func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte { columnsHeaderSize := bh.columnsHeaderSize if columnsHeaderSize > maxColumnsHeaderSize { logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, columnsHeaderSize) } - bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize)) - p.columnsHeaderFile.MustReadAt(bb.B, int64(bh.columnsHeaderOffset)) - - if err := csh.unmarshal(a, bb.B); err != nil { - logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", p.path, err) - } - longTermBufPool.Put(bb) + dstLen := len(dst) + dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(columnsHeaderSize)+dstLen) + p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset)) + return dst } // getBloomFilterForColumn returns bloom filter for the given ch. diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index 9d0d77e9c..a90e54a08 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -36,7 +36,7 @@ func (f *Field) marshal(dst []byte) []byte { return dst } -func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) { +func (f *Field) unmarshalNoArena(src []byte) ([]byte, error) { srcOrig := src // Unmarshal field name @@ -45,7 +45,7 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) { return srcOrig, fmt.Errorf("cannot unmarshal field name") } src = src[nSize:] - f.Name = a.copyBytesToString(b) + f.Name = bytesutil.ToUnsafeString(b) // Unmarshal field value b, nSize = encoding.UnmarshalBytes(src) @@ -53,7 +53,7 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) { return srcOrig, fmt.Errorf("cannot unmarshal field value") } src = src[nSize:] - f.Value = a.copyBytesToString(b) + f.Value = bytesutil.ToUnsafeString(b) return src, nil } diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index dc7dfebee..9185f8a74 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -1116,10 +1116,10 @@ func (vd *valuesDict) marshal(dst []byte) []byte { return dst } -// unmarshal unmarshals vd from src. +// unmarshalNoArena unmarshals vd from src. // -// vd is valid until a.reset() is called. -func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) { +// vd is valid until src is changed. +func (vd *valuesDict) unmarshalNoArena(src []byte) ([]byte, error) { vd.reset() srcOrig := src @@ -1135,7 +1135,7 @@ func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) { } src = src[nSize:] - v := a.copyBytesToString(data) + v := bytesutil.ToUnsafeString(data) vd.values = append(vd.values, v) } return src, nil