lib/logstorage: refactor storage format to be more efficient for querying wide events

It has been appeared that VictoriaLogs is frequently used for collecting logs with tens of fields.
For example, standard Kuberntes setup on top of Filebeat generates more than 20 fields per each log.
Such logs are also known as "wide events".

The previous storage format was optimized for logs with a few fields. When at least a single field
was referenced in the query, then the all the meta-information about all the log fields was unpacked
and parsed per each scanned block during the query. This could require a lot of additional disk IO
and CPU time when logs contain many fields. Resolve this issue by providing an (field -> metainfo_offset)
index per each field in every data block. This index allows reading and extracting only the needed
metainfo for fields used in the query. This index is stored in columnsHeaderIndexFilename ( columns_header_index.bin ).
This allows increasing performance for queries over wide events by 10x and more.

Another issue was that the data for bloom filters and field values across all the log fields except of _msg
was intermixed in two files - fieldBloomFilename ( field_bloom.bin ) and fieldValuesFilename ( field_values.bin ).
This could result in huge disk read IO overhead when some small field was referred in the query,
since the Operating System usually reads more data than requested. It reads the data from disk
in at least 4KiB blocks (usually the block size is much bigger in the range 64KiB - 512KiB).
So, if 512-byte bloom filter or values' block is read from the file, then the Operating System
reads up to 512KiB of data from disk, which results in 1000x disk read IO overhead. This overhead isn't visible
for recently accessed data, since this data is usually stored in RAM (aka Operating System page cache),
but this overhead may become very annoying when performing the query over large volumes of data
which isn't present in OS page cache.

The solution for this issue is to split bloom filters and field values across multiple shards.
This reduces the worst-case disk read IO overhead by at least Nx where N is the number of shards,
while the disk read IO overhead is completely removed in best case when the number of columns doesn't exceed N.
Currently the number of shards is 8 - see bloomValuesShardsCount . This solution increases
performance for queries over large volumes of newly ingested data by up to 1000x.

The new storage format is versioned as v1, while the old storage format is version as v0.
It is stored in the partHeader.FormatVersion.

Parts with the old storage format are converted into parts with the new storage format during background merge.
It is possible to force merge by querying /internal/force_merge HTTP endpoint - see https://docs.victoriametrics.com/victorialogs/#forced-merge .
This commit is contained in:
Aliaksandr Valialkin 2024-10-16 16:18:28 +02:00
parent 1d637667a6
commit 202eb429a7
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
26 changed files with 1766 additions and 368 deletions

View file

@ -15,8 +15,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: optimize [LogsQL queries](https://docs.victoriametrics.com/victorialogs/logsql/), which need to scan big number of logs with big number of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`). The performance for such queries is improved by 10x and more depending on the number of log fields in the scanned logs. The performance improvement is visible when querying logs ingested after the upgrade to this release.
* FEATURE: add support for forced merge. See [these docs](https://docs.victoriametrics.com/victorialogs/#forced-merge). * FEATURE: add support for forced merge. See [these docs](https://docs.victoriametrics.com/victorialogs/#forced-merge).
* FEATURE: skip empty log fields in query results, since they are treated as non-existing fields in [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: skip empty log fields in query results, since they are treated as non-existing fields in [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). This should reduce the level of confusion for end users when they see empty log fields.
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to cancel running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7097). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to cancel running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7097).
* BUGFIX: avoid possible panic when logs for a new day are ingested during execution of concurrent queries. * BUGFIX: avoid possible panic when logs for a new day are ingested during execution of concurrent queries.

View file

@ -120,6 +120,13 @@ func (bm *bitmap) or(x *bitmap) {
} }
} }
func (bm *bitmap) setBit(i int) {
wordIdx := uint(i) / 64
wordOffset := uint(i) % 64
wordPtr := &bm.a[wordIdx]
*wordPtr |= (1 << wordOffset)
}
func (bm *bitmap) isSetBit(i int) bool { func (bm *bitmap) isSetBit(i int) bool {
wordIdx := uint(i) / 64 wordIdx := uint(i) / 64
wordOffset := uint(i) % 64 wordOffset := uint(i) % 64

View file

@ -27,7 +27,6 @@ func TestBitmap(t *testing.T) {
} }
bm.setBits() bm.setBits()
if n := bm.onesCount(); n != i { if n := bm.onesCount(); n != i {
t.Fatalf("unexpected number of set bits; got %d; want %d", n, i) t.Fatalf("unexpected number of set bits; got %d; want %d", n, i)
} }
@ -106,6 +105,23 @@ func TestBitmap(t *testing.T) {
t.Fatalf("unexpected non-zero number of set bits remained: %d", bitsCount) t.Fatalf("unexpected non-zero number of set bits remained: %d", bitsCount)
} }
// Set bits via setBit() call
for i := 0; i < bitsLen; i++ {
if n := bm.onesCount(); n != i {
t.Fatalf("unexpected number of ones set; got %d; want %d", n, i)
}
if bm.isSetBit(i) {
t.Fatalf("the bit %d mustn't be set", i)
}
bm.setBit(i)
if !bm.isSetBit(i) {
t.Fatalf("the bit %d must be set", i)
}
if n := bm.onesCount(); n != i+1 {
t.Fatalf("unexpected number of ones set; got %d; want %d", n, i+1)
}
}
putBitmap(bm) putBitmap(bm)
} }
} }

View file

@ -150,18 +150,13 @@ func (c *column) resizeValues(valuesLen int) []string {
// mustWriteTo writes c to sw and updates ch accordingly. // mustWriteTo writes c to sw and updates ch accordingly.
// //
// ch is valid until c is changed. // ch is valid until c is changed.
func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) {
ch.reset() ch.reset()
valuesWriter := &sw.fieldValuesWriter
bloomFilterWriter := &sw.fieldBloomFilterWriter
if c.name == "" {
valuesWriter = &sw.messageValuesWriter
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = c.name ch.name = c.name
bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name)
// encode values // encode values
ve := getValuesEncoder() ve := getValuesEncoder()
ch.valueType, ch.minValue, ch.maxValue = ve.encode(c.values, &ch.valuesDict) ch.valueType, ch.minValue, ch.maxValue = ve.encode(c.values, &ch.valuesDict)
@ -176,15 +171,15 @@ func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
if ch.valuesSize > maxValuesBlockSize { if ch.valuesSize > maxValuesBlockSize {
logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize) logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
} }
ch.valuesOffset = valuesWriter.bytesWritten ch.valuesOffset = bloomValuesWriter.values.bytesWritten
valuesWriter.MustWrite(bb.B) bloomValuesWriter.values.MustWrite(bb.B)
// create and marshal bloom filter for c.values // create and marshal bloom filter for c.values
if ch.valueType != valueTypeDict { if ch.valueType != valueTypeDict {
tokensBuf := getTokensBuf() hashesBuf := encoding.GetUint64s(0)
tokensBuf.A = tokenizeStrings(tokensBuf.A[:0], c.values) hashesBuf.A = tokenizeHashes(hashesBuf.A[:0], c.values)
bb.B = bloomFilterMarshal(bb.B[:0], tokensBuf.A) bb.B = bloomFilterMarshalHashes(bb.B[:0], hashesBuf.A)
putTokensBuf(tokensBuf) encoding.PutUint64s(hashesBuf)
} else { } else {
// there is no need in ecoding bloom filter for dictionary type, // there is no need in ecoding bloom filter for dictionary type,
// since it isn't used during querying - all the dictionary values are available in ch.valuesDict // since it isn't used during querying - all the dictionary values are available in ch.valuesDict
@ -194,8 +189,8 @@ func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
if ch.bloomFilterSize > maxBloomFilterBlockSize { if ch.bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize) logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize)
} }
ch.bloomFilterOffset = bloomFilterWriter.bytesWritten ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten
bloomFilterWriter.MustWrite(bb.B) bloomValuesWriter.bloom.MustWrite(bb.B)
} }
func (b *block) assertValid() { func (b *block) assertValid() {
@ -436,7 +431,7 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v
} }
// mustWriteTo writes b with the given sid to sw and updates bh accordingly. // mustWriteTo writes b with the given sid to sw and updates bh accordingly.
func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
// Do not store the version used for encoding directly in the block data, since: // Do not store the version used for encoding directly in the block data, since:
// - all the blocks in the same part use the same encoding // - all the blocks in the same part use the same encoding
// - the block encoding version can be put in metadata file for the part (aka metadataFilename) // - the block encoding version can be put in metadata file for the part (aka metadataFilename)
@ -458,22 +453,13 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
chs := csh.resizeColumnHeaders(len(cs)) chs := csh.resizeColumnHeaders(len(cs))
for i := range cs { for i := range cs {
cs[i].mustWriteToNoArena(&chs[i], sw) cs[i].mustWriteTo(&chs[i], sw)
} }
csh.constColumns = append(csh.constColumns[:0], b.constColumns...) csh.constColumns = append(csh.constColumns[:0], b.constColumns...)
bb := longTermBufPool.Get() csh.mustWriteTo(bh, sw, g)
bb.B = csh.marshal(bb.B)
putColumnsHeader(csh) putColumnsHeader(csh)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
}
sw.columnsHeaderWriter.MustWrite(bb.B)
longTermBufPool.Put(bb)
} }
// appendRowsTo appends log entries from b to dst. // appendRowsTo appends log entries from b to dst.

View file

@ -93,7 +93,7 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd *
} }
// mustWriteTo writes bd to sw and updates bh accordingly // mustWriteTo writes bd to sw and updates bh accordingly
func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
// Do not store the version used for encoding directly in the block data, since: // Do not store the version used for encoding directly in the block data, since:
// - all the blocks in the same part use the same encoding // - all the blocks in the same part use the same encoding
// - the block encoding version can be put in metadata file for the part (aka metadataFilename) // - the block encoding version can be put in metadata file for the part (aka metadataFilename)
@ -114,28 +114,19 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
chs := csh.resizeColumnHeaders(len(cds)) chs := csh.resizeColumnHeaders(len(cds))
for i := range cds { for i := range cds {
cds[i].mustWriteToNoArena(&chs[i], sw) cds[i].mustWriteTo(&chs[i], sw)
} }
csh.constColumns = append(csh.constColumns[:0], bd.constColumns...) csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)
bb := longTermBufPool.Get() csh.mustWriteTo(bh, sw, g)
bb.B = csh.marshal(bb.B)
putColumnsHeader(csh) putColumnsHeader(csh)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(bb.B))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
}
sw.columnsHeaderWriter.MustWrite(bb.B)
longTermBufPool.Put(bb)
} }
// mustReadFrom reads block data associated with bh from sr to bd. // mustReadFrom reads block data associated with bh from sr to bd.
// //
// The bd is valid until a.reset() is called. // The bd is valid until a.reset() is called.
func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) { func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders, partFormatVersion uint, columnNames []string) {
bd.reset() bd.reset()
bd.streamID = bh.streamID bd.streamID = bh.streamID
@ -159,19 +150,46 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders)
sr.columnsHeaderReader.MustReadFull(bb.B) sr.columnsHeaderReader.MustReadFull(bb.B)
csh := getColumnsHeader() csh := getColumnsHeader()
if err := csh.unmarshalNoArena(bb.B); err != nil { if err := csh.unmarshalNoArena(bb.B, partFormatVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err) logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err)
} }
if partFormatVersion >= 1 {
readColumnNamesFromColumnsHeaderIndex(bh, sr, csh, columnNames)
}
chs := csh.columnHeaders chs := csh.columnHeaders
cds := bd.resizeColumnsData(len(chs)) cds := bd.resizeColumnsData(len(chs))
for i := range chs { for i := range chs {
cds[i].mustReadFrom(a, &chs[i], sr) cds[i].mustReadFrom(a, &chs[i], sr, partFormatVersion)
} }
bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns) bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns)
putColumnsHeader(csh) putColumnsHeader(csh)
longTermBufPool.Put(bb) longTermBufPool.Put(bb)
} }
func readColumnNamesFromColumnsHeaderIndex(bh *blockHeader, sr *streamReaders, csh *columnsHeader, columnNames []string) {
bb := longTermBufPool.Get()
defer longTermBufPool.Put(bb)
n := bh.columnsHeaderIndexSize
if n > maxColumnsHeaderIndexSize {
logger.Panicf("BUG: %s: too big columnsHeaderIndexSize: %d bytes; mustn't exceed %d bytes", sr.columnsHeaderIndexReader.Path(), n, maxColumnsHeaderIndexSize)
}
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(n))
sr.columnsHeaderIndexReader.MustReadFull(bb.B)
cshIndex := getColumnsHeaderIndex()
if err := cshIndex.unmarshalNoArena(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columnsHeaderIndex: %s", sr.columnsHeaderIndexReader.Path(), err)
}
if err := csh.setColumnNames(cshIndex, columnNames); err != nil {
logger.Panicf("FATAL: %s: %s", sr.columnsHeaderIndexReader.Path(), err)
}
putColumnsHeaderIndex(cshIndex)
}
// timestampsData contains the encoded timestamps data. // timestampsData contains the encoded timestamps data.
type timestampsData struct { type timestampsData struct {
// data contains packed timestamps data. // data contains packed timestamps data.
@ -307,16 +325,9 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) {
// mustWriteTo writes cd to sw and updates ch accordingly. // mustWriteTo writes cd to sw and updates ch accordingly.
// //
// ch is valid until cd is changed. // ch is valid until cd is changed.
func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) {
ch.reset() ch.reset()
valuesWriter := &sw.fieldValuesWriter
bloomFilterWriter := &sw.fieldBloomFilterWriter
if cd.name == "" {
valuesWriter = &sw.messageValuesWriter
bloomFilterWriter = &sw.messageBloomFilterWriter
}
ch.name = cd.name ch.name = cd.name
ch.valueType = cd.valueType ch.valueType = cd.valueType
@ -324,36 +335,31 @@ func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
ch.maxValue = cd.maxValue ch.maxValue = cd.maxValue
ch.valuesDict.copyFromNoArena(&cd.valuesDict) ch.valuesDict.copyFromNoArena(&cd.valuesDict)
bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name)
// marshal values // marshal values
ch.valuesSize = uint64(len(cd.valuesData)) ch.valuesSize = uint64(len(cd.valuesData))
if ch.valuesSize > maxValuesBlockSize { if ch.valuesSize > maxValuesBlockSize {
logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize) logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize)
} }
ch.valuesOffset = valuesWriter.bytesWritten ch.valuesOffset = bloomValuesWriter.values.bytesWritten
valuesWriter.MustWrite(cd.valuesData) bloomValuesWriter.values.MustWrite(cd.valuesData)
// marshal bloom filter // marshal bloom filter
ch.bloomFilterSize = uint64(len(cd.bloomFilterData)) ch.bloomFilterSize = uint64(len(cd.bloomFilterData))
if ch.bloomFilterSize > maxBloomFilterBlockSize { if ch.bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize) logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize)
} }
ch.bloomFilterOffset = bloomFilterWriter.bytesWritten ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten
bloomFilterWriter.MustWrite(cd.bloomFilterData) bloomValuesWriter.bloom.MustWrite(cd.bloomFilterData)
} }
// mustReadFrom reads columns data associated with ch from sr to cd. // mustReadFrom reads columns data associated with ch from sr to cd.
// //
// cd is valid until a.reset() is called. // cd is valid until a.reset() is called.
func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders) { func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders, partFormatVersion uint) {
cd.reset() cd.reset()
valuesReader := &sr.fieldValuesReader
bloomFilterReader := &sr.fieldBloomFilterReader
if ch.name == "" {
valuesReader = &sr.messageValuesReader
bloomFilterReader = &sr.messageBloomFilterReader
}
cd.name = a.copyString(ch.name) cd.name = a.copyString(ch.name)
cd.valueType = ch.valueType cd.valueType = ch.valueType
@ -361,30 +367,32 @@ func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders
cd.maxValue = ch.maxValue cd.maxValue = ch.maxValue
cd.valuesDict.copyFrom(a, &ch.valuesDict) cd.valuesDict.copyFrom(a, &ch.valuesDict)
bloomValuesReader := sr.getBloomValuesReaderForColumnName(ch.name, partFormatVersion)
// read values // read values
if ch.valuesOffset != valuesReader.bytesRead { if ch.valuesOffset != bloomValuesReader.values.bytesRead {
logger.Panicf("FATAL: %s: unexpected columnHeader.valuesOffset=%d; must equal to the number of bytes read: %d", logger.Panicf("FATAL: %s: unexpected columnHeader.valuesOffset=%d; must equal to the number of bytes read: %d",
valuesReader.Path(), ch.valuesOffset, valuesReader.bytesRead) bloomValuesReader.values.Path(), ch.valuesOffset, bloomValuesReader.values.bytesRead)
} }
valuesSize := ch.valuesSize valuesSize := ch.valuesSize
if valuesSize > maxValuesBlockSize { if valuesSize > maxValuesBlockSize {
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", valuesReader.Path(), maxValuesBlockSize, valuesSize) logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bloomValuesReader.values.Path(), maxValuesBlockSize, valuesSize)
} }
cd.valuesData = a.newBytes(int(valuesSize)) cd.valuesData = a.newBytes(int(valuesSize))
valuesReader.MustReadFull(cd.valuesData) bloomValuesReader.values.MustReadFull(cd.valuesData)
// read bloom filter // read bloom filter
// bloom filter is missing in valueTypeDict. // bloom filter is missing in valueTypeDict.
if ch.valueType != valueTypeDict { if ch.valueType != valueTypeDict {
if ch.bloomFilterOffset != bloomFilterReader.bytesRead { if ch.bloomFilterOffset != bloomValuesReader.bloom.bytesRead {
logger.Panicf("FATAL: %s: unexpected columnHeader.bloomFilterOffset=%d; must equal to the number of bytes read: %d", logger.Panicf("FATAL: %s: unexpected columnHeader.bloomFilterOffset=%d; must equal to the number of bytes read: %d",
bloomFilterReader.Path(), ch.bloomFilterOffset, bloomFilterReader.bytesRead) bloomValuesReader.bloom.Path(), ch.bloomFilterOffset, bloomValuesReader.bloom.bytesRead)
} }
bloomFilterSize := ch.bloomFilterSize bloomFilterSize := ch.bloomFilterSize
if bloomFilterSize > maxBloomFilterBlockSize { if bloomFilterSize > maxBloomFilterBlockSize {
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomFilterReader.Path(), maxBloomFilterBlockSize, bloomFilterSize) logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomValuesReader.bloom.Path(), maxBloomFilterBlockSize, bloomFilterSize)
} }
cd.bloomFilterData = a.newBytes(int(bloomFilterSize)) cd.bloomFilterData = a.newBytes(int(bloomFilterSize))
bloomFilterReader.MustReadFull(cd.bloomFilterData) bloomValuesReader.bloom.MustReadFull(cd.bloomFilterData)
} }
} }

