This commit is contained in:
Aliaksandr Valialkin 2024-05-11 00:39:12 +02:00
parent 790f4db6f0
commit 4c457cf20f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
15 changed files with 307 additions and 96 deletions

View file

@ -1,9 +1,26 @@
package logstorage
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
func getArena() *arena {
v := arenaPool.Get()
if v == nil {
return &arena{}
}
return v.(*arena)
}
func putArena(a *arena) {
a.reset()
arenaPool.Put(a)
}
var arenaPool sync.Pool
type arena struct {
b []byte
}
@ -12,6 +29,10 @@ func (a *arena) reset() {
a.b = a.b[:0]
}
func (a *arena) sizeBytes() int {
return len(a.b)
}
func (a *arena) copyBytes(b []byte) []byte {
ab := a.b
abLen := len(ab)
@ -21,6 +42,16 @@ func (a *arena) copyBytes(b []byte) []byte {
return result
}
func (a *arena) copyBytesToString(b []byte) string {
bCopy := a.copyBytes(b)
return bytesutil.ToUnsafeString(bCopy)
}
func (a *arena) copyString(s string) string {
b := bytesutil.ToUnsafeBytes(s)
return a.copyBytesToString(b)
}
func (a *arena) newBytes(size int) []byte {
ab := a.b
abLen := len(ab)

View file

@ -0,0 +1,80 @@
package logstorage
import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
func TestArena(t *testing.T) {
values := []string{"foo", "bar", "", "adsfjkljsdfdsf", "dsfsopq", "io234"}
for i := 0; i < 10; i++ {
a := getArena()
if n := a.sizeBytes(); n != 0 {
t.Fatalf("unexpected non-zero size of empty arena: %d", n)
}
// add values to arena
valuesCopy := make([]string, len(values))
valuesLen := 0
for j, v := range values {
vCopy := a.copyString(v)
if vCopy != v {
t.Fatalf("unexpected value; got %q; want %q", vCopy, v)
}
valuesCopy[j] = vCopy
valuesLen += len(v)
}
// verify that the values returned from arena match the original values
for j, v := range values {
vCopy := valuesCopy[j]
if vCopy != v {
t.Fatalf("unexpected value; got %q; want %q", vCopy, v)
}
}
if n := a.sizeBytes(); n != valuesLen {
t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen)
}
// Try allocating slices with different lengths
bs := make([]string, 100)
for j := range bs {
b := a.newBytes(j)
if len(b) != j {
t.Fatalf("unexpected len(b); got %d; want %d", len(b), j)
}
valuesLen += j
if n := a.sizeBytes(); n != valuesLen {
t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen)
}
for k := range b {
b[k] = byte(k)
}
bs[j] = bytesutil.ToUnsafeString(b)
}
// verify that the allocated slices didn't change
for j, v := range bs {
b := make([]byte, j)
for k := 0; k < j; k++ {
b[k] = byte(k)
}
if v != string(b) {
t.Fatalf("unexpected value at index %d; got %X; want %X", j, v, b)
}
}
// verify that the values returned from arena match the original values
for j, v := range values {
vCopy := valuesCopy[j]
if vCopy != v {
t.Fatalf("unexpected value; got %q; want %q", vCopy, v)
}
}
putArena(a)
}
}

View file

