diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 7cdaa08b3..36affa5b7 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: added ability to receive systemd (journald) logs over network. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4618). +* FEATURE: improve performance for queries over large volume of logs with big number of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`). +* FEATURE: improve performance for [`/select/logsql/field_values` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values). * FEATURE: improve performance for [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe) when it is applied directly to [log filter](https://docs.victoriametrics.com/victorialogs/logsql/#filters). * BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix various glitches with updating query responses. The issue was introduced in [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7279). diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 3670991d1..3c23e17e0 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -432,10 +432,6 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v // mustWriteTo writes b with the given sid to sw and updates bh accordingly. func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { - // Do not store the version used for encoding directly in the block data, since: - // - all the blocks in the same part use the same encoding - // - the block encoding version can be put in metadata file for the part (aka metadataFilename) - b.assertValid() bh.reset() diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index 5c76ec3f0..fd315659b 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -94,10 +94,6 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd * // mustWriteTo writes bd to sw and updates bh accordingly func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { - // Do not store the version used for encoding directly in the block data, since: - // - all the blocks in the same part use the same encoding - // - the block encoding version can be put in metadata file for the part (aka metadataFilename) - bh.reset() bh.streamID = bd.streamID diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index 03ab0ebb3..da620e3c0 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -67,10 +67,6 @@ func (bh *blockHeader) copyFrom(src *blockHeader) { // marshal appends the marshaled bh to dst and returns the result. func (bh *blockHeader) marshal(dst []byte) []byte { - // Do not store the version used for encoding directly in the block header, since: - // - all the block headers in the same part use the same encoding - // - the format encoding version is stored in metadata file for the part (aka metadataFilename) - dst = bh.streamID.marshal(dst) dst = encoding.MarshalVarUint64(dst, bh.uncompressedSizeBytes) dst = encoding.MarshalVarUint64(dst, bh.rowsCount) diff --git a/lib/logstorage/block_stream_reader.go b/lib/logstorage/block_stream_reader.go index f8f3af015..7b706954c 100644 --- a/lib/logstorage/block_stream_reader.go +++ b/lib/logstorage/block_stream_reader.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) type readerWithStats struct { @@ -63,7 +64,7 @@ type streamReaders struct { messageBloomValuesReader bloomValuesReader oldBloomValuesReader bloomValuesReader - bloomValuesShards [bloomValuesShardsCount]bloomValuesReader + bloomValuesShards []bloomValuesReader } type bloomValuesReader struct { @@ -105,13 +106,14 @@ func (sr *streamReaders) reset() { sr.messageBloomValuesReader.reset() sr.oldBloomValuesReader.reset() - for i := range sr.bloomValuesShards[:] { + for i := range sr.bloomValuesShards { sr.bloomValuesShards[i].reset() } + sr.bloomValuesShards = sr.bloomValuesShards[:0] } func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser, - messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader, + messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader, ) { sr.columnNamesReader.init(columnNamesReader) sr.metaindexReader.init(metaindexReader) @@ -122,7 +124,9 @@ func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, c sr.messageBloomValuesReader.init(messageBloomValuesReader) sr.oldBloomValuesReader.init(oldBloomValuesReader) - for i := range sr.bloomValuesShards[:] { + + sr.bloomValuesShards = slicesutil.SetLength(sr.bloomValuesShards, len(bloomValuesShards)) + for i := range sr.bloomValuesShards { sr.bloomValuesShards[i].init(bloomValuesShards[i]) } } @@ -139,7 +143,7 @@ func (sr *streamReaders) totalBytesRead() uint64 { n += sr.messageBloomValuesReader.totalBytesRead() n += sr.oldBloomValuesReader.totalBytesRead() - for i := range sr.bloomValuesShards[:] { + for i := range sr.bloomValuesShards { n += sr.bloomValuesShards[i].totalBytesRead() } @@ -156,7 +160,7 @@ func (sr *streamReaders) MustClose() { sr.messageBloomValuesReader.MustClose() sr.oldBloomValuesReader.MustClose() - for i := range sr.bloomValuesShards[:] { + for i := range sr.bloomValuesShards { sr.bloomValuesShards[i].MustClose() } } @@ -169,8 +173,12 @@ func (sr *streamReaders) getBloomValuesReaderForColumnName(name string, partForm return &sr.oldBloomValuesReader } - h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) - idx := h % uint64(len(sr.bloomValuesShards)) + n := len(sr.bloomValuesShards) + idx := uint64(0) + if n > 1 { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx = h % uint64(n) + } return &sr.bloomValuesShards[idx] } @@ -280,9 +288,8 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { messageBloomValuesReader := mp.messageBloomValues.NewStreamReader() var oldBloomValuesReader bloomValuesStreamReader - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader - for i := range bloomValuesShards[:] { - bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamReader() + bloomValuesShards := []bloomValuesStreamReader{ + mp.fieldBloomValues.NewStreamReader(), } bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader, @@ -333,7 +340,7 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { values: filestream.MustOpen(messageValuesPath, nocache), } var oldBloomValuesReader bloomValuesStreamReader - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader + var bloomValuesShards []bloomValuesStreamReader if bsr.ph.FormatVersion < 1 { bloomPath := filepath.Join(path, oldBloomFilename) oldBloomValuesReader.bloom = filestream.MustOpen(bloomPath, nocache) @@ -341,7 +348,8 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { valuesPath := filepath.Join(path, oldValuesFilename) oldBloomValuesReader.values = filestream.MustOpen(valuesPath, nocache) } else { - for i := range bloomValuesShards[:] { + bloomValuesShards = make([]bloomValuesStreamReader, bsr.ph.BloomValuesShardsCount) + for i := range bloomValuesShards { shard := &bloomValuesShards[i] bloomPath := getBloomFilePath(path, uint64(i)) diff --git a/lib/logstorage/block_stream_writer.go b/lib/logstorage/block_stream_writer.go index a95a419ec..3d3a54382 100644 --- a/lib/logstorage/block_stream_writer.go +++ b/lib/logstorage/block_stream_writer.go @@ -1,6 +1,7 @@ package logstorage import ( + "math/bits" "path/filepath" "sync" @@ -10,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // writerWithStats writes data to w and tracks the total amounts of data written at bytesWritten. @@ -53,7 +55,7 @@ type streamWriters struct { timestampsWriter writerWithStats messageBloomValuesWriter bloomValuesWriter - bloomValuesShards [bloomValuesShardsCount]bloomValuesWriter + bloomValuesShards []bloomValuesWriter } type bloomValuesWriter struct { @@ -94,13 +96,14 @@ func (sw *streamWriters) reset() { sw.timestampsWriter.reset() sw.messageBloomValuesWriter.reset() - for i := range sw.bloomValuesShards[:] { + for i := range sw.bloomValuesShards { sw.bloomValuesShards[i].reset() } + sw.bloomValuesShards = sw.bloomValuesShards[:0] } func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser, - messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter, + messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards []bloomValuesStreamWriter, ) { sw.columnNamesWriter.init(columnNamesWriter) sw.metaindexWriter.init(metaindexWriter) @@ -110,7 +113,8 @@ func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, c sw.timestampsWriter.init(timestampsWriter) sw.messageBloomValuesWriter.init(messageBloomValuesWriter) - for i := range sw.bloomValuesShards[:] { + sw.bloomValuesShards = slicesutil.SetLength(sw.bloomValuesShards, len(bloomValuesShards)) + for i := range sw.bloomValuesShards { sw.bloomValuesShards[i].init(bloomValuesShards[i]) } } @@ -126,7 +130,7 @@ func (sw *streamWriters) totalBytesWritten() uint64 { n += sw.timestampsWriter.bytesWritten n += sw.messageBloomValuesWriter.totalBytesWritten() - for i := range sw.bloomValuesShards[:] { + for i := range sw.bloomValuesShards { n += sw.bloomValuesShards[i].totalBytesWritten() } @@ -142,7 +146,7 @@ func (sw *streamWriters) MustClose() { sw.timestampsWriter.MustClose() sw.messageBloomValuesWriter.MustClose() - for i := range sw.bloomValuesShards[:] { + for i := range sw.bloomValuesShards { sw.bloomValuesShards[i].MustClose() } } @@ -152,8 +156,12 @@ func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomVa return &sw.messageBloomValuesWriter } - h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) - idx := h % uint64(len(sw.bloomValuesShards)) + n := len(sw.bloomValuesShards) + idx := uint64(0) + if n > 1 { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx = h % uint64(n) + } return &sw.bloomValuesShards[idx] } @@ -168,6 +176,9 @@ type blockStreamWriter struct { // sidFirst is the streamID for the first block in the current indexBlock sidFirst streamID + // bloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the output part. + bloomValuesFieldsCount uint64 + // minTimestampLast is the minimum timestamp seen for the last written block minTimestampLast int64 @@ -213,6 +224,7 @@ func (bsw *blockStreamWriter) reset() { bsw.streamWriters.reset() bsw.sidLast.reset() bsw.sidFirst.reset() + bsw.bloomValuesFieldsCount = 0 bsw.minTimestampLast = 0 bsw.minTimestamp = 0 bsw.maxTimestamp = 0 @@ -243,9 +255,8 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { messageBloomValues := mp.messageBloomValues.NewStreamWriter() - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter - for i := range bloomValuesShards[:] { - bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamWriter() + bloomValuesShards := []bloomValuesStreamWriter{ + mp.fieldBloomValues.NewStreamWriter(), } bsw.streamWriters.init(&mp.columnNames, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, bloomValuesShards) @@ -254,7 +265,7 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { // MustInitForFilePart initializes bsw for writing data to file part located at path. // // if nocache is true, then the written data doesn't go to OS page cache. -func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { +func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool, bloomValuesShardsCount uint64) { bsw.reset() fs.MustMkdirFailIfExist(path) @@ -284,8 +295,9 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { values: filestream.MustCreate(messageValuesPath, nocache), } - var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter - for i := range bloomValuesShards[:] { + bloomValuesShardsCount = adjustBloomValuesShardsCount(bloomValuesShardsCount) + bloomValuesShards := make([]bloomValuesStreamWriter, bloomValuesShardsCount) + for i := range bloomValuesShards { shard := &bloomValuesShards[i] bloomPath := getBloomFilePath(path, uint64(i)) @@ -298,6 +310,18 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { bsw.streamWriters.init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter, bloomValuesShards) } +func adjustBloomValuesShardsCount(n uint64) uint64 { + if n == 0 { + return n + } + + n = 1 << bits.Len64(n-1) + if n > bloomValuesMaxShardsCount { + n = bloomValuesMaxShardsCount + } + return n +} + // MustWriteRows writes timestamps with rows under the given sid to bsw. // // timestamps must be sorted. @@ -348,11 +372,18 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd bsw.sidLast = *sid bh := getBlockHeader() + columnsLen := 0 if b != nil { b.mustWriteTo(sid, bh, &bsw.streamWriters, &bsw.columnNameIDGenerator) + columnsLen = len(b.columns) } else { bd.mustWriteTo(bh, &bsw.streamWriters, &bsw.columnNameIDGenerator) + columnsLen = len(bd.columnsData) } + if bsw.bloomValuesFieldsCount < uint64(columnsLen) { + bsw.bloomValuesFieldsCount = uint64(columnsLen) + } + th := &bh.timestampsHeader if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp { bsw.globalMinTimestamp = th.minTimestamp @@ -407,6 +438,8 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) { ph.BlocksCount = bsw.globalBlocksCount ph.MinTimestamp = bsw.globalMinTimestamp ph.MaxTimestamp = bsw.globalMaxTimestamp + ph.BloomValuesShardsCount = uint64(len(bsw.streamWriters.bloomValuesShards)) + ph.BloomValuesFieldsCount = bsw.bloomValuesFieldsCount bsw.mustFlushIndexBlock(bsw.indexBlockData) diff --git a/lib/logstorage/consts.go b/lib/logstorage/consts.go index 49a38470d..d1de8faea 100644 --- a/lib/logstorage/consts.go +++ b/lib/logstorage/consts.go @@ -3,12 +3,12 @@ package logstorage // partFormatLatestVersion is the latest format version for parts. // // See partHeader.FormatVersion for details. -const partFormatLatestVersion = 1 +const partFormatLatestVersion = 2 -// bloomValuesShardsCount is the number of shards for bloomFilename and valuesFilename files. +// bloomValuesMaxShardsCount is the number of shards for bloomFilename and valuesFilename files. // -// The partHeader.FormatVersion must be updated when this number changes. -const bloomValuesShardsCount = 8 +// The partHeader.FormatVersion and partFormatLatestVersion must be updated when this number changes. +const bloomValuesMaxShardsCount = 128 // maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block. // diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index 99a1d6f7a..aaf006200 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -530,10 +530,15 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { srcSize := uint64(0) srcRowsCount := uint64(0) srcBlocksCount := uint64(0) + bloomValuesShardsCount := uint64(0) for _, pw := range pws { - srcSize += pw.p.ph.CompressedSizeBytes - srcRowsCount += pw.p.ph.RowsCount - srcBlocksCount += pw.p.ph.BlocksCount + ph := &pw.p.ph + srcSize += ph.CompressedSizeBytes + srcRowsCount += ph.RowsCount + srcBlocksCount += ph.BlocksCount + if ph.BloomValuesFieldsCount > bloomValuesShardsCount { + bloomValuesShardsCount = ph.BloomValuesFieldsCount + } } bsw := getBlockStreamWriter() var mpNew *inmemoryPart @@ -542,7 +547,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { bsw.MustInitForInmemoryPart(mpNew) } else { nocache := dstPartType == partBig - bsw.MustInitForFilePart(dstPartPath, nocache) + bsw.MustInitForFilePart(dstPartPath, nocache, bloomValuesShardsCount) } // Merge source parts to destination part. diff --git a/lib/logstorage/inmemory_part.go b/lib/logstorage/inmemory_part.go index fd50b7ccd..ff98bb1cf 100644 --- a/lib/logstorage/inmemory_part.go +++ b/lib/logstorage/inmemory_part.go @@ -22,7 +22,7 @@ type inmemoryPart struct { timestamps bytesutil.ByteBuffer messageBloomValues bloomValuesBuffer - bloomValuesShards [bloomValuesShardsCount]bloomValuesBuffer + fieldBloomValues bloomValuesBuffer } type bloomValuesBuffer struct { @@ -61,9 +61,7 @@ func (mp *inmemoryPart) reset() { mp.timestamps.Reset() mp.messageBloomValues.reset() - for i := range mp.bloomValuesShards[:] { - mp.bloomValuesShards[i].reset() - } + mp.fieldBloomValues.reset() } // mustInitFromRows initializes mp from lr. @@ -127,15 +125,11 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) { fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B) fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B) - for i := range mp.bloomValuesShards[:] { - shard := &mp.bloomValuesShards[i] + bloomPath := getBloomFilePath(path, 0) + fs.MustWriteSync(bloomPath, mp.fieldBloomValues.bloom.B) - bloomPath := getBloomFilePath(path, uint64(i)) - fs.MustWriteSync(bloomPath, shard.bloom.B) - - valuesPath := getValuesFilePath(path, uint64(i)) - fs.MustWriteSync(valuesPath, shard.values.B) - } + valuesPath := getValuesFilePath(path, 0) + fs.MustWriteSync(valuesPath, mp.fieldBloomValues.values.B) mp.ph.mustWriteMetadata(path) diff --git a/lib/logstorage/part.go b/lib/logstorage/part.go index 6598ebdbc..4fe325abf 100644 --- a/lib/logstorage/part.go +++ b/lib/logstorage/part.go @@ -41,7 +41,8 @@ type part struct { messageBloomValues bloomValuesReaderAt oldBloomValues bloomValuesReaderAt - bloomValuesShards [bloomValuesShardsCount]bloomValuesReaderAt + + bloomValuesShards []bloomValuesReaderAt } type bloomValuesReaderAt struct { @@ -76,14 +77,15 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part { p.columnsHeaderFile = &mp.columnsHeader p.timestampsFile = &mp.timestamps + // Open files with bloom filters and column values p.messageBloomValues.bloom = &mp.messageBloomValues.bloom p.messageBloomValues.values = &mp.messageBloomValues.values - // Open files with bloom filters and column values - for i := range p.bloomValuesShards[:] { - shard := &p.bloomValuesShards[i] - shard.bloom = &mp.bloomValuesShards[i].bloom - shard.values = &mp.bloomValuesShards[i].values + p.bloomValuesShards = []bloomValuesReaderAt{ + { + bloom: &mp.fieldBloomValues.bloom, + values: &mp.fieldBloomValues.values, + }, } return &p @@ -140,7 +142,8 @@ func mustOpenFilePart(pt *partition, path string) *part { valuesPath := filepath.Join(path, oldValuesFilename) p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath) } else { - for i := range p.bloomValuesShards[:] { + p.bloomValuesShards = make([]bloomValuesReaderAt, p.ph.BloomValuesShardsCount) + for i := range p.bloomValuesShards { shard := &p.bloomValuesShards[i] bloomPath := getBloomFilePath(path, uint64(i)) @@ -166,7 +169,7 @@ func mustClosePart(p *part) { if p.ph.FormatVersion < 1 { p.oldBloomValues.MustClose() } else { - for i := range p.bloomValuesShards[:] { + for i := range p.bloomValuesShards { p.bloomValuesShards[i].MustClose() } } @@ -183,8 +186,12 @@ func (p *part) getBloomValuesFileForColumnName(name string) *bloomValuesReaderAt return &p.oldBloomValues } - h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) - idx := h % uint64(len(p.bloomValuesShards)) + n := len(p.bloomValuesShards) + idx := uint64(0) + if n > 1 { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx = h % uint64(n) + } return &p.bloomValuesShards[idx] } diff --git a/lib/logstorage/part_header.go b/lib/logstorage/part_header.go index 05baf4195..f6d782749 100644 --- a/lib/logstorage/part_header.go +++ b/lib/logstorage/part_header.go @@ -34,6 +34,12 @@ type partHeader struct { // MaxTimestamp is the maximum timestamp seen in the part MaxTimestamp int64 + + // BloomValuesShardsCount is the number of (bloom, values) shards in the part. + BloomValuesShardsCount uint64 + + // BloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the given part. + BloomValuesFieldsCount uint64 } // reset resets ph for subsequent re-use @@ -45,12 +51,16 @@ func (ph *partHeader) reset() { ph.BlocksCount = 0 ph.MinTimestamp = 0 ph.MaxTimestamp = 0 + ph.BloomValuesShardsCount = 0 + ph.BloomValuesFieldsCount = 0 } // String returns string represenation for ph. func (ph *partHeader) String() string { - return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}", - ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp)) + return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, "+ + "MinTimestamp=%s, MaxTimestamp=%s, BloomValuesShardsCount=%d, BloomValuesFieldsCount=%d}", + ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, + timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp), ph.BloomValuesShardsCount, ph.BloomValuesFieldsCount) } func (ph *partHeader) mustReadMetadata(partPath string) { @@ -65,6 +75,19 @@ func (ph *partHeader) mustReadMetadata(partPath string) { logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err) } + if ph.FormatVersion <= 1 { + if ph.BloomValuesShardsCount != 0 { + logger.Panicf("FATAL: unexpected BloomValuesShardsCount for FormatVersion<=1; got %d; want 0", ph.BloomValuesShardsCount) + } + if ph.BloomValuesFieldsCount != 0 { + logger.Panicf("FATAL: unexpected BloomValuesFieldsCount for FormatVersion<=1; got %d; want 0", ph.BloomValuesFieldsCount) + } + if ph.FormatVersion == 1 { + ph.BloomValuesShardsCount = 8 + ph.BloomValuesFieldsCount = bloomValuesMaxShardsCount + } + } + // Perform various checks if ph.FormatVersion > partFormatLatestVersion { logger.Panicf("FATAL: unsupported part format version; got %d; mustn't exceed %d", partFormatLatestVersion)