View file

@ -27,6 +27,12 @@ type blockHeader struct {
// timestampsHeader contains information about timestamps for log entries in the block // timestampsHeader contains information about timestamps for log entries in the block
timestampsHeader timestampsHeader timestampsHeader timestampsHeader
// columnsHeaderIndexOffset is the offset of columnsHeaderIndex at columnsHeaderIndexFilename
columnsHeaderIndexOffset uint64
// columnsHeaderIndexSize is the size of columnsHeaderIndex at columnsHeaderIndexFilename
columnsHeaderIndexSize uint64
// columnsHeaderOffset is the offset of columnsHeader at columnsHeaderFilename // columnsHeaderOffset is the offset of columnsHeader at columnsHeaderFilename
columnsHeaderOffset uint64 columnsHeaderOffset uint64
@ -40,6 +46,8 @@ func (bh *blockHeader) reset() {
bh.uncompressedSizeBytes = 0 bh.uncompressedSizeBytes = 0
bh.rowsCount = 0 bh.rowsCount = 0
bh.timestampsHeader.reset() bh.timestampsHeader.reset()
bh.columnsHeaderIndexOffset = 0
bh.columnsHeaderIndexSize = 0
bh.columnsHeaderOffset = 0 bh.columnsHeaderOffset = 0
bh.columnsHeaderSize = 0 bh.columnsHeaderSize = 0
} }
@ -51,6 +59,8 @@ func (bh *blockHeader) copyFrom(src *blockHeader) {
bh.uncompressedSizeBytes = src.uncompressedSizeBytes bh.uncompressedSizeBytes = src.uncompressedSizeBytes
bh.rowsCount = src.rowsCount bh.rowsCount = src.rowsCount
bh.timestampsHeader.copyFrom(&src.timestampsHeader) bh.timestampsHeader.copyFrom(&src.timestampsHeader)
bh.columnsHeaderIndexOffset = src.columnsHeaderIndexOffset
bh.columnsHeaderIndexSize = src.columnsHeaderIndexSize
bh.columnsHeaderOffset = src.columnsHeaderOffset bh.columnsHeaderOffset = src.columnsHeaderOffset
bh.columnsHeaderSize = src.columnsHeaderSize bh.columnsHeaderSize = src.columnsHeaderSize
} }
@ -59,12 +69,14 @@ func (bh *blockHeader) copyFrom(src *blockHeader) {
func (bh *blockHeader) marshal(dst []byte) []byte { func (bh *blockHeader) marshal(dst []byte) []byte {
// Do not store the version used for encoding directly in the block header, since: // Do not store the version used for encoding directly in the block header, since:
// - all the block headers in the same part use the same encoding // - all the block headers in the same part use the same encoding
// - the block header encoding version can be put in metadata file for the part (aka metadataFilename) // - the format encoding version is stored in metadata file for the part (aka metadataFilename)
dst = bh.streamID.marshal(dst) dst = bh.streamID.marshal(dst)
dst = encoding.MarshalVarUint64(dst, bh.uncompressedSizeBytes) dst = encoding.MarshalVarUint64(dst, bh.uncompressedSizeBytes)
dst = encoding.MarshalVarUint64(dst, bh.rowsCount) dst = encoding.MarshalVarUint64(dst, bh.rowsCount)
dst = bh.timestampsHeader.marshal(dst) dst = bh.timestampsHeader.marshal(dst)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderIndexOffset)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderIndexSize)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderOffset) dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderOffset)
dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderSize) dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderSize)
@ -72,7 +84,7 @@ func (bh *blockHeader) marshal(dst []byte) []byte {
} }
// unmarshal unmarshals bh from src and returns the remaining tail. // unmarshal unmarshals bh from src and returns the remaining tail.
func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) { func (bh *blockHeader) unmarshal(src []byte, partFormatVersion uint) ([]byte, error) {
bh.reset() bh.reset()
srcOrig := src srcOrig := src
@ -110,6 +122,24 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) {
} }
src = tail src = tail
if partFormatVersion >= 1 {
// unmarshal columnsHeaderIndexOffset
n, nSize = encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderIndexOffset")
}
src = src[nSize:]
bh.columnsHeaderIndexOffset = n
// unmarshal columnsHeaderIndexSize
n, nSize = encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderIndexSize")
}
src = src[nSize:]
bh.columnsHeaderIndexSize = n
}
// unmarshal columnsHeaderOffset // unmarshal columnsHeaderOffset
n, nSize = encoding.UnmarshalVarUint64(src) n, nSize = encoding.UnmarshalVarUint64(src)
if nSize <= 0 { if nSize <= 0 {
@ -148,8 +178,8 @@ func putBlockHeader(bh *blockHeader) {
var blockHeaderPool sync.Pool var blockHeaderPool sync.Pool
// unmarshalBlockHeaders appends unmarshaled from src blockHeader entries to dst and returns the result. // unmarshalBlockHeaders appends unmarshaled from src blockHeader entries to dst and returns the result.
func unmarshalBlockHeaders(dst []blockHeader, src []byte) ([]blockHeader, error) { func unmarshalBlockHeaders(dst []blockHeader, src []byte, partFormatVersion uint) ([]blockHeader, error) {
dstOrig := dst dstLen := len(dst)
for len(src) > 0 { for len(src) > 0 {
if len(dst) < cap(dst) { if len(dst) < cap(dst) {
dst = dst[:len(dst)+1] dst = dst[:len(dst)+1]
@ -157,14 +187,14 @@ func unmarshalBlockHeaders(dst []blockHeader, src []byte) ([]blockHeader, error)
dst = append(dst, blockHeader{}) dst = append(dst, blockHeader{})
} }
bh := &dst[len(dst)-1] bh := &dst[len(dst)-1]
tail, err := bh.unmarshal(src) tail, err := bh.unmarshal(src, partFormatVersion)
if err != nil { if err != nil {
return dstOrig, fmt.Errorf("cannot unmarshal blockHeader entries: %w", err) return dst, fmt.Errorf("cannot unmarshal blockHeader entries: %w", err)
} }
src = tail src = tail
} }
if err := validateBlockHeaders(dst[len(dstOrig):]); err != nil { if err := validateBlockHeaders(dst[dstLen:]); err != nil {
return dstOrig, err return dst, err
} }
return dst, nil return dst, nil
} }
@ -195,6 +225,128 @@ func resetBlockHeaders(bhs []blockHeader) []blockHeader {
return bhs[:0] return bhs[:0]
} }
// columnHeaderRef references column header in the marshaled columnsHeader.
type columnHeaderRef struct {
// columnNameID is the ID of the column name. The column name can be obtained from part.columnNames.
columnNameID uint64
// offset is the offset of the the corresponding columnHeader inside marshaled columnsHeader.
offset uint64
}
// columnsHeaderIndex contains offsets for marshaled column headers.
type columnsHeaderIndex struct {
// columnHeadersRefs contains references to columnHeaders.
columnHeadersRefs []columnHeaderRef
// constColumnsRefs contains references to constColumns.
constColumnsRefs []columnHeaderRef
}
func getColumnsHeaderIndex() *columnsHeaderIndex {
v := columnsHeaderIndexPool.Get()
if v == nil {
return &columnsHeaderIndex{}
}
return v.(*columnsHeaderIndex)
}
func putColumnsHeaderIndex(cshIndex *columnsHeaderIndex) {
cshIndex.reset()
columnsHeaderIndexPool.Put(cshIndex)
}
var columnsHeaderIndexPool sync.Pool
func (cshIndex *columnsHeaderIndex) reset() {
clear(cshIndex.columnHeadersRefs)
cshIndex.columnHeadersRefs = cshIndex.columnHeadersRefs[:0]
clear(cshIndex.constColumnsRefs)
cshIndex.constColumnsRefs = cshIndex.constColumnsRefs[:0]
}
func (cshIndex *columnsHeaderIndex) resizeConstColumnsRefs(n int) []columnHeaderRef {
cshIndex.constColumnsRefs = slicesutil.SetLength(cshIndex.constColumnsRefs, n)
return cshIndex.constColumnsRefs
}
func (cshIndex *columnsHeaderIndex) resizeColumnHeadersRefs(n int) []columnHeaderRef {
cshIndex.columnHeadersRefs = slicesutil.SetLength(cshIndex.columnHeadersRefs, n)
return cshIndex.columnHeadersRefs
}
func (cshIndex *columnsHeaderIndex) marshal(dst []byte) []byte {
dst = marshalColumnHeadersRefs(dst, cshIndex.columnHeadersRefs)
dst = marshalColumnHeadersRefs(dst, cshIndex.constColumnsRefs)
return dst
}
// unmarshalNoArena unmarshals cshIndex from src.
//
// cshIndex is valid until src is changed.
func (cshIndex *columnsHeaderIndex) unmarshalNoArena(src []byte) error {
cshIndex.reset()
refs, tail, err := unmarshalColumnHeadersRefsNoArena(cshIndex.columnHeadersRefs[:0], src)
if err != nil {
return fmt.Errorf("cannot unmarshal columnHeadersRefs: %w", err)
}
cshIndex.columnHeadersRefs = refs
src = tail
refs, tail, err = unmarshalColumnHeadersRefsNoArena(cshIndex.constColumnsRefs[:0], src)
if err != nil {
return fmt.Errorf("cannot unmarshal constColumnsRefs: %w", err)
}
cshIndex.constColumnsRefs = refs
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling columnsHeaderIndex; len(tail)=%d", len(tail))
}
return nil
}
func marshalColumnHeadersRefs(dst []byte, refs []columnHeaderRef) []byte {
dst = encoding.MarshalVarUint64(dst, uint64(len(refs)))
for _, r := range refs {
dst = encoding.MarshalVarUint64(dst, r.columnNameID)
dst = encoding.MarshalVarUint64(dst, r.offset)
}
return dst
}
func unmarshalColumnHeadersRefsNoArena(dst []columnHeaderRef, src []byte) ([]columnHeaderRef, []byte, error) {
srcOrig := src
n, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return dst, srcOrig, fmt.Errorf("cannot unmarshal the number of columnHeaderRef items")
}
src = src[nSize:]
for i := uint64(0); i < n; i++ {
columnNameID, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return dst, srcOrig, fmt.Errorf("cannot unmarshal column name ID number %d out of %d", i, n)
}
src = src[nSize:]
offset, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
return dst, srcOrig, fmt.Errorf("cannot unmarshal offset number %d out of %d", i, n)
}
src = src[nSize:]
dst = append(dst, columnHeaderRef{
columnNameID: columnNameID,
offset: offset,
})
}
return dst, src, nil
}
func getColumnsHeader() *columnsHeader { func getColumnsHeader() *columnsHeader {
v := columnsHeaderPool.Get() v := columnsHeaderPool.Get()
if v == nil { if v == nil {
@ -235,27 +387,98 @@ func (csh *columnsHeader) reset() {
csh.constColumns = ccs[:0] csh.constColumns = ccs[:0]
} }
func (csh *columnsHeader) resizeConstColumns(columnsLen int) []Field { func (csh *columnsHeader) resizeConstColumns(n int) []Field {
csh.constColumns = slicesutil.SetLength(csh.constColumns, columnsLen) csh.constColumns = slicesutil.SetLength(csh.constColumns, n)
return csh.constColumns return csh.constColumns
} }
func (csh *columnsHeader) resizeColumnHeaders(columnHeadersLen int) []columnHeader { func (csh *columnsHeader) resizeColumnHeaders(n int) []columnHeader {
csh.columnHeaders = slicesutil.SetLength(csh.columnHeaders, columnHeadersLen) csh.columnHeaders = slicesutil.SetLength(csh.columnHeaders, n)
return csh.columnHeaders return csh.columnHeaders
} }
func (csh *columnsHeader) marshal(dst []byte) []byte { func (csh *columnsHeader) setColumnNames(cshIndex *columnsHeaderIndex, columnNames []string) error {
if len(cshIndex.columnHeadersRefs) != len(csh.columnHeaders) {
return fmt.Errorf("unpexected number of column headers; got %d; want %d", len(cshIndex.columnHeadersRefs), len(csh.columnHeaders))
}
for i := range csh.columnHeaders {
columnNameID := cshIndex.columnHeadersRefs[i].columnNameID
if columnNameID >= uint64(len(columnNames)) {
return fmt.Errorf("unexpected columnNameID=%d in columnHeadersRef; len(columnNames)=%d; columnNames=%v", columnNameID, len(columnNames), columnNames)
}
csh.columnHeaders[i].name = columnNames[columnNameID]
}
if len(cshIndex.constColumnsRefs) != len(csh.constColumns) {
return fmt.Errorf("unexpected number of const columns; got %d; want %d", len(cshIndex.constColumnsRefs), len(csh.constColumns))
}
for i := range csh.constColumns {
columnNameID := cshIndex.constColumnsRefs[i].columnNameID
if columnNameID >= uint64(len(columnNames)) {
return fmt.Errorf("unexpected columnNameID=%d in constColumnsRefs; len(columnNames)=%d; columnNames=%v", columnNameID, len(columnNames), columnNames)
}
csh.constColumns[i].Name = columnNames[columnNameID]
}
return nil
}
func (csh *columnsHeader) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) {
bb := longTermBufPool.Get()
defer longTermBufPool.Put(bb)
cshIndex := getColumnsHeaderIndex()
bb.B = csh.marshal(bb.B, cshIndex, g)
columnsHeaderData := bb.B
bb.B = cshIndex.marshal(bb.B)
columnsHeaderIndexData := bb.B[len(columnsHeaderData):]
putColumnsHeaderIndex(cshIndex)
bh.columnsHeaderIndexOffset = sw.columnsHeaderIndexWriter.bytesWritten
bh.columnsHeaderIndexSize = uint64(len(columnsHeaderIndexData))
if bh.columnsHeaderIndexSize > maxColumnsHeaderIndexSize {
logger.Panicf("BUG: too big columnsHeaderIndexSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderIndexSize, maxColumnsHeaderIndexSize)
}
sw.columnsHeaderIndexWriter.MustWrite(columnsHeaderIndexData)
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
bh.columnsHeaderSize = uint64(len(columnsHeaderData))
if bh.columnsHeaderSize > maxColumnsHeaderSize {
logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize)
}
sw.columnsHeaderWriter.MustWrite(columnsHeaderData)
}
func (csh *columnsHeader) marshal(dst []byte, cshIndex *columnsHeaderIndex, g *columnNameIDGenerator) []byte {
dstLen := len(dst)
chs := csh.columnHeaders chs := csh.columnHeaders
chsRefs := cshIndex.resizeColumnHeadersRefs(len(chs))
dst = encoding.MarshalVarUint64(dst, uint64(len(chs))) dst = encoding.MarshalVarUint64(dst, uint64(len(chs)))
for i := range chs { for i := range chs {
columnNameID := g.getColumnNameID(chs[i].name)
offset := len(dst) - dstLen
dst = chs[i].marshal(dst) dst = chs[i].marshal(dst)
chsRefs[i] = columnHeaderRef{
columnNameID: columnNameID,
offset: uint64(offset),
}
} }
ccs := csh.constColumns ccs := csh.constColumns
ccsRefs := cshIndex.resizeConstColumnsRefs(len(ccs))
dst = encoding.MarshalVarUint64(dst, uint64(len(ccs))) dst = encoding.MarshalVarUint64(dst, uint64(len(ccs)))
for i := range ccs { for i := range ccs {
dst = ccs[i].marshal(dst) columnNameID := g.getColumnNameID(ccs[i].Name)
offset := len(dst) - dstLen
dst = ccs[i].marshal(dst, false)
ccsRefs[i] = columnHeaderRef{
columnNameID: columnNameID,
offset: uint64(offset),
}
} }
return dst return dst
@ -264,7 +487,7 @@ func (csh *columnsHeader) marshal(dst []byte) []byte {
// unmarshalNoArena unmarshals csh from src. // unmarshalNoArena unmarshals csh from src.
// //
// csh is valid until src is changed. // csh is valid until src is changed.
func (csh *columnsHeader) unmarshalNoArena(src []byte) error { func (csh *columnsHeader) unmarshalNoArena(src []byte, partFormatVersion uint) error {
csh.reset() csh.reset()
// unmarshal columnHeaders // unmarshal columnHeaders
@ -279,7 +502,7 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
chs := csh.resizeColumnHeaders(int(n)) chs := csh.resizeColumnHeaders(int(n))
for i := range chs { for i := range chs {
tail, err := chs[i].unmarshalNoArena(src) tail, err := chs[i].unmarshalNoArena(src, partFormatVersion)
if err != nil { if err != nil {
return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err) return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err)
} }
@ -299,7 +522,7 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
ccs := csh.resizeConstColumns(int(n)) ccs := csh.resizeConstColumns(int(n))
for i := range ccs { for i := range ccs {
tail, err := ccs[i].unmarshalNoArena(src) tail, err := ccs[i].unmarshalNoArena(src, partFormatVersion < 1)
if err != nil { if err != nil {
return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err) return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err)
} }
@ -317,7 +540,8 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
// columnHeaders contains information for values, which belong to a single label in a single block. // columnHeaders contains information for values, which belong to a single label in a single block.
// //
// The main column with an empty name is stored in messageValuesFilename, // The main column with an empty name is stored in messageValuesFilename,
// while the rest of columns are stored in fieldValuesFilename. // while the rest of columns are stored in smallValuesFilename or bigValuesFilename depending
// on the block size (see maxSmallValuesBlockSize).
// This allows minimizing disk read IO when filtering by non-message columns. // This allows minimizing disk read IO when filtering by non-message columns.
// //
// Every block column contains also a bloom filter for all the tokens stored in the column. // Every block column contains also a bloom filter for all the tokens stored in the column.
@ -334,7 +558,8 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error {
// - valueTypeTimestampISO8601 stores encoded into uint64 timestamps // - valueTypeTimestampISO8601 stores encoded into uint64 timestamps
// //
// Bloom filters for main column with an empty name is stored in messageBloomFilename, // Bloom filters for main column with an empty name is stored in messageBloomFilename,
// while the rest of columns are stored in fieldBloomFilename. // while the rest of columns are stored in smallBloomFilename or bigBloomFilename depending on their size
// (see maxSmallBloomFilterBlockSize).
type columnHeader struct { type columnHeader struct {
// name contains column name aka label name // name contains column name aka label name
name string name string
@ -355,16 +580,16 @@ type columnHeader struct {
// valuesDict contains unique values for valueType = valueTypeDict // valuesDict contains unique values for valueType = valueTypeDict
valuesDict valuesDict valuesDict valuesDict
// valuesOffset contains the offset of the block in either messageValuesFilename or fieldValuesFilename // valuesOffset contains the offset of the block in either messageValuesFilename, smallValuesFilename or bigValuesFilename
valuesOffset uint64 valuesOffset uint64
// valuesSize contains the size of the block in either messageValuesFilename or fieldValuesFilename // valuesSize contains the size of the block in either messageValuesFilename, smallValuesFilename or bigValuesFilename
valuesSize uint64 valuesSize uint64
// bloomFilterOffset contains the offset of the bloom filter in either messageBloomFilename or fieldBloomFilename // bloomFilterOffset contains the offset of the bloom filter in messageBloomFilename, smallBloomFilename or bigBloomFilename
bloomFilterOffset uint64 bloomFilterOffset uint64
// bloomFilterSize contains the size of the bloom filter in either messageBloomFilename or fieldBloomFilename // bloomFilterSize contains the size of the bloom filter in messageBloomFilename, smallBloomFilename or bigBloomFilename
bloomFilterSize uint64 bloomFilterSize uint64
} }
@ -403,8 +628,9 @@ func (ch *columnHeader) marshal(dst []byte) []byte {
logger.Panicf("BUG: minValue=%d must be smaller than maxValue=%d for valueType=%d", ch.minValue, ch.maxValue, ch.valueType) logger.Panicf("BUG: minValue=%d must be smaller than maxValue=%d for valueType=%d", ch.minValue, ch.maxValue, ch.valueType)
} }
// Encode common fields - ch.name and ch.valueType // Do not encode ch.name, since it should be encoded at columnsHeaderIndex.columnHeadersRefs
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(ch.name))
// Encode common field - ch.valueType
dst = append(dst, byte(ch.valueType)) dst = append(dst, byte(ch.valueType))
// Encode other fields depending on ch.valueType // Encode other fields depending on ch.valueType
@ -472,18 +698,20 @@ func (ch *columnHeader) marshalBloomFilters(dst []byte) []byte {
// unmarshalNoArena unmarshals ch from src and returns the tail left after unmarshaling. // unmarshalNoArena unmarshals ch from src and returns the tail left after unmarshaling.
// //
// ch is valid until src is changed. // ch is valid until src is changed.
func (ch *columnHeader) unmarshalNoArena(src []byte) ([]byte, error) { func (ch *columnHeader) unmarshalNoArena(src []byte, partFormatVersion uint) ([]byte, error) {
ch.reset() ch.reset()
srcOrig := src srcOrig := src
// Unmarshal column name // Unmarshal column name
if partFormatVersion < 1 {
data, nSize := encoding.UnmarshalBytes(src) data, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 { if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal column name") return srcOrig, fmt.Errorf("cannot unmarshal column name")
} }
src = src[nSize:] src = src[nSize:]
ch.name = bytesutil.ToUnsafeString(data) ch.name = bytesutil.ToUnsafeString(data)
}
// Unmarshal value type // Unmarshal value type
if len(src) < 1 { if len(src) < 1 {

View file

@ -15,7 +15,7 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected lengths of the marshaled blockHeader; got %d; want %d", len(data), marshaledLen) t.Fatalf("unexpected lengths of the marshaled blockHeader; got %d; want %d", len(data), marshaledLen)
} }
bh2 := &blockHeader{} bh2 := &blockHeader{}
tail, err := bh2.unmarshal(data) tail, err := bh2.unmarshal(data, partFormatLatestVersion)
if err != nil { if err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err) t.Fatalf("unexpected error in unmarshal: %s", err)
} }
@ -26,7 +26,7 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", bh2, bh) t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", bh2, bh)
} }
} }
f(&blockHeader{}, 61) f(&blockHeader{}, 63)
f(&blockHeader{ f(&blockHeader{
streamID: streamID{ streamID: streamID{
tenantID: TenantID{ tenantID: TenantID{
@ -47,24 +47,71 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) {
maxTimestamp: 23434, maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2, marshalType: encoding.MarshalTypeNearestDelta2,
}, },
columnsHeaderIndexOffset: 8923481,
columnsHeaderIndexSize: 8989832,
columnsHeaderOffset: 4384, columnsHeaderOffset: 4384,
columnsHeaderSize: 894, columnsHeaderSize: 894,
}, 65) }, 73)
}
func TestColumnsHeaderIndexMarshalUnmarshal(t *testing.T) {
f := func(cshIndex *columnsHeaderIndex, marshaledLen int) {
t.Helper()
data := cshIndex.marshal(nil)
if len(data) != marshaledLen {
t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen)
}
cshIndex2 := &columnsHeaderIndex{}
if err := cshIndex2.unmarshalNoArena(data); err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err)
}
if !reflect.DeepEqual(cshIndex, cshIndex2) {
t.Fatalf("unexpected blockHeaderIndex unmarshaled\ngot\n%v\nwant\n%v", cshIndex2, cshIndex)
}
}
f(&columnsHeaderIndex{}, 2)
f(&columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{
{
columnNameID: 234,
offset: 123432,
},
{
columnNameID: 23898,
offset: 0,
},
},
constColumnsRefs: []columnHeaderRef{
{
columnNameID: 0,
offset: 8989,
},
},
}, 14)
} }
func TestColumnsHeaderMarshalUnmarshal(t *testing.T) { func TestColumnsHeaderMarshalUnmarshal(t *testing.T) {
f := func(csh *columnsHeader, marshaledLen int) { f := func(csh *columnsHeader, marshaledLen int) {
t.Helper() t.Helper()
data := csh.marshal(nil) cshIndex := getColumnsHeaderIndex()
g := &columnNameIDGenerator{}
data := csh.marshal(nil, cshIndex, g)
if len(data) != marshaledLen { if len(data) != marshaledLen {
t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen) t.Fatalf("unexpected length of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen)
} }
csh2 := &columnsHeader{} csh2 := &columnsHeader{}
err := csh2.unmarshalNoArena(data) if err := csh2.unmarshalNoArena(data, partFormatLatestVersion); err != nil {
if err != nil {
t.Fatalf("unexpected error in unmarshal: %s", err) t.Fatalf("unexpected error in unmarshal: %s", err)
} }
if err := csh2.setColumnNames(cshIndex, g.columnNames); err != nil {
t.Fatalf("cannot set column names: %s", err)
}
if !reflect.DeepEqual(csh, csh2) { if !reflect.DeepEqual(csh, csh2) {
t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", csh2, csh) t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", csh2, csh)
} }
@ -98,7 +145,7 @@ func TestColumnsHeaderMarshalUnmarshal(t *testing.T) {
Value: "bar", Value: "bar",
}, },
}, },
}, 50) }, 31)
} }
func TestBlockHeaderUnmarshalFailure(t *testing.T) { func TestBlockHeaderUnmarshalFailure(t *testing.T) {
@ -107,7 +154,7 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) {
dataOrig := append([]byte{}, data...) dataOrig := append([]byte{}, data...)
bh := getBlockHeader() bh := getBlockHeader()
defer putBlockHeader(bh) defer putBlockHeader(bh)
tail, err := bh.unmarshal(data) tail, err := bh.unmarshal(data, partFormatLatestVersion)
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
@ -138,6 +185,8 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) {
maxTimestamp: 23434, maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2, marshalType: encoding.MarshalTypeNearestDelta2,
}, },
columnsHeaderIndexOffset: 89434,
columnsHeaderIndexSize: 89123,
columnsHeaderOffset: 4384, columnsHeaderOffset: 4384,
columnsHeaderSize: 894, columnsHeaderSize: 894,
} }
@ -148,14 +197,52 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) {
} }
} }
func TestColumnsHeaderIndexUnmarshalFailure(t *testing.T) {
f := func(data []byte) {
t.Helper()
cshIndex := getColumnsHeaderIndex()
defer putColumnsHeaderIndex(cshIndex)
if err := cshIndex.unmarshalNoArena(data); err == nil {
t.Fatalf("expecting non-nil error")
}
}
f(nil)
f([]byte("foo"))
cshIndex := &columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{
{
columnNameID: 0,
offset: 123,
},
},
constColumnsRefs: []columnHeaderRef{
{
columnNameID: 2,
offset: 89834,
},
{
columnNameID: 234,
offset: 8934,
},
},
}
data := cshIndex.marshal(nil)
for len(data) > 0 {
data = data[:len(data)-1]
f(data)
}
}
func TestColumnsHeaderUnmarshalFailure(t *testing.T) { func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
f := func(data []byte) { f := func(data []byte) {
t.Helper() t.Helper()
csh := getColumnsHeader() csh := getColumnsHeader()
defer putColumnsHeader(csh) defer putColumnsHeader(csh)
err := csh.unmarshalNoArena(data) if err := csh.unmarshalNoArena(data, partFormatLatestVersion); err == nil {
if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
} }
@ -163,7 +250,7 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
f(nil) f(nil)
f([]byte("foo")) f([]byte("foo"))
csh := columnsHeader{ csh := &columnsHeader{
columnHeaders: []columnHeader{ columnHeaders: []columnHeader{
{ {
name: "foobar", name: "foobar",
@ -191,11 +278,14 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) {
}, },
}, },
} }
data := csh.marshal(nil) cshIndex := getColumnsHeaderIndex()
g := &columnNameIDGenerator{}
data := csh.marshal(nil, cshIndex, g)
for len(data) > 0 { for len(data) > 0 {
data = data[:len(data)-1] data = data[:len(data)-1]
f(data) f(data)
} }
putColumnsHeaderIndex(cshIndex)
} }
func TestBlockHeaderReset(t *testing.T) { func TestBlockHeaderReset(t *testing.T) {
@ -219,6 +309,8 @@ func TestBlockHeaderReset(t *testing.T) {
maxTimestamp: 23434, maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2, marshalType: encoding.MarshalTypeNearestDelta2,
}, },
columnsHeaderIndexOffset: 18934,
columnsHeaderIndexSize: 8912,
columnsHeaderOffset: 12332, columnsHeaderOffset: 12332,
columnsHeaderSize: 234, columnsHeaderSize: 234,
} }
@ -229,6 +321,35 @@ func TestBlockHeaderReset(t *testing.T) {
} }
} }
func TestColumnsHeaderIndexReset(t *testing.T) {
cshIndex := &columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{
{
columnNameID: 234,
offset: 1234,
},
},
constColumnsRefs: []columnHeaderRef{
{
columnNameID: 328,
offset: 21344,
},
{
columnNameID: 1,
offset: 234,
},
},
}
cshIndex.reset()
cshIndexZero := &columnsHeaderIndex{
columnHeadersRefs: []columnHeaderRef{},
constColumnsRefs: []columnHeaderRef{},
}
if !reflect.DeepEqual(cshIndex, cshIndexZero) {
t.Fatalf("unexpected non-zero columnsHeaderIndex after reset: %v", cshIndex)
}
}
func TestColumnsHeaderReset(t *testing.T) { func TestColumnsHeaderReset(t *testing.T) {
csh := &columnsHeader{ csh := &columnsHeader{
columnHeaders: []columnHeader{ columnHeaders: []columnHeader{
@ -278,7 +399,7 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) {
if len(data) != marshaledLen { if len(data) != marshaledLen {
t.Fatalf("unexpected length for marshaled blockHeader entries; got %d; want %d", len(data), marshaledLen) t.Fatalf("unexpected length for marshaled blockHeader entries; got %d; want %d", len(data), marshaledLen)
} }
bhs2, err := unmarshalBlockHeaders(nil, data) bhs2, err := unmarshalBlockHeaders(nil, data, partFormatLatestVersion)
if err != nil { if err != nil {
t.Fatalf("unexpected error when unmarshaling blockHeader entries: %s", err) t.Fatalf("unexpected error when unmarshaling blockHeader entries: %s", err)
} }
@ -287,7 +408,7 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) {
} }
} }
f(nil, 0) f(nil, 0)
f([]blockHeader{{}}, 61) f([]blockHeader{{}}, 63)
f([]blockHeader{ f([]blockHeader{
{}, {},
{ {
@ -310,10 +431,12 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) {
maxTimestamp: 23434, maxTimestamp: 23434,
marshalType: encoding.MarshalTypeNearestDelta2, marshalType: encoding.MarshalTypeNearestDelta2,
}, },
columnsHeaderIndexOffset: 1234,
columnsHeaderIndexSize: 89324,
columnsHeaderOffset: 12332, columnsHeaderOffset: 12332,
columnsHeaderSize: 234, columnsHeaderSize: 234,
}, },
}, 127) }, 134)
} }
func TestColumnHeaderMarshalUnmarshal(t *testing.T) { func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
@ -325,13 +448,17 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen) t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen)
} }
var ch2 columnHeader var ch2 columnHeader
tail, err := ch2.unmarshalNoArena(data) tail, err := ch2.unmarshalNoArena(data, partFormatLatestVersion)
if err != nil { if err != nil {
t.Fatalf("unexpected error in umarshal(%v): %s", ch, err) t.Fatalf("unexpected error in umarshal(%v): %s", ch, err)
} }
if len(tail) > 0 { if len(tail) > 0 {
t.Fatalf("unexpected non-empty tail after unmarshal(%v): %X", ch, tail) t.Fatalf("unexpected non-empty tail after unmarshal(%v): %X", ch, tail)
} }
// columnHeader.name isn't marshaled, since it is marshaled via columnsHeaderIndex starting from part format v1.
ch2.name = ch.name
if !reflect.DeepEqual(ch, &ch2) { if !reflect.DeepEqual(ch, &ch2) {
t.Fatalf("unexpected columnHeader after unmarshal;\ngot\n%v\nwant\n%v", &ch2, ch) t.Fatalf("unexpected columnHeader after unmarshal;\ngot\n%v\nwant\n%v", &ch2, ch)
} }
@ -340,7 +467,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
f(&columnHeader{ f(&columnHeader{
name: "foo", name: "foo",
valueType: valueTypeUint8, valueType: valueTypeUint8,
}, 11) }, 7)
ch := &columnHeader{ ch := &columnHeader{
name: "foobar", name: "foobar",
valueType: valueTypeDict, valueType: valueTypeDict,
@ -349,7 +476,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) {
valuesSize: 254452, valuesSize: 254452,
} }
ch.valuesDict.getOrAdd("abc") ch.valuesDict.getOrAdd("abc")
f(ch, 18) f(ch, 11)
} }
func TestColumnHeaderUnmarshalFailure(t *testing.T) { func TestColumnHeaderUnmarshalFailure(t *testing.T) {
@ -358,7 +485,7 @@ func TestColumnHeaderUnmarshalFailure(t *testing.T) {
dataOrig := append([]byte{}, data...) dataOrig := append([]byte{}, data...)
var ch columnHeader var ch columnHeader
tail, err := ch.unmarshalNoArena(data) tail, err := ch.unmarshalNoArena(data, partFormatLatestVersion)
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }

View file

@ -1,11 +1,13 @@
package logstorage package logstorage
import ( import (
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
// The number of blocks to search at once by a single worker // The number of blocks to search at once by a single worker
@ -113,17 +115,35 @@ type blockSearch struct {
// sbu is used for unmarshaling local columns // sbu is used for unmarshaling local columns
sbu stringsBlockUnmarshaler sbu stringsBlockUnmarshaler
// cshIndexBlockCache holds columnsHeaderIndex data for the given block.
//
// It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexBlockCache []byte
// cshBlockCache holds columnsHeader data for the given block. // cshBlockCache holds columnsHeader data for the given block.
// //
// it is initialized lazily by calling getColumnsHeader(). // It is initialized lazily by calling getColumnsHeaderBlock().
cshBlockCache []byte cshBlockCache []byte
cshBlockInitialized bool
// cshCache is the columnsHeader associated with the given block // ccsCache is the cache for accessed const columns
ccsCache []Field
// chsCache is the cache for accessed column headers
chsCache []columnHeader
// cshIndexCache is the columnsHeaderIndex associated with the given block.
// //
// it is initialized lazily by calling getColumnsHeader(). // It is initialized lazily by calling getColumnsHeaderIndex().
cshIndexCache *columnsHeaderIndex
// cshCache is the columnsHeader associated with the given block.
//
// It is initialized lazily by calling getColumnsHeaderV0().
cshCache *columnsHeader cshCache *columnsHeader
// seenStreams contains seen streamIDs for the recent searches. // seenStreams contains seen streamIDs for the recent searches.
//
// It is used for speeding up fetching _stream column. // It is used for speeding up fetching _stream column.
seenStreams map[u128]string seenStreams map[u128]string
} }
@ -151,7 +171,27 @@ func (bs *blockSearch) reset() {
bs.sbu.reset() bs.sbu.reset()
bs.cshIndexBlockCache = bs.cshIndexBlockCache[:0]
bs.cshBlockCache = bs.cshBlockCache[:0] bs.cshBlockCache = bs.cshBlockCache[:0]
bs.cshBlockInitialized = false
ccsCache := bs.ccsCache
for i := range ccsCache {
ccsCache[i].Reset()
}
bs.ccsCache = ccsCache[:0]
chsCache := bs.chsCache
for i := range chsCache {
chsCache[i].reset()
}
bs.chsCache = chsCache[:0]
if bs.cshIndexCache != nil {
putColumnsHeaderIndex(bs.cshIndexCache)
bs.cshIndexCache = nil
}
if bs.cshCache != nil { if bs.cshCache != nil {
putColumnsHeader(bs.cshCache) putColumnsHeader(bs.cshCache)
@ -190,12 +230,17 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) {
} }
} }
func (bs *blockSearch) partFormatVersion() uint {
return bs.bsw.p.ph.FormatVersion
}
func (bs *blockSearch) getConstColumnValue(name string) string { func (bs *blockSearch) getConstColumnValue(name string) string {
if name == "_msg" { if name == "_msg" {
name = "" name = ""
} }
csh := bs.getColumnsHeader() if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
for _, cc := range csh.constColumns { for _, cc := range csh.constColumns {
if cc.Name == name { if cc.Name == name {
return cc.Value return cc.Value
@ -204,12 +249,46 @@ func (bs *blockSearch) getConstColumnValue(name string) string {
return "" return ""
} }
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return ""
}
for i := range bs.ccsCache {
if bs.ccsCache[i].Name == name {
return bs.ccsCache[i].Value
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.constColumnsRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for const column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.ccsCache = slicesutil.SetLength(bs.ccsCache, len(bs.ccsCache)+1)
cc := &bs.ccsCache[len(bs.ccsCache)-1]
if _, err := cc.unmarshalNoArena(b, false); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for const column %q: %s", bs.bsw.p.path, name, err)
}
cc.Name = strings.Clone(name)
return cc.Value
}
return ""
}
func (bs *blockSearch) getColumnHeader(name string) *columnHeader { func (bs *blockSearch) getColumnHeader(name string) *columnHeader {
if name == "_msg" { if name == "_msg" {
name = "" name = ""
} }
csh := bs.getColumnsHeader() if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
chs := csh.columnHeaders chs := csh.columnHeaders
for i := range chs { for i := range chs {
ch := &chs[i] ch := &chs[i]
@ -220,35 +299,146 @@ func (bs *blockSearch) getColumnHeader(name string) *columnHeader {
return nil return nil
} }
columnNameID, ok := bs.getColumnNameID(name)
if !ok {
return nil
}
for i := range bs.chsCache {
if bs.chsCache[i].name == name {
return &bs.chsCache[i]
}
}
cshIndex := bs.getColumnsHeaderIndex()
for _, cr := range cshIndex.columnHeadersRefs {
if cr.columnNameID != columnNameID {
continue
}
b := bs.getColumnsHeaderBlock()
if cr.offset > uint64(len(b)) {
logger.Panicf("FATAL: %s: header offset for column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
}
b = b[cr.offset:]
bs.chsCache = slicesutil.SetLength(bs.chsCache, len(bs.chsCache)+1)
ch := &bs.chsCache[len(bs.chsCache)-1]
if _, err := ch.unmarshalNoArena(b, partFormatLatestVersion); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal header for column %q: %s", bs.bsw.p.path, name, err)
}
ch.name = strings.Clone(name)
return ch
}
return nil
}
func (bs *blockSearch) getColumnNameID(name string) (uint64, bool) {
id, ok := bs.bsw.p.columnNameIDs[name]
return id, ok
}
func (bs *blockSearch) getColumnNameByID(id uint64) (string, bool) {
columnNames := bs.bsw.p.columnNames
if id >= uint64(len(columnNames)) {
return "", false
}
return columnNames[id], true
}
func (bs *blockSearch) getConstColumns() []Field { func (bs *blockSearch) getConstColumns() []Field {
csh := bs.getColumnsHeader() if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
return csh.constColumns return csh.constColumns
} }
chsIndex := bs.getColumnsHeaderIndex()
for _, cr := range chsIndex.constColumnsRefs {
columnName, ok := bs.getColumnNameByID(cr.columnNameID)
if !ok {
logger.Panicf("FATAL: %s: missing column name for id=%d", bs.bsw.p.path, cr.columnNameID)
}
_ = bs.getConstColumnValue(columnName)
}
return bs.ccsCache
}
func (bs *blockSearch) getColumnHeaders() []columnHeader { func (bs *blockSearch) getColumnHeaders() []columnHeader {
csh := bs.getColumnsHeader() if bs.partFormatVersion() < 1 {
csh := bs.getColumnsHeaderV0()
return csh.columnHeaders return csh.columnHeaders
} }
func (bs *blockSearch) getColumnsHeader() *columnsHeader { chsIndex := bs.getColumnsHeaderIndex()
for _, cr := range chsIndex.columnHeadersRefs {
columnName, ok := bs.getColumnNameByID(cr.columnNameID)
if !ok {
logger.Panicf("FATAL: %s: missing column name for id=%d", bs.bsw.p.path, cr.columnNameID)
}
_ = bs.getColumnHeader(columnName)
}
return bs.chsCache
}
func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex {
if bs.partFormatVersion() < 1 {
logger.Panicf("BUG: getColumnsHeaderIndex() can be called only for part encoding v1+, while it has been called for v%d", bs.partFormatVersion())
}
if bs.cshIndexCache == nil {
bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshIndexCache = getColumnsHeaderIndex()
if err := bs.cshIndexCache.unmarshalNoArena(bs.cshIndexBlockCache); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err)
}
}
return bs.cshIndexCache
}
func (bs *blockSearch) getColumnsHeaderV0() *columnsHeader {
if bs.partFormatVersion() >= 1 {
logger.Panicf("BUG: getColumnsHeaderV0() can be called only for part encoding v0, while it has been called for v%d", bs.partFormatVersion())
}
if bs.cshCache == nil { if bs.cshCache == nil {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh) b := bs.getColumnsHeaderBlock()
bs.cshCache = getColumnsHeader() bs.cshCache = getColumnsHeader()
if err := bs.cshCache.unmarshalNoArena(bs.cshBlockCache); err != nil { if err := bs.cshCache.unmarshalNoArena(b, 0); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err) logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err)
} }
} }
return bs.cshCache return bs.cshCache
} }
func (bs *blockSearch) getColumnsHeaderBlock() []byte {
if !bs.cshBlockInitialized {
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
bs.cshBlockInitialized = true
}
return bs.cshBlockCache
}
func readColumnsHeaderIndexBlock(dst []byte, p *part, bh *blockHeader) []byte {
n := bh.columnsHeaderIndexSize
if n > maxColumnsHeaderIndexSize {
logger.Panicf("FATAL: %s: columns header index size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderIndexSize, n)
}
dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderIndexFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderIndexOffset))
return dst
}
func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte { func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte {
columnsHeaderSize := bh.columnsHeaderSize n := bh.columnsHeaderSize
if columnsHeaderSize > maxColumnsHeaderSize { if n > maxColumnsHeaderSize {
logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, columnsHeaderSize) logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, n)
} }
dstLen := len(dst) dstLen := len(dst)
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(columnsHeaderSize)+dstLen) dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset)) p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset))
return dst return dst
} }
@ -263,11 +453,7 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
} }
p := bs.bsw.p p := bs.bsw.p
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
bloomFilterFile := p.fieldBloomFilterFile
if ch.name == "" {
bloomFilterFile = p.messageBloomFilterFile
}
bb := longTermBufPool.Get() bb := longTermBufPool.Get()
bloomFilterSize := ch.bloomFilterSize bloomFilterSize := ch.bloomFilterSize
@ -275,7 +461,8 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxBloomFilterBlockSize, bloomFilterSize) logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxBloomFilterBlockSize, bloomFilterSize)
} }
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize)) bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize))
bloomFilterFile.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
bf = getBloomFilter() bf = getBloomFilter()
if err := bf.unmarshal(bb.B); err != nil { if err := bf.unmarshal(bb.B); err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err) logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err)
@ -299,11 +486,7 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
} }
p := bs.bsw.p p := bs.bsw.p
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
valuesFile := p.fieldValuesFile
if ch.name == "" {
valuesFile = p.messageValuesFile
}
bb := longTermBufPool.Get() bb := longTermBufPool.Get()
valuesSize := ch.valuesSize valuesSize := ch.valuesSize
@ -311,7 +494,7 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxValuesBlockSize, valuesSize) logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxValuesBlockSize, valuesSize)
} }
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize)) bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize))
valuesFile.MustReadAt(bb.B, int64(ch.valuesOffset)) bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset))
values = getStringBucket() values = getStringBucket()
var err error var err error
@ -378,7 +561,7 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b
logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err) logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
} }
dst, err = unmarshalBlockHeaders(dst, bb.B) dst, err = unmarshalBlockHeaders(dst, bb.B, p.ph.FormatVersion)
longTermBufPool.Put(bb) longTermBufPool.Put(bb)
if err != nil { if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal block headers read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err) logger.Panicf("FATAL: %s: cannot unmarshal block headers read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)

View file

@ -4,6 +4,9 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -43,68 +46,132 @@ func (r *readerWithStats) Read(p []byte) (int, error) {
} }
func (r *readerWithStats) MustClose() { func (r *readerWithStats) MustClose() {
if r.r != nil {
r.r.MustClose() r.r.MustClose()
r.r = nil r.r = nil
} }
}
// streamReaders contains readers for blockStreamReader // streamReaders contains readers for blockStreamReader
type streamReaders struct { type streamReaders struct {
columnNamesReader readerWithStats
metaindexReader readerWithStats metaindexReader readerWithStats
indexReader readerWithStats indexReader readerWithStats
columnsHeaderIndexReader readerWithStats
columnsHeaderReader readerWithStats columnsHeaderReader readerWithStats
timestampsReader readerWithStats timestampsReader readerWithStats
fieldValuesReader readerWithStats
fieldBloomFilterReader readerWithStats messageBloomValuesReader bloomValuesReader
messageValuesReader readerWithStats oldBloomValuesReader bloomValuesReader
messageBloomFilterReader readerWithStats bloomValuesShards [bloomValuesShardsCount]bloomValuesReader
}
type bloomValuesReader struct {
bloom readerWithStats
values readerWithStats
}
func (r *bloomValuesReader) reset() {
r.bloom.reset()
r.values.reset()
}
func (r *bloomValuesReader) init(sr bloomValuesStreamReader) {
r.bloom.init(sr.bloom)
r.values.init(sr.values)
}
func (r *bloomValuesReader) totalBytesRead() uint64 {
return r.bloom.bytesRead + r.values.bytesRead
}
func (r *bloomValuesReader) MustClose() {
r.bloom.MustClose()
r.values.MustClose()
}
type bloomValuesStreamReader struct {
bloom filestream.ReadCloser
values filestream.ReadCloser
} }
func (sr *streamReaders) reset() { func (sr *streamReaders) reset() {
sr.columnNamesReader.reset()
sr.metaindexReader.reset() sr.metaindexReader.reset()
sr.indexReader.reset() sr.indexReader.reset()
sr.columnsHeaderIndexReader.reset()
sr.columnsHeaderReader.reset() sr.columnsHeaderReader.reset()
sr.timestampsReader.reset() sr.timestampsReader.reset()
sr.fieldValuesReader.reset()
sr.fieldBloomFilterReader.reset() sr.messageBloomValuesReader.reset()
sr.messageValuesReader.reset() sr.oldBloomValuesReader.reset()
sr.messageBloomFilterReader.reset() for i := range sr.bloomValuesShards[:] {
sr.bloomValuesShards[i].reset()
}
} }
func (sr *streamReaders) init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader, fieldValuesReader, fieldBloomFilterReader, func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser,
messageValuesReader, messageBloomFilterReader filestream.ReadCloser, messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader,
) { ) {
sr.columnNamesReader.init(columnNamesReader)
sr.metaindexReader.init(metaindexReader) sr.metaindexReader.init(metaindexReader)
sr.indexReader.init(indexReader) sr.indexReader.init(indexReader)
sr.columnsHeaderIndexReader.init(columnsHeaderIndexReader)
sr.columnsHeaderReader.init(columnsHeaderReader) sr.columnsHeaderReader.init(columnsHeaderReader)
sr.timestampsReader.init(timestampsReader) sr.timestampsReader.init(timestampsReader)
sr.fieldValuesReader.init(fieldValuesReader)
sr.fieldBloomFilterReader.init(fieldBloomFilterReader) sr.messageBloomValuesReader.init(messageBloomValuesReader)
sr.messageValuesReader.init(messageValuesReader) sr.oldBloomValuesReader.init(oldBloomValuesReader)
sr.messageBloomFilterReader.init(messageBloomFilterReader) for i := range sr.bloomValuesShards[:] {
sr.bloomValuesShards[i].init(bloomValuesShards[i])
}
} }
func (sr *streamReaders) totalBytesRead() uint64 { func (sr *streamReaders) totalBytesRead() uint64 {
n := uint64(0) n := uint64(0)
n += sr.columnNamesReader.bytesRead
n += sr.metaindexReader.bytesRead n += sr.metaindexReader.bytesRead
n += sr.indexReader.bytesRead n += sr.indexReader.bytesRead
n += sr.columnsHeaderIndexReader.bytesRead
n += sr.columnsHeaderReader.bytesRead n += sr.columnsHeaderReader.bytesRead
n += sr.timestampsReader.bytesRead n += sr.timestampsReader.bytesRead
n += sr.fieldValuesReader.bytesRead
n += sr.fieldBloomFilterReader.bytesRead n += sr.messageBloomValuesReader.totalBytesRead()
n += sr.messageValuesReader.bytesRead n += sr.oldBloomValuesReader.totalBytesRead()
n += sr.messageBloomFilterReader.bytesRead for i := range sr.bloomValuesShards[:] {
n += sr.bloomValuesShards[i].totalBytesRead()
}
return n return n
} }
func (sr *streamReaders) MustClose() { func (sr *streamReaders) MustClose() {
sr.columnNamesReader.MustClose()
sr.metaindexReader.MustClose() sr.metaindexReader.MustClose()
sr.indexReader.MustClose() sr.indexReader.MustClose()
sr.columnsHeaderIndexReader.MustClose()
sr.columnsHeaderReader.MustClose() sr.columnsHeaderReader.MustClose()
sr.timestampsReader.MustClose() sr.timestampsReader.MustClose()
sr.fieldValuesReader.MustClose()
sr.fieldBloomFilterReader.MustClose() sr.messageBloomValuesReader.MustClose()
sr.messageValuesReader.MustClose() sr.oldBloomValuesReader.MustClose()
sr.messageBloomFilterReader.MustClose() for i := range sr.bloomValuesShards[:] {
sr.bloomValuesShards[i].MustClose()
}
}
func (sr *streamReaders) getBloomValuesReaderForColumnName(name string, partFormatVersion uint) *bloomValuesReader {
if name == "" {
return &sr.messageBloomValuesReader
}
if partFormatVersion < 1 {
return &sr.oldBloomValuesReader
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx := h % uint64(len(sr.bloomValuesShards))
return &sr.bloomValuesShards[idx]
} }
// blockStreamReader is used for reading blocks in streaming manner from a part. // blockStreamReader is used for reading blocks in streaming manner from a part.
@ -121,6 +188,12 @@ type blockStreamReader struct {
// streamReaders contains data readers in stream mode // streamReaders contains data readers in stream mode
streamReaders streamReaders streamReaders streamReaders
// columnNameIDs contains columnName->id mapping for all the column names seen in the part
columnNameIDs map[string]uint64
// columnNames constains id->columnName mapping for all the columns seen in the part
columnNames []string
// indexBlockHeaders contains the list of all the indexBlockHeader entries for the part // indexBlockHeaders contains the list of all the indexBlockHeader entries for the part
indexBlockHeaders []indexBlockHeader indexBlockHeaders []indexBlockHeader
@ -156,6 +229,9 @@ func (bsr *blockStreamReader) reset() {
bsr.ph.reset() bsr.ph.reset()
bsr.streamReaders.reset() bsr.streamReaders.reset()
bsr.columnNameIDs = nil
bsr.columnNames = nil
ihs := bsr.indexBlockHeaders ihs := bsr.indexBlockHeaders
if len(ihs) > 10e3 { if len(ihs) > 10e3 {
// The ihs len is unbound, so it is better to drop too long indexBlockHeaders in order to reduce memory usage // The ihs len is unbound, so it is better to drop too long indexBlockHeaders in order to reduce memory usage
@ -195,17 +271,25 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) {
bsr.ph = mp.ph bsr.ph = mp.ph
// Initialize streamReaders // Initialize streamReaders
columnNamesReader := mp.columnNames.NewReader()
metaindexReader := mp.metaindex.NewReader() metaindexReader := mp.metaindex.NewReader()
indexReader := mp.index.NewReader() indexReader := mp.index.NewReader()
columnsHeaderIndexReader := mp.columnsHeaderIndex.NewReader()
columnsHeaderReader := mp.columnsHeader.NewReader() columnsHeaderReader := mp.columnsHeader.NewReader()
timestampsReader := mp.timestamps.NewReader() timestampsReader := mp.timestamps.NewReader()
fieldValuesReader := mp.fieldValues.NewReader()
fieldBloomFilterReader := mp.fieldBloomFilter.NewReader()
messageValuesReader := mp.messageValues.NewReader()
messageBloomFilterReader := mp.messageBloomFilter.NewReader()
bsr.streamReaders.init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader, messageBloomValuesReader := mp.messageBloomValues.NewStreamReader()
fieldValuesReader, fieldBloomFilterReader, messageValuesReader, messageBloomFilterReader) var oldBloomValuesReader bloomValuesStreamReader
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader
for i := range bloomValuesShards[:] {
bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamReader()
}
bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
// Read columnNames data
bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader)
// Read metaindex data // Read metaindex data
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader) bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
@ -219,30 +303,63 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) {
// since they are usually deleted after the merge. // since they are usually deleted after the merge.
const nocache = true const nocache = true
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
bsr.ph.mustReadMetadata(path) bsr.ph.mustReadMetadata(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
// Open data readers // Open data readers
var columnNamesReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
columnNamesReader = filestream.MustOpen(columnNamesPath, nocache)
}
metaindexReader := filestream.MustOpen(metaindexPath, nocache) metaindexReader := filestream.MustOpen(metaindexPath, nocache)
indexReader := filestream.MustOpen(indexPath, nocache) indexReader := filestream.MustOpen(indexPath, nocache)
var columnsHeaderIndexReader filestream.ReadCloser
if bsr.ph.FormatVersion >= 1 {
columnsHeaderIndexReader = filestream.MustOpen(columnsHeaderIndexPath, nocache)
}
columnsHeaderReader := filestream.MustOpen(columnsHeaderPath, nocache) columnsHeaderReader := filestream.MustOpen(columnsHeaderPath, nocache)
timestampsReader := filestream.MustOpen(timestampsPath, nocache) timestampsReader := filestream.MustOpen(timestampsPath, nocache)
fieldValuesReader := filestream.MustOpen(fieldValuesPath, nocache)
fieldBloomFilterReader := filestream.MustOpen(fieldBloomFilterPath, nocache) messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
messageValuesReader := filestream.MustOpen(messageValuesPath, nocache) messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterReader := filestream.MustOpen(messageBloomFilterPath, nocache) messageBloomValuesReader := bloomValuesStreamReader{
bloom: filestream.MustOpen(messageBloomFilterPath, nocache),
values: filestream.MustOpen(messageValuesPath, nocache),
}
var oldBloomValuesReader bloomValuesStreamReader
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader
if bsr.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
oldBloomValuesReader.bloom = filestream.MustOpen(bloomPath, nocache)
valuesPath := filepath.Join(path, oldValuesFilename)
oldBloomValuesReader.values = filestream.MustOpen(valuesPath, nocache)
} else {
for i := range bloomValuesShards[:] {
shard := &bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = filestream.MustOpen(bloomPath, nocache)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = filestream.MustOpen(valuesPath, nocache)
}
}
// Initialize streamReaders // Initialize streamReaders
bsr.streamReaders.init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader, bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader,
fieldValuesReader, fieldBloomFilterReader, messageValuesReader, messageBloomFilterReader) messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards)
if bsr.ph.FormatVersion >= 1 {
// Read columnNames data
bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader)
}
// Read metaindex data // Read metaindex data
bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader) bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader)
@ -282,7 +399,7 @@ func (bsr *blockStreamReader) NextBlock() bool {
// Read bsr.blockData // Read bsr.blockData
bsr.a.reset() bsr.a.reset()
bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders) bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders, bsr.ph.FormatVersion, bsr.columnNames)
bsr.globalUncompressedSizeBytes += bh.uncompressedSizeBytes bsr.globalUncompressedSizeBytes += bh.uncompressedSizeBytes
bsr.globalRowsCount += bh.rowsCount bsr.globalRowsCount += bh.rowsCount
@ -342,7 +459,7 @@ func (bsr *blockStreamReader) nextIndexBlock() bool {
bb.B = ih.mustReadNextIndexBlock(bb.B[:0], &bsr.streamReaders) bb.B = ih.mustReadNextIndexBlock(bb.B[:0], &bsr.streamReaders)
bsr.blockHeaders = resetBlockHeaders(bsr.blockHeaders) bsr.blockHeaders = resetBlockHeaders(bsr.blockHeaders)
var err error var err error
bsr.blockHeaders, err = unmarshalBlockHeaders(bsr.blockHeaders[:0], bb.B) bsr.blockHeaders, err = unmarshalBlockHeaders(bsr.blockHeaders[:0], bb.B, bsr.ph.FormatVersion)
longTermBufPool.Put(bb) longTermBufPool.Put(bb)
if err != nil { if err != nil {
logger.Panicf("FATAL: %s: cannot unmarshal blockHeader entries: %s", bsr.streamReaders.indexReader.Path(), err) logger.Panicf("FATAL: %s: cannot unmarshal blockHeader entries: %s", bsr.streamReaders.indexReader.Path(), err)

View file

@ -4,8 +4,9 @@ import (
"path/filepath" "path/filepath"
"sync" "sync"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -44,62 +45,116 @@ func (w *writerWithStats) MustClose() {
// streamWriters contain writers for blockStreamWriter // streamWriters contain writers for blockStreamWriter
type streamWriters struct { type streamWriters struct {
columnNamesWriter writerWithStats
metaindexWriter writerWithStats metaindexWriter writerWithStats
indexWriter writerWithStats indexWriter writerWithStats
columnsHeaderIndexWriter writerWithStats
columnsHeaderWriter writerWithStats columnsHeaderWriter writerWithStats
timestampsWriter writerWithStats timestampsWriter writerWithStats
fieldValuesWriter writerWithStats
fieldBloomFilterWriter writerWithStats messageBloomValuesWriter bloomValuesWriter
messageValuesWriter writerWithStats bloomValuesShards [bloomValuesShardsCount]bloomValuesWriter
messageBloomFilterWriter writerWithStats }
type bloomValuesWriter struct {
bloom writerWithStats
values writerWithStats
}
func (w *bloomValuesWriter) reset() {
w.bloom.reset()
w.values.reset()
}
func (w *bloomValuesWriter) init(sw bloomValuesStreamWriter) {
w.bloom.init(sw.bloom)
w.values.init(sw.values)
}
func (w *bloomValuesWriter) totalBytesWritten() uint64 {
return w.bloom.bytesWritten + w.values.bytesWritten
}
func (w *bloomValuesWriter) MustClose() {
w.bloom.MustClose()
w.values.MustClose()
}
type bloomValuesStreamWriter struct {
bloom filestream.WriteCloser
values filestream.WriteCloser
} }
func (sw *streamWriters) reset() { func (sw *streamWriters) reset() {
sw.columnNamesWriter.reset()
sw.metaindexWriter.reset() sw.metaindexWriter.reset()
sw.indexWriter.reset() sw.indexWriter.reset()
sw.columnsHeaderIndexWriter.reset()
sw.columnsHeaderWriter.reset() sw.columnsHeaderWriter.reset()
sw.timestampsWriter.reset() sw.timestampsWriter.reset()
sw.fieldValuesWriter.reset()
sw.fieldBloomFilterWriter.reset() sw.messageBloomValuesWriter.reset()
sw.messageValuesWriter.reset() for i := range sw.bloomValuesShards[:] {
sw.messageBloomFilterWriter.reset() sw.bloomValuesShards[i].reset()
}
} }
func (sw *streamWriters) init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter, fieldValuesWriter, fieldBloomFilterWriter, func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser,
messageValuesWriter, messageBloomFilterWriter filestream.WriteCloser, messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter,
) { ) {
sw.columnNamesWriter.init(columnNamesWriter)
sw.metaindexWriter.init(metaindexWriter) sw.metaindexWriter.init(metaindexWriter)
sw.indexWriter.init(indexWriter) sw.indexWriter.init(indexWriter)
sw.columnsHeaderIndexWriter.init(columnsHeaderIndexWriter)
sw.columnsHeaderWriter.init(columnsHeaderWriter) sw.columnsHeaderWriter.init(columnsHeaderWriter)
sw.timestampsWriter.init(timestampsWriter) sw.timestampsWriter.init(timestampsWriter)
sw.fieldValuesWriter.init(fieldValuesWriter)
sw.fieldBloomFilterWriter.init(fieldBloomFilterWriter) sw.messageBloomValuesWriter.init(messageBloomValuesWriter)
sw.messageValuesWriter.init(messageValuesWriter) for i := range sw.bloomValuesShards[:] {
sw.messageBloomFilterWriter.init(messageBloomFilterWriter) sw.bloomValuesShards[i].init(bloomValuesShards[i])
}
} }
func (sw *streamWriters) totalBytesWritten() uint64 { func (sw *streamWriters) totalBytesWritten() uint64 {
n := uint64(0) n := uint64(0)
n += sw.columnNamesWriter.bytesWritten
n += sw.metaindexWriter.bytesWritten n += sw.metaindexWriter.bytesWritten
n += sw.indexWriter.bytesWritten n += sw.indexWriter.bytesWritten
n += sw.columnsHeaderIndexWriter.bytesWritten
n += sw.columnsHeaderWriter.bytesWritten n += sw.columnsHeaderWriter.bytesWritten
n += sw.timestampsWriter.bytesWritten n += sw.timestampsWriter.bytesWritten
n += sw.fieldValuesWriter.bytesWritten
n += sw.fieldBloomFilterWriter.bytesWritten n += sw.messageBloomValuesWriter.totalBytesWritten()
n += sw.messageValuesWriter.bytesWritten for i := range sw.bloomValuesShards[:] {
n += sw.messageBloomFilterWriter.bytesWritten n += sw.bloomValuesShards[i].totalBytesWritten()
}
return n return n
} }
func (sw *streamWriters) MustClose() { func (sw *streamWriters) MustClose() {
sw.columnNamesWriter.MustClose()
sw.metaindexWriter.MustClose() sw.metaindexWriter.MustClose()
sw.indexWriter.MustClose() sw.indexWriter.MustClose()
sw.columnsHeaderIndexWriter.MustClose()
sw.columnsHeaderWriter.MustClose() sw.columnsHeaderWriter.MustClose()
sw.timestampsWriter.MustClose() sw.timestampsWriter.MustClose()
sw.fieldValuesWriter.MustClose()
sw.fieldBloomFilterWriter.MustClose() sw.messageBloomValuesWriter.MustClose()
sw.messageValuesWriter.MustClose() for i := range sw.bloomValuesShards[:] {
sw.messageBloomFilterWriter.MustClose() sw.bloomValuesShards[i].MustClose()
}
}
func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter {
if name == "" {
return &sw.messageBloomValuesWriter
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx := h % uint64(len(sw.bloomValuesShards))
return &sw.bloomValuesShards[idx]
} }
// blockStreamWriter is used for writing blocks into the underlying storage in streaming manner. // blockStreamWriter is used for writing blocks into the underlying storage in streaming manner.
@ -148,6 +203,9 @@ type blockStreamWriter struct {
// indexBlockHeader is used for marshaling the data to metaindexData // indexBlockHeader is used for marshaling the data to metaindexData
indexBlockHeader indexBlockHeader indexBlockHeader indexBlockHeader
// columnNameIDGenerator is used for generating columnName->id mapping for all the columns seen in bsw
columnNameIDGenerator columnNameIDGenerator
} }
// reset resets bsw for subsequent re-use. // reset resets bsw for subsequent re-use.
@ -175,12 +233,22 @@ func (bsw *blockStreamWriter) reset() {
} }
bsw.indexBlockHeader.reset() bsw.indexBlockHeader.reset()
bsw.columnNameIDGenerator.reset()
} }
// MustInitForInmemoryPart initializes bsw from mp // MustInitForInmemoryPart initializes bsw from mp
func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) {
bsw.reset() bsw.reset()
bsw.streamWriters.init(&mp.metaindex, &mp.index, &mp.columnsHeader, &mp.timestamps, &mp.fieldValues, &mp.fieldBloomFilter, &mp.messageValues, &mp.messageBloomFilter)
messageBloomValues := mp.messageBloomValues.NewStreamWriter()
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter
for i := range bloomValuesShards[:] {
bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamWriter()
}
bsw.streamWriters.init(&mp.columnNames, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, bloomValuesShards)
} }
// MustInitForFilePart initializes bsw for writing data to file part located at path. // MustInitForFilePart initializes bsw for writing data to file part located at path.
@ -191,28 +259,43 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) {
fs.MustMkdirFailIfExist(path) fs.MustMkdirFailIfExist(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename) metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename) indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename) timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
// Always cache metaindex file, since it it re-read immediately after part creation // Always cache columnNames files, since it is re-read immediately after part creation
columnNamesWriter := filestream.MustCreate(columnNamesPath, false)
// Always cache metaindex file, since it is re-read immediately after part creation
metaindexWriter := filestream.MustCreate(metaindexPath, false) metaindexWriter := filestream.MustCreate(metaindexPath, false)
indexWriter := filestream.MustCreate(indexPath, nocache) indexWriter := filestream.MustCreate(indexPath, nocache)
columnsHeaderIndexWriter := filestream.MustCreate(columnsHeaderIndexPath, nocache)
columnsHeaderWriter := filestream.MustCreate(columnsHeaderPath, nocache) columnsHeaderWriter := filestream.MustCreate(columnsHeaderPath, nocache)
timestampsWriter := filestream.MustCreate(timestampsPath, nocache) timestampsWriter := filestream.MustCreate(timestampsPath, nocache)
fieldValuesWriter := filestream.MustCreate(fieldValuesPath, nocache)
fieldBloomFilterWriter := filestream.MustCreate(fieldBloomFilterPath, nocache)
messageValuesWriter := filestream.MustCreate(messageValuesPath, nocache)
messageBloomFilterWriter := filestream.MustCreate(messageBloomFilterPath, nocache)
bsw.streamWriters.init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter, messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
fieldValuesWriter, fieldBloomFilterWriter, messageValuesWriter, messageBloomFilterWriter) messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomValuesWriter := bloomValuesStreamWriter{
bloom: filestream.MustCreate(messageBloomFilterPath, nocache),
values: filestream.MustCreate(messageValuesPath, nocache),
}
var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter
for i := range bloomValuesShards[:] {
shard := &bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = filestream.MustCreate(bloomPath, nocache)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = filestream.MustCreate(valuesPath, nocache)
}
bsw.streamWriters.init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter, bloomValuesShards)
} }
// MustWriteRows writes timestamps with rows under the given sid to bsw. // MustWriteRows writes timestamps with rows under the given sid to bsw.
@ -266,9 +349,9 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd
bh := getBlockHeader() bh := getBlockHeader()
if b != nil { if b != nil {
b.mustWriteTo(sid, bh, &bsw.streamWriters) b.mustWriteTo(sid, bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
} else { } else {
bd.mustWriteTo(bh, &bsw.streamWriters) bd.mustWriteTo(bh, &bsw.streamWriters, &bsw.columnNameIDGenerator)
} }
th := &bh.timestampsHeader th := &bh.timestampsHeader
if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp { if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp {
@ -318,6 +401,7 @@ func (bsw *blockStreamWriter) mustFlushIndexBlock(data []byte) {
// //
// bsw can be re-used after calling Finalize(). // bsw can be re-used after calling Finalize().
func (bsw *blockStreamWriter) Finalize(ph *partHeader) { func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
ph.FormatVersion = partFormatLatestVersion
ph.UncompressedSizeBytes = bsw.globalUncompressedSizeBytes ph.UncompressedSizeBytes = bsw.globalUncompressedSizeBytes
ph.RowsCount = bsw.globalRowsCount ph.RowsCount = bsw.globalRowsCount
ph.BlocksCount = bsw.globalBlocksCount ph.BlocksCount = bsw.globalBlocksCount
@ -326,13 +410,11 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) {
bsw.mustFlushIndexBlock(bsw.indexBlockData) bsw.mustFlushIndexBlock(bsw.indexBlockData)
// Write columnNames data
mustWriteColumnNames(&bsw.streamWriters.columnNamesWriter, bsw.columnNameIDGenerator.columnNames)
// Write metaindex data // Write metaindex data
bb := longTermBufPool.Get() mustWriteIndexBlockHeaders(&bsw.streamWriters.metaindexWriter, bsw.metaindexData)
bb.B = encoding.CompressZSTDLevel(bb.B[:0], bsw.metaindexData, 1)
bsw.streamWriters.metaindexWriter.MustWrite(bb.B)
if len(bb.B) < 1024*1024 {
longTermBufPool.Put(bb)
}
ph.CompressedSizeBytes = bsw.streamWriters.totalBytesWritten() ph.CompressedSizeBytes = bsw.streamWriters.totalBytesWritten()

View file

@ -18,10 +18,19 @@ const bloomFilterHashesCount = 6
// bloomFilterBitsPerItem is the number of bits to use per each token. // bloomFilterBitsPerItem is the number of bits to use per each token.
const bloomFilterBitsPerItem = 16 const bloomFilterBitsPerItem = 16
// bloomFilterMarshal appends marshaled bloom filter for tokens to dst and returns the result. // bloomFilterMarshalTokens appends marshaled bloom filter for tokens to dst and returns the result.
func bloomFilterMarshal(dst []byte, tokens []string) []byte { func bloomFilterMarshalTokens(dst []byte, tokens []string) []byte {
bf := getBloomFilter() bf := getBloomFilter()
bf.mustInit(tokens) bf.mustInitTokens(tokens)
dst = bf.marshal(dst)
putBloomFilter(bf)
return dst
}
// bloomFilterMarshalHashes appends marshaled bloom filter for hashes to dst and returns the result.
func bloomFilterMarshalHashes(dst []byte, hashes []uint64) []byte {
bf := getBloomFilter()
bf.mustInitHashes(hashes)
dst = bf.marshal(dst) dst = bf.marshal(dst)
putBloomFilter(bf) putBloomFilter(bf)
return dst return dst
@ -61,23 +70,45 @@ func (bf *bloomFilter) unmarshal(src []byte) error {
return nil return nil
} }
// mustInit initializes bf with the given tokens // mustInitTokens initializes bf with the given tokens
func (bf *bloomFilter) mustInit(tokens []string) { func (bf *bloomFilter) mustInitTokens(tokens []string) {
bitsCount := len(tokens) * bloomFilterBitsPerItem bitsCount := len(tokens) * bloomFilterBitsPerItem
wordsCount := (bitsCount + 63) / 64 wordsCount := (bitsCount + 63) / 64
bits := slicesutil.SetLength(bf.bits, wordsCount) bits := slicesutil.SetLength(bf.bits, wordsCount)
bloomFilterAdd(bits, tokens) bloomFilterAddTokens(bits, tokens)
bf.bits = bits bf.bits = bits
} }
// bloomFilterAdd adds the given tokens to the bloom filter bits // mustInitHashes initializes bf with the given hashes
func bloomFilterAdd(bits []uint64, tokens []string) { func (bf *bloomFilter) mustInitHashes(hashes []uint64) {
bitsCount := len(hashes) * bloomFilterBitsPerItem
wordsCount := (bitsCount + 63) / 64
bits := slicesutil.SetLength(bf.bits, wordsCount)
bloomFilterAddHashes(bits, hashes)
bf.bits = bits
}
// bloomFilterAddTokens adds the given tokens to the bloom filter bits
func bloomFilterAddTokens(bits []uint64, tokens []string) {
hashesCount := len(tokens) * bloomFilterHashesCount hashesCount := len(tokens) * bloomFilterHashesCount
a := encoding.GetUint64s(hashesCount) a := encoding.GetUint64s(hashesCount)
a.A = appendTokensHashes(a.A[:0], tokens) a.A = appendTokensHashes(a.A[:0], tokens)
initBloomFilter(bits, a.A)
encoding.PutUint64s(a)
}
// bloomFilterAddHashes adds the given haehs to the bloom filter bits
func bloomFilterAddHashes(bits, hashes []uint64) {
hashesCount := len(hashes) * bloomFilterHashesCount
a := encoding.GetUint64s(hashesCount)
a.A = appendHashesHashes(a.A[:0], hashes)
initBloomFilter(bits, a.A)
encoding.PutUint64s(a)
}
func initBloomFilter(bits, hashes []uint64) {
maxBits := uint64(len(bits)) * 64 maxBits := uint64(len(bits)) * 64
for _, h := range a.A { for _, h := range hashes {
idx := h % maxBits idx := h % maxBits
i := idx / 64 i := idx / 64
j := idx % 64 j := idx % 64
@ -87,8 +118,6 @@ func bloomFilterAdd(bits []uint64, tokens []string) {
bits[i] = w | mask bits[i] = w | mask
} }
} }
encoding.PutUint64s(a)
} }
// appendTokensHashes appends hashes for the given tokens to dst and returns the result. // appendTokensHashes appends hashes for the given tokens to dst and returns the result.
@ -114,6 +143,29 @@ func appendTokensHashes(dst []uint64, tokens []string) []uint64 {
return dst return dst
} }
// appendHashesHashes appends hashes for the given hashes to dst and returns the result.
//
// the appended hashes can be then passed to bloomFilter.containsAll().
func appendHashesHashes(dst, hashes []uint64) []uint64 {
dstLen := len(dst)
hashesCount := len(hashes) * bloomFilterHashesCount
dst = slicesutil.SetLength(dst, dstLen+hashesCount)
dst = dst[:dstLen]
var buf [8]byte
hp := (*uint64)(unsafe.Pointer(&buf[0]))
for _, h := range hashes {
*hp = h
for i := 0; i < bloomFilterHashesCount; i++ {
h := xxhash.Sum64(buf[:])
(*hp)++
dst = append(dst, h)
}
}
return dst
}
// containsAll returns true if bf contains all the given tokens hashes generated by appendTokensHashes. // containsAll returns true if bf contains all the given tokens hashes generated by appendTokensHashes.
func (bf *bloomFilter) containsAll(hashes []uint64) bool { func (bf *bloomFilter) containsAll(hashes []uint64) bool {
bits := bf.bits bits := bf.bits

View file

@ -8,10 +8,16 @@ import (
func TestBloomFilter(t *testing.T) { func TestBloomFilter(t *testing.T) {
f := func(tokens []string) { f := func(tokens []string) {
t.Helper() t.Helper()
data := bloomFilterMarshal(nil, tokens) dataTokens := bloomFilterMarshalTokens(nil, tokens)
hashes := tokenizeHashes(nil, tokens)
dataHashes := bloomFilterMarshalHashes(nil, hashes)
if string(dataTokens) != string(dataHashes) {
t.Fatalf("unexpected marshaled bloom filters from hashes\ngot\n%X\nwant\n%X", dataHashes, dataTokens)
}
bf := getBloomFilter() bf := getBloomFilter()
defer putBloomFilter(bf) defer putBloomFilter(bf)
if err := bf.unmarshal(data); err != nil { if err := bf.unmarshal(dataTokens); err != nil {
t.Fatalf("unexpected error when unmarshaling bloom filter: %s", err) t.Fatalf("unexpected error when unmarshaling bloom filter: %s", err)
} }
tokensHashes := appendTokensHashes(nil, tokens) tokensHashes := appendTokensHashes(nil, tokens)
@ -57,7 +63,7 @@ func TestBloomFilterFalsePositive(t *testing.T) {
for i := range tokens { for i := range tokens {
tokens[i] = fmt.Sprintf("token_%d", i) tokens[i] = fmt.Sprintf("token_%d", i)
} }
data := bloomFilterMarshal(nil, tokens) data := bloomFilterMarshalTokens(nil, tokens)
bf := getBloomFilter() bf := getBloomFilter()
defer putBloomFilter(bf) defer putBloomFilter(bf)
if err := bf.unmarshal(data); err != nil { if err := bf.unmarshal(data); err != nil {
@ -79,3 +85,35 @@ func TestBloomFilterFalsePositive(t *testing.T) {
t.Fatalf("too high false positive rate; got %.4f; want %.4f max", p, maxFalsePositive) t.Fatalf("too high false positive rate; got %.4f; want %.4f max", p, maxFalsePositive)
} }
} }
func TestBloomFilterMarshal_TokensVSHashes(t *testing.T) {
tokens := make([]string, 100)
for i := range tokens {
tokens[i] = fmt.Sprintf("token_%d", i)
}
dataTokens := bloomFilterMarshalTokens(nil, tokens)
hashes := tokenizeHashes(nil, tokens)
dataHashes := bloomFilterMarshalHashes(nil, hashes)
if string(dataTokens) != string(dataHashes) {
t.Fatalf("unexpected bloom filter obtained from hashes\ngot\n%X\nwant\n%X", dataHashes, dataTokens)
}
}
func TestBloomFilterMarshalTokens(t *testing.T) {
f := func(tokens []string, resultExpected string) {
t.Helper()
result := bloomFilterMarshalTokens(nil, tokens)
if string(result) != resultExpected {
t.Fatalf("unexpected result\ngot\n%X\nwant\n%X", result, resultExpected)
}
}
f([]string{}, "")
f([]string{"foo"}, "\x00\x00\x00\x82\x40\x18\x00\x04")
f([]string{"foo", "bar", "baz"}, "\x00\x00\x81\xA3\x48\x5C\x10\x26")
f([]string{"foo", "bar", "baz", "foo"}, "\x00\x00\x81\xA3\x48\x5C\x10\x26")
}

View file

@ -0,0 +1,127 @@
package logstorage
import (
"fmt"
"io"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
func mustWriteColumnNames(w *writerWithStats, columnNames []string) {
data := marshalColumnNames(nil, columnNames)
w.MustWrite(data)
}
func mustReadColumnNames(r filestream.ReadCloser) ([]string, map[string]uint64) {
src, err := io.ReadAll(r)
if err != nil {
logger.Panicf("FATAL: %s: cannot read colum names: %s", r.Path(), err)
}
columnNames, err := unmarshalColumnNames(src)
if err != nil {
logger.Panicf("FATAL: %s: %s", r.Path(), err)
}
columnNameIDs, err := getColumnNameIDs(columnNames)
if err != nil {
logger.Panicf("BUG: %s: %s; columnNames=%v", r.Path(), err, columnNameIDs)
}
return columnNames, columnNameIDs
}
func getColumnNameIDs(columnNames []string) (map[string]uint64, error) {
m := make(map[uint64]string, len(columnNames))
columnNameIDs := make(map[string]uint64, len(columnNames))
for i, name := range columnNames {
id := uint64(i)
if prevName, ok := m[id]; ok {
return nil, fmt.Errorf("duplicate column name id=%d for columns %q and %q", id, prevName, name)
}
m[id] = name
columnNameIDs[name] = id
}
return columnNameIDs, nil
}
func marshalColumnNames(dst []byte, columnNames []string) []byte {
data := encoding.MarshalVarUint64(nil, uint64(len(columnNames)))
for _, name := range columnNames {
data = encoding.MarshalBytes(data, bytesutil.ToUnsafeBytes(name))
}
dst = encoding.CompressZSTDLevel(dst, data, 1)
return dst
}
func unmarshalColumnNames(src []byte) ([]string, error) {
data, err := encoding.DecompressZSTD(nil, src)
if err != nil {
return nil, fmt.Errorf("cannot decompress column names from len(src)=%d: %w", len(src), err)
}
src = data
n, nBytes := encoding.UnmarshalVarUint64(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse the number of column names for len(src)=%d", len(src))
}
src = src[nBytes:]
m := make(map[string]uint64, n)
columnNames := make([]string, n)
for id := uint64(0); id < n; id++ {
name, nBytes := encoding.UnmarshalBytes(src)
if nBytes <= 0 {
return nil, fmt.Errorf("cannot parse colum name number %d out of %d", id, n)
}
src = src[nBytes:]
if idPrev, ok := m[string(name)]; ok {
return nil, fmt.Errorf("duplicate ids for column name %q: %d and %d", name, idPrev, id)
}
m[string(name)] = id
columnNames[id] = string(name)
}
if len(src) > 0 {
return nil, fmt.Errorf("unexpected non-empty tail left after unmarshaling column name ids; len(tail)=%d", len(src))
}
return columnNames, nil
}
type columnNameIDGenerator struct {
// columnNameIDs contains columnName->id mapping for already seen columns
columnNameIDs map[string]uint64
// columnNames contains id->columnName mapping for already seen columns
columnNames []string
}
func (g *columnNameIDGenerator) reset() {
g.columnNameIDs = nil
g.columnNames = nil
}
func (g *columnNameIDGenerator) getColumnNameID(name string) uint64 {
id, ok := g.columnNameIDs[name]
if !ok {
if g.columnNameIDs == nil {
g.columnNameIDs = make(map[string]uint64)
}
id = uint64(len(g.columnNames))
nameCopy := strings.Clone(name)
g.columnNameIDs[nameCopy] = id
g.columnNames = append(g.columnNames, nameCopy)
}
return id
}

View file

@ -0,0 +1,54 @@
package logstorage
import (
"reflect"
"testing"
)
func TestMarshalUnmarshalColumnNames(t *testing.T) {
f := func(columnNames []string) {
t.Helper()
data := marshalColumnNames(nil, columnNames)
result, err := unmarshalColumnNames(data)
if err != nil {
t.Fatalf("unexpected error when unmarshaling columnNames: %s", err)
}
if !reflect.DeepEqual(columnNames, result) {
t.Fatalf("unexpected umarshaled columnNames\ngot\n%v\nwant\n%v", result, columnNames)
}
}
f([]string{})
f([]string{"", "foo", "bar"})
f([]string{
"asdf.sdf.dsfds.f fds. fds ",
"foo",
"bar.sdfsdf.fd",
"",
"aso apaa",
})
}
func TestColumnNameIDGenerator(t *testing.T) {
a := []string{"", "foo", "bar.baz", "asdf dsf dfs"}
g := &columnNameIDGenerator{}
for i, s := range a {
id := g.getColumnNameID(s)
if id != uint64(i) {
t.Fatalf("first run: unexpected id generated for s=%q; got %d; want %d; g=%v", s, id, i, g)
}
}
// Repeat the loop
for i, s := range a {
id := g.getColumnNameID(s)
if id != uint64(i) {
t.Fatalf("second run: unexpected id generated for s=%q; got %d; want %d; g=%v", s, id, i, g)
}
}
}

View file

@ -1,5 +1,15 @@
package logstorage package logstorage
// partFormatLatestVersion is the latest format version for parts.
//
// See partHeader.FormatVersion for details.
const partFormatLatestVersion = 1
// bloomValuesShardsCount is the number of shards for bloomFilename and valuesFilename files.
//
// The partHeader.FormatVersion must be updated when this number changes.
const bloomValuesShardsCount = 8
// maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block. // maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block.
// //
// The real block length can exceed this value by a small percentage because of the block write details. // The real block length can exceed this value by a small percentage because of the block write details.
@ -46,6 +56,9 @@ const maxBloomFilterBlockSize = 8 * 1024 * 1024
// maxColumnsHeaderSize is the maximum size of columnsHeader block // maxColumnsHeaderSize is the maximum size of columnsHeader block
const maxColumnsHeaderSize = 8 * 1024 * 1024 const maxColumnsHeaderSize = 8 * 1024 * 1024
// maxColumnsHeaderIndexSize is the maximum size of columnsHeaderIndex block
const maxColumnsHeaderIndexSize = 8 * 1024 * 1024
// maxDictSizeBytes is the maximum length of all the keys in the valuesDict. // maxDictSizeBytes is the maximum length of all the keys in the valuesDict.
// //
// Dict is stored in columnsHeader, which is read every time the corresponding block is scanned during search qieries. // Dict is stored in columnsHeader, which is read every time the corresponding block is scanned during search qieries.

View file

@ -1,12 +1,16 @@
package logstorage package logstorage
const ( const (
columnNamesFilename = "column_names.bin"
metaindexFilename = "metaindex.bin" metaindexFilename = "metaindex.bin"
indexFilename = "index.bin" indexFilename = "index.bin"
columnsHeaderIndexFilename = "columns_header_index.bin"
columnsHeaderFilename = "columns_header.bin" columnsHeaderFilename = "columns_header.bin"
timestampsFilename = "timestamps.bin" timestampsFilename = "timestamps.bin"
fieldValuesFilename = "field_values.bin" oldValuesFilename = "field_values.bin"
fieldBloomFilename = "field_bloom.bin" oldBloomFilename = "field_bloom.bin"
valuesFilename = "values.bin"
bloomFilename = "bloom.bin"
messageValuesFilename = "message_values.bin" messageValuesFilename = "message_values.bin"
messageBloomFilename = "message_bloom.bin" messageBloomFilename = "message_bloom.bin"

View file

@ -0,0 +1,180 @@
package logstorage
import (
"sync"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// tokenizeHashes extracts word tokens from a, hashes them, appends hashes to dst and returns the result.
//
// The returned hashes can be used for building bloom filters.
func tokenizeHashes(dst []uint64, a []string) []uint64 {
t := getHashTokenizer()
for i, s := range a {
if i > 0 && s == a[i-1] {
// This string has been already tokenized
continue
}
dst = t.tokenizeString(dst, s)
}
putHashTokenizer(t)
return dst
}
const hashTokenizerBucketsCount = 1024
type hashTokenizer struct {
buckets [hashTokenizerBucketsCount]hashTokenizerBucket
bm bitmap
}
type hashTokenizerBucket struct {
v uint64
overflow []uint64
}
func (b *hashTokenizerBucket) reset() {
// do not spend CPU time on clearing v and b.overflow items,
// since they'll be overwritten with new items.
b.overflow = b.overflow[:0]
}
func newHashTokenizer() *hashTokenizer {
var t hashTokenizer
t.bm.init(len(t.buckets))
return &t
}
func (t *hashTokenizer) reset() {
if t.bm.onesCount() <= len(t.buckets)/4 {
t.bm.forEachSetBit(func(idx int) bool {
t.buckets[idx].reset()
return false
})
} else {
buckets := t.buckets[:]
for i := range buckets {
buckets[i].reset()
}
t.bm.init(len(t.buckets))
}
}
func (t *hashTokenizer) tokenizeString(dst []uint64, s string) []uint64 {
if !isASCII(s) {
// Slow path - s contains unicode chars
return t.tokenizeStringUnicode(dst, s)
}
// Fast path for ASCII s
i := 0
for i < len(s) {
// Search for the next token.
start := len(s)
for i < len(s) {
if !isTokenChar(s[i]) {
i++
continue
}
start = i
i++
break
}
// Search for the end of the token.
end := len(s)
for i < len(s) {
if isTokenChar(s[i]) {
i++
continue
}
end = i
i++
break
}
if end <= start {
break
}
// Register the token.
token := s[start:end]
if h, ok := t.addToken(token); ok {
dst = append(dst, h)
}
}
return dst
}
func (t *hashTokenizer) tokenizeStringUnicode(dst []uint64, s string) []uint64 {
for len(s) > 0 {
// Search for the next token.
n := len(s)
for offset, r := range s {
if isTokenRune(r) {
n = offset
break
}
}
s = s[n:]
// Search for the end of the token.
n = len(s)
for offset, r := range s {
if !isTokenRune(r) {
n = offset
break
}
}
if n == 0 {
break
}
// Register the token
token := s[:n]
s = s[n:]
if h, ok := t.addToken(token); ok {
dst = append(dst, h)
}
}
return dst
}
func (t *hashTokenizer) addToken(token string) (uint64, bool) {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(token))
idx := int(h % uint64(len(t.buckets)))
b := &t.buckets[idx]
if !t.bm.isSetBit(idx) {
b.v = h
t.bm.setBit(idx)
return h, true
}
if b.v == h {
return h, false
}
for _, v := range b.overflow {
if v == h {
return h, false
}
}
b.overflow = append(b.overflow, h)
return h, true
}
func getHashTokenizer() *hashTokenizer {
v := hashTokenizerPool.Get()
if v == nil {
return newHashTokenizer()
}
return v.(*hashTokenizer)
}
func putHashTokenizer(t *hashTokenizer) {
t.reset()
hashTokenizerPool.Put(t)
}
var hashTokenizerPool sync.Pool

View file

@ -0,0 +1,24 @@
package logstorage
import (
"reflect"
"testing"
)
func TestTokenizeHashes(t *testing.T) {
f := func(a []string, hashesExpected []uint64) {
t.Helper()
hashes := tokenizeHashes(nil, a)
if !reflect.DeepEqual(hashes, hashesExpected) {
t.Fatalf("unexpected hashes\ngot\n%X\nwant\n%X", hashes, hashesExpected)
}
}
f(nil, nil)
f([]string{""}, nil)
f([]string{"foo"}, []uint64{0x33BF00A859C4BA3F})
f([]string{"foo foo", "!!foo //"}, []uint64{0x33BF00A859C4BA3F})
f([]string{"foo bar---.!!([baz]!!! %$# TaSte"}, []uint64{0x33BF00A859C4BA3F, 0x48A37C90AD27A659, 0x42598CF26A247404, 0x34709F40A3286E46})
f([]string{"foo bar---.!!([baz]!!! %$# baz foo TaSte"}, []uint64{0x33BF00A859C4BA3F, 0x48A37C90AD27A659, 0x42598CF26A247404, 0x34709F40A3286E46})
f([]string{"теСТ 1234 f12.34", "34 f12 AS"}, []uint64{0xFE846FA145CEABD1, 0xD8316E61D84F6BA4, 0x6D67BA71C4E03D10, 0x5E8D522CA93563ED, 0xED80AED10E029FC8})
}

View file

@ -0,0 +1,19 @@
package logstorage
import (
"strings"
"testing"
)
func BenchmarkTokenizeHashes(b *testing.B) {
a := strings.Split(benchLogs, "\n")
b.ReportAllocs()
b.SetBytes(int64(len(benchLogs)))
b.RunParallel(func(pb *testing.PB) {
var hashes []uint64
for pb.Next() {
hashes = tokenizeHashes(hashes[:0], a)
}
})
}

View file

@ -110,25 +110,36 @@ func (ih *indexBlockHeader) unmarshal(src []byte) ([]byte, error) {
return src[32:], nil return src[32:], nil
} }
// mustWriteIndexBlockHeaders writes metaindexData to w.
func mustWriteIndexBlockHeaders(w *writerWithStats, metaindexData []byte) {
bb := longTermBufPool.Get()
bb.B = encoding.CompressZSTDLevel(bb.B[:0], metaindexData, 1)
w.MustWrite(bb.B)
if len(bb.B) < 1024*1024 {
longTermBufPool.Put(bb)
}
}
// mustReadIndexBlockHeaders reads indexBlockHeader entries from r, appends them to dst and returns the result. // mustReadIndexBlockHeaders reads indexBlockHeader entries from r, appends them to dst and returns the result.
func mustReadIndexBlockHeaders(dst []indexBlockHeader, r *readerWithStats) []indexBlockHeader { func mustReadIndexBlockHeaders(dst []indexBlockHeader, r *readerWithStats) []indexBlockHeader {
data, err := io.ReadAll(r) data, err := io.ReadAll(r)
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot read indexBlockHeader entries from %s: %s", r.Path(), err) logger.Panicf("FATAL: %s: cannot read indexBlockHeader entries: %s", r.Path(), err)
} }
bb := longTermBufPool.Get() bb := longTermBufPool.Get()
bb.B, err = encoding.DecompressZSTD(bb.B[:0], data) bb.B, err = encoding.DecompressZSTD(bb.B[:0], data)
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot decompress indexBlockHeader entries from %s: %s", r.Path(), err) logger.Panicf("FATAL: %s: cannot decompress indexBlockHeader entries: %s", r.Path(), err)
} }
dst, err = unmarshalIndexBlockHeaders(dst, bb.B) dst, err = unmarshalIndexBlockHeaders(dst, bb.B)
if len(bb.B) < 1024*1024 { if len(bb.B) < 1024*1024 {
longTermBufPool.Put(bb) longTermBufPool.Put(bb)
} }
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot parse indexBlockHeader entries from %s: %s", r.Path(), err) logger.Panicf("FATAL: %s: cannot parse indexBlockHeader entries: %s", r.Path(), err)
} }
return dst return dst
} }

View file

@ -14,38 +14,62 @@ type inmemoryPart struct {
// ph contains partHeader information for the given in-memory part. // ph contains partHeader information for the given in-memory part.
ph partHeader ph partHeader
columnNames bytesutil.ByteBuffer
metaindex bytesutil.ByteBuffer metaindex bytesutil.ByteBuffer
index bytesutil.ByteBuffer index bytesutil.ByteBuffer
columnsHeaderIndex bytesutil.ByteBuffer
columnsHeader bytesutil.ByteBuffer columnsHeader bytesutil.ByteBuffer
timestamps bytesutil.ByteBuffer timestamps bytesutil.ByteBuffer
fieldValues bytesutil.ByteBuffer
fieldBloomFilter bytesutil.ByteBuffer messageBloomValues bloomValuesBuffer
messageValues bytesutil.ByteBuffer bloomValuesShards [bloomValuesShardsCount]bloomValuesBuffer
messageBloomFilter bytesutil.ByteBuffer }
type bloomValuesBuffer struct {
bloom bytesutil.ByteBuffer
values bytesutil.ByteBuffer
}
func (b *bloomValuesBuffer) reset() {
b.bloom.Reset()
b.values.Reset()
}
func (b *bloomValuesBuffer) NewStreamReader() bloomValuesStreamReader {
return bloomValuesStreamReader{
bloom: b.bloom.NewReader(),
values: b.values.NewReader(),
}
}
func (b *bloomValuesBuffer) NewStreamWriter() bloomValuesStreamWriter {
return bloomValuesStreamWriter{
bloom: &b.bloom,
values: &b.values,
}
} }
// reset resets mp, so it can be re-used // reset resets mp, so it can be re-used
func (mp *inmemoryPart) reset() { func (mp *inmemoryPart) reset() {
mp.ph.reset() mp.ph.reset()
mp.columnNames.Reset()
mp.metaindex.Reset() mp.metaindex.Reset()
mp.index.Reset() mp.index.Reset()
mp.columnsHeaderIndex.Reset()
mp.columnsHeader.Reset() mp.columnsHeader.Reset()
mp.timestamps.Reset() mp.timestamps.Reset()
mp.fieldValues.Reset()
mp.fieldBloomFilter.Reset() mp.messageBloomValues.reset()
mp.messageValues.Reset() for i := range mp.bloomValuesShards[:] {
mp.messageBloomFilter.Reset() mp.bloomValuesShards[i].reset()
}
} }
// mustInitFromRows initializes mp from lr. // mustInitFromRows initializes mp from lr.
func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) { func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
mp.reset() mp.reset()
if len(lr.timestamps) == 0 {
return
}
sort.Sort(lr) sort.Sort(lr)
bsw := getBlockStreamWriter() bsw := getBlockStreamWriter()
@ -75,6 +99,7 @@ func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
} }
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows) bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
putTmpRows(trs) putTmpRows(trs)
bsw.Finalize(&mp.ph) bsw.Finalize(&mp.ph)
putBlockStreamWriter(bsw) putBlockStreamWriter(bsw)
} }
@ -83,23 +108,34 @@ func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
func (mp *inmemoryPart) MustStoreToDisk(path string) { func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path) fs.MustMkdirFailIfExist(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename) metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename) indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename) timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename) messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename) messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
fs.MustWriteSync(columnNamesPath, mp.columnNames.B)
fs.MustWriteSync(metaindexPath, mp.metaindex.B) fs.MustWriteSync(metaindexPath, mp.metaindex.B)
fs.MustWriteSync(indexPath, mp.index.B) fs.MustWriteSync(indexPath, mp.index.B)
fs.MustWriteSync(columnsHeaderIndexPath, mp.columnsHeaderIndex.B)
fs.MustWriteSync(columnsHeaderPath, mp.columnsHeader.B) fs.MustWriteSync(columnsHeaderPath, mp.columnsHeader.B)
fs.MustWriteSync(timestampsPath, mp.timestamps.B) fs.MustWriteSync(timestampsPath, mp.timestamps.B)
fs.MustWriteSync(fieldValuesPath, mp.fieldValues.B)
fs.MustWriteSync(fieldBloomFilterPath, mp.fieldBloomFilter.B) fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B)
fs.MustWriteSync(messageValuesPath, mp.messageValues.B) fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B)
fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomFilter.B)
for i := range mp.bloomValuesShards[:] {
shard := &mp.bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
fs.MustWriteSync(bloomPath, shard.bloom.B)
valuesPath := getValuesFilePath(path, uint64(i))
fs.MustWriteSync(valuesPath, shard.values.B)
}
mp.ph.mustWriteMetadata(path) mp.ph.mustWriteMetadata(path)

