VictoriaMetrics/lib/logstorage/inmemory_part.go
Aliaksandr Valialkin 7a62eefa34
lib/logstorage: dynamically adjust the number of (bloom, values) shards in a part depending on the number of non-const columns
This allows reducing the amounts of data, which must be read during queries over logs with big number of fields (aka "wide events").
This, in turn, improves query performance when the data, which needs to be scanned during the query, doesn't fit OS page cache.
2024-10-29 16:44:45 +01:00

185 lines
4.5 KiB
Go

package logstorage
import (
"path/filepath"
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
// inmemoryPart is an in-memory part.
type inmemoryPart struct {
// ph contains partHeader information for the given in-memory part.
ph partHeader
columnNames bytesutil.ByteBuffer
metaindex bytesutil.ByteBuffer
index bytesutil.ByteBuffer
columnsHeaderIndex bytesutil.ByteBuffer
columnsHeader bytesutil.ByteBuffer
timestamps bytesutil.ByteBuffer
messageBloomValues bloomValuesBuffer
fieldBloomValues bloomValuesBuffer
}
type bloomValuesBuffer struct {
bloom bytesutil.ByteBuffer
values bytesutil.ByteBuffer
}
func (b *bloomValuesBuffer) reset() {
b.bloom.Reset()
b.values.Reset()
}
func (b *bloomValuesBuffer) NewStreamReader() bloomValuesStreamReader {
return bloomValuesStreamReader{
bloom: b.bloom.NewReader(),
values: b.values.NewReader(),
}
}
func (b *bloomValuesBuffer) NewStreamWriter() bloomValuesStreamWriter {
return bloomValuesStreamWriter{
bloom: &b.bloom,
values: &b.values,
}
}
// reset resets mp, so it can be re-used
func (mp *inmemoryPart) reset() {
mp.ph.reset()
mp.columnNames.Reset()
mp.metaindex.Reset()
mp.index.Reset()
mp.columnsHeaderIndex.Reset()
mp.columnsHeader.Reset()
mp.timestamps.Reset()
mp.messageBloomValues.reset()
mp.fieldBloomValues.reset()
}
// mustInitFromRows initializes mp from lr.
func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
mp.reset()
sort.Sort(lr)
bsw := getBlockStreamWriter()
bsw.MustInitForInmemoryPart(mp)
trs := getTmpRows()
var sidPrev *streamID
uncompressedBlockSizeBytes := uint64(0)
timestamps := lr.timestamps
rows := lr.rows
streamIDs := lr.streamIDs
for i := range timestamps {
streamID := &streamIDs[i]
if sidPrev == nil {
sidPrev = streamID
}
if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || !streamID.equal(sidPrev) {
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
trs.reset()
sidPrev = streamID
uncompressedBlockSizeBytes = 0
}
fields := rows[i]
trs.timestamps = append(trs.timestamps, timestamps[i])
trs.rows = append(trs.rows, fields)
uncompressedBlockSizeBytes += uncompressedRowSizeBytes(fields)
}
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
putTmpRows(trs)
bsw.Finalize(&mp.ph)
putBlockStreamWriter(bsw)
}
// MustStoreToDisk stores mp to disk at the given path.
func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
fs.MustWriteSync(columnNamesPath, mp.columnNames.B)
fs.MustWriteSync(metaindexPath, mp.metaindex.B)
fs.MustWriteSync(indexPath, mp.index.B)
fs.MustWriteSync(columnsHeaderIndexPath, mp.columnsHeaderIndex.B)
fs.MustWriteSync(columnsHeaderPath, mp.columnsHeader.B)
fs.MustWriteSync(timestampsPath, mp.timestamps.B)
fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B)
fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B)
bloomPath := getBloomFilePath(path, 0)
fs.MustWriteSync(bloomPath, mp.fieldBloomValues.bloom.B)
valuesPath := getValuesFilePath(path, 0)
fs.MustWriteSync(valuesPath, mp.fieldBloomValues.values.B)
mp.ph.mustWriteMetadata(path)
fs.MustSyncPath(path)
// Do not sync parent directory - it must be synced by the caller.
}
// tmpRows is used as a helper for inmemoryPart.mustInitFromRows()
type tmpRows struct {
timestamps []int64
rows [][]Field
}
func (trs *tmpRows) reset() {
trs.timestamps = trs.timestamps[:0]
rows := trs.rows
for i := range rows {
rows[i] = nil
}
trs.rows = rows[:0]
}
func getTmpRows() *tmpRows {
v := tmpRowsPool.Get()
if v == nil {
return &tmpRows{}
}
return v.(*tmpRows)
}
func putTmpRows(trs *tmpRows) {
trs.reset()
tmpRowsPool.Put(trs)
}
var tmpRowsPool sync.Pool
func getInmemoryPart() *inmemoryPart {
v := inmemoryPartPool.Get()
if v == nil {
return &inmemoryPart{}
}
return v.(*inmemoryPart)
}
func putInmemoryPart(mp *inmemoryPart) {
mp.reset()
inmemoryPartPool.Put(mp)
}
var inmemoryPartPool sync.Pool