lib/logstorage: avoid redundant copying of column names and column values for dictionary-encoded columns during querying

Refer the original byte slice with the marshaled columnsHeader for columns names and dictionary-encoded column values.
This improves query performance a bit when big number of blocks with big number of columns are scanned during the query.

(cherry picked from commit 279e25e7c8)
This commit is contained in:
Aliaksandr Valialkin 2024-10-13 13:25:36 +02:00 committed by hagen1778
parent fe9e6b7495
commit beeb80e4f8
No known key found for this signature in database
GPG key ID: E92986095E0DD614
6 changed files with 40 additions and 52 deletions

View file

@ -158,12 +158,10 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders)
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize))
sr.columnsHeaderReader.MustReadFull(bb.B)
cshA := getArena()
csh := getColumnsHeader()
if err := csh.unmarshal(cshA, bb.B); err != nil {
if err := csh.unmarshalNoArena(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err)
}
longTermBufPool.Put(bb)
chs := csh.columnHeaders
cds := bd.resizeColumnsData(len(chs))
for i := range chs {
@ -171,7 +169,7 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders)
}
bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns)
putColumnsHeader(csh)
putArena(cshA)
longTermBufPool.Put(bb)
}
// timestampsData contains the encoded timestamps data.

View file

@ -289,10 +289,10 @@ func (csh *columnsHeader) marshal(dst []byte) []byte {
return dst
}
// unmarshal unmarshals csh from src.
// unmarshalNoArena unmarshals csh from src.
//
// csh is valid until a.reset() is called.
func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
// csh is valid until src is changed.
func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
csh.reset()
// unmarshal columnHeaders
@ -307,7 +307,7 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
chs := csh.resizeColumnHeaders(int(n))
for i := range chs {
tail, err := chs[i].unmarshal(a, src)
tail, err := chs[i].unmarshalNoArena(src)
if err != nil {
return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err)
}
@ -327,7 +327,7 @@ func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
ccs := csh.resizeConstColumns(int(n))
for i := range ccs {
tail, err := ccs[i].unmarshal(a, src)
tail, err := ccs[i].unmarshalNoArena(src)
if err != nil {
return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err)
}
@ -497,10 +497,10 @@ func (ch *columnHeader) marshalBloomFilters(dst []byte) []byte {
return dst
}
// unmarshal unmarshals ch from src and returns the tail left after unmarshaling.
// unmarshalNoArena unmarshals ch from src and returns the tail left after unmarshaling.
//
// ch is valid until a.reset() is called.
func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
// ch is valid until src is changed.
func (ch *columnHeader) unmarshalNoArena(src []byte) ([]byte, error) {
ch.reset()
srcOrig := src
@ -511,7 +511,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
return srcOrig, fmt.Errorf("cannot unmarshal column name")
}
src = src[nSize:]
ch.name = a.copyBytesToString(data)
ch.name = bytesutil.ToUnsafeString(data)
// Unmarshal value type
if len(src) < 1 {
@ -529,7 +529,7 @@ func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
}
src = tail
case valueTypeDict:
tail, err := ch.valuesDict.unmarshal(a, src)
tail, err := ch.valuesDict.unmarshalNoArena(src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err)
}

View file