View file

@ -75,7 +75,7 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) {
f(GetLogRows(nil, nil), 0, 0) f(GetLogRows(nil, nil), 0, 0)
// Check how inmemoryPart works with a single stream // Check how inmemoryPart works with a single stream
f(newTestLogRows(1, 1, 0), 1, 0.8) f(newTestLogRows(1, 1, 0), 1, 0.7)
f(newTestLogRows(1, 2, 0), 1, 0.9) f(newTestLogRows(1, 2, 0), 1, 0.9)
f(newTestLogRows(1, 10, 0), 1, 2.0) f(newTestLogRows(1, 10, 0), 1, 2.0)
f(newTestLogRows(1, 1000, 0), 1, 7.1) f(newTestLogRows(1, 1000, 0), 1, 7.1)
@ -83,9 +83,9 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) {
// Check how inmemoryPart works with multiple streams // Check how inmemoryPart works with multiple streams
f(newTestLogRows(2, 1, 0), 2, 0.8) f(newTestLogRows(2, 1, 0), 2, 0.8)
f(newTestLogRows(10, 1, 0), 10, 0.9) f(newTestLogRows(10, 1, 0), 10, 1.1)
f(newTestLogRows(100, 1, 0), 100, 1.0) f(newTestLogRows(100, 1, 0), 100, 1.2)
f(newTestLogRows(10, 5, 0), 10, 1.4) f(newTestLogRows(10, 5, 0), 10, 1.5)
f(newTestLogRows(10, 1000, 0), 10, 7.2) f(newTestLogRows(10, 1000, 0), 10, 7.2)
f(newTestLogRows(100, 100, 0), 100, 5.0) f(newTestLogRows(100, 100, 0), 100, 5.0)
} }
@ -192,14 +192,14 @@ func TestInmemoryPartInitFromBlockStreamReaders(t *testing.T) {
f([]*LogRows{GetLogRows(nil, nil), GetLogRows(nil, nil)}, 0, 0) f([]*LogRows{GetLogRows(nil, nil), GetLogRows(nil, nil)}, 0, 0)
// Check merge with a single reader // Check merge with a single reader
f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.8) f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.7)
f([]*LogRows{newTestLogRows(1, 10, 0)}, 1, 2.0) f([]*LogRows{newTestLogRows(1, 10, 0)}, 1, 2.0)
f([]*LogRows{newTestLogRows(1, 100, 0)}, 1, 4.9) f([]*LogRows{newTestLogRows(1, 100, 0)}, 1, 4.9)
f([]*LogRows{newTestLogRows(1, 1000, 0)}, 1, 7.1) f([]*LogRows{newTestLogRows(1, 1000, 0)}, 1, 7.1)
f([]*LogRows{newTestLogRows(1, 10000, 0)}, 1, 7.4) f([]*LogRows{newTestLogRows(1, 10000, 0)}, 1, 7.4)
f([]*LogRows{newTestLogRows(10, 1, 0)}, 10, 0.9) f([]*LogRows{newTestLogRows(10, 1, 0)}, 10, 1.1)
f([]*LogRows{newTestLogRows(100, 1, 0)}, 100, 1.0) f([]*LogRows{newTestLogRows(100, 1, 0)}, 100, 1.3)
f([]*LogRows{newTestLogRows(1000, 1, 0)}, 1000, 1.0) f([]*LogRows{newTestLogRows(1000, 1, 0)}, 1000, 1.2)
f([]*LogRows{newTestLogRows(10, 10, 0)}, 10, 2.1) f([]*LogRows{newTestLogRows(10, 10, 0)}, 10, 2.1)
f([]*LogRows{newTestLogRows(10, 100, 0)}, 10, 4.9) f([]*LogRows{newTestLogRows(10, 100, 0)}, 10, 4.9)

