lib/logstorage: make sure that the data for every log field is stored in a separate file until the number of files is smaller than 256

This should improve query performance for logs with hundreds of fields (aka wide events).
Previously there was a high chance that the data for multiple log fields is stored in the same file.
This could result in query performance slowdown and/or increased disk read IO,
since the operating system could read unnecessary data for the fields, which aren't used in the query.

Now log fields are guaranteed to be stored in separate files until the number of fields exceeds 256.
After that multiple log fields start sharing files.

(cherry picked from commit 9bb5ba5d2f)
This commit is contained in:
Aliaksandr Valialkin 2025-02-19 01:40:39 +01:00 committed by hagen1778
parent 0cd8591700
commit a842114070
No known key found for this signature in database
GPG key ID: E92986095E0DD614
13 changed files with 277 additions and 147 deletions

View file

@ -16,6 +16,11 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
**Update note: this release changes data storage format in backwards-incompatible way, so it is impossible to downgrade to the previous releases after upgrading to this release.
It is safe upgrading to this release from older releases.**
* FEATURE: improve per-field data locality on disk. This reduces overhead related to reading data from unrelated fields during queries. This improves query performance over structured logs with big number of fields (aka [wide events](https://jeremymorrell.dev/blog/a-practitioners-guide-to-wide-events/)) when only a small portion of fields are used in the query.
## [v1.10.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.10.1-victorialogs)
Released at 2025-02-14

View file

@ -473,7 +473,7 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v
}
// mustWriteTo writes b with the given sid to sw and updates bh accordingly.
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
b.assertValid()
bh.reset()
@ -485,17 +485,18 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g
mustWriteTimestampsTo(&bh.timestampsHeader, b.timestamps, sw)
// Marshal columns
cs := b.columns
csh := getColumnsHeader()
cs := b.columns
chs := csh.resizeColumnHeaders(len(cs))
for i := range cs {
cs[i].mustWriteTo(&chs[i], sw)
}
csh.constColumns = append(csh.constColumns[:0], b.constColumns...)
csh.mustWriteTo(bh, sw, g)
csh.mustWriteTo(bh, sw)
putColumnsHeader(csh)
}

View file