@ -120,11 +120,8 @@ type column struct {
func (c *column) reset() {
c.name = ""
values := c.values
for i := range values {
values[i] = ""
}
c.values = values[:0]
clear(c.values)
c.values = c.values[:0]
}
func (c *column) canStoreInConstColumn() bool {
@ -155,7 +152,9 @@ func (c *column) resizeValues(valuesLen int) []string {
}
// mustWriteTo writes c to sw and updates ch accordingly.
func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
//
// ch is valid until a.reset() is called.
func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
ch.reset()
valuesWriter := &sw.fieldValuesWriter
@ -165,7 +164,7 @@ func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = c.name
ch.name = a.copyString(c.name)
// encode values
ve := getValuesEncoder()
@ -226,6 +225,8 @@ func (b *block) assertValid() {
// MustInitFromRows initializes b from the given timestamps and rows.
//
// It is expected that timestamps are sorted.
//
// b is valid until timestamps and rows are changed.
func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) {
b.reset()
@ -235,6 +236,9 @@ func (b *block) MustInitFromRows(timestamps []int64, rows [][]Field) {
b.sortColumnsByName()
}
// mustInitiFromRows initializes b from rows.
//
// b is valid until rows are changed.
func (b *block) mustInitFromRows(rows [][]Field) {
rowsLen := len(rows)
if rowsLen == 0 {
@ -424,7 +428,7 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v
for i := range cds {
cd := &cds[i]
c := &cs[i]
c.name = cd.name
c.name = sbu.copyString(cd.name)
c.values, err = sbu.unmarshal(c.values[:0], cd.valuesData, uint64(rowsCount))
if err != nil {
return fmt.Errorf("cannot unmarshal column %d: %w", i, err)
@ -435,12 +439,12 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v
}
// unmarshal constColumns
b.constColumns = append(b.constColumns[:0], bd.constColumns...)
b.constColumns = sbu.appendFields(b.constColumns[:0], bd.constColumns)
return nil
}
// mustWriteTo writes b with the given sid to sw and updates bh accordingly
// mustWriteTo writes b with the given sid to sw and updates bh accordingly.
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
// Do not store the version used for encoding directly in the block data, since:
// - all the blocks in the same part use the same encoding
@ -458,16 +462,22 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
// Marshal columns
cs := b.columns
a := getArena()
csh := getColumnsHeader()
chs := csh.resizeColumnHeaders(len(cs))
for i := range cs {
cs[i].mustWriteTo(&chs[i], sw)
cs[i].mustWriteTo(a, &chs[i], sw)
}
csh.constColumns = append(csh.constColumns[:0], b.constColumns...)
csh.constColumns = appendFields(a, csh.constColumns[:0], b.constColumns)
bb := longTermBufPool.Get()
bb.B = csh.marshal(bb.B)
putColumnsHeader(csh)
putArena(a)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
@ -489,13 +499,7 @@ func (b *block) appendRowsTo(dst *rows) {
for i := range b.timestamps {
fieldsLen := len(fieldsBuf)
// copy const columns
for j := range ccs {
cc := &ccs[j]
fieldsBuf = append(fieldsBuf, Field{
Name: cc.Name,
Value: cc.Value,
})
}
fieldsBuf = append(fieldsBuf, ccs...)
// copy other columns
for j := range cs {
c := &cs[j]

View file

@ -29,11 +29,6 @@ type blockData struct {
// constColumns contains data for const columns across the block
constColumns []Field
// a is used for storing byte slices for timestamps and columns.
//
// It reduces fragmentation for them.
a arena
}
// reset resets bd for subsequent re-use
@ -54,8 +49,6 @@ func (bd *blockData) reset() {
ccs[i].Reset()
}
bd.constColumns = ccs[:0]
bd.a.reset()
}
func (bd *blockData) resizeColumnsData(columnsDataLen int) []columnData {
@ -69,27 +62,29 @@ func (bd *blockData) resizeColumnsData(columnsDataLen int) []columnData {
}
// copyFrom copies src to bd.
func (bd *blockData) copyFrom(src *blockData) {
//
// bd is valid until a.reset() is called.
func (bd *blockData) copyFrom(a *arena, src *blockData) {
bd.reset()
bd.streamID = src.streamID
bd.uncompressedSizeBytes = src.uncompressedSizeBytes
bd.rowsCount = src.rowsCount
bd.timestampsData.copyFrom(&src.timestampsData, &bd.a)
bd.timestampsData.copyFrom(a, &src.timestampsData)
cdsSrc := src.columnsData
cds := bd.resizeColumnsData(len(cdsSrc))
for i := range cds {
cds[i].copyFrom(&cdsSrc[i], &bd.a)
cds[i].copyFrom(a, &cdsSrc[i])
}
bd.columnsData = cds
bd.constColumns = append(bd.constColumns[:0], src.constColumns...)
bd.constColumns = appendFields(a, bd.constColumns[:0], src.constColumns)
}
// unmarshalRows appends unmarshaled from bd log entries to dst.
//
// The returned log entries are valid until sbu and vd are valid.
// The unmarshaled log entries are valid until sbu and vd are reset.
func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *valuesDecoder) error {
b := getBlock()
defer putBlock(b)
@ -101,7 +96,7 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *
return nil
}
// mustWriteTo writes bd with the given sid to sw and updates bh accordingly
// mustWriteTo writes bd to sw and updates bh accordingly
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
// Do not store the version used for encoding directly in the block data, since:
// - all the blocks in the same part use the same encoding
@ -118,16 +113,20 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
// Marshal columns
cds := bd.columnsData
a := getArena()
csh := getColumnsHeader()
chs := csh.resizeColumnHeaders(len(cds))
for i := range cds {
cds[i].mustWriteTo(&chs[i], sw)
cds[i].mustWriteTo(a, &chs[i], sw)
}
csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)
csh.constColumns = appendFields(a, csh.constColumns[:0], bd.constColumns)
bb := longTermBufPool.Get()
bb.B = csh.marshal(bb.B)
putColumnsHeader(csh)
putArena(a)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
@ -138,7 +137,9 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
}
// mustReadFrom reads block data associated with bh from sr to bd.
func (bd *blockData) mustReadFrom(bh *blockHeader, sr *streamReaders) {
//
// The bd is valid until a.reset() is called.
func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) {
bd.reset()
bd.streamID = bh.streamID
@ -146,7 +147,7 @@ func (bd *blockData) mustReadFrom(bh *blockHeader, sr *streamReaders) {
bd.rowsCount = bh.rowsCount
// Read timestamps
bd.timestampsData.mustReadFrom(&bh.timestampsHeader, sr, &bd.a)
bd.timestampsData.mustReadFrom(a, &bh.timestampsHeader, sr)
// Read columns
if bh.columnsHeaderOffset != sr.columnsHeaderReader.bytesRead {
@ -161,18 +162,20 @@ func (bd *blockData) mustReadFrom(bh *blockHeader, sr *streamReaders) {
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize))
sr.columnsHeaderReader.MustReadFull(bb.B)
cshA := getArena()
csh := getColumnsHeader()
if err := csh.unmarshal(bb.B); err != nil {
if err := csh.unmarshal(cshA, bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err)
}
longTermBufPool.Put(bb)
chs := csh.columnHeaders
cds := bd.resizeColumnsData(len(chs))
for i := range chs {
cds[i].mustReadFrom(&chs[i], sr, &bd.a)
cds[i].mustReadFrom(a, &chs[i], sr)
}
bd.constColumns = append(bd.constColumns[:0], csh.constColumns...)
bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns)
putColumnsHeader(csh)
putArena(cshA)
}
// timestampsData contains the encoded timestamps data.
@ -199,7 +202,9 @@ func (td *timestampsData) reset() {
}
// copyFrom copies src to td.
func (td *timestampsData) copyFrom(src *timestampsData, a *arena) {
//
// td is valid until a.reset() is called.
func (td *timestampsData) copyFrom(a *arena, src *timestampsData) {
td.reset()
td.data = a.copyBytes(src.data)
@ -224,7 +229,9 @@ func (td *timestampsData) mustWriteTo(th *timestampsHeader, sw *streamWriters) {
}
// mustReadFrom reads timestamps data associated with th from sr to td.
func (td *timestampsData) mustReadFrom(th *timestampsHeader, sr *streamReaders, a *arena) {
//
// td is valid until a.reset() is called.
func (td *timestampsData) mustReadFrom(a *arena, th *timestampsHeader, sr *streamReaders) {
td.reset()
td.marshalType = th.marshalType
@ -287,22 +294,26 @@ func (cd *columnData) reset() {
}
// copyFrom copies src to cd.
func (cd *columnData) copyFrom(src *columnData, a *arena) {
//
// cd is valid until a.reset() is called.
func (cd *columnData) copyFrom(a *arena, src *columnData) {
cd.reset()
cd.name = src.name
cd.name = a.copyString(src.name)
cd.valueType = src.valueType
cd.minValue = src.minValue
cd.maxValue = src.maxValue
cd.valuesDict.copyFrom(&src.valuesDict)
cd.valuesDict.copyFrom(a, &src.valuesDict)
cd.valuesData = a.copyBytes(src.valuesData)
cd.bloomFilterData = a.copyBytes(src.bloomFilterData)
}
// mustWriteTo writes cd to sw and updates ch accordingly.
func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) {
//
// ch is valid until a.reset() is called.
func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
ch.reset()
valuesWriter := &sw.fieldValuesWriter
@ -312,12 +323,12 @@ func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) {
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = cd.name
ch.name = a.copyString(cd.name)
ch.valueType = cd.valueType
ch.minValue = cd.minValue
ch.maxValue = cd.maxValue
ch.valuesDict.copyFrom(&cd.valuesDict)
ch.valuesDict.copyFrom(a, &cd.valuesDict)
// marshal values
ch.valuesSize = uint64(len(cd.valuesData))
@ -337,7 +348,9 @@ func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) {
}
// mustReadFrom reads columns data associated with ch from sr to cd.
func (cd *columnData) mustReadFrom(ch *columnHeader, sr *streamReaders, a *arena) {
//
// cd is valid until a.reset() is called.
func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders) {
cd.reset()
valuesReader := &sr.fieldValuesReader
@ -347,12 +360,12 @@ func (cd *columnData) mustReadFrom(ch *columnHeader, sr *streamReaders, a *arena
bloomFilterReader = &sr.messageBloomFilterReader
}
cd.name = ch.name
cd.name = a.copyString(ch.name)
cd.valueType = ch.valueType
cd.minValue = ch.minValue
cd.maxValue = ch.maxValue
cd.valuesDict.copyFrom(&ch.valuesDict)
cd.valuesDict.copyFrom(a, &ch.valuesDict)
// read values
if ch.valuesOffset != valuesReader.bytesRead {

View file

@ -51,20 +51,23 @@ func TestBlockDataReset(t *testing.T) {
func TestBlockDataCopyFrom(t *testing.T) {
f := func(bd *blockData) {
t.Helper()
a := getArena()
defer putArena(a)
var bd2 blockData
bd2.copyFrom(bd)
bd2.a.b = nil
bd2.copyFrom(a, bd)
if !reflect.DeepEqual(bd, &bd2) {
t.Fatalf("unexpected blockData copy\ngot\n%v\nwant\n%v", &bd2, bd)
}
// Try copying it again to the same destination
bd2.copyFrom(bd)
bd2.a.b = nil
bd2.copyFrom(a, bd)
if !reflect.DeepEqual(bd, &bd2) {
t.Fatalf("unexpected blockData copy to the same destination\ngot\n%v\nwant\n%v", &bd2, bd)
}
}
f(&blockData{})
bd := &blockData{

View file

@ -298,7 +298,10 @@ func (csh *columnsHeader) marshal(dst []byte) []byte {
return dst
}
func (csh *columnsHeader) unmarshal(src []byte) error {
// unmarshal unmarshals csh from src.
//
// csh is valid until a.reset() is called.
func (csh *columnsHeader) unmarshal(a *arena, src []byte) error {
csh.reset()
// unmarshal columnHeaders
@ -312,7 +315,7 @@ func (csh *columnsHeader) unmarshal(src []byte) error {
src = tail
chs := csh.resizeColumnHeaders(int(n))
for i := range chs {
tail, err = chs[i].unmarshal(src)
tail, err = chs[i].unmarshal(a, src)
if err != nil {
return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err)
}
@ -331,7 +334,7 @@ func (csh *columnsHeader) unmarshal(src []byte) error {
src = tail
ccs := csh.resizeConstColumns(int(n))
for i := range ccs {
tail, err = ccs[i].unmarshal(src)
tail, err = ccs[i].unmarshal(a, src)
if err != nil {
return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err)
}
@ -357,7 +360,7 @@ func (csh *columnsHeader) unmarshal(src []byte) error {
//
// Tokens in bloom filter depend on valueType:
//
// - valueTypeString stores lowercased tokens seen in all the values
// - valueTypeString stores tokens seen in all the values
// - valueTypeDict doesn't store anything in the bloom filter, since all the encoded values
// are available directly in the valuesDict field
// - valueTypeUint8, valueTypeUint16, valueTypeUint32 and valueTypeUint64 stores encoded uint values
@ -502,7 +505,9 @@ func (ch *columnHeader) marshalBloomFilters(dst []byte) []byte {
}
// unmarshal unmarshals ch from src and returns the tail left after unmarshaling.
func (ch *columnHeader) unmarshal(src []byte) ([]byte, error) {
//
// ch is valid until a.reset() is called.
func (ch *columnHeader) unmarshal(a *arena, src []byte) ([]byte, error) {
ch.reset()
srcOrig := src
@ -512,8 +517,7 @@ func (ch *columnHeader) unmarshal(src []byte) ([]byte, error) {
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal column name: %w", err)
}
// Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod
ch.name = string(data)
ch.name = a.copyBytesToString(data)
src = tail
// Unmarshal value type
@ -532,7 +536,7 @@ func (ch *columnHeader) unmarshal(src []byte) ([]byte, error) {
}
src = tail
case valueTypeDict:
tail, err = ch.valuesDict.unmarshal(src)
tail, err = ch.valuesDict.unmarshal(a, src)
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal dict at valueTypeDict for column %q: %w", ch.name, err)
}

View file

@ -55,12 +55,16 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
func TestColumnsHeaderMarshalUnmarshal(t *testing.T) {
f := func(csh *columnsHeader, marshaledLen int) {
t.Helper()
a := getArena()
defer putArena(a)
data := csh.marshal(nil)
if len(data) != marshaledLen {
t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen)
}
csh2 := &columnsHeader{}
err := csh2.unmarshal(data)
err := csh2.unmarshal(a, data)
if err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err)
}
@ -68,6 +72,7 @@ func TestColumnsHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", csh2, csh)
}
}
f(&columnsHeader{}, 2)
f(&columnsHeader{
columnHeaders: []columnHeader{
@ -149,13 +154,18 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) {
func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
f := func(data []byte) {
t.Helper()
a := getArena()
defer putArena(a)
csh := getColumnsHeader()
defer putColumnsHeader(csh)
err := csh.unmarshal(data)
err := csh.unmarshal(a, data)
if err == nil {
t.Fatalf("expecting non-nil error")
}
}
f(nil)
f([]byte("foo"))
@ -315,12 +325,16 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) {
func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
f := func(ch *columnHeader, marshaledLen int) {
t.Helper()
a := getArena()
defer putArena(a)
data := ch.marshal(nil)
if len(data) != marshaledLen {
t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen)
}
var ch2 columnHeader
tail, err := ch2.unmarshal(data)
tail, err := ch2.unmarshal(a, data)
if err != nil {
t.Fatalf("unexpected error in umarshal(%v): %s", ch, err)
}
@ -331,6 +345,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected columnHeader after unmarshal;\ngot\n%v\nwant\n%v", &ch2, ch)
}
}
f(&columnHeader{
name: "foo",
valueType: valueTypeUint8,
@ -349,9 +364,13 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
func TestColumnHeaderUnmarshalFailure(t *testing.T) {
f := func(data []byte) {
t.Helper()
a := getArena()
defer putArena(a)
dataOrig := append([]byte{}, data...)
var ch columnHeader
tail, err := ch.unmarshal(data)
tail, err := ch.unmarshal(a, data)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -359,6 +378,7 @@ func TestColumnHeaderUnmarshalFailure(t *testing.T) {
t.Fatalf("unexpected tail left; got %q; want %q", tail, dataOrig)
}
}
f(nil)
f([]byte("foo"))

View file

@ -120,6 +120,12 @@ func (br *blockResult) cloneValues(values []string) []string {
return valuesBuf[valuesBufLen:]
}
func (br *blockResult) copyString(s string) string {
bufLen := len(br.buf)
br.buf = append(br.buf, s...)
return bytesutil.ToUnsafeString(br.buf[bufLen:])
}
// sizeBytes returns the size of br in bytes.
func (br *blockResult) sizeBytes() int {
n := int(unsafe.Sizeof(*br))
@ -149,13 +155,13 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) {
if areConstValues(rc.values) {
// This optimization allows reducing memory usage after br cloning
csBuf = append(csBuf, blockResultColumn{
name: rc.name,
name: br.copyString(rc.name),
isConst: true,
encodedValues: rc.values[:1],
})
} else {
csBuf = append(csBuf, blockResultColumn{
name: rc.name,
name: br.copyString(rc.name),
valueType: valueTypeString,
encodedValues: rc.values,
})
@ -373,9 +379,13 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap)
}
dictValues = valuesBuf[valuesBufLen:]
name := getCanonicalColumnName(ch.name)
// copy ch.name to buf
bufLen := len(buf)
buf = append(buf, ch.name...)
name := bytesutil.ToUnsafeString(buf[bufLen:])
br.csBuf = append(br.csBuf, blockResultColumn{
name: name,
name: getCanonicalColumnName(name),
valueType: ch.valueType,
dictValues: dictValues,
encodedValues: encodedValues,
@ -417,21 +427,18 @@ func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
}
func (br *blockResult) addConstColumn(name, value string) {
buf := br.buf
bufLen := len(buf)
buf = append(buf, value...)
s := bytesutil.ToUnsafeString(buf[bufLen:])
br.buf = buf
value = br.copyString(value)
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
valuesBuf = append(valuesBuf, s)
valuesBuf = append(valuesBuf, value)
br.valuesBuf = valuesBuf
encodedValues := valuesBuf[valuesBufLen:]
br.csBuf = append(br.csBuf, blockResultColumn{
name: name,
name: br.copyString(name),
isConst: true,
encodedValues: valuesBuf[valuesBufLen:],
encodedValues: encodedValues,
})
br.csInitialized = false
}
@ -1265,7 +1272,7 @@ type blockResultColumn struct {
func (c *blockResultColumn) clone(br *blockResult) blockResultColumn {
var cNew blockResultColumn
cNew.name = c.name
cNew.name = br.copyString(c.name)
cNew.isConst = c.isConst
cNew.isTime = c.isTime
cNew.valueType = c.valueType

View file

@ -63,6 +63,9 @@ type blockSearch struct {
// csh is the columnsHeader associated with the given block
csh columnsHeader
// a is used for storing unmarshaled data in csh
a arena
}
func (bs *blockSearch) reset() {
@ -88,6 +91,7 @@ func (bs *blockSearch) reset() {
bs.sbu.reset()
bs.csh.reset()
bs.a.reset()
}
func (bs *blockSearch) partPath() string {
@ -99,7 +103,7 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
bs.bsw = bsw
bs.csh.initFromBlockHeader(bsw.p, &bsw.bh)
bs.csh.initFromBlockHeader(&bs.a, bsw.p, &bsw.bh)
// search rows matching the given filter
bm := getBitmap(int(bsw.bh.rowsCount))
@ -122,7 +126,7 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
}
}
func (csh *columnsHeader) initFromBlockHeader(p *part, bh *blockHeader) {
func (csh *columnsHeader) initFromBlockHeader(a *arena, p *part, bh *blockHeader) {
bb := longTermBufPool.Get()
columnsHeaderSize := bh.columnsHeaderSize
if columnsHeaderSize > maxColumnsHeaderSize {
@ -131,7 +135,7 @@ func (csh *columnsHeader) initFromBlockHeader(p *part, bh *blockHeader) {
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(columnsHeaderSize))
p.columnsHeaderFile.MustReadAt(bb.B, int64(bh.columnsHeaderOffset))
if err := csh.unmarshal(bb.B); err != nil {
if err := csh.unmarshal(a, bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", p.path, err)
}
longTermBufPool.Put(bb)

View file

@ -59,6 +59,9 @@ type blockStreamMerger struct {
// bd is unpacked into rows when needed.
bd blockData
// a holds bd data.
a arena
// rows is pending log entries.
rows rows
@ -99,6 +102,7 @@ func (bsm *blockStreamMerger) resetRows() {
bsm.vd = nil
}
bsm.bd.reset()
bsm.a.reset()
bsm.rows.reset()
bsm.rowsTmp.reset()
@ -138,7 +142,8 @@ func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWrit
bsw.MustWriteBlockData(bd)
} else {
// Slow path - copy the bd to the curr bd.
bsm.bd.copyFrom(bd)
bsm.a.reset()
bsm.bd.copyFrom(&bsm.a, bd)
bsm.uniqueFields = uniqueFields
}
case bsm.uniqueFields+uniqueFields >= maxColumnsPerBlock:
@ -150,7 +155,8 @@ func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWrit
if uniqueFields >= maxColumnsPerBlock {
bsw.MustWriteBlockData(bd)
} else {
bsm.bd.copyFrom(bd)
bsm.a.reset()
bsm.bd.copyFrom(&bsm.a, bd)
bsm.uniqueFields = uniqueFields
}
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
@ -218,6 +224,7 @@ func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
// Unmarshal log entries from bsm.bd
bsm.mustUnmarshalRows(&bsm.bd)
bsm.bd.reset()
bsm.a.reset()
}
// Unmarshal log entries from bd

View file

@ -112,6 +112,9 @@ type blockStreamReader struct {
// blockData contains the data for the last read block
blockData blockData
// a contains data for blockData
a arena
// ph is the header for the part
ph partHeader
@ -149,6 +152,7 @@ type blockStreamReader struct {
// reset resets bsr, so it can be re-used
func (bsr *blockStreamReader) reset() {
bsr.blockData.reset()
bsr.a.reset()
bsr.ph.reset()
bsr.streamReaders.reset()
@ -247,6 +251,8 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// NextBlock reads the next block from bsr and puts it into bsr.blockData.
//
// false is returned if there are no other blocks.
//
// bsr.blockData is valid until the next call to NextBlock().
func (bsr *blockStreamReader) NextBlock() bool {
for bsr.nextBlockIdx >= len(bsr.blockHeaders) {
if !bsr.nextIndexBlock() {
@ -275,7 +281,8 @@ func (bsr *blockStreamReader) NextBlock() bool {
}
// Read bsr.blockData
bsr.blockData.mustReadFrom(bh, &bsr.streamReaders)
bsr.a.reset()
bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders)
bsr.globalUncompressedSizeBytes += bh.uncompressedSizeBytes
bsr.globalRowsCount += bh.rowsCount

View file

@ -14,7 +14,7 @@ const maxUncompressedBlockSize = 2 * 1024 * 1024
const maxRowsPerBlock = 8 * 1024 * 1024
// maxColumnsPerBlock is the maximum number of columns per block.
const maxColumnsPerBlock = 2_000
const maxColumnsPerBlock = 1_000
// MaxFieldNameSize is the maximum size in bytes for field name.
//

View file

@ -47,6 +47,22 @@ func (sbu *stringsBlockUnmarshaler) reset() {
sbu.data = sbu.data[:0]
}
func (sbu *stringsBlockUnmarshaler) copyString(s string) string {
dataLen := len(sbu.data)
sbu.data = append(sbu.data, s...)
return bytesutil.ToUnsafeString(sbu.data[dataLen:])
}
func (sbu *stringsBlockUnmarshaler) appendFields(dst, src []Field) []Field {
for _, f := range src {
dst = append(dst, Field{
Name: sbu.copyString(f.Name),
Value: sbu.copyString(f.Value),
})
}
return dst
}
// unmarshal unmarshals itemsCount strings from src, appends them to dst and returns the result.
//
// The returned strings are valid until sbu.reset() call.

View file

@ -34,7 +34,7 @@ func (f *Field) marshal(dst []byte) []byte {
return dst
}
func (f *Field) unmarshal(src []byte) ([]byte, error) {
func (f *Field) unmarshal(a *arena, src []byte) ([]byte, error) {
srcOrig := src
// Unmarshal field name
@ -42,8 +42,7 @@ func (f *Field) unmarshal(src []byte) ([]byte, error) {
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal field name: %w", err)
}
// Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod
f.Name = string(b)
f.Name = a.copyBytesToString(b)
src = tail
// Unmarshal field value
@ -51,13 +50,22 @@ func (f *Field) unmarshal(src []byte) ([]byte, error) {
if err != nil {
return srcOrig, fmt.Errorf("cannot unmarshal field value: %w", err)
}
// Do not use bytesutil.InternBytes(b) here, since it works slower than the string(b) in prod
f.Value = string(b)
f.Value = a.copyBytesToString(b)
src = tail
return src, nil
}
func appendFields(a *arena, dst, src []Field) []Field {
for _, f := range src {
dst = append(dst, Field{
Name: a.copyString(f.Name),
Value: a.copyString(f.Value),
})
}
return dst
}
// rows is an aux structure used during rows merge
type rows struct {
fieldsBuf []Field

View file

@ -1074,10 +1074,15 @@ func (vd *valuesDict) reset() {
vd.values = vd.values[:0]
}
func (vd *valuesDict) copyFrom(src *valuesDict) {
func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) {
vd.reset()
vd.values = append(vd.values[:0], src.values...)
dstValues := vd.values
for _, v := range src.values {
v = a.copyString(v)
dstValues = append(dstValues, v)
}
vd.values = dstValues
}
func (vd *valuesDict) getOrAdd(k string) (byte, bool) {
@ -1113,7 +1118,10 @@ func (vd *valuesDict) marshal(dst []byte) []byte {
return dst
}
func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) {
// unmarshal unmarshals vd from src.
//
// vd is valid until a.reset() is called.
func (vd *valuesDict) unmarshal(a *arena, src []byte) ([]byte, error) {
vd.reset()
srcOrig := src
@ -1129,8 +1137,7 @@ func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) {
}
src = tail
// Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod
v := string(data)
v := a.copyBytesToString(data)
vd.values = append(vd.values, v)
}
return src, nil