View file

@ -1,8 +1,12 @@
package logstorage package logstorage
import ( import (
"fmt"
"path/filepath" "path/filepath"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
) )
@ -19,16 +23,35 @@ type part struct {
// ph contains partHeader for the given part. // ph contains partHeader for the given part.
ph partHeader ph partHeader
// columnNameIDs is a mapping from column names seen in the given part to internal IDs.
// The internal IDs are used in columnHeaderRef.
columnNameIDs map[string]uint64
// columnNames is a mapping from internal IDs to column names.
// The internal IDs are used in columnHeaderRef.
columnNames []string
// indexBlockHeaders contains a list of indexBlockHeader entries for the given part. // indexBlockHeaders contains a list of indexBlockHeader entries for the given part.
indexBlockHeaders []indexBlockHeader indexBlockHeaders []indexBlockHeader
indexFile fs.MustReadAtCloser indexFile fs.MustReadAtCloser
columnsHeaderIndexFile fs.MustReadAtCloser
columnsHeaderFile fs.MustReadAtCloser columnsHeaderFile fs.MustReadAtCloser
timestampsFile fs.MustReadAtCloser timestampsFile fs.MustReadAtCloser
fieldValuesFile fs.MustReadAtCloser
fieldBloomFilterFile fs.MustReadAtCloser messageBloomValues bloomValuesReaderAt
messageValuesFile fs.MustReadAtCloser oldBloomValues bloomValuesReaderAt
messageBloomFilterFile fs.MustReadAtCloser bloomValuesShards [bloomValuesShardsCount]bloomValuesReaderAt
}
type bloomValuesReaderAt struct {
bloom fs.MustReadAtCloser
values fs.MustReadAtCloser
}
func (r *bloomValuesReaderAt) MustClose() {
r.bloom.MustClose()
r.values.MustClose()
} }
func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part { func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
@ -37,6 +60,10 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
p.path = "" p.path = ""
p.ph = mp.ph p.ph = mp.ph
// Read columnNames
columnNamesReader := mp.columnNames.NewReader()
p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader)
// Read metaindex // Read metaindex
metaindexReader := mp.metaindex.NewReader() metaindexReader := mp.metaindex.NewReader()
var mrs readerWithStats var mrs readerWithStats
@ -45,12 +72,19 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part {
// Open data files // Open data files
p.indexFile = &mp.index p.indexFile = &mp.index
p.columnsHeaderIndexFile = &mp.columnsHeaderIndex
p.columnsHeaderFile = &mp.columnsHeader p.columnsHeaderFile = &mp.columnsHeader
p.timestampsFile = &mp.timestamps p.timestampsFile = &mp.timestamps
p.fieldValuesFile = &mp.fieldValues
p.fieldBloomFilterFile = &mp.fieldBloomFilter p.messageBloomValues.bloom = &mp.messageBloomValues.bloom
p.messageValuesFile = &mp.messageValues p.messageBloomValues.values = &mp.messageBloomValues.values
p.messageBloomFilterFile = &mp.messageBloomFilter
// Open files with bloom filters and column values
for i := range p.bloomValuesShards[:] {
shard := &p.bloomValuesShards[i]
shard.bloom = &mp.bloomValuesShards[i].bloom
shard.values = &mp.bloomValuesShards[i].values
}
return &p return &p
} }
@ -61,14 +95,21 @@ func mustOpenFilePart(pt *partition, path string) *part {
p.path = path p.path = path
p.ph.mustReadMetadata(path) p.ph.mustReadMetadata(path)
columnNamesPath := filepath.Join(path, columnNamesFilename)
metaindexPath := filepath.Join(path, metaindexFilename) metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename) indexPath := filepath.Join(path, indexFilename)
columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename) timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename) // Read columnNames
messageValuesPath := filepath.Join(path, messageValuesFilename) if p.ph.FormatVersion >= 1 {
messageBloomFilterPath := filepath.Join(path, messageBloomFilename) columnNamesReader := filestream.MustOpen(columnNamesPath, true)
var crs readerWithStats
crs.init(columnNamesReader)
p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader)
crs.MustClose()
}
// Read metaindex // Read metaindex
metaindexReader := filestream.MustOpen(metaindexPath, true) metaindexReader := filestream.MustOpen(metaindexPath, true)
@ -79,24 +120,78 @@ func mustOpenFilePart(pt *partition, path string) *part {
// Open data files // Open data files
p.indexFile = fs.MustOpenReaderAt(indexPath) p.indexFile = fs.MustOpenReaderAt(indexPath)
if p.ph.FormatVersion >= 1 {
p.columnsHeaderIndexFile = fs.MustOpenReaderAt(columnsHeaderIndexPath)
}
p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath) p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath)
p.timestampsFile = fs.MustOpenReaderAt(timestampsPath) p.timestampsFile = fs.MustOpenReaderAt(timestampsPath)
p.fieldValuesFile = fs.MustOpenReaderAt(fieldValuesPath)
p.fieldBloomFilterFile = fs.MustOpenReaderAt(fieldBloomFilterPath) // Open files with bloom filters and column values
p.messageValuesFile = fs.MustOpenReaderAt(messageValuesPath) messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
p.messageBloomFilterFile = fs.MustOpenReaderAt(messageBloomFilterPath) p.messageBloomValues.bloom = fs.MustOpenReaderAt(messageBloomFilterPath)
messageValuesPath := filepath.Join(path, messageValuesFilename)
p.messageBloomValues.values = fs.MustOpenReaderAt(messageValuesPath)
if p.ph.FormatVersion < 1 {
bloomPath := filepath.Join(path, oldBloomFilename)
p.oldBloomValues.bloom = fs.MustOpenReaderAt(bloomPath)
valuesPath := filepath.Join(path, oldValuesFilename)
p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath)
} else {
for i := range p.bloomValuesShards[:] {
shard := &p.bloomValuesShards[i]
bloomPath := getBloomFilePath(path, uint64(i))
shard.bloom = fs.MustOpenReaderAt(bloomPath)
valuesPath := getValuesFilePath(path, uint64(i))
shard.values = fs.MustOpenReaderAt(valuesPath)
}
}
return &p return &p
} }
func mustClosePart(p *part) { func mustClosePart(p *part) {
p.indexFile.MustClose() p.indexFile.MustClose()
if p.ph.FormatVersion >= 1 {
p.columnsHeaderIndexFile.MustClose()
}
p.columnsHeaderFile.MustClose() p.columnsHeaderFile.MustClose()
p.timestampsFile.MustClose() p.timestampsFile.MustClose()
p.fieldValuesFile.MustClose() p.messageBloomValues.MustClose()
p.fieldBloomFilterFile.MustClose()
p.messageValuesFile.MustClose() if p.ph.FormatVersion < 1 {
p.messageBloomFilterFile.MustClose() p.oldBloomValues.MustClose()
} else {
for i := range p.bloomValuesShards[:] {
p.bloomValuesShards[i].MustClose()
}
}
p.pt = nil p.pt = nil
} }
func (p *part) getBloomValuesFileForColumnName(name string) *bloomValuesReaderAt {
if name == "" {
return &p.messageBloomValues
}
if p.ph.FormatVersion < 1 {
return &p.oldBloomValues
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name))
idx := h % uint64(len(p.bloomValuesShards))
return &p.bloomValuesShards[idx]
}
func getBloomFilePath(partPath string, shardNum uint64) string {
return filepath.Join(partPath, bloomFilename) + fmt.Sprintf("%d", shardNum)
}
func getValuesFilePath(partPath string, shardNum uint64) string {
return filepath.Join(partPath, valuesFilename) + fmt.Sprintf("%d", shardNum)
}