@ -93,7 +93,7 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *
}
// mustWriteTo writes bd to sw and updates bh accordingly
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
bh.reset()
bh.streamID = bd.streamID
@ -114,7 +114,7 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNa
}
csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)
csh.mustWriteTo(bh, sw, g)
csh.mustWriteTo(bh, sw)
putColumnsHeader(csh)
}
@ -122,7 +122,7 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNa
// mustReadFrom reads block data associated with bh from sr to bd.
//
// The bd is valid until a.reset() is called.
func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders, partFormatVersion uint, columnNames []string) {
func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) {
bd.reset()
bd.streamID = bh.streamID
@ -146,24 +146,24 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders,
sr.columnsHeaderReader.MustReadFull(bb.B)
csh := getColumnsHeader()
if err := csh.unmarshalNoArena(bb.B, partFormatVersion); err != nil {
if err := csh.unmarshalNoArena(bb.B, sr.partFormatVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err)
}
if partFormatVersion >= 1 {
readColumnNamesFromColumnsHeaderIndex(bh, sr, csh, columnNames)
if sr.partFormatVersion >= 1 {
readColumnNamesFromColumnsHeaderIndex(bh, sr, csh)
}
chs := csh.columnHeaders
cds := bd.resizeColumnsData(len(chs))
for i := range chs {
cds[i].mustReadFrom(a, &chs[i], sr, partFormatVersion)
cds[i].mustReadFrom(a, &chs[i], sr)
}
bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns)
putColumnsHeader(csh)
longTermBufPool.Put(bb)
}
func readColumnNamesFromColumnsHeaderIndex(bh *blockHeader, sr *streamReaders, csh *columnsHeader, columnNames []string) {
func readColumnNamesFromColumnsHeaderIndex(bh *blockHeader, sr *streamReaders, csh *columnsHeader) {
bb := longTermBufPool.Get()
defer longTermBufPool.Put(bb)
@ -179,7 +179,7 @@ func readColumnNamesFromColumnsHeaderIndex(bh *blockHeader, sr *streamReaders, c
if err := cshIndex.unmarshalNoArena(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeaderIndex: %s", sr.columnsHeaderIndexReader.Path(), err)
}
if err := csh.setColumnNames(cshIndex, columnNames); err != nil {
if err := csh.setColumnNames(cshIndex, sr.columnNames); err != nil {
logger.Panicf("FATAL: %s: %s", sr.columnsHeaderIndexReader.Path(), err)
}
@ -353,7 +353,7 @@ func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) {
// mustReadFrom reads columns data associated with ch from sr to cd.
//
// cd is valid until a.reset() is called.
func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders, partFormatVersion uint) {
func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders) {
cd.reset()
cd.name = a.copyString(ch.name)
@ -363,7 +363,7 @@ func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders
cd.maxValue = ch.maxValue
cd.valuesDict.copyFrom(a, &ch.valuesDict)
bloomValuesReader := sr.getBloomValuesReaderForColumnName(ch.name, partFormatVersion)
bloomValuesReader := sr.getBloomValuesReaderForColumnName(ch.name)
// read values
if ch.valuesOffset != bloomValuesReader.values.bytesRead {

View file

@ -419,13 +419,13 @@ func (csh *columnsHeader) setColumnNames(cshIndex *columnsHeaderIndex, columnNam
return nil
}
func (csh *columnsHeader) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
func (csh *columnsHeader) mustWriteTo(bh *blockHeader, sw *streamWriters) {
bb := longTermBufPool.Get()
defer longTermBufPool.Put(bb)
cshIndex := getColumnsHeaderIndex()
bb.B = csh.marshal(bb.B, cshIndex, g)
bb.B = csh.marshal(bb.B, cshIndex, &sw.columnNameIDGenerator)
columnsHeaderData := bb.B
bb.B = cshIndex.marshal(bb.B)

View file

@ -55,7 +55,10 @@ func (r *readerWithStats) MustClose() {
// streamReaders contains readers for blockStreamReader
type streamReaders struct {
partFormatVersion uint
columnNamesReader readerWithStats
columnIdxsReader readerWithStats
metaindexReader readerWithStats
indexReader readerWithStats
columnsHeaderIndexReader readerWithStats
@ -65,6 +68,12 @@ type streamReaders struct {
messageBloomValuesReader bloomValuesReader
oldBloomValuesReader bloomValuesReader
bloomValuesShards []bloomValuesReader
// columnIdxs contains bloomValuesShards indexes for column names seen in the part
columnIdxs map[string]uint64
// columnNames constains id->columnName mapping for all the columns seen in the part
columnNames []string
}
type bloomValuesReader struct {
@ -97,7 +106,10 @@ type bloomValuesStreamReader struct {
}
func (sr *streamReaders) reset() {
sr.partFormatVersion = 0
sr.columnNamesReader.reset()
sr.columnIdxsReader.reset()
sr.metaindexReader.reset()
sr.indexReader.reset()
sr.columnsHeaderIndexReader.reset()
@ -110,12 +122,19 @@ func (sr *streamReaders) reset() {
sr.bloomValuesShards[i].reset()
}
sr.bloomValuesShards = sr.bloomValuesShards[:0]
sr.columnIdxs = nil
sr.columnNames = nil
}
func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser,
func (sr *streamReaders) init(partFormatVersion uint, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser,
messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader,
) {
sr.partFormatVersion = partFormatVersion
sr.columnNamesReader.init(columnNamesReader)
sr.columnIdxsReader.init(columnIdxsReader)
sr.metaindexReader.init(metaindexReader)
sr.indexReader.init(indexReader)
sr.columnsHeaderIndexReader.init(columnsHeaderIndexReader)
@ -129,12 +148,20 @@ func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, c
for i := range sr.bloomValuesShards {
sr.bloomValuesShards[i].init(bloomValuesShards[i])
}
if partFormatVersion >= 1 {
sr.columnNames, _ = mustReadColumnNames(&sr.columnNamesReader)
}
if partFormatVersion >= 3 {
sr.columnIdxs = mustReadColumnIdxs(&sr.columnIdxsReader, sr.columnNames, uint64(len(bloomValuesShards)))
}
}
func (sr *streamReaders) totalBytesRead() uint64 {
n := uint64(0)
n += sr.columnNamesReader.bytesRead
n += sr.columnIdxsReader.bytesRead
n += sr.metaindexReader.bytesRead
n += sr.indexReader.bytesRead
n += sr.columnsHeaderIndexReader.bytesRead
@ -152,6 +179,7 @@ func (sr *streamReaders) totalBytesRead() uint64 {
func (sr *streamReaders) MustClose() {
sr.columnNamesReader.MustClose()
sr.columnIdxsReader.MustClose()
sr.metaindexReader.MustClose()
sr.indexReader.MustClose()
sr.columnsHeaderIndexReader.MustClose()
@ -165,21 +193,28 @@ func (sr *streamReaders) MustClose() {
}
}
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string, partFormatVersion uint) *bloomValuesReader {
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string) *bloomValuesReader {
if name == "" {
return &sr.messageBloomValuesReader
}
if partFormatVersion < 1 {
if sr.partFormatVersion < 1 {
return &sr.oldBloomValuesReader
}
n := len(sr.bloomValuesShards)
idx := uint64(0)
if n > 1 {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx = h % uint64(n)
if sr.partFormatVersion < 3 {
n := len(sr.bloomValuesShards)
shardIdx := uint64(0)
if n > 1 {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
shardIdx = h % uint64(n)
}
return &sr.bloomValuesShards[shardIdx]
}
return &sr.bloomValuesShards[idx]
shardIdx, ok := sr.columnIdxs[name]
if !ok {
logger.Panicf("BUG: missing column index for %q; columnIdxs=%v", name, sr.columnIdxs)
}
return &sr.bloomValuesShards[shardIdx]
}
// blockStreamReader is used for reading blocks in streaming manner from a part.
@ -196,12 +231,6 @@ type blockStreamReader struct {
// streamReaders contains data readers in stream mode
streamReaders streamReaders
// columnNameIDs contains columnName->id mapping for all the column names seen in the part
columnNameIDs map[string]uint64
// columnNames constains id->columnName mapping for all the columns seen in the part
columnNames []string
// indexBlockHeaders contains the list of all the indexBlockHeader entries for the part
indexBlockHeaders []indexBlockHeader
@ -237,9 +266,6 @@ func (bsr *blockStreamReader) reset() {
bsr.ph.reset()
bsr.streamReaders.reset()
bsr.columnNameIDs = nil
bsr.columnNames = nil
ihs := bsr.indexBlockHeaders
if len(ihs) > 10e3 {
// The ihs len is unbound, so it is better to drop too long indexBlockHeaders in order to reduce memory usage
@ -280,6 +306,7 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
// Initialize streamReaders
columnNamesReader := mp.columnNames.NewReader()
columnIdxsReader := mp.columnIdxs.NewReader()
metaindexReader := mp.metaindex.NewReader()
indexReader := mp.index.NewReader()
columnsHeaderIndexReader := mp.columnsHeaderIndex.NewReader()
@ -292,12 +319,10 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
mp.fieldBloomValues.NewStreamReader(),
}
bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
// Read columnNames data
bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader)
// Read metaindex data
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
}
@ -313,6 +338,7 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
bsr.ph.mustReadMetadata(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
columnIdxsPath := filepath.Join(path, columnIdxsFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
@ -324,6 +350,10 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
if bsr.ph.FormatVersion >= 1 {
columnNamesReader = filestream.MustOpen(columnNamesPath, nocache)
}
var columnIdxsReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 3 {
columnIdxsReader = filestream.MustOpen(columnIdxsPath, nocache)
}
metaindexReader := filestream.MustOpen(metaindexPath, nocache)
indexReader := filestream.MustOpen(indexPath, nocache)
var columnsHeaderIndexReader filestream.ReadCloser
@ -361,14 +391,10 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
}
// Initialize streamReaders
bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
bsr.streamReaders.init(bsr.ph.FormatVersion, columnNamesReader, columnIdxsReader, metaindexReader, indexReader,
columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
if bsr.ph.FormatVersion >= 1 {
// Read columnNames data
bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader)
}
// Read metaindex data
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
}
@ -407,7 +433,7 @@ func (bsr *blockStreamReader) NextBlock() bool {
// Read bsr.blockData
bsr.a.reset()
bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders, bsr.ph.FormatVersion, bsr.columnNames)
bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders)
bsr.globalUncompressedSizeBytes += bh.uncompressedSizeBytes
bsr.globalRowsCount += bh.rowsCount

View file

@ -1,12 +1,9 @@
package logstorage
import (
"math/bits"
"path/filepath"
"sync"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -48,6 +45,7 @@ func (w *writerWithStats) MustClose() {
// streamWriters contain writers for blockStreamWriter
type streamWriters struct {
columnNamesWriter writerWithStats
columnIdxsWriter writerWithStats
metaindexWriter writerWithStats
indexWriter writerWithStats
columnsHeaderIndexWriter writerWithStats
@ -55,7 +53,16 @@ type streamWriters struct {
timestampsWriter writerWithStats
messageBloomValuesWriter bloomValuesWriter
bloomValuesShards []bloomValuesWriter
createBloomValuesWriter func(shardIdx uint64) bloomValuesStreamWriter
maxShards uint64
bloomValuesShards []bloomValuesWriter
// columnNameIDGenerator is used for generating columnName->id mapping for all the columns seen in bsw
columnNameIDGenerator columnNameIDGenerator
columnIdxs map[uint64]uint64
nextColumnIdx uint64
}
type bloomValuesWriter struct {
@ -89,6 +96,7 @@ type bloomValuesStreamWriter struct {
func (sw *streamWriters) reset() {
sw.columnNamesWriter.reset()
sw.columnIdxsWriter.reset()
sw.metaindexWriter.reset()
sw.indexWriter.reset()
sw.columnsHeaderIndexWriter.reset()
@ -99,13 +107,22 @@ func (sw *streamWriters) reset() {
for i := range sw.bloomValuesShards {
sw.bloomValuesShards[i].reset()
}
sw.createBloomValuesWriter = nil
sw.maxShards = 0
sw.bloomValuesShards = sw.bloomValuesShards[:0]
sw.columnNameIDGenerator.reset()
sw.columnIdxs = nil
sw.nextColumnIdx = 0
}
func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser,
messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards []bloomValuesStreamWriter,
func (sw *streamWriters) init(columnNamesWriter, columnIdxsWriter, metaindexWriter, indexWriter,
columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser,
messageBloomValuesWriter bloomValuesStreamWriter, createBloomValuesWriter func(shardIdx uint64) bloomValuesStreamWriter, maxShards uint64,
) {
sw.columnNamesWriter.init(columnNamesWriter)
sw.columnIdxsWriter.init(columnIdxsWriter)
sw.metaindexWriter.init(metaindexWriter)
sw.indexWriter.init(indexWriter)
sw.columnsHeaderIndexWriter.init(columnsHeaderIndexWriter)
@ -113,16 +130,16 @@ func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, c
sw.timestampsWriter.init(timestampsWriter)
sw.messageBloomValuesWriter.init(messageBloomValuesWriter)
sw.bloomValuesShards = slicesutil.SetLength(sw.bloomValuesShards, len(bloomValuesShards))
for i := range sw.bloomValuesShards {
sw.bloomValuesShards[i].init(bloomValuesShards[i])
}
sw.createBloomValuesWriter = createBloomValuesWriter
sw.maxShards = maxShards
}
func (sw *streamWriters) totalBytesWritten() uint64 {
n := uint64(0)
n += sw.columnNamesWriter.bytesWritten
n += sw.columnIdxsWriter.bytesWritten
n += sw.metaindexWriter.bytesWritten
n += sw.indexWriter.bytesWritten
n += sw.columnsHeaderIndexWriter.bytesWritten
@ -139,6 +156,7 @@ func (sw *streamWriters) totalBytesWritten() uint64 {
func (sw *streamWriters) MustClose() {
sw.columnNamesWriter.MustClose()
sw.columnIdxsWriter.MustClose()
sw.metaindexWriter.MustClose()
sw.indexWriter.MustClose()
sw.columnsHeaderIndexWriter.MustClose()
@ -156,13 +174,29 @@ func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomVa
return &sw.messageBloomValuesWriter
}
n := len(sw.bloomValuesShards)
idx := uint64(0)
if n > 1 {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx = h % uint64(n)
columnID := sw.columnNameIDGenerator.getColumnNameID(name)
shardIdx, ok := sw.columnIdxs[columnID]
if ok {
return &sw.bloomValuesShards[shardIdx]
}
return &sw.bloomValuesShards[idx]
shardIdx = sw.nextColumnIdx % sw.maxShards
sw.nextColumnIdx++
if sw.columnIdxs == nil {
sw.columnIdxs = make(map[uint64]uint64)
}
sw.columnIdxs[columnID] = shardIdx
if shardIdx >= uint64(len(sw.bloomValuesShards)) {
if shardIdx > uint64(len(sw.bloomValuesShards)) {
logger.Panicf("BUG: shardIdx must equal %d; got %d; maxShards=%d; columnIdxs=%v", len(sw.bloomValuesShards), shardIdx, sw.maxShards, sw.columnIdxs)
}
sws := sw.createBloomValuesWriter(shardIdx)
sw.bloomValuesShards = slicesutil.SetLength(sw.bloomValuesShards, len(sw.bloomValuesShards)+1)
sw.bloomValuesShards[len(sw.bloomValuesShards)-1].init(sws)
}
return &sw.bloomValuesShards[shardIdx]
}
// blockStreamWriter is used for writing blocks into the underlying storage in streaming manner.
@ -176,9 +210,6 @@ type blockStreamWriter struct {
// sidFirst is the streamID for the first block in the current indexBlock
sidFirst streamID
// bloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the output part.
bloomValuesFieldsCount uint64
// minTimestampLast is the minimum timestamp seen for the last written block
minTimestampLast int64
@ -214,9 +245,6 @@ type blockStreamWriter struct {
// indexBlockHeader is used for marshaling the data to metaindexData
indexBlockHeader indexBlockHeader
// columnNameIDGenerator is used for generating columnName->id mapping for all the columns seen in bsw
columnNameIDGenerator columnNameIDGenerator
}
// reset resets bsw for subsequent re-use.
@ -224,7 +252,6 @@ func (bsw *blockStreamWriter) reset() {
bsw.streamWriters.reset()
bsw.sidLast.reset()
bsw.sidFirst.reset()
bsw.bloomValuesFieldsCount = 0
bsw.minTimestampLast = 0
bsw.minTimestamp = 0
bsw.maxTimestamp = 0
@ -245,8 +272,6 @@ func (bsw *blockStreamWriter) reset() {
}
bsw.indexBlockHeader.reset()
bsw.columnNameIDGenerator.reset()
}
// MustInitForInmemoryPart initializes bsw from mp
@ -254,32 +279,35 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
bsw.reset()
messageBloomValues := mp.messageBloomValues.NewStreamWriter()
bloomValuesShards := []bloomValuesStreamWriter{
mp.fieldBloomValues.NewStreamWriter(),
createBloomValuesWriter := func(_ uint64) bloomValuesStreamWriter {
return mp.fieldBloomValues.NewStreamWriter()
}
bsw.streamWriters.init(&mp.columnNames, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, bloomValuesShards)
bsw.streamWriters.init(&mp.columnNames, &mp.columnIdxs, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, createBloomValuesWriter, 1)
}
// MustInitForFilePart initializes bsw for writing data to file part located at path.
//
// if nocache is true, then the written data doesn't go to OS page cache.
func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool, bloomValuesShardsCount uint64) {
func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
bsw.reset()
fs.MustMkdirFailIfExist(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
columnIdxsPath := filepath.Join(path, columnIdxsFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
// Always cache columnNames files, since it is re-read immediately after part creation
// Always cache columnNames file, since it is re-read immediately after part creation
columnNamesWriter := filestream.MustCreate(columnNamesPath, false)
// Always cache columnIdxs file, since it is re-read immediately after part creation
columnIdxsWriter := filestream.MustCreate(columnIdxsPath, false)
// Always cache metaindex file, since it is re-read immediately after part creation
metaindexWriter := filestream.MustCreate(metaindexPath, false)
@ -295,34 +323,22 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool, blo
values: filestream.MustCreate(messageValuesPath, nocache),
}
bloomValuesShardsCount = adjustBloomValuesShardsCount(bloomValuesShardsCount)
bloomValuesShards := make([]bloomValuesStreamWriter, bloomValuesShardsCount)
for i := range bloomValuesShards {
shard := &bloomValuesShards[i]
createBloomValuesWriter := func(shardIdx uint64) bloomValuesStreamWriter {
bloomPath := getBloomFilePath(path, shardIdx)
bloom := filestream.MustCreate(bloomPath, nocache)
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = filestream.MustCreate(bloomPath, nocache)
valuesPath := getValuesFilePath(path, shardIdx)
values := filestream.MustCreate(valuesPath, nocache)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = filestream.MustCreate(valuesPath, nocache)
return bloomValuesStreamWriter{
bloom: bloom,
values: values,
}
}
bsw.streamWriters.init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter, bloomValuesShards)
}
func adjustBloomValuesShardsCount(n uint64) uint64 {
if n == 0 {
// At least a single shard is needed for writing potential non-const fields,
// which can appear after merging of const fields.
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7391
return 1
}
n = 1 << bits.Len64(n-1)
if n > bloomValuesMaxShardsCount {
n = bloomValuesMaxShardsCount
}
return n
bsw.streamWriters.init(columnNamesWriter, columnIdxsWriter, metaindexWriter, indexWriter,
columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter,
createBloomValuesWriter, bloomValuesMaxShardsCount)
}
// MustWriteRows writes timestamps with rows under the given sid to bsw.
@ -375,16 +391,10 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd
bsw.sidLast = *sid
bh := getBlockHeader()
columnsLen := 0
if b != nil {
b.mustWriteTo(sid, bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
columnsLen = len(b.columns)
b.mustWriteTo(sid, bh, &bsw.streamWriters)
} else {
bd.mustWriteTo(bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
columnsLen = len(bd.columnsData)
}
if bsw.bloomValuesFieldsCount < uint64(columnsLen) {
bsw.bloomValuesFieldsCount = uint64(columnsLen)
bd.mustWriteTo(bh, &bsw.streamWriters)
}
th := &bh.timestampsHeader
@ -442,12 +452,14 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
ph.MinTimestamp = bsw.globalMinTimestamp
ph.MaxTimestamp = bsw.globalMaxTimestamp
ph.BloomValuesShardsCount = uint64(len(bsw.streamWriters.bloomValuesShards))
ph.BloomValuesFieldsCount = bsw.bloomValuesFieldsCount
bsw.mustFlushIndexBlock(bsw.indexBlockData)
// Write columnNames data
mustWriteColumnNames(&bsw.streamWriters.columnNamesWriter, bsw.columnNameIDGenerator.columnNames)
mustWriteColumnNames(&bsw.streamWriters.columnNamesWriter, bsw.streamWriters.columnNameIDGenerator.columnNames)
// Write columnIdxs data
mustWriteColumnIdxs(&bsw.streamWriters.columnIdxsWriter, bsw.streamWriters.columnIdxs)
// Write metaindex data
mustWriteIndexBlockHeaders(&bsw.streamWriters.metaindexWriter, bsw.metaindexData)

View file

@ -12,6 +12,74 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
func mustWriteColumnIdxs(w *writerWithStats, columnIdxs map[uint64]uint64) {
data := marshalColumnIdxs(nil, columnIdxs)
w.MustWrite(data)
}
func mustReadColumnIdxs(r filestream.ReadCloser, columnNames []string, shardsCount uint64) map[string]uint64 {
src, err := io.ReadAll(r)
if err != nil {
logger.Panicf("FATAL: %s: cannot read column indexes: %s", r.Path(), err)
}
columnIdxs, err := unmarshalColumnIdxs(src, columnNames, shardsCount)
if err != nil {
logger.Panicf("FATAL: %s: cannot parse column indexes: %s", r.Path(), err)
}
return columnIdxs
}
func marshalColumnIdxs(dst []byte, columnIdxs map[uint64]uint64) []byte {
dst = encoding.MarshalVarUint64(dst, uint64(len(columnIdxs)))
for columnID, shardIdx := range columnIdxs {
dst = encoding.MarshalVarUint64(dst, columnID)
dst = encoding.MarshalVarUint64(dst, shardIdx)
}
return dst
}
func unmarshalColumnIdxs(src []byte, columnNames []string, shardsCount uint64) (map[string]uint64, error) {
n, nBytes := encoding.UnmarshalVarUint64(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse the number of entries from len(src)=%d", len(src))
}
src = src[nBytes:]
if n > math.MaxInt {
return nil, fmt.Errorf("too many entries: %d; musn't exceed %d", n, math.MaxInt)
}
shardIdxs := make(map[string]uint64, n)
for i := uint64(0); i < n; i++ {
columnID, nBytes := encoding.UnmarshalVarUint64(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse columnID #%d", i)
}
src = src[nBytes:]
shardIdx, nBytes := encoding.UnmarshalVarUint64(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse shardIdx #%d", i)
}
if shardIdx >= shardsCount {
return nil, fmt.Errorf("too big shardIdx=%d; must be smaller than %d", shardIdx, shardsCount)
}
src = src[nBytes:]
if columnID >= uint64(len(columnNames)) {
return nil, fmt.Errorf("too big columnID; got %d; must be smaller than %d", columnID, len(columnNames))
}
columnName := columnNames[columnID]
shardIdxs[columnName] = shardIdx
}
if len(src) > 0 {
return nil, fmt.Errorf("unexpected tail left after reading column indexes; len(tail)=%d", len(src))
}
return shardIdxs, nil
}
func mustWriteColumnNames(w *writerWithStats, columnNames []string) {
data := marshalColumnNames(nil, columnNames)
w.MustWrite(data)
@ -101,14 +169,15 @@ func (g *columnNameIDGenerator) reset() {
func (g *columnNameIDGenerator) getColumnNameID(name string) uint64 {
id, ok := g.columnNameIDs[name]
if !ok {
if g.columnNameIDs == nil {
g.columnNameIDs = make(map[string]uint64)
}
id = uint64(len(g.columnNames))
nameCopy := strings.Clone(name)
g.columnNameIDs[nameCopy] = id
g.columnNames = append(g.columnNames, nameCopy)
if ok {
return id
}
if g.columnNameIDs == nil {
g.columnNameIDs = make(map[string]uint64)
}
id = uint64(len(g.columnNames))
nameCopy := strings.Clone(name)
g.columnNameIDs[nameCopy] = id
g.columnNames = append(g.columnNames, nameCopy)
return id
}

View file

@ -3,12 +3,12 @@ package logstorage
// partFormatLatestVersion is the latest format version for parts.
//
// See partHeader.FormatVersion for details.
const partFormatLatestVersion = 2
const partFormatLatestVersion = 3
// bloomValuesMaxShardsCount is the number of shards for bloomFilename and valuesFilename files.
//
// The partHeader.FormatVersion and partFormatLatestVersion must be updated when this number changes.
const bloomValuesMaxShardsCount = 128
const bloomValuesMaxShardsCount = 256
// maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block.
//

View file

@ -530,15 +530,11 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
srcSize := uint64(0)
srcRowsCount := uint64(0)
srcBlocksCount := uint64(0)
bloomValuesShardsCount := uint64(0)
for _, pw := range pws {
ph := &pw.p.ph
srcSize += ph.CompressedSizeBytes
srcRowsCount += ph.RowsCount
srcBlocksCount += ph.BlocksCount
if ph.BloomValuesFieldsCount > bloomValuesShardsCount {
bloomValuesShardsCount = ph.BloomValuesFieldsCount
}
}
bsw := getBlockStreamWriter()
var mpNew *inmemoryPart
@ -547,7 +543,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
bsw.MustInitForInmemoryPart(mpNew)
} else {
nocache := dstPartType == partBig
bsw.MustInitForFilePart(dstPartPath, nocache, bloomValuesShardsCount)
bsw.MustInitForFilePart(dstPartPath, nocache)
}
// Merge source parts to destination part.

View file

@ -2,6 +2,7 @@ package logstorage
const (
columnNamesFilename = "column_names.bin"
columnIdxsFilename = "column_idxs.bin"
metaindexFilename = "metaindex.bin"
indexFilename = "index.bin"
columnsHeaderIndexFilename = "columns_header_index.bin"

View file

@ -15,6 +15,7 @@ type inmemoryPart struct {
ph partHeader
columnNames bytesutil.ByteBuffer
columnIdxs bytesutil.ByteBuffer
metaindex bytesutil.ByteBuffer
index bytesutil.ByteBuffer
columnsHeaderIndex bytesutil.ByteBuffer
@ -54,6 +55,7 @@ func (mp *inmemoryPart) reset() {
mp.ph.reset()
mp.columnNames.Reset()
mp.columnIdxs.Reset()
mp.metaindex.Reset()
mp.index.Reset()
mp.columnsHeaderIndex.Reset()
@ -108,6 +110,7 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
columnIdxsPath := filepath.Join(path, columnIdxsFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
@ -117,6 +120,7 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
fs.MustWriteSync(columnNamesPath, mp.columnNames.B)
fs.MustWriteSync(columnIdxsPath, mp.columnIdxs.B)
fs.MustWriteSync(metaindexPath, mp.metaindex.B)
fs.MustWriteSync(indexPath, mp.index.B)
fs.MustWriteSync(columnsHeaderIndexPath, mp.columnsHeaderIndex.B)

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type part struct {
@ -31,6 +32,9 @@ type part struct {
// The internal IDs are used in columnHeaderRef.
columnNames []string
// columnIdxs is a mapping from column name to the corresponding item at bloomValuesShards
columnIdxs map[string]uint64
// indexBlockHeaders contains a list of indexBlockHeader entries for the given part.
indexBlockHeaders []indexBlockHeader
@ -64,12 +68,19 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
// Read columnNames
columnNamesReader := mp.columnNames.NewReader()
p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader)
columnNamesReader.MustClose()
// Read columnIdxs
columnIdxsReader := mp.columnIdxs.NewReader()
p.columnIdxs = mustReadColumnIdxs(columnIdxsReader, p.columnNames, p.ph.BloomValuesShardsCount)
columnIdxsReader.MustClose()
// Read metaindex
metaindexReader := mp.metaindex.NewReader()
var mrs readerWithStats
mrs.init(metaindexReader)
p.indexBlockHeaders = mustReadIndexBlockHeaders(p.indexBlockHeaders[:0], &mrs)
metaindexReader.MustClose()
// Open data files
p.indexFile = &mp.index
@ -98,6 +109,7 @@ func mustOpenFilePart(pt *partition, path string) *part {
p.ph.mustReadMetadata(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
columnIdxsPath := filepath.Join(path, columnIdxsFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
@ -110,6 +122,11 @@ func mustOpenFilePart(pt *partition, path string) *part {
p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader)
columnNamesReader.MustClose()
}
if p.ph.FormatVersion >= 3 {
columnIdxsReader := filestream.MustOpen(columnIdxsPath, true)
p.columnIdxs = mustReadColumnIdxs(columnIdxsReader, p.columnNames, p.ph.BloomValuesShardsCount)
columnIdxsReader.MustClose()
}
// Read metaindex
metaindexReader := filestream.MustOpen(metaindexPath, true)
@ -183,20 +200,27 @@ func (p *part) getBloomValuesFileForColumnName(name string) *bloomValuesReaderAt
if p.ph.FormatVersion < 1 {
return &p.oldBloomValues
}
n := len(p.bloomValuesShards)
idx := uint64(0)
if n > 1 {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx = h % uint64(n)
if p.ph.FormatVersion < 3 {
n := len(p.bloomValuesShards)
shardIdx := uint64(0)
if n > 1 {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
shardIdx = h % uint64(n)
}
return &p.bloomValuesShards[shardIdx]
}
return &p.bloomValuesShards[idx]
shardIdx, ok := p.columnIdxs[name]
if !ok {
logger.Panicf("BUG: unknown shard index for column %q; columnIdxs=%v", name, p.columnIdxs)
}
return &p.bloomValuesShards[shardIdx]
}
func getBloomFilePath(partPath string, shardNum uint64) string {
return filepath.Join(partPath, bloomFilename) + fmt.Sprintf("%d", shardNum)
func getBloomFilePath(partPath string, shardIdx uint64) string {
return filepath.Join(partPath, bloomFilename) + fmt.Sprintf("%d", shardIdx)
}
func getValuesFilePath(partPath string, shardNum uint64) string {
return filepath.Join(partPath, valuesFilename) + fmt.Sprintf("%d", shardNum)
func getValuesFilePath(partPath string, shardIdx uint64) string {
return filepath.Join(partPath, valuesFilename) + fmt.Sprintf("%d", shardIdx)
}

View file

@ -37,9 +37,6 @@ type partHeader struct {
// BloomValuesShardsCount is the number of (bloom, values) shards in the part.
BloomValuesShardsCount uint64
// BloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the given part.
BloomValuesFieldsCount uint64
}
// reset resets ph for subsequent re-use
@ -52,15 +49,14 @@ func (ph *partHeader) reset() {
ph.MinTimestamp = 0
ph.MaxTimestamp = 0
ph.BloomValuesShardsCount = 0
ph.BloomValuesFieldsCount = 0
}
// String returns string represenation for ph.
func (ph *partHeader) String() string {
return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, "+
"MinTimestamp=%s, MaxTimestamp=%s, BloomValuesShardsCount=%d, BloomValuesFieldsCount=%d}",
"MinTimestamp=%s, MaxTimestamp=%s, BloomValuesShardsCount=%d}",
ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount,
timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp), ph.BloomValuesShardsCount, ph.BloomValuesFieldsCount)
timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp), ph.BloomValuesShardsCount)
}
func (ph *partHeader) mustReadMetadata(partPath string) {
@ -79,12 +75,8 @@ func (ph *partHeader) mustReadMetadata(partPath string) {
if ph.BloomValuesShardsCount != 0 {
logger.Panicf("FATAL: %s: unexpected BloomValuesShardsCount for FormatVersion<=1; got %d; want 0", metadataPath, ph.BloomValuesShardsCount)
}
if ph.BloomValuesFieldsCount != 0 {
logger.Panicf("FATAL: %s: unexpected BloomValuesFieldsCount for FormatVersion<=1; got %d; want 0", metadataPath, ph.BloomValuesFieldsCount)
}
if ph.FormatVersion == 1 {
ph.BloomValuesShardsCount = 8
ph.BloomValuesFieldsCount = bloomValuesMaxShardsCount
}
}