@ -56,15 +56,12 @@ func TestColumnsHeaderMarshalUnmarshal(t *testing.T) {
f := func(csh *columnsHeader, marshaledLen int) {
t.Helper()
a := getArena()
defer putArena(a)
data := csh.marshal(nil)
if len(data) != marshaledLen {
t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen)
}
csh2 := &columnsHeader{}
err := csh2.unmarshal(a, data)
err := csh2.unmarshalNoArena(data)
if err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err)
}
@ -155,12 +152,9 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
f := func(data []byte) {
t.Helper()
a := getArena()
defer putArena(a)
csh := getColumnsHeader()
defer putColumnsHeader(csh)
err := csh.unmarshal(a, data)
err := csh.unmarshalNoArena(data)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -326,15 +320,12 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
f := func(ch *columnHeader, marshaledLen int) {
t.Helper()
a := getArena()
defer putArena(a)
data := ch.marshal(nil)
if len(data) != marshaledLen {
t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen)
}
var ch2 columnHeader
tail, err := ch2.unmarshal(a, data)
tail, err := ch2.unmarshalNoArena(data)
if err != nil {
t.Fatalf("unexpected error in umarshal(%v): %s", ch, err)
}
@ -365,12 +356,9 @@ func TestColumnHeaderUnmarshalFailure(t *testing.T) {
f := func(data []byte) {
t.Helper()
a := getArena()
defer putArena(a)
dataOrig := append([]byte{}, data...)
var ch columnHeader
tail, err := ch.unmarshal(a, data)
tail, err := ch.unmarshalNoArena(data)
if err == nil {
t.Fatalf("expecting non-nil error")
}

View file

@ -113,14 +113,16 @@ type blockSearch struct {
// sbu is used for unmarshaling local columns
sbu stringsBlockUnmarshaler
// cshBlockCache holds columnsHeader data for the given block.
//
// it is initialized lazily by calling getColumnsHeader().
cshBlockCache []byte
// cshCache is the columnsHeader associated with the given block
//
// it is initialized lazily by calling getColumnsHeader().
cshCache *columnsHeader
// a is used for storing unmarshaled data in cshCached
a arena
// seenStreams contains seen streamIDs for the recent searches.
// It is used for speeding up fetching _stream column.
seenStreams map[u128]string
@ -149,13 +151,13 @@ func (bs *blockSearch) reset() {
bs.sbu.reset()
bs.cshBlockCache = bs.cshBlockCache[:0]
if bs.cshCache != nil {
putColumnsHeader(bs.cshCache)
bs.cshCache = nil
}
bs.a.reset()
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
}
@ -190,25 +192,25 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) {
func (bs *blockSearch) getColumnsHeader() *columnsHeader {
if bs.cshCache == nil {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshCache = getColumnsHeader()
bs.cshCache.initFromBlockHeader(&bs.a, bs.bsw.p, &bs.bsw.bh)
if err := bs.cshCache.unmarshalNoArena(bs.cshBlockCache); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err)
}
}
return bs.cshCache
}
func (csh *columnsHeader) initFromBlockHeader(a *arena, p *part, bh *blockHeader) {
bb := longTermBufPool.Get()
func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte {
columnsHeaderSize := bh.columnsHeaderSize
if columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, columnsHeaderSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize))
p.columnsHeaderFile.MustReadAt(bb.B, int64(bh.columnsHeaderOffset))
if err := csh.unmarshal(a, bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", p.path, err)
}
longTermBufPool.Put(bb)
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(columnsHeaderSize)+dstLen)
p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset))
return dst
}
// getBloomFilterForColumn returns bloom filter for the given ch.

View file

@ -36,7 +36,7 @@ func (f *Field) marshal(dst []byte) []byte {
return dst
}
func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
func (f *Field) unmarshalNoArena(src []byte) ([]byte, error) {
srcOrig := src
// Unmarshal field name
@ -45,7 +45,7 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
return srcOrig, fmt.Errorf("cannot unmarshal field name")
}
src = src[nSize:]
f.Name = a.copyBytesToString(b)
f.Name = bytesutil.ToUnsafeString(b)
// Unmarshal field value
b, nSize = encoding.UnmarshalBytes(src)
@ -53,7 +53,7 @@ func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
return srcOrig, fmt.Errorf("cannot unmarshal field value")
}
src = src[nSize:]
f.Value = a.copyBytesToString(b)
f.Value = bytesutil.ToUnsafeString(b)
return src, nil
}

View file

@ -1116,10 +1116,10 @@ func (vd *valuesDict) marshal(dst []byte) []byte {
return dst
}
// unmarshal unmarshals vd from src.
// unmarshalNoArena unmarshals vd from src.
//
// vd is valid until a.reset() is called.
func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) {
// vd is valid until src is changed.
func (vd *valuesDict) unmarshalNoArena(src []byte) ([]byte, error) {
vd.reset()
srcOrig := src
@ -1135,7 +1135,7 @@ func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) {
}
src = src[nSize:]
v := a.copyBytesToString(data)
v := bytesutil.ToUnsafeString(data)
vd.values = append(vd.values, v)
}
return src, nil