View file

@ -14,6 +14,9 @@ import (
// partHeader contains the information about a single part // partHeader contains the information about a single part
type partHeader struct { type partHeader struct {
// FormatVersion is the version of the part format
FormatVersion uint
// CompressedSizeBytes is physical size of the part // CompressedSizeBytes is physical size of the part
CompressedSizeBytes uint64 CompressedSizeBytes uint64
@ -35,6 +38,7 @@ type partHeader struct {
// reset resets ph for subsequent re-use // reset resets ph for subsequent re-use
func (ph *partHeader) reset() { func (ph *partHeader) reset() {
ph.FormatVersion = 0
ph.CompressedSizeBytes = 0 ph.CompressedSizeBytes = 0
ph.UncompressedSizeBytes = 0 ph.UncompressedSizeBytes = 0
ph.RowsCount = 0 ph.RowsCount = 0
@ -45,8 +49,8 @@ func (ph *partHeader) reset() {
// String returns string represenation for ph. // String returns string represenation for ph.
func (ph *partHeader) String() string { func (ph *partHeader) String() string {
return fmt.Sprintf("{CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}", return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}",
ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp)) ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp))
} }
func (ph *partHeader) mustReadMetadata(partPath string) { func (ph *partHeader) mustReadMetadata(partPath string) {
@ -62,9 +66,15 @@ func (ph *partHeader) mustReadMetadata(partPath string) {
} }
// Perform various checks // Perform various checks
if ph.FormatVersion > partFormatLatestVersion {
logger.Panicf("FATAL: unsupported part format version; got %d; mustn't exceed %d", partFormatLatestVersion)
}
if ph.MinTimestamp > ph.MaxTimestamp { if ph.MinTimestamp > ph.MaxTimestamp {
logger.Panicf("FATAL: MinTimestamp cannot exceed MaxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp) logger.Panicf("FATAL: MinTimestamp cannot exceed MaxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp)
} }
if ph.BlocksCount > ph.RowsCount {
logger.Panicf("FATAL: BlocksCount=%d cannot exceed RowsCount=%d", ph.BlocksCount, ph.RowsCount)
}
} }
func (ph *partHeader) mustWriteMetadata(partPath string) { func (ph *partHeader) mustWriteMetadata(partPath string) {

View file

@ -30,30 +30,34 @@ func (f *Field) String() string {
return string(x) return string(x)
} }
func (f *Field) marshal(dst []byte) []byte { func (f *Field) marshal(dst []byte, marshalFieldName bool) []byte {
if marshalFieldName {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name)) dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name))
}
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Value)) dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Value))
return dst return dst
} }
func (f *Field) unmarshalNoArena(src []byte) ([]byte, error) { func (f *Field) unmarshalNoArena(src []byte, unmarshalFieldName bool) ([]byte, error) {
srcOrig := src srcOrig := src
// Unmarshal field name // Unmarshal field name
b, nSize := encoding.UnmarshalBytes(src) if unmarshalFieldName {
name, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 { if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal field name") return srcOrig, fmt.Errorf("cannot unmarshal field name")
} }
src = src[nSize:] src = src[nSize:]
f.Name = bytesutil.ToUnsafeString(b) f.Name = bytesutil.ToUnsafeString(name)
}
// Unmarshal field value // Unmarshal field value
b, nSize = encoding.UnmarshalBytes(src) value, nSize := encoding.UnmarshalBytes(src)
if nSize <= 0 { if nSize <= 0 {
return srcOrig, fmt.Errorf("cannot unmarshal field value") return srcOrig, fmt.Errorf("cannot unmarshal field value")
} }
src = src[nSize:] src = src[nSize:]
f.Value = bytesutil.ToUnsafeString(b) f.Value = bytesutil.ToUnsafeString(value)
return src, nil return src, nil
} }

View file

@ -8,7 +8,7 @@ import (
// tokenizeStrings extracts word tokens from a, appends them to dst and returns the result. // tokenizeStrings extracts word tokens from a, appends them to dst and returns the result.
// //
// the order of returned tokens is unspecified. // The order of returned tokens equals the order of tokens seen in a.
func tokenizeStrings(dst, a []string) []string { func tokenizeStrings(dst, a []string) []string {
t := getTokenizer() t := getTokenizer()
for i, s := range a { for i, s := range a {
@ -145,27 +145,3 @@ func putTokenizer(t *tokenizer) {
} }
var tokenizerPool sync.Pool var tokenizerPool sync.Pool
type tokensBuf struct {
A []string
}
func (tb *tokensBuf) reset() {
clear(tb.A)
tb.A = tb.A[:0]
}
func getTokensBuf() *tokensBuf {
v := tokensBufPool.Get()
if v == nil {
return &tokensBuf{}
}
return v.(*tokensBuf)
}
func putTokensBuf(tb *tokensBuf) {
tb.reset()
tokensBufPool.Put(tb)
}
var tokensBufPool sync.Pool