From 7a62eefa341b89b8471654f35045135480580e36 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 29 Oct 2024 11:28:41 +0100 Subject: [PATCH] lib/logstorage: dynamically adjust the number of (bloom, values) shards in a part depending on the number of non-const columns This allows reducing the amounts of data, which must be read during queries over logs with big number of fields (aka "wide events"). This, in turn, improves query performance when the data, which needs to be scanned during the query, doesn't fit OS page cache. --- docs/VictoriaLogs/CHANGELOG.md | 2 + lib/logstorage/block.go | 4 -- lib/logstorage/block_data.go | 4 -- lib/logstorage/block_header.go | 4 -- lib/logstorage/block_stream_reader.go | 34 +++++++++------ lib/logstorage/block_stream_writer.go | 61 +++++++++++++++++++++------ lib/logstorage/consts.go | 8 ++-- lib/logstorage/datadb.go | 13 ++++-- lib/logstorage/inmemory_part.go | 18 +++----- lib/logstorage/part.go | 27 +++++++----- lib/logstorage/part_header.go | 27 +++++++++++- 11 files changed, 131 insertions(+), 71 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 7cdaa08b3..36affa5b7 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: added ability to receive systemd (journald) logs over network. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4618). +* FEATURE: improve performance for queries over large volume of logs with big number of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`). +* FEATURE: improve performance for [`/select/logsql/field_values` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values). * FEATURE: improve performance for [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe) when it is applied directly to [log filter](https://docs.victoriametrics.com/victorialogs/logsql/#filters). * BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix various glitches with updating query responses. The issue was introduced in [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7279). diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 3670991d1..3c23e17e0 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -432,10 +432,6 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v // mustWriteTo writes b with the given sid to sw and updates bh accordingly. func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { - // Do not store the version used for encoding directly in the block data, since: - // - all the blocks in the same part use the same encoding - // - the block encoding version can be put in metadata file for the part (aka metadataFilename) - b.assertValid() bh.reset() diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index 5c76ec3f0..fd315659b 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -94,10 +94,6 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd * // mustWriteTo writes bd to sw and updates bh accordingly func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { - // Do not store the version used for encoding directly in the block data, since: - // - all the blocks in the same part use the same encoding - // - the block encoding version can be put in metadata file for the part (aka metadataFilename) - bh.reset() bh.streamID = bd.streamID diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index 03ab0ebb3..da620e3c0 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -67,10 +67,6 @@ func (bh *blockHeader) copyFrom(src *blockHeader) { // marshal appends the marshaled bh to dst and returns the result. func (bh *blockHeader) marshal(dst []byte) []byte { - // Do not store the version used for encoding directly in the block header, since: - // - all the block headers in the same part use the same encoding - // - the format encoding version is stored in metadata file for the part (aka metadataFilename) - dst = bh.streamID.marshal(dst) dst = encoding.MarshalVarUint64(dst, bh.uncompressedSizeBytes) dst = encoding.MarshalVarUint64(dst, bh.rowsCount) diff --git a/lib/logstorage/block_stream_reader.go b/lib/logstorage/block_stream_reader.go index f8f3af015..7b706954c 100644 --- a/lib/logstorage/block_stream_reader.go +++ b/lib/logstorage/block_stream_reader.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) type readerWithStats struct { @@ -63,7 +64,7 @@ type streamReaders struct { messageBloomValuesReader bloomValuesReader oldBloomValuesReader bloomValuesReader - bloomValuesShards [bloomValuesShardsCount]bloomValuesReader + bloomValuesShards []bloomValuesReader } type bloomValuesReader struct { @@ -105,13 +106,14 @@ func (sr *streamReaders) reset() { sr.messageBloomValuesReader.reset() sr.oldBloomValuesReader.reset() - for i := range sr.bloomValuesShards[:] { + for i := range sr.bloomValuesShards { sr.bloomValuesShards[i].reset() } + sr.bloomValuesShards = sr.bloomValuesShards[:0] } func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser, - messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader, + messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader, ) { sr.columnNamesReader.init(columnNamesReader) sr.metaindexReader.init(metaindexReader) @@ -122,7 +124,9 @@ func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, c sr.messageBloomValuesReader.init(messageBloomValuesReader) sr.oldBloomValuesReader.init(oldBloomValuesReader) - for i := range sr.bloomValuesShards[:] { + + sr.bloomValuesShards = slicesutil.SetLength(sr.bloomValuesShards, len(bloomValuesShards)) + for i := range sr.bloomValuesShards { sr.bloomValuesShards[i].init(bloomValuesShards[i]) } } @@ -139,7 +143,7 @@ func (sr *streamReaders) totalBytesRead() uint64 { n += sr.messageBloomValuesReader.totalBytesRead() n += sr.oldBloomValuesReader.totalBytesRead() - for i := range sr.bloomValuesShards[:] { + for i := range sr.bloomValuesShards { n += sr.bloomValuesShards[i].totalBytesRead() } @@ -156,7 +160,7 @@ func (sr *streamReaders) MustClose() { sr.messageBloomValuesReader.MustClose() sr.oldBloomValuesReader.MustClose() - for i := range sr.bloomValuesShards[:] { + for i := range sr.bloomValuesShards { sr.bloomValuesShards[i].MustClose() } } @@ -169,8 +173,12 @@ func (sr *streamReaders) getBloomValuesReaderForColumnName(name string, partForm return &sr.oldBloomValuesReader } - h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) - idx := h % uint64(len(sr.bloomValuesShards)) + n := len(sr.bloomValuesShards) + idx := uint64(0) + if n > 1 { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx = h % uint64(n) + } return &sr.bloomValuesShards[idx] } @@ -280,9 +288,8 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { messageBloomValuesReader := mp.messageBloomValues.NewStreamReader() var oldBloomValuesReader bloomValuesStreamReader - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader - for i := range bloomValuesShards[:] { - bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamReader() + bloomValuesShards := []bloomValuesStreamReader{ + mp.fieldBloomValues.NewStreamReader(), } bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader, @@ -333,7 +340,7 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { values: filestream.MustOpen(messageValuesPath, nocache), } var oldBloomValuesReader bloomValuesStreamReader - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader + var bloomValuesShards []bloomValuesStreamReader if bsr.ph.FormatVersion < 1 { bloomPath := filepath.Join(path, oldBloomFilename) oldBloomValuesReader.bloom = filestream.MustOpen(bloomPath, nocache) @@ -341,7 +348,8 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { valuesPath := filepath.Join(path, oldValuesFilename) oldBloomValuesReader.values = filestream.MustOpen(valuesPath, nocache) } else { - for i := range bloomValuesShards[:] { + bloomValuesShards = make([]bloomValuesStreamReader, bsr.ph.BloomValuesShardsCount) + for i := range bloomValuesShards { shard := &bloomValuesShards[i] bloomPath := getBloomFilePath(path, uint64(i)) diff --git a/lib/logstorage/block_stream_writer.go b/lib/logstorage/block_stream_writer.go index a95a419ec..3d3a54382 100644 --- a/lib/logstorage/block_stream_writer.go +++ b/lib/logstorage/block_stream_writer.go @@ -1,6 +1,7 @@ package logstorage import ( + "math/bits" "path/filepath" "sync" @@ -10,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // writerWithStats writes data to w and tracks the total amounts of data written at bytesWritten. @@ -53,7 +55,7 @@ type streamWriters struct { timestampsWriter writerWithStats messageBloomValuesWriter bloomValuesWriter - bloomValuesShards [bloomValuesShardsCount]bloomValuesWriter + bloomValuesShards []bloomValuesWriter } type bloomValuesWriter struct { @@ -94,13 +96,14 @@ func (sw *streamWriters) reset() { sw.timestampsWriter.reset() sw.messageBloomValuesWriter.reset() - for i := range sw.bloomValuesShards[:] { + for i := range sw.bloomValuesShards { sw.bloomValuesShards[i].reset() } + sw.bloomValuesShards = sw.bloomValuesShards[:0] } func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser, - messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter, + messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards []bloomValuesStreamWriter, ) { sw.columnNamesWriter.init(columnNamesWriter) sw.metaindexWriter.init(metaindexWriter) @@ -110,7 +113,8 @@ func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, c sw.timestampsWriter.init(timestampsWriter) sw.messageBloomValuesWriter.init(messageBloomValuesWriter) - for i := range sw.bloomValuesShards[:] { + sw.bloomValuesShards = slicesutil.SetLength(sw.bloomValuesShards, len(bloomValuesShards)) + for i := range sw.bloomValuesShards { sw.bloomValuesShards[i].init(bloomValuesShards[i]) } } @@ -126,7 +130,7 @@ func (sw *streamWriters) totalBytesWritten() uint64 { n += sw.timestampsWriter.bytesWritten n += sw.messageBloomValuesWriter.totalBytesWritten() - for i := range sw.bloomValuesShards[:] { + for i := range sw.bloomValuesShards { n += sw.bloomValuesShards[i].totalBytesWritten() } @@ -142,7 +146,7 @@ func (sw *streamWriters) MustClose() { sw.timestampsWriter.MustClose() sw.messageBloomValuesWriter.MustClose() - for i := range sw.bloomValuesShards[:] { + for i := range sw.bloomValuesShards { sw.bloomValuesShards[i].MustClose() } } @@ -152,8 +156,12 @@ func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomVa return &sw.messageBloomValuesWriter } - h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) - idx := h % uint64(len(sw.bloomValuesShards)) + n := len(sw.bloomValuesShards) + idx := uint64(0) + if n > 1 { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx = h % uint64(n) + } return &sw.bloomValuesShards[idx] } @@ -168,6 +176,9 @@ type blockStreamWriter struct { // sidFirst is the streamID for the first block in the current indexBlock sidFirst streamID + // bloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the output part. + bloomValuesFieldsCount uint64 + // minTimestampLast is the minimum timestamp seen for the last written block minTimestampLast int64 @@ -213,6 +224,7 @@ func (bsw *blockStreamWriter) reset() { bsw.streamWriters.reset() bsw.sidLast.reset() bsw.sidFirst.reset() + bsw.bloomValuesFieldsCount = 0 bsw.minTimestampLast = 0 bsw.minTimestamp = 0 bsw.maxTimestamp = 0 @@ -243,9 +255,8 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { messageBloomValues := mp.messageBloomValues.NewStreamWriter() - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter - for i := range bloomValuesShards[:] { - bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamWriter() + bloomValuesShards := []bloomValuesStreamWriter{ + mp.fieldBloomValues.NewStreamWriter(), } bsw.streamWriters.init(&mp.columnNames, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, bloomValuesShards) @@ -254,7 +265,7 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { // MustInitForFilePart initializes bsw for writing data to file part located at path. // // if nocache is true, then the written data doesn't go to OS page cache. -func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { +func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool, bloomValuesShardsCount uint64) { bsw.reset() fs.MustMkdirFailIfExist(path) @@ -284,8 +295,9 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { values: filestream.MustCreate(messageValuesPath, nocache), } - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter - for i := range bloomValuesShards[:] { + bloomValuesShardsCount = adjustBloomValuesShardsCount(bloomValuesShardsCount) + bloomValuesShards := make([]bloomValuesStreamWriter, bloomValuesShardsCount) + for i := range bloomValuesShards { shard := &bloomValuesShards[i] bloomPath := getBloomFilePath(path, uint64(i)) @@ -298,6 +310,18 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { bsw.streamWriters.init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter, bloomValuesShards) } +func adjustBloomValuesShardsCount(n uint64) uint64 { + if n == 0 { + return n + } + + n = 1 << bits.Len64(n-1) + if n > bloomValuesMaxShardsCount { + n = bloomValuesMaxShardsCount + } + return n +} + // MustWriteRows writes timestamps with rows under the given sid to bsw. // // timestamps must be sorted. @@ -348,11 +372,18 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd bsw.sidLast = *sid bh := getBlockHeader() + columnsLen := 0 if b != nil { b.mustWriteTo(sid, bh, &bsw.streamWriters, &bsw.columnNameIDGenerator) + columnsLen = len(b.columns) } else { bd.mustWriteTo(bh, &bsw.streamWriters, &bsw.columnNameIDGenerator) + columnsLen = len(bd.columnsData) } + if bsw.bloomValuesFieldsCount < uint64(columnsLen) { + bsw.bloomValuesFieldsCount = uint64(columnsLen) + } + th := &bh.timestampsHeader if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp { bsw.globalMinTimestamp = th.minTimestamp @@ -407,6 +438,8 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) { ph.BlocksCount = bsw.globalBlocksCount ph.MinTimestamp = bsw.globalMinTimestamp ph.MaxTimestamp = bsw.globalMaxTimestamp + ph.BloomValuesShardsCount = uint64(len(bsw.streamWriters.bloomValuesShards)) + ph.BloomValuesFieldsCount = bsw.bloomValuesFieldsCount bsw.mustFlushIndexBlock(bsw.indexBlockData) diff --git a/lib/logstorage/consts.go b/lib/logstorage/consts.go index 49a38470d..d1de8faea 100644 --- a/lib/logstorage/consts.go +++ b/lib/logstorage/consts.go @@ -3,12 +3,12 @@ package logstorage // partFormatLatestVersion is the latest format version for parts. // // See partHeader.FormatVersion for details. -const partFormatLatestVersion = 1 +const partFormatLatestVersion = 2 -// bloomValuesShardsCount is the number of shards for bloomFilename and valuesFilename files. +// bloomValuesMaxShardsCount is the number of shards for bloomFilename and valuesFilename files. // -// The partHeader.FormatVersion must be updated when this number changes. -const bloomValuesShardsCount = 8 +// The partHeader.FormatVersion and partFormatLatestVersion must be updated when this number changes. +const bloomValuesMaxShardsCount = 128 // maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block. // diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index 99a1d6f7a..aaf006200 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -530,10 +530,15 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { srcSize := uint64(0) srcRowsCount := uint64(0) srcBlocksCount := uint64(0) + bloomValuesShardsCount := uint64(0) for _, pw := range pws { - srcSize += pw.p.ph.CompressedSizeBytes - srcRowsCount += pw.p.ph.RowsCount - srcBlocksCount += pw.p.ph.BlocksCount + ph := &pw.p.ph + srcSize += ph.CompressedSizeBytes + srcRowsCount += ph.RowsCount + srcBlocksCount += ph.BlocksCount + if ph.BloomValuesFieldsCount > bloomValuesShardsCount { + bloomValuesShardsCount = ph.BloomValuesFieldsCount + } } bsw := getBlockStreamWriter() var mpNew *inmemoryPart @@ -542,7 +547,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { bsw.MustInitForInmemoryPart(mpNew) } else { nocache := dstPartType == partBig - bsw.MustInitForFilePart(dstPartPath, nocache) + bsw.MustInitForFilePart(dstPartPath, nocache, bloomValuesShardsCount) } // Merge source parts to destination part. diff --git a/lib/logstorage/inmemory_part.go b/lib/logstorage/inmemory_part.go index fd50b7ccd..ff98bb1cf 100644 --- a/lib/logstorage/inmemory_part.go +++ b/lib/logstorage/inmemory_part.go @@ -22,7 +22,7 @@ type inmemoryPart struct { timestamps bytesutil.ByteBuffer messageBloomValues bloomValuesBuffer - bloomValuesShards [bloomValuesShardsCount]bloomValuesBuffer + fieldBloomValues bloomValuesBuffer } type bloomValuesBuffer struct { @@ -61,9 +61,7 @@ func (mp *inmemoryPart) reset() { mp.timestamps.Reset() mp.messageBloomValues.reset() - for i := range mp.bloomValuesShards[:] { - mp.bloomValuesShards[i].reset() - } + mp.fieldBloomValues.reset() } // mustInitFromRows initializes mp from lr. @@ -127,15 +125,11 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) { fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B) fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B) - for i := range mp.bloomValuesShards[:] { - shard := &mp.bloomValuesShards[i] + bloomPath := getBloomFilePath(path, 0) + fs.MustWriteSync(bloomPath, mp.fieldBloomValues.bloom.B) - bloomPath := getBloomFilePath(path, uint64(i)) - fs.MustWriteSync(bloomPath, shard.bloom.B) - - valuesPath := getValuesFilePath(path, uint64(i)) - fs.MustWriteSync(valuesPath, shard.values.B) - } + valuesPath := getValuesFilePath(path, 0) + fs.MustWriteSync(valuesPath, mp.fieldBloomValues.values.B) mp.ph.mustWriteMetadata(path) diff --git a/lib/logstorage/part.go b/lib/logstorage/part.go index 6598ebdbc..4fe325abf 100644 --- a/lib/logstorage/part.go +++ b/lib/logstorage/part.go @@ -41,7 +41,8 @@ type part struct { messageBloomValues bloomValuesReaderAt oldBloomValues bloomValuesReaderAt - bloomValuesShards [bloomValuesShardsCount]bloomValuesReaderAt + + bloomValuesShards []bloomValuesReaderAt } type bloomValuesReaderAt struct { @@ -76,14 +77,15 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part { p.columnsHeaderFile = &mp.columnsHeader p.timestampsFile = &mp.timestamps + // Open files with bloom filters and column values p.messageBloomValues.bloom = &mp.messageBloomValues.bloom p.messageBloomValues.values = &mp.messageBloomValues.values - // Open files with bloom filters and column values - for i := range p.bloomValuesShards[:] { - shard := &p.bloomValuesShards[i] - shard.bloom = &mp.bloomValuesShards[i].bloom - shard.values = &mp.bloomValuesShards[i].values + p.bloomValuesShards = []bloomValuesReaderAt{ + { + bloom: &mp.fieldBloomValues.bloom, + values: &mp.fieldBloomValues.values, + }, } return &p @@ -140,7 +142,8 @@ func mustOpenFilePart(pt *partition, path string) *part { valuesPath := filepath.Join(path, oldValuesFilename) p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath) } else { - for i := range p.bloomValuesShards[:] { + p.bloomValuesShards = make([]bloomValuesReaderAt, p.ph.BloomValuesShardsCount) + for i := range p.bloomValuesShards { shard := &p.bloomValuesShards[i] bloomPath := getBloomFilePath(path, uint64(i)) @@ -166,7 +169,7 @@ func mustClosePart(p *part) { if p.ph.FormatVersion < 1 { p.oldBloomValues.MustClose() } else { - for i := range p.bloomValuesShards[:] { + for i := range p.bloomValuesShards { p.bloomValuesShards[i].MustClose() } } @@ -183,8 +186,12 @@ func (p *part) getBloomValuesFileForColumnName(name string) *bloomValuesReaderAt return &p.oldBloomValues } - h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) - idx := h % uint64(len(p.bloomValuesShards)) + n := len(p.bloomValuesShards) + idx := uint64(0) + if n > 1 { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx = h % uint64(n) + } return &p.bloomValuesShards[idx] } diff --git a/lib/logstorage/part_header.go b/lib/logstorage/part_header.go index 05baf4195..f6d782749 100644 --- a/lib/logstorage/part_header.go +++ b/lib/logstorage/part_header.go @@ -34,6 +34,12 @@ type partHeader struct { // MaxTimestamp is the maximum timestamp seen in the part MaxTimestamp int64 + + // BloomValuesShardsCount is the number of (bloom, values) shards in the part. + BloomValuesShardsCount uint64 + + // BloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the given part. + BloomValuesFieldsCount uint64 } // reset resets ph for subsequent re-use @@ -45,12 +51,16 @@ func (ph *partHeader) reset() { ph.BlocksCount = 0 ph.MinTimestamp = 0 ph.MaxTimestamp = 0 + ph.BloomValuesShardsCount = 0 + ph.BloomValuesFieldsCount = 0 } // String returns string represenation for ph. func (ph *partHeader) String() string { - return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}", - ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp)) + return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, "+ + "MinTimestamp=%s, MaxTimestamp=%s, BloomValuesShardsCount=%d, BloomValuesFieldsCount=%d}", + ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, + timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp), ph.BloomValuesShardsCount, ph.BloomValuesFieldsCount) } func (ph *partHeader) mustReadMetadata(partPath string) { @@ -65,6 +75,19 @@ func (ph *partHeader) mustReadMetadata(partPath string) { logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err) } + if ph.FormatVersion <= 1 { + if ph.BloomValuesShardsCount != 0 { + logger.Panicf("FATAL: unexpected BloomValuesShardsCount for FormatVersion<=1; got %d; want 0", ph.BloomValuesShardsCount) + } + if ph.BloomValuesFieldsCount != 0 { + logger.Panicf("FATAL: unexpected BloomValuesFieldsCount for FormatVersion<=1; got %d; want 0", ph.BloomValuesFieldsCount) + } + if ph.FormatVersion == 1 { + ph.BloomValuesShardsCount = 8 + ph.BloomValuesFieldsCount = bloomValuesMaxShardsCount + } + } + // Perform various checks if ph.FormatVersion > partFormatLatestVersion { logger.Panicf("FATAL: unsupported part format version; got %d; mustn't exceed %d", partFormatLatestVersion)