mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/logstorage: dynamically adjust the number of (bloom, values) shards in a part depending on the number of non-const columns
This allows reducing the amounts of data, which must be read during queries over logs with big number of fields (aka "wide events").
This, in turn, improves query performance when the data, which needs to be scanned during the query, doesn't fit OS page cache.
(cherry picked from commit 7a62eefa34
)
This commit is contained in:
parent
40be393f5a
commit
fe5f16b662
11 changed files with 131 additions and 71 deletions
|
@ -16,6 +16,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
## tip
|
||||
|
||||
* FEATURE: added ability to receive systemd (journald) logs over network. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4618).
|
||||
* FEATURE: improve performance for queries over large volume of logs with big number of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`).
|
||||
* FEATURE: improve performance for [`/select/logsql/field_values` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values).
|
||||
* FEATURE: improve performance for [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe) when it is applied directly to [log filter](https://docs.victoriametrics.com/victorialogs/logsql/#filters).
|
||||
|
||||
* BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix various glitches with updating query responses. The issue was introduced in [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7279).
|
||||
|
|
|
@ -432,10 +432,6 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v
|
|||
|
||||
// mustWriteTo writes b with the given sid to sw and updates bh accordingly.
|
||||
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
|
||||
// Do not store the version used for encoding directly in the block data, since:
|
||||
// - all the blocks in the same part use the same encoding
|
||||
// - the block encoding version can be put in metadata file for the part (aka metadataFilename)
|
||||
|
||||
b.assertValid()
|
||||
bh.reset()
|
||||
|
||||
|
|
|
@ -94,10 +94,6 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *
|
|||
|
||||
// mustWriteTo writes bd to sw and updates bh accordingly
|
||||
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
|
||||
// Do not store the version used for encoding directly in the block data, since:
|
||||
// - all the blocks in the same part use the same encoding
|
||||
// - the block encoding version can be put in metadata file for the part (aka metadataFilename)
|
||||
|
||||
bh.reset()
|
||||
|
||||
bh.streamID = bd.streamID
|
||||
|
|
|
@ -67,10 +67,6 @@ func (bh *blockHeader) copyFrom(src *blockHeader) {
|
|||
|
||||
// marshal appends the marshaled bh to dst and returns the result.
|
||||
func (bh *blockHeader) marshal(dst []byte) []byte {
|
||||
// Do not store the version used for encoding directly in the block header, since:
|
||||
// - all the block headers in the same part use the same encoding
|
||||
// - the format encoding version is stored in metadata file for the part (aka metadataFilename)
|
||||
|
||||
dst = bh.streamID.marshal(dst)
|
||||
dst = encoding.MarshalVarUint64(dst, bh.uncompressedSizeBytes)
|
||||
dst = encoding.MarshalVarUint64(dst, bh.rowsCount)
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
type readerWithStats struct {
|
||||
|
@ -63,7 +64,7 @@ type streamReaders struct {
|
|||
|
||||
messageBloomValuesReader bloomValuesReader
|
||||
oldBloomValuesReader bloomValuesReader
|
||||
bloomValuesShards [bloomValuesShardsCount]bloomValuesReader
|
||||
bloomValuesShards []bloomValuesReader
|
||||
}
|
||||
|
||||
type bloomValuesReader struct {
|
||||
|
@ -105,13 +106,14 @@ func (sr *streamReaders) reset() {
|
|||
|
||||
sr.messageBloomValuesReader.reset()
|
||||
sr.oldBloomValuesReader.reset()
|
||||
for i := range sr.bloomValuesShards[:] {
|
||||
for i := range sr.bloomValuesShards {
|
||||
sr.bloomValuesShards[i].reset()
|
||||
}
|
||||
sr.bloomValuesShards = sr.bloomValuesShards[:0]
|
||||
}
|
||||
|
||||
func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser,
|
||||
messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader,
|
||||
messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards []bloomValuesStreamReader,
|
||||
) {
|
||||
sr.columnNamesReader.init(columnNamesReader)
|
||||
sr.metaindexReader.init(metaindexReader)
|
||||
|
@ -122,7 +124,9 @@ func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, c
|
|||
|
||||
sr.messageBloomValuesReader.init(messageBloomValuesReader)
|
||||
sr.oldBloomValuesReader.init(oldBloomValuesReader)
|
||||
for i := range sr.bloomValuesShards[:] {
|
||||
|
||||
sr.bloomValuesShards = slicesutil.SetLength(sr.bloomValuesShards, len(bloomValuesShards))
|
||||
for i := range sr.bloomValuesShards {
|
||||
sr.bloomValuesShards[i].init(bloomValuesShards[i])
|
||||
}
|
||||
}
|
||||
|
@ -139,7 +143,7 @@ func (sr *streamReaders) totalBytesRead() uint64 {
|
|||
|
||||
n += sr.messageBloomValuesReader.totalBytesRead()
|
||||
n += sr.oldBloomValuesReader.totalBytesRead()
|
||||
for i := range sr.bloomValuesShards[:] {
|
||||
for i := range sr.bloomValuesShards {
|
||||
n += sr.bloomValuesShards[i].totalBytesRead()
|
||||
}
|
||||
|
||||
|
@ -156,7 +160,7 @@ func (sr *streamReaders) MustClose() {
|
|||
|
||||
sr.messageBloomValuesReader.MustClose()
|
||||
sr.oldBloomValuesReader.MustClose()
|
||||
for i := range sr.bloomValuesShards[:] {
|
||||
for i := range sr.bloomValuesShards {
|
||||
sr.bloomValuesShards[i].MustClose()
|
||||
}
|
||||
}
|
||||
|
@ -169,8 +173,12 @@ func (sr *streamReaders) getBloomValuesReaderForColumnName(name string, partForm
|
|||
return &sr.oldBloomValuesReader
|
||||
}
|
||||
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
|
||||
idx := h % uint64(len(sr.bloomValuesShards))
|
||||
n := len(sr.bloomValuesShards)
|
||||
idx := uint64(0)
|
||||
if n > 1 {
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
|
||||
idx = h % uint64(n)
|
||||
}
|
||||
return &sr.bloomValuesShards[idx]
|
||||
}
|
||||
|
||||
|
@ -280,9 +288,8 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
|
|||
|
||||
messageBloomValuesReader := mp.messageBloomValues.NewStreamReader()
|
||||
var oldBloomValuesReader bloomValuesStreamReader
|
||||
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader
|
||||
for i := range bloomValuesShards[:] {
|
||||
bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamReader()
|
||||
bloomValuesShards := []bloomValuesStreamReader{
|
||||
mp.fieldBloomValues.NewStreamReader(),
|
||||
}
|
||||
|
||||
bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
|
||||
|
@ -333,7 +340,7 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
|||
values: filestream.MustOpen(messageValuesPath, nocache),
|
||||
}
|
||||
var oldBloomValuesReader bloomValuesStreamReader
|
||||
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader
|
||||
var bloomValuesShards []bloomValuesStreamReader
|
||||
if bsr.ph.FormatVersion < 1 {
|
||||
bloomPath := filepath.Join(path, oldBloomFilename)
|
||||
oldBloomValuesReader.bloom = filestream.MustOpen(bloomPath, nocache)
|
||||
|
@ -341,7 +348,8 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
|
|||
valuesPath := filepath.Join(path, oldValuesFilename)
|
||||
oldBloomValuesReader.values = filestream.MustOpen(valuesPath, nocache)
|
||||
} else {
|
||||
for i := range bloomValuesShards[:] {
|
||||
bloomValuesShards = make([]bloomValuesStreamReader, bsr.ph.BloomValuesShardsCount)
|
||||
for i := range bloomValuesShards {
|
||||
shard := &bloomValuesShards[i]
|
||||
|
||||
bloomPath := getBloomFilePath(path, uint64(i))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"math/bits"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
|
@ -10,6 +11,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
// writerWithStats writes data to w and tracks the total amounts of data written at bytesWritten.
|
||||
|
@ -53,7 +55,7 @@ type streamWriters struct {
|
|||
timestampsWriter writerWithStats
|
||||
|
||||
messageBloomValuesWriter bloomValuesWriter
|
||||
bloomValuesShards [bloomValuesShardsCount]bloomValuesWriter
|
||||
bloomValuesShards []bloomValuesWriter
|
||||
}
|
||||
|
||||
type bloomValuesWriter struct {
|
||||
|
@ -94,13 +96,14 @@ func (sw *streamWriters) reset() {
|
|||
sw.timestampsWriter.reset()
|
||||
|
||||
sw.messageBloomValuesWriter.reset()
|
||||
for i := range sw.bloomValuesShards[:] {
|
||||
for i := range sw.bloomValuesShards {
|
||||
sw.bloomValuesShards[i].reset()
|
||||
}
|
||||
sw.bloomValuesShards = sw.bloomValuesShards[:0]
|
||||
}
|
||||
|
||||
func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser,
|
||||
messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter,
|
||||
messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards []bloomValuesStreamWriter,
|
||||
) {
|
||||
sw.columnNamesWriter.init(columnNamesWriter)
|
||||
sw.metaindexWriter.init(metaindexWriter)
|
||||
|
@ -110,7 +113,8 @@ func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, c
|
|||
sw.timestampsWriter.init(timestampsWriter)
|
||||
|
||||
sw.messageBloomValuesWriter.init(messageBloomValuesWriter)
|
||||
for i := range sw.bloomValuesShards[:] {
|
||||
sw.bloomValuesShards = slicesutil.SetLength(sw.bloomValuesShards, len(bloomValuesShards))
|
||||
for i := range sw.bloomValuesShards {
|
||||
sw.bloomValuesShards[i].init(bloomValuesShards[i])
|
||||
}
|
||||
}
|
||||
|
@ -126,7 +130,7 @@ func (sw *streamWriters) totalBytesWritten() uint64 {
|
|||
n += sw.timestampsWriter.bytesWritten
|
||||
|
||||
n += sw.messageBloomValuesWriter.totalBytesWritten()
|
||||
for i := range sw.bloomValuesShards[:] {
|
||||
for i := range sw.bloomValuesShards {
|
||||
n += sw.bloomValuesShards[i].totalBytesWritten()
|
||||
}
|
||||
|
||||
|
@ -142,7 +146,7 @@ func (sw *streamWriters) MustClose() {
|
|||
sw.timestampsWriter.MustClose()
|
||||
|
||||
sw.messageBloomValuesWriter.MustClose()
|
||||
for i := range sw.bloomValuesShards[:] {
|
||||
for i := range sw.bloomValuesShards {
|
||||
sw.bloomValuesShards[i].MustClose()
|
||||
}
|
||||
}
|
||||
|
@ -152,8 +156,12 @@ func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomVa
|
|||
return &sw.messageBloomValuesWriter
|
||||
}
|
||||
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
|
||||
idx := h % uint64(len(sw.bloomValuesShards))
|
||||
n := len(sw.bloomValuesShards)
|
||||
idx := uint64(0)
|
||||
if n > 1 {
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
|
||||
idx = h % uint64(n)
|
||||
}
|
||||
return &sw.bloomValuesShards[idx]
|
||||
}
|
||||
|
||||
|
@ -168,6 +176,9 @@ type blockStreamWriter struct {
|
|||
// sidFirst is the streamID for the first block in the current indexBlock
|
||||
sidFirst streamID
|
||||
|
||||
// bloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the output part.
|
||||
bloomValuesFieldsCount uint64
|
||||
|
||||
// minTimestampLast is the minimum timestamp seen for the last written block
|
||||
minTimestampLast int64
|
||||
|
||||
|
@ -213,6 +224,7 @@ func (bsw *blockStreamWriter) reset() {
|
|||
bsw.streamWriters.reset()
|
||||
bsw.sidLast.reset()
|
||||
bsw.sidFirst.reset()
|
||||
bsw.bloomValuesFieldsCount = 0
|
||||
bsw.minTimestampLast = 0
|
||||
bsw.minTimestamp = 0
|
||||
bsw.maxTimestamp = 0
|
||||
|
@ -243,9 +255,8 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
|
|||
|
||||
messageBloomValues := mp.messageBloomValues.NewStreamWriter()
|
||||
|
||||
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter
|
||||
for i := range bloomValuesShards[:] {
|
||||
bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamWriter()
|
||||
bloomValuesShards := []bloomValuesStreamWriter{
|
||||
mp.fieldBloomValues.NewStreamWriter(),
|
||||
}
|
||||
|
||||
bsw.streamWriters.init(&mp.columnNames, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, bloomValuesShards)
|
||||
|
@ -254,7 +265,7 @@ func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
|
|||
// MustInitForFilePart initializes bsw for writing data to file part located at path.
|
||||
//
|
||||
// if nocache is true, then the written data doesn't go to OS page cache.
|
||||
func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
|
||||
func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool, bloomValuesShardsCount uint64) {
|
||||
bsw.reset()
|
||||
|
||||
fs.MustMkdirFailIfExist(path)
|
||||
|
@ -284,8 +295,9 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
|
|||
values: filestream.MustCreate(messageValuesPath, nocache),
|
||||
}
|
||||
|
||||
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter
|
||||
for i := range bloomValuesShards[:] {
|
||||
bloomValuesShardsCount = adjustBloomValuesShardsCount(bloomValuesShardsCount)
|
||||
bloomValuesShards := make([]bloomValuesStreamWriter, bloomValuesShardsCount)
|
||||
for i := range bloomValuesShards {
|
||||
shard := &bloomValuesShards[i]
|
||||
|
||||
bloomPath := getBloomFilePath(path, uint64(i))
|
||||
|
@ -298,6 +310,18 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
|
|||
bsw.streamWriters.init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter, bloomValuesShards)
|
||||
}
|
||||
|
||||
func adjustBloomValuesShardsCount(n uint64) uint64 {
|
||||
if n == 0 {
|
||||
return n
|
||||
}
|
||||
|
||||
n = 1 << bits.Len64(n-1)
|
||||
if n > bloomValuesMaxShardsCount {
|
||||
n = bloomValuesMaxShardsCount
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// MustWriteRows writes timestamps with rows under the given sid to bsw.
|
||||
//
|
||||
// timestamps must be sorted.
|
||||
|
@ -348,11 +372,18 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd
|
|||
bsw.sidLast = *sid
|
||||
|
||||
bh := getBlockHeader()
|
||||
columnsLen := 0
|
||||
if b != nil {
|
||||
b.mustWriteTo(sid, bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
|
||||
columnsLen = len(b.columns)
|
||||
} else {
|
||||
bd.mustWriteTo(bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
|
||||
columnsLen = len(bd.columnsData)
|
||||
}
|
||||
if bsw.bloomValuesFieldsCount < uint64(columnsLen) {
|
||||
bsw.bloomValuesFieldsCount = uint64(columnsLen)
|
||||
}
|
||||
|
||||
th := &bh.timestampsHeader
|
||||
if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp {
|
||||
bsw.globalMinTimestamp = th.minTimestamp
|
||||
|
@ -407,6 +438,8 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
|
|||
ph.BlocksCount = bsw.globalBlocksCount
|
||||
ph.MinTimestamp = bsw.globalMinTimestamp
|
||||
ph.MaxTimestamp = bsw.globalMaxTimestamp
|
||||
ph.BloomValuesShardsCount = uint64(len(bsw.streamWriters.bloomValuesShards))
|
||||
ph.BloomValuesFieldsCount = bsw.bloomValuesFieldsCount
|
||||
|
||||
bsw.mustFlushIndexBlock(bsw.indexBlockData)
|
||||
|
||||
|
|
|
@ -3,12 +3,12 @@ package logstorage
|
|||
// partFormatLatestVersion is the latest format version for parts.
|
||||
//
|
||||
// See partHeader.FormatVersion for details.
|
||||
const partFormatLatestVersion = 1
|
||||
const partFormatLatestVersion = 2
|
||||
|
||||
// bloomValuesShardsCount is the number of shards for bloomFilename and valuesFilename files.
|
||||
// bloomValuesMaxShardsCount is the number of shards for bloomFilename and valuesFilename files.
|
||||
//
|
||||
// The partHeader.FormatVersion must be updated when this number changes.
|
||||
const bloomValuesShardsCount = 8
|
||||
// The partHeader.FormatVersion and partFormatLatestVersion must be updated when this number changes.
|
||||
const bloomValuesMaxShardsCount = 128
|
||||
|
||||
// maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block.
|
||||
//
|
||||
|
|
|
@ -530,10 +530,15 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
|
|||
srcSize := uint64(0)
|
||||
srcRowsCount := uint64(0)
|
||||
srcBlocksCount := uint64(0)
|
||||
bloomValuesShardsCount := uint64(0)
|
||||
for _, pw := range pws {
|
||||
srcSize += pw.p.ph.CompressedSizeBytes
|
||||
srcRowsCount += pw.p.ph.RowsCount
|
||||
srcBlocksCount += pw.p.ph.BlocksCount
|
||||
ph := &pw.p.ph
|
||||
srcSize += ph.CompressedSizeBytes
|
||||
srcRowsCount += ph.RowsCount
|
||||
srcBlocksCount += ph.BlocksCount
|
||||
if ph.BloomValuesFieldsCount > bloomValuesShardsCount {
|
||||
bloomValuesShardsCount = ph.BloomValuesFieldsCount
|
||||
}
|
||||
}
|
||||
bsw := getBlockStreamWriter()
|
||||
var mpNew *inmemoryPart
|
||||
|
@ -542,7 +547,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
|
|||
bsw.MustInitForInmemoryPart(mpNew)
|
||||
} else {
|
||||
nocache := dstPartType == partBig
|
||||
bsw.MustInitForFilePart(dstPartPath, nocache)
|
||||
bsw.MustInitForFilePart(dstPartPath, nocache, bloomValuesShardsCount)
|
||||
}
|
||||
|
||||
// Merge source parts to destination part.
|
||||
|
|
|
@ -22,7 +22,7 @@ type inmemoryPart struct {
|
|||
timestamps bytesutil.ByteBuffer
|
||||
|
||||
messageBloomValues bloomValuesBuffer
|
||||
bloomValuesShards [bloomValuesShardsCount]bloomValuesBuffer
|
||||
fieldBloomValues bloomValuesBuffer
|
||||
}
|
||||
|
||||
type bloomValuesBuffer struct {
|
||||
|
@ -61,9 +61,7 @@ func (mp *inmemoryPart) reset() {
|
|||
mp.timestamps.Reset()
|
||||
|
||||
mp.messageBloomValues.reset()
|
||||
for i := range mp.bloomValuesShards[:] {
|
||||
mp.bloomValuesShards[i].reset()
|
||||
}
|
||||
mp.fieldBloomValues.reset()
|
||||
}
|
||||
|
||||
// mustInitFromRows initializes mp from lr.
|
||||
|
@ -127,15 +125,11 @@ func (mp *inmemoryPart) MustStoreToDisk(path string) {
|
|||
fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B)
|
||||
fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B)
|
||||
|
||||
for i := range mp.bloomValuesShards[:] {
|
||||
shard := &mp.bloomValuesShards[i]
|
||||
bloomPath := getBloomFilePath(path, 0)
|
||||
fs.MustWriteSync(bloomPath, mp.fieldBloomValues.bloom.B)
|
||||
|
||||
bloomPath := getBloomFilePath(path, uint64(i))
|
||||
fs.MustWriteSync(bloomPath, shard.bloom.B)
|
||||
|
||||
valuesPath := getValuesFilePath(path, uint64(i))
|
||||
fs.MustWriteSync(valuesPath, shard.values.B)
|
||||
}
|
||||
valuesPath := getValuesFilePath(path, 0)
|
||||
fs.MustWriteSync(valuesPath, mp.fieldBloomValues.values.B)
|
||||
|
||||
mp.ph.mustWriteMetadata(path)
|
||||
|
||||
|
|
|
@ -41,7 +41,8 @@ type part struct {
|
|||
|
||||
messageBloomValues bloomValuesReaderAt
|
||||
oldBloomValues bloomValuesReaderAt
|
||||
bloomValuesShards [bloomValuesShardsCount]bloomValuesReaderAt
|
||||
|
||||
bloomValuesShards []bloomValuesReaderAt
|
||||
}
|
||||
|
||||
type bloomValuesReaderAt struct {
|
||||
|
@ -76,14 +77,15 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
|
|||
p.columnsHeaderFile = &mp.columnsHeader
|
||||
p.timestampsFile = &mp.timestamps
|
||||
|
||||
// Open files with bloom filters and column values
|
||||
p.messageBloomValues.bloom = &mp.messageBloomValues.bloom
|
||||
p.messageBloomValues.values = &mp.messageBloomValues.values
|
||||
|
||||
// Open files with bloom filters and column values
|
||||
for i := range p.bloomValuesShards[:] {
|
||||
shard := &p.bloomValuesShards[i]
|
||||
shard.bloom = &mp.bloomValuesShards[i].bloom
|
||||
shard.values = &mp.bloomValuesShards[i].values
|
||||
p.bloomValuesShards = []bloomValuesReaderAt{
|
||||
{
|
||||
bloom: &mp.fieldBloomValues.bloom,
|
||||
values: &mp.fieldBloomValues.values,
|
||||
},
|
||||
}
|
||||
|
||||
return &p
|
||||
|
@ -140,7 +142,8 @@ func mustOpenFilePart(pt *partition, path string) *part {
|
|||
valuesPath := filepath.Join(path, oldValuesFilename)
|
||||
p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath)
|
||||
} else {
|
||||
for i := range p.bloomValuesShards[:] {
|
||||
p.bloomValuesShards = make([]bloomValuesReaderAt, p.ph.BloomValuesShardsCount)
|
||||
for i := range p.bloomValuesShards {
|
||||
shard := &p.bloomValuesShards[i]
|
||||
|
||||
bloomPath := getBloomFilePath(path, uint64(i))
|
||||
|
@ -166,7 +169,7 @@ func mustClosePart(p *part) {
|
|||
if p.ph.FormatVersion < 1 {
|
||||
p.oldBloomValues.MustClose()
|
||||
} else {
|
||||
for i := range p.bloomValuesShards[:] {
|
||||
for i := range p.bloomValuesShards {
|
||||
p.bloomValuesShards[i].MustClose()
|
||||
}
|
||||
}
|
||||
|
@ -183,8 +186,12 @@ func (p *part) getBloomValuesFileForColumnName(name string) *bloomValuesReaderAt
|
|||
return &p.oldBloomValues
|
||||
}
|
||||
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
|
||||
idx := h % uint64(len(p.bloomValuesShards))
|
||||
n := len(p.bloomValuesShards)
|
||||
idx := uint64(0)
|
||||
if n > 1 {
|
||||
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
|
||||
idx = h % uint64(n)
|
||||
}
|
||||
return &p.bloomValuesShards[idx]
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,12 @@ type partHeader struct {
|
|||
|
||||
// MaxTimestamp is the maximum timestamp seen in the part
|
||||
MaxTimestamp int64
|
||||
|
||||
// BloomValuesShardsCount is the number of (bloom, values) shards in the part.
|
||||
BloomValuesShardsCount uint64
|
||||
|
||||
// BloomValuesFieldsCount is the number of fields with (bloom, values) pairs in the given part.
|
||||
BloomValuesFieldsCount uint64
|
||||
}
|
||||
|
||||
// reset resets ph for subsequent re-use
|
||||
|
@ -45,12 +51,16 @@ func (ph *partHeader) reset() {
|
|||
ph.BlocksCount = 0
|
||||
ph.MinTimestamp = 0
|
||||
ph.MaxTimestamp = 0
|
||||
ph.BloomValuesShardsCount = 0
|
||||
ph.BloomValuesFieldsCount = 0
|
||||
}
|
||||
|
||||
// String returns string represenation for ph.
|
||||
func (ph *partHeader) String() string {
|
||||
return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}",
|
||||
ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp))
|
||||
return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, "+
|
||||
"MinTimestamp=%s, MaxTimestamp=%s, BloomValuesShardsCount=%d, BloomValuesFieldsCount=%d}",
|
||||
ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount,
|
||||
timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp), ph.BloomValuesShardsCount, ph.BloomValuesFieldsCount)
|
||||
}
|
||||
|
||||
func (ph *partHeader) mustReadMetadata(partPath string) {
|
||||
|
@ -65,6 +75,19 @@ func (ph *partHeader) mustReadMetadata(partPath string) {
|
|||
logger.Panicf("FATAL: cannot parse %q: %s", metadataPath, err)
|
||||
}
|
||||
|
||||
if ph.FormatVersion <= 1 {
|
||||
if ph.BloomValuesShardsCount != 0 {
|
||||
logger.Panicf("FATAL: unexpected BloomValuesShardsCount for FormatVersion<=1; got %d; want 0", ph.BloomValuesShardsCount)
|
||||
}
|
||||
if ph.BloomValuesFieldsCount != 0 {
|
||||
logger.Panicf("FATAL: unexpected BloomValuesFieldsCount for FormatVersion<=1; got %d; want 0", ph.BloomValuesFieldsCount)
|
||||
}
|
||||
if ph.FormatVersion == 1 {
|
||||
ph.BloomValuesShardsCount = 8
|
||||
ph.BloomValuesFieldsCount = bloomValuesMaxShardsCount
|
||||
}
|
||||
}
|
||||
|
||||
// Perform various checks
|
||||
if ph.FormatVersion > partFormatLatestVersion {
|
||||
logger.Panicf("FATAL: unsupported part format version; got %d; mustn't exceed %d", partFormatLatestVersion)
|
||||
|
|
Loading…
Reference in a new issue