From 202eb429a7d0022def274d3ff905c8c4012bbbca Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 16 Oct 2024 16:18:28 +0200 Subject: [PATCH] lib/logstorage: refactor storage format to be more efficient for querying wide events It has been appeared that VictoriaLogs is frequently used for collecting logs with tens of fields. For example, standard Kuberntes setup on top of Filebeat generates more than 20 fields per each log. Such logs are also known as "wide events". The previous storage format was optimized for logs with a few fields. When at least a single field was referenced in the query, then the all the meta-information about all the log fields was unpacked and parsed per each scanned block during the query. This could require a lot of additional disk IO and CPU time when logs contain many fields. Resolve this issue by providing an (field -> metainfo_offset) index per each field in every data block. This index allows reading and extracting only the needed metainfo for fields used in the query. This index is stored in columnsHeaderIndexFilename ( columns_header_index.bin ). This allows increasing performance for queries over wide events by 10x and more. Another issue was that the data for bloom filters and field values across all the log fields except of _msg was intermixed in two files - fieldBloomFilename ( field_bloom.bin ) and fieldValuesFilename ( field_values.bin ). This could result in huge disk read IO overhead when some small field was referred in the query, since the Operating System usually reads more data than requested. It reads the data from disk in at least 4KiB blocks (usually the block size is much bigger in the range 64KiB - 512KiB). So, if 512-byte bloom filter or values' block is read from the file, then the Operating System reads up to 512KiB of data from disk, which results in 1000x disk read IO overhead. This overhead isn't visible for recently accessed data, since this data is usually stored in RAM (aka Operating System page cache), but this overhead may become very annoying when performing the query over large volumes of data which isn't present in OS page cache. The solution for this issue is to split bloom filters and field values across multiple shards. This reduces the worst-case disk read IO overhead by at least Nx where N is the number of shards, while the disk read IO overhead is completely removed in best case when the number of columns doesn't exceed N. Currently the number of shards is 8 - see bloomValuesShardsCount . This solution increases performance for queries over large volumes of newly ingested data by up to 1000x. The new storage format is versioned as v1, while the old storage format is version as v0. It is stored in the partHeader.FormatVersion. Parts with the old storage format are converted into parts with the new storage format during background merge. It is possible to force merge by querying /internal/force_merge HTTP endpoint - see https://docs.victoriametrics.com/victorialogs/#forced-merge . --- docs/VictoriaLogs/CHANGELOG.md | 3 +- lib/logstorage/bitmap.go | 7 + lib/logstorage/bitmap_test.go | 18 +- lib/logstorage/block.go | 42 +-- lib/logstorage/block_data.go | 94 +++--- lib/logstorage/block_header.go | 290 +++++++++++++++++-- lib/logstorage/block_header_test.go | 183 ++++++++++-- lib/logstorage/block_search.go | 259 ++++++++++++++--- lib/logstorage/block_stream_reader.go | 211 +++++++++++--- lib/logstorage/block_stream_writer.go | 168 ++++++++--- lib/logstorage/bloomfilter.go | 74 ++++- lib/logstorage/bloomfilter_test.go | 44 ++- lib/logstorage/column_names.go | 127 ++++++++ lib/logstorage/column_names_test.go | 54 ++++ lib/logstorage/consts.go | 13 + lib/logstorage/filenames.go | 20 +- lib/logstorage/hash_tokenizer.go | 180 ++++++++++++ lib/logstorage/hash_tokenizer_test.go | 24 ++ lib/logstorage/hash_tokenizer_timing_test.go | 19 ++ lib/logstorage/index_block_header.go | 17 +- lib/logstorage/inmemory_part.go | 72 +++-- lib/logstorage/inmemory_part_test.go | 16 +- lib/logstorage/part.go | 135 +++++++-- lib/logstorage/part_header.go | 14 +- lib/logstorage/rows.go | 24 +- lib/logstorage/tokenizer.go | 26 +- 26 files changed, 1766 insertions(+), 368 deletions(-) create mode 100644 lib/logstorage/column_names.go create mode 100644 lib/logstorage/column_names_test.go create mode 100644 lib/logstorage/hash_tokenizer.go create mode 100644 lib/logstorage/hash_tokenizer_test.go create mode 100644 lib/logstorage/hash_tokenizer_timing_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index df9a49666..193f446ff 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,8 +15,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: optimize [LogsQL queries](https://docs.victoriametrics.com/victorialogs/logsql/), which need to scan big number of logs with big number of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`). The performance for such queries is improved by 10x and more depending on the number of log fields in the scanned logs. The performance improvement is visible when querying logs ingested after the upgrade to this release. * FEATURE: add support for forced merge. See [these docs](https://docs.victoriametrics.com/victorialogs/#forced-merge). -* FEATURE: skip empty log fields in query results, since they are treated as non-existing fields in [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +* FEATURE: skip empty log fields in query results, since they are treated as non-existing fields in [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). This should reduce the level of confusion for end users when they see empty log fields. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to cancel running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7097). * BUGFIX: avoid possible panic when logs for a new day are ingested during execution of concurrent queries. diff --git a/lib/logstorage/bitmap.go b/lib/logstorage/bitmap.go index 350ea0cc8..39fae9b5c 100644 --- a/lib/logstorage/bitmap.go +++ b/lib/logstorage/bitmap.go @@ -120,6 +120,13 @@ func (bm *bitmap) or(x *bitmap) { } } +func (bm *bitmap) setBit(i int) { + wordIdx := uint(i) / 64 + wordOffset := uint(i) % 64 + wordPtr := &bm.a[wordIdx] + *wordPtr |= (1 << wordOffset) +} + func (bm *bitmap) isSetBit(i int) bool { wordIdx := uint(i) / 64 wordOffset := uint(i) % 64 diff --git a/lib/logstorage/bitmap_test.go b/lib/logstorage/bitmap_test.go index 41cce6974..8304b427e 100644 --- a/lib/logstorage/bitmap_test.go +++ b/lib/logstorage/bitmap_test.go @@ -27,7 +27,6 @@ func TestBitmap(t *testing.T) { } bm.setBits() - if n := bm.onesCount(); n != i { t.Fatalf("unexpected number of set bits; got %d; want %d", n, i) } @@ -106,6 +105,23 @@ func TestBitmap(t *testing.T) { t.Fatalf("unexpected non-zero number of set bits remained: %d", bitsCount) } + // Set bits via setBit() call + for i := 0; i < bitsLen; i++ { + if n := bm.onesCount(); n != i { + t.Fatalf("unexpected number of ones set; got %d; want %d", n, i) + } + if bm.isSetBit(i) { + t.Fatalf("the bit %d mustn't be set", i) + } + bm.setBit(i) + if !bm.isSetBit(i) { + t.Fatalf("the bit %d must be set", i) + } + if n := bm.onesCount(); n != i+1 { + t.Fatalf("unexpected number of ones set; got %d; want %d", n, i+1) + } + } + putBitmap(bm) } } diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 0d1f9026a..3670991d1 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -150,18 +150,13 @@ func (c *column) resizeValues(valuesLen int) []string { // mustWriteTo writes c to sw and updates ch accordingly. // // ch is valid until c is changed. -func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { +func (c *column) mustWriteTo(ch *columnHeader, sw *streamWriters) { ch.reset() - valuesWriter := &sw.fieldValuesWriter - bloomFilterWriter := &sw.fieldBloomFilterWriter - if c.name == "" { - valuesWriter = &sw.messageValuesWriter - bloomFilterWriter = &sw.messageBloomFilterWriter - } - ch.name = c.name + bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name) + // encode values ve := getValuesEncoder() ch.valueType, ch.minValue, ch.maxValue = ve.encode(c.values, &ch.valuesDict) @@ -176,15 +171,15 @@ func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { if ch.valuesSize > maxValuesBlockSize { logger.Panicf("BUG: too valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize) } - ch.valuesOffset = valuesWriter.bytesWritten - valuesWriter.MustWrite(bb.B) + ch.valuesOffset = bloomValuesWriter.values.bytesWritten + bloomValuesWriter.values.MustWrite(bb.B) // create and marshal bloom filter for c.values if ch.valueType != valueTypeDict { - tokensBuf := getTokensBuf() - tokensBuf.A = tokenizeStrings(tokensBuf.A[:0], c.values) - bb.B = bloomFilterMarshal(bb.B[:0], tokensBuf.A) - putTokensBuf(tokensBuf) + hashesBuf := encoding.GetUint64s(0) + hashesBuf.A = tokenizeHashes(hashesBuf.A[:0], c.values) + bb.B = bloomFilterMarshalHashes(bb.B[:0], hashesBuf.A) + encoding.PutUint64s(hashesBuf) } else { // there is no need in ecoding bloom filter for dictionary type, // since it isn't used during querying - all the dictionary values are available in ch.valuesDict @@ -194,8 +189,8 @@ func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { if ch.bloomFilterSize > maxBloomFilterBlockSize { logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize) } - ch.bloomFilterOffset = bloomFilterWriter.bytesWritten - bloomFilterWriter.MustWrite(bb.B) + ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten + bloomValuesWriter.bloom.MustWrite(bb.B) } func (b *block) assertValid() { @@ -436,7 +431,7 @@ func (b *block) InitFromBlockData(bd *blockData, sbu *stringsBlockUnmarshaler, v } // mustWriteTo writes b with the given sid to sw and updates bh accordingly. -func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { +func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { // Do not store the version used for encoding directly in the block data, since: // - all the blocks in the same part use the same encoding // - the block encoding version can be put in metadata file for the part (aka metadataFilename) @@ -458,22 +453,13 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { chs := csh.resizeColumnHeaders(len(cs)) for i := range cs { - cs[i].mustWriteToNoArena(&chs[i], sw) + cs[i].mustWriteTo(&chs[i], sw) } csh.constColumns = append(csh.constColumns[:0], b.constColumns...) - bb := longTermBufPool.Get() - bb.B = csh.marshal(bb.B) + csh.mustWriteTo(bh, sw, g) putColumnsHeader(csh) - - bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten - bh.columnsHeaderSize = uint64(len(bb.B)) - if bh.columnsHeaderSize > maxColumnsHeaderSize { - logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize) - } - sw.columnsHeaderWriter.MustWrite(bb.B) - longTermBufPool.Put(bb) } // appendRowsTo appends log entries from b to dst. diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index 0ad517c02..5c76ec3f0 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -93,7 +93,7 @@ func (bd *blockData) unmarshalRows(dst *rows, sbu *stringsBlockUnmarshaler, vd * } // mustWriteTo writes bd to sw and updates bh accordingly -func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { +func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { // Do not store the version used for encoding directly in the block data, since: // - all the blocks in the same part use the same encoding // - the block encoding version can be put in metadata file for the part (aka metadataFilename) @@ -114,28 +114,19 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { chs := csh.resizeColumnHeaders(len(cds)) for i := range cds { - cds[i].mustWriteToNoArena(&chs[i], sw) + cds[i].mustWriteTo(&chs[i], sw) } csh.constColumns = append(csh.constColumns[:0], bd.constColumns...) - bb := longTermBufPool.Get() - bb.B = csh.marshal(bb.B) + csh.mustWriteTo(bh, sw, g) putColumnsHeader(csh) - - bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten - bh.columnsHeaderSize = uint64(len(bb.B)) - if bh.columnsHeaderSize > maxColumnsHeaderSize { - logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize) - } - sw.columnsHeaderWriter.MustWrite(bb.B) - longTermBufPool.Put(bb) } // mustReadFrom reads block data associated with bh from sr to bd. // // The bd is valid until a.reset() is called. -func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) { +func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders, partFormatVersion uint, columnNames []string) { bd.reset() bd.streamID = bh.streamID @@ -159,19 +150,46 @@ func (bd *blockData) mustReadFrom(a *arena, bh *blockHeader, sr *streamReaders) sr.columnsHeaderReader.MustReadFull(bb.B) csh := getColumnsHeader() - if err := csh.unmarshalNoArena(bb.B); err != nil { + if err := csh.unmarshalNoArena(bb.B, partFormatVersion); err != nil { logger.Panicf("FATAL: %s: cannot unmarshal columnsHeader: %s", sr.columnsHeaderReader.Path(), err) } + if partFormatVersion >= 1 { + readColumnNamesFromColumnsHeaderIndex(bh, sr, csh, columnNames) + } + chs := csh.columnHeaders cds := bd.resizeColumnsData(len(chs)) for i := range chs { - cds[i].mustReadFrom(a, &chs[i], sr) + cds[i].mustReadFrom(a, &chs[i], sr, partFormatVersion) } bd.constColumns = appendFields(a, bd.constColumns[:0], csh.constColumns) putColumnsHeader(csh) longTermBufPool.Put(bb) } +func readColumnNamesFromColumnsHeaderIndex(bh *blockHeader, sr *streamReaders, csh *columnsHeader, columnNames []string) { + bb := longTermBufPool.Get() + defer longTermBufPool.Put(bb) + + n := bh.columnsHeaderIndexSize + if n > maxColumnsHeaderIndexSize { + logger.Panicf("BUG: %s: too big columnsHeaderIndexSize: %d bytes; mustn't exceed %d bytes", sr.columnsHeaderIndexReader.Path(), n, maxColumnsHeaderIndexSize) + } + + bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(n)) + sr.columnsHeaderIndexReader.MustReadFull(bb.B) + + cshIndex := getColumnsHeaderIndex() + if err := cshIndex.unmarshalNoArena(bb.B); err != nil { + logger.Panicf("FATAL: %s: cannot unmarshal columnsHeaderIndex: %s", sr.columnsHeaderIndexReader.Path(), err) + } + if err := csh.setColumnNames(cshIndex, columnNames); err != nil { + logger.Panicf("FATAL: %s: %s", sr.columnsHeaderIndexReader.Path(), err) + } + + putColumnsHeaderIndex(cshIndex) +} + // timestampsData contains the encoded timestamps data. type timestampsData struct { // data contains packed timestamps data. @@ -307,16 +325,9 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) { // mustWriteTo writes cd to sw and updates ch accordingly. // // ch is valid until cd is changed. -func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { +func (cd *columnData) mustWriteTo(ch *columnHeader, sw *streamWriters) { ch.reset() - valuesWriter := &sw.fieldValuesWriter - bloomFilterWriter := &sw.fieldBloomFilterWriter - if cd.name == "" { - valuesWriter = &sw.messageValuesWriter - bloomFilterWriter = &sw.messageBloomFilterWriter - } - ch.name = cd.name ch.valueType = cd.valueType @@ -324,36 +335,31 @@ func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { ch.maxValue = cd.maxValue ch.valuesDict.copyFromNoArena(&cd.valuesDict) + bloomValuesWriter := sw.getBloomValuesWriterForColumnName(ch.name) + // marshal values ch.valuesSize = uint64(len(cd.valuesData)) if ch.valuesSize > maxValuesBlockSize { logger.Panicf("BUG: too big valuesSize: %d bytes; mustn't exceed %d bytes", ch.valuesSize, maxValuesBlockSize) } - ch.valuesOffset = valuesWriter.bytesWritten - valuesWriter.MustWrite(cd.valuesData) + ch.valuesOffset = bloomValuesWriter.values.bytesWritten + bloomValuesWriter.values.MustWrite(cd.valuesData) // marshal bloom filter ch.bloomFilterSize = uint64(len(cd.bloomFilterData)) if ch.bloomFilterSize > maxBloomFilterBlockSize { logger.Panicf("BUG: too big bloomFilterSize: %d bytes; mustn't exceed %d bytes", ch.bloomFilterSize, maxBloomFilterBlockSize) } - ch.bloomFilterOffset = bloomFilterWriter.bytesWritten - bloomFilterWriter.MustWrite(cd.bloomFilterData) + ch.bloomFilterOffset = bloomValuesWriter.bloom.bytesWritten + bloomValuesWriter.bloom.MustWrite(cd.bloomFilterData) } // mustReadFrom reads columns data associated with ch from sr to cd. // // cd is valid until a.reset() is called. -func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders) { +func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders, partFormatVersion uint) { cd.reset() - valuesReader := &sr.fieldValuesReader - bloomFilterReader := &sr.fieldBloomFilterReader - if ch.name == "" { - valuesReader = &sr.messageValuesReader - bloomFilterReader = &sr.messageBloomFilterReader - } - cd.name = a.copyString(ch.name) cd.valueType = ch.valueType @@ -361,30 +367,32 @@ func (cd *columnData) mustReadFrom(a *arena, ch *columnHeader, sr *streamReaders cd.maxValue = ch.maxValue cd.valuesDict.copyFrom(a, &ch.valuesDict) + bloomValuesReader := sr.getBloomValuesReaderForColumnName(ch.name, partFormatVersion) + // read values - if ch.valuesOffset != valuesReader.bytesRead { + if ch.valuesOffset != bloomValuesReader.values.bytesRead { logger.Panicf("FATAL: %s: unexpected columnHeader.valuesOffset=%d; must equal to the number of bytes read: %d", - valuesReader.Path(), ch.valuesOffset, valuesReader.bytesRead) + bloomValuesReader.values.Path(), ch.valuesOffset, bloomValuesReader.values.bytesRead) } valuesSize := ch.valuesSize if valuesSize > maxValuesBlockSize { - logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", valuesReader.Path(), maxValuesBlockSize, valuesSize) + logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bloomValuesReader.values.Path(), maxValuesBlockSize, valuesSize) } cd.valuesData = a.newBytes(int(valuesSize)) - valuesReader.MustReadFull(cd.valuesData) + bloomValuesReader.values.MustReadFull(cd.valuesData) // read bloom filter // bloom filter is missing in valueTypeDict. if ch.valueType != valueTypeDict { - if ch.bloomFilterOffset != bloomFilterReader.bytesRead { + if ch.bloomFilterOffset != bloomValuesReader.bloom.bytesRead { logger.Panicf("FATAL: %s: unexpected columnHeader.bloomFilterOffset=%d; must equal to the number of bytes read: %d", - bloomFilterReader.Path(), ch.bloomFilterOffset, bloomFilterReader.bytesRead) + bloomValuesReader.bloom.Path(), ch.bloomFilterOffset, bloomValuesReader.bloom.bytesRead) } bloomFilterSize := ch.bloomFilterSize if bloomFilterSize > maxBloomFilterBlockSize { - logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomFilterReader.Path(), maxBloomFilterBlockSize, bloomFilterSize) + logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bloomValuesReader.bloom.Path(), maxBloomFilterBlockSize, bloomFilterSize) } cd.bloomFilterData = a.newBytes(int(bloomFilterSize)) - bloomFilterReader.MustReadFull(cd.bloomFilterData) + bloomValuesReader.bloom.MustReadFull(cd.bloomFilterData) } } diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index 96ae05282..03ab0ebb3 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -27,6 +27,12 @@ type blockHeader struct { // timestampsHeader contains information about timestamps for log entries in the block timestampsHeader timestampsHeader + // columnsHeaderIndexOffset is the offset of columnsHeaderIndex at columnsHeaderIndexFilename + columnsHeaderIndexOffset uint64 + + // columnsHeaderIndexSize is the size of columnsHeaderIndex at columnsHeaderIndexFilename + columnsHeaderIndexSize uint64 + // columnsHeaderOffset is the offset of columnsHeader at columnsHeaderFilename columnsHeaderOffset uint64 @@ -40,6 +46,8 @@ func (bh *blockHeader) reset() { bh.uncompressedSizeBytes = 0 bh.rowsCount = 0 bh.timestampsHeader.reset() + bh.columnsHeaderIndexOffset = 0 + bh.columnsHeaderIndexSize = 0 bh.columnsHeaderOffset = 0 bh.columnsHeaderSize = 0 } @@ -51,6 +59,8 @@ func (bh *blockHeader) copyFrom(src *blockHeader) { bh.uncompressedSizeBytes = src.uncompressedSizeBytes bh.rowsCount = src.rowsCount bh.timestampsHeader.copyFrom(&src.timestampsHeader) + bh.columnsHeaderIndexOffset = src.columnsHeaderIndexOffset + bh.columnsHeaderIndexSize = src.columnsHeaderIndexSize bh.columnsHeaderOffset = src.columnsHeaderOffset bh.columnsHeaderSize = src.columnsHeaderSize } @@ -59,12 +69,14 @@ func (bh *blockHeader) copyFrom(src *blockHeader) { func (bh *blockHeader) marshal(dst []byte) []byte { // Do not store the version used for encoding directly in the block header, since: // - all the block headers in the same part use the same encoding - // - the block header encoding version can be put in metadata file for the part (aka metadataFilename) + // - the format encoding version is stored in metadata file for the part (aka metadataFilename) dst = bh.streamID.marshal(dst) dst = encoding.MarshalVarUint64(dst, bh.uncompressedSizeBytes) dst = encoding.MarshalVarUint64(dst, bh.rowsCount) dst = bh.timestampsHeader.marshal(dst) + dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderIndexOffset) + dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderIndexSize) dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderOffset) dst = encoding.MarshalVarUint64(dst, bh.columnsHeaderSize) @@ -72,7 +84,7 @@ func (bh *blockHeader) marshal(dst []byte) []byte { } // unmarshal unmarshals bh from src and returns the remaining tail. -func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) { +func (bh *blockHeader) unmarshal(src []byte, partFormatVersion uint) ([]byte, error) { bh.reset() srcOrig := src @@ -110,6 +122,24 @@ func (bh *blockHeader) unmarshal(src []byte) ([]byte, error) { } src = tail + if partFormatVersion >= 1 { + // unmarshal columnsHeaderIndexOffset + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderIndexOffset") + } + src = src[nSize:] + bh.columnsHeaderIndexOffset = n + + // unmarshal columnsHeaderIndexSize + n, nSize = encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal columnsHeaderIndexSize") + } + src = src[nSize:] + bh.columnsHeaderIndexSize = n + } + // unmarshal columnsHeaderOffset n, nSize = encoding.UnmarshalVarUint64(src) if nSize <= 0 { @@ -148,8 +178,8 @@ func putBlockHeader(bh *blockHeader) { var blockHeaderPool sync.Pool // unmarshalBlockHeaders appends unmarshaled from src blockHeader entries to dst and returns the result. -func unmarshalBlockHeaders(dst []blockHeader, src []byte) ([]blockHeader, error) { - dstOrig := dst +func unmarshalBlockHeaders(dst []blockHeader, src []byte, partFormatVersion uint) ([]blockHeader, error) { + dstLen := len(dst) for len(src) > 0 { if len(dst) < cap(dst) { dst = dst[:len(dst)+1] @@ -157,14 +187,14 @@ func unmarshalBlockHeaders(dst []blockHeader, src []byte) ([]blockHeader, error) dst = append(dst, blockHeader{}) } bh := &dst[len(dst)-1] - tail, err := bh.unmarshal(src) + tail, err := bh.unmarshal(src, partFormatVersion) if err != nil { - return dstOrig, fmt.Errorf("cannot unmarshal blockHeader entries: %w", err) + return dst, fmt.Errorf("cannot unmarshal blockHeader entries: %w", err) } src = tail } - if err := validateBlockHeaders(dst[len(dstOrig):]); err != nil { - return dstOrig, err + if err := validateBlockHeaders(dst[dstLen:]); err != nil { + return dst, err } return dst, nil } @@ -195,6 +225,128 @@ func resetBlockHeaders(bhs []blockHeader) []blockHeader { return bhs[:0] } +// columnHeaderRef references column header in the marshaled columnsHeader. +type columnHeaderRef struct { + // columnNameID is the ID of the column name. The column name can be obtained from part.columnNames. + columnNameID uint64 + + // offset is the offset of the the corresponding columnHeader inside marshaled columnsHeader. + offset uint64 +} + +// columnsHeaderIndex contains offsets for marshaled column headers. +type columnsHeaderIndex struct { + // columnHeadersRefs contains references to columnHeaders. + columnHeadersRefs []columnHeaderRef + + // constColumnsRefs contains references to constColumns. + constColumnsRefs []columnHeaderRef +} + +func getColumnsHeaderIndex() *columnsHeaderIndex { + v := columnsHeaderIndexPool.Get() + if v == nil { + return &columnsHeaderIndex{} + } + return v.(*columnsHeaderIndex) +} + +func putColumnsHeaderIndex(cshIndex *columnsHeaderIndex) { + cshIndex.reset() + columnsHeaderIndexPool.Put(cshIndex) +} + +var columnsHeaderIndexPool sync.Pool + +func (cshIndex *columnsHeaderIndex) reset() { + clear(cshIndex.columnHeadersRefs) + cshIndex.columnHeadersRefs = cshIndex.columnHeadersRefs[:0] + + clear(cshIndex.constColumnsRefs) + cshIndex.constColumnsRefs = cshIndex.constColumnsRefs[:0] +} + +func (cshIndex *columnsHeaderIndex) resizeConstColumnsRefs(n int) []columnHeaderRef { + cshIndex.constColumnsRefs = slicesutil.SetLength(cshIndex.constColumnsRefs, n) + return cshIndex.constColumnsRefs +} + +func (cshIndex *columnsHeaderIndex) resizeColumnHeadersRefs(n int) []columnHeaderRef { + cshIndex.columnHeadersRefs = slicesutil.SetLength(cshIndex.columnHeadersRefs, n) + return cshIndex.columnHeadersRefs +} + +func (cshIndex *columnsHeaderIndex) marshal(dst []byte) []byte { + dst = marshalColumnHeadersRefs(dst, cshIndex.columnHeadersRefs) + dst = marshalColumnHeadersRefs(dst, cshIndex.constColumnsRefs) + return dst +} + +// unmarshalNoArena unmarshals cshIndex from src. +// +// cshIndex is valid until src is changed. +func (cshIndex *columnsHeaderIndex) unmarshalNoArena(src []byte) error { + cshIndex.reset() + + refs, tail, err := unmarshalColumnHeadersRefsNoArena(cshIndex.columnHeadersRefs[:0], src) + if err != nil { + return fmt.Errorf("cannot unmarshal columnHeadersRefs: %w", err) + } + cshIndex.columnHeadersRefs = refs + src = tail + + refs, tail, err = unmarshalColumnHeadersRefsNoArena(cshIndex.constColumnsRefs[:0], src) + if err != nil { + return fmt.Errorf("cannot unmarshal constColumnsRefs: %w", err) + } + cshIndex.constColumnsRefs = refs + if len(tail) > 0 { + return fmt.Errorf("unexpected non-empty tail left after unmarshaling columnsHeaderIndex; len(tail)=%d", len(tail)) + } + + return nil +} + +func marshalColumnHeadersRefs(dst []byte, refs []columnHeaderRef) []byte { + dst = encoding.MarshalVarUint64(dst, uint64(len(refs))) + for _, r := range refs { + dst = encoding.MarshalVarUint64(dst, r.columnNameID) + dst = encoding.MarshalVarUint64(dst, r.offset) + } + return dst +} + +func unmarshalColumnHeadersRefsNoArena(dst []columnHeaderRef, src []byte) ([]columnHeaderRef, []byte, error) { + srcOrig := src + + n, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return dst, srcOrig, fmt.Errorf("cannot unmarshal the number of columnHeaderRef items") + } + src = src[nSize:] + + for i := uint64(0); i < n; i++ { + columnNameID, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return dst, srcOrig, fmt.Errorf("cannot unmarshal column name ID number %d out of %d", i, n) + } + src = src[nSize:] + + offset, nSize := encoding.UnmarshalVarUint64(src) + if nSize <= 0 { + return dst, srcOrig, fmt.Errorf("cannot unmarshal offset number %d out of %d", i, n) + } + src = src[nSize:] + + dst = append(dst, columnHeaderRef{ + columnNameID: columnNameID, + offset: offset, + }) + } + + return dst, src, nil +} + func getColumnsHeader() *columnsHeader { v := columnsHeaderPool.Get() if v == nil { @@ -235,27 +387,98 @@ func (csh *columnsHeader) reset() { csh.constColumns = ccs[:0] } -func (csh *columnsHeader) resizeConstColumns(columnsLen int) []Field { - csh.constColumns = slicesutil.SetLength(csh.constColumns, columnsLen) +func (csh *columnsHeader) resizeConstColumns(n int) []Field { + csh.constColumns = slicesutil.SetLength(csh.constColumns, n) return csh.constColumns } -func (csh *columnsHeader) resizeColumnHeaders(columnHeadersLen int) []columnHeader { - csh.columnHeaders = slicesutil.SetLength(csh.columnHeaders, columnHeadersLen) +func (csh *columnsHeader) resizeColumnHeaders(n int) []columnHeader { + csh.columnHeaders = slicesutil.SetLength(csh.columnHeaders, n) return csh.columnHeaders } -func (csh *columnsHeader) marshal(dst []byte) []byte { +func (csh *columnsHeader) setColumnNames(cshIndex *columnsHeaderIndex, columnNames []string) error { + if len(cshIndex.columnHeadersRefs) != len(csh.columnHeaders) { + return fmt.Errorf("unpexected number of column headers; got %d; want %d", len(cshIndex.columnHeadersRefs), len(csh.columnHeaders)) + } + for i := range csh.columnHeaders { + columnNameID := cshIndex.columnHeadersRefs[i].columnNameID + if columnNameID >= uint64(len(columnNames)) { + return fmt.Errorf("unexpected columnNameID=%d in columnHeadersRef; len(columnNames)=%d; columnNames=%v", columnNameID, len(columnNames), columnNames) + } + csh.columnHeaders[i].name = columnNames[columnNameID] + } + + if len(cshIndex.constColumnsRefs) != len(csh.constColumns) { + return fmt.Errorf("unexpected number of const columns; got %d; want %d", len(cshIndex.constColumnsRefs), len(csh.constColumns)) + } + for i := range csh.constColumns { + columnNameID := cshIndex.constColumnsRefs[i].columnNameID + if columnNameID >= uint64(len(columnNames)) { + return fmt.Errorf("unexpected columnNameID=%d in constColumnsRefs; len(columnNames)=%d; columnNames=%v", columnNameID, len(columnNames), columnNames) + } + csh.constColumns[i].Name = columnNames[columnNameID] + } + + return nil +} + +func (csh *columnsHeader) mustWriteTo(bh *blockHeader, sw *streamWriters, g *columnNameIDGenerator) { + bb := longTermBufPool.Get() + defer longTermBufPool.Put(bb) + + cshIndex := getColumnsHeaderIndex() + + bb.B = csh.marshal(bb.B, cshIndex, g) + columnsHeaderData := bb.B + + bb.B = cshIndex.marshal(bb.B) + columnsHeaderIndexData := bb.B[len(columnsHeaderData):] + + putColumnsHeaderIndex(cshIndex) + + bh.columnsHeaderIndexOffset = sw.columnsHeaderIndexWriter.bytesWritten + bh.columnsHeaderIndexSize = uint64(len(columnsHeaderIndexData)) + if bh.columnsHeaderIndexSize > maxColumnsHeaderIndexSize { + logger.Panicf("BUG: too big columnsHeaderIndexSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderIndexSize, maxColumnsHeaderIndexSize) + } + sw.columnsHeaderIndexWriter.MustWrite(columnsHeaderIndexData) + + bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten + bh.columnsHeaderSize = uint64(len(columnsHeaderData)) + if bh.columnsHeaderSize > maxColumnsHeaderSize { + logger.Panicf("BUG: too big columnsHeaderSize: %d bytes; mustn't exceed %d bytes", bh.columnsHeaderSize, maxColumnsHeaderSize) + } + sw.columnsHeaderWriter.MustWrite(columnsHeaderData) +} + +func (csh *columnsHeader) marshal(dst []byte, cshIndex *columnsHeaderIndex, g *columnNameIDGenerator) []byte { + dstLen := len(dst) + chs := csh.columnHeaders + chsRefs := cshIndex.resizeColumnHeadersRefs(len(chs)) dst = encoding.MarshalVarUint64(dst, uint64(len(chs))) for i := range chs { + columnNameID := g.getColumnNameID(chs[i].name) + offset := len(dst) - dstLen dst = chs[i].marshal(dst) + chsRefs[i] = columnHeaderRef{ + columnNameID: columnNameID, + offset: uint64(offset), + } } ccs := csh.constColumns + ccsRefs := cshIndex.resizeConstColumnsRefs(len(ccs)) dst = encoding.MarshalVarUint64(dst, uint64(len(ccs))) for i := range ccs { - dst = ccs[i].marshal(dst) + columnNameID := g.getColumnNameID(ccs[i].Name) + offset := len(dst) - dstLen + dst = ccs[i].marshal(dst, false) + ccsRefs[i] = columnHeaderRef{ + columnNameID: columnNameID, + offset: uint64(offset), + } } return dst @@ -264,7 +487,7 @@ func (csh *columnsHeader) marshal(dst []byte) []byte { // unmarshalNoArena unmarshals csh from src. // // csh is valid until src is changed. -func (csh *columnsHeader) unmarshalNoArena(src []byte) error { +func (csh *columnsHeader) unmarshalNoArena(src []byte, partFormatVersion uint) error { csh.reset() // unmarshal columnHeaders @@ -279,7 +502,7 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error { chs := csh.resizeColumnHeaders(int(n)) for i := range chs { - tail, err := chs[i].unmarshalNoArena(src) + tail, err := chs[i].unmarshalNoArena(src, partFormatVersion) if err != nil { return fmt.Errorf("cannot unmarshal columnHeader %d out of %d columnHeaders: %w", i, len(chs), err) } @@ -299,7 +522,7 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error { ccs := csh.resizeConstColumns(int(n)) for i := range ccs { - tail, err := ccs[i].unmarshalNoArena(src) + tail, err := ccs[i].unmarshalNoArena(src, partFormatVersion < 1) if err != nil { return fmt.Errorf("cannot unmarshal constColumn %d out of %d columns: %w", i, len(ccs), err) } @@ -317,7 +540,8 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error { // columnHeaders contains information for values, which belong to a single label in a single block. // // The main column with an empty name is stored in messageValuesFilename, -// while the rest of columns are stored in fieldValuesFilename. +// while the rest of columns are stored in smallValuesFilename or bigValuesFilename depending +// on the block size (see maxSmallValuesBlockSize). // This allows minimizing disk read IO when filtering by non-message columns. // // Every block column contains also a bloom filter for all the tokens stored in the column. @@ -334,7 +558,8 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte) error { // - valueTypeTimestampISO8601 stores encoded into uint64 timestamps // // Bloom filters for main column with an empty name is stored in messageBloomFilename, -// while the rest of columns are stored in fieldBloomFilename. +// while the rest of columns are stored in smallBloomFilename or bigBloomFilename depending on their size +// (see maxSmallBloomFilterBlockSize). type columnHeader struct { // name contains column name aka label name name string @@ -355,16 +580,16 @@ type columnHeader struct { // valuesDict contains unique values for valueType = valueTypeDict valuesDict valuesDict - // valuesOffset contains the offset of the block in either messageValuesFilename or fieldValuesFilename + // valuesOffset contains the offset of the block in either messageValuesFilename, smallValuesFilename or bigValuesFilename valuesOffset uint64 - // valuesSize contains the size of the block in either messageValuesFilename or fieldValuesFilename + // valuesSize contains the size of the block in either messageValuesFilename, smallValuesFilename or bigValuesFilename valuesSize uint64 - // bloomFilterOffset contains the offset of the bloom filter in either messageBloomFilename or fieldBloomFilename + // bloomFilterOffset contains the offset of the bloom filter in messageBloomFilename, smallBloomFilename or bigBloomFilename bloomFilterOffset uint64 - // bloomFilterSize contains the size of the bloom filter in either messageBloomFilename or fieldBloomFilename + // bloomFilterSize contains the size of the bloom filter in messageBloomFilename, smallBloomFilename or bigBloomFilename bloomFilterSize uint64 } @@ -403,8 +628,9 @@ func (ch *columnHeader) marshal(dst []byte) []byte { logger.Panicf("BUG: minValue=%d must be smaller than maxValue=%d for valueType=%d", ch.minValue, ch.maxValue, ch.valueType) } - // Encode common fields - ch.name and ch.valueType - dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(ch.name)) + // Do not encode ch.name, since it should be encoded at columnsHeaderIndex.columnHeadersRefs + + // Encode common field - ch.valueType dst = append(dst, byte(ch.valueType)) // Encode other fields depending on ch.valueType @@ -472,18 +698,20 @@ func (ch *columnHeader) marshalBloomFilters(dst []byte) []byte { // unmarshalNoArena unmarshals ch from src and returns the tail left after unmarshaling. // // ch is valid until src is changed. -func (ch *columnHeader) unmarshalNoArena(src []byte) ([]byte, error) { +func (ch *columnHeader) unmarshalNoArena(src []byte, partFormatVersion uint) ([]byte, error) { ch.reset() srcOrig := src // Unmarshal column name - data, nSize := encoding.UnmarshalBytes(src) - if nSize <= 0 { - return srcOrig, fmt.Errorf("cannot unmarshal column name") + if partFormatVersion < 1 { + data, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal column name") + } + src = src[nSize:] + ch.name = bytesutil.ToUnsafeString(data) } - src = src[nSize:] - ch.name = bytesutil.ToUnsafeString(data) // Unmarshal value type if len(src) < 1 { diff --git a/lib/logstorage/block_header_test.go b/lib/logstorage/block_header_test.go index a358f25cc..fbe77ae71 100644 --- a/lib/logstorage/block_header_test.go +++ b/lib/logstorage/block_header_test.go @@ -15,7 +15,7 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) { t.Fatalf("unexpected lengths of the marshaled blockHeader; got %d; want %d", len(data), marshaledLen) } bh2 := &blockHeader{} - tail, err := bh2.unmarshal(data) + tail, err := bh2.unmarshal(data, partFormatLatestVersion) if err != nil { t.Fatalf("unexpected error in unmarshal: %s", err) } @@ -26,7 +26,7 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) { t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", bh2, bh) } } - f(&blockHeader{}, 61) + f(&blockHeader{}, 63) f(&blockHeader{ streamID: streamID{ tenantID: TenantID{ @@ -47,24 +47,71 @@ func TestBlockHeaderMarshalUnmarshal(t *testing.T) { maxTimestamp: 23434, marshalType: encoding.MarshalTypeNearestDelta2, }, - columnsHeaderOffset: 4384, - columnsHeaderSize: 894, - }, 65) + columnsHeaderIndexOffset: 8923481, + columnsHeaderIndexSize: 8989832, + columnsHeaderOffset: 4384, + columnsHeaderSize: 894, + }, 73) +} + +func TestColumnsHeaderIndexMarshalUnmarshal(t *testing.T) { + f := func(cshIndex *columnsHeaderIndex, marshaledLen int) { + t.Helper() + + data := cshIndex.marshal(nil) + if len(data) != marshaledLen { + t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen) + } + cshIndex2 := &columnsHeaderIndex{} + if err := cshIndex2.unmarshalNoArena(data); err != nil { + t.Fatalf("unexpected error in unmarshal: %s", err) + } + + if !reflect.DeepEqual(cshIndex, cshIndex2) { + t.Fatalf("unexpected blockHeaderIndex unmarshaled\ngot\n%v\nwant\n%v", cshIndex2, cshIndex) + } + } + + f(&columnsHeaderIndex{}, 2) + f(&columnsHeaderIndex{ + columnHeadersRefs: []columnHeaderRef{ + { + columnNameID: 234, + offset: 123432, + }, + { + columnNameID: 23898, + offset: 0, + }, + }, + constColumnsRefs: []columnHeaderRef{ + { + columnNameID: 0, + offset: 8989, + }, + }, + }, 14) } func TestColumnsHeaderMarshalUnmarshal(t *testing.T) { f := func(csh *columnsHeader, marshaledLen int) { t.Helper() - data := csh.marshal(nil) + cshIndex := getColumnsHeaderIndex() + g := &columnNameIDGenerator{} + + data := csh.marshal(nil, cshIndex, g) if len(data) != marshaledLen { - t.Fatalf("unexpected lengths of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen) + t.Fatalf("unexpected length of the marshaled columnsHeader; got %d; want %d", len(data), marshaledLen) } csh2 := &columnsHeader{} - err := csh2.unmarshalNoArena(data) - if err != nil { + if err := csh2.unmarshalNoArena(data, partFormatLatestVersion); err != nil { t.Fatalf("unexpected error in unmarshal: %s", err) } + if err := csh2.setColumnNames(cshIndex, g.columnNames); err != nil { + t.Fatalf("cannot set column names: %s", err) + } + if !reflect.DeepEqual(csh, csh2) { t.Fatalf("unexpected blockHeader unmarshaled\ngot\n%v\nwant\n%v", csh2, csh) } @@ -98,7 +145,7 @@ func TestColumnsHeaderMarshalUnmarshal(t *testing.T) { Value: "bar", }, }, - }, 50) + }, 31) } func TestBlockHeaderUnmarshalFailure(t *testing.T) { @@ -107,7 +154,7 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) { dataOrig := append([]byte{}, data...) bh := getBlockHeader() defer putBlockHeader(bh) - tail, err := bh.unmarshal(data) + tail, err := bh.unmarshal(data, partFormatLatestVersion) if err == nil { t.Fatalf("expecting non-nil error") } @@ -138,8 +185,10 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) { maxTimestamp: 23434, marshalType: encoding.MarshalTypeNearestDelta2, }, - columnsHeaderOffset: 4384, - columnsHeaderSize: 894, + columnsHeaderIndexOffset: 89434, + columnsHeaderIndexSize: 89123, + columnsHeaderOffset: 4384, + columnsHeaderSize: 894, } data := bh.marshal(nil) for len(data) > 0 { @@ -148,14 +197,52 @@ func TestBlockHeaderUnmarshalFailure(t *testing.T) { } } +func TestColumnsHeaderIndexUnmarshalFailure(t *testing.T) { + f := func(data []byte) { + t.Helper() + + cshIndex := getColumnsHeaderIndex() + defer putColumnsHeaderIndex(cshIndex) + if err := cshIndex.unmarshalNoArena(data); err == nil { + t.Fatalf("expecting non-nil error") + } + } + + f(nil) + f([]byte("foo")) + + cshIndex := &columnsHeaderIndex{ + columnHeadersRefs: []columnHeaderRef{ + { + columnNameID: 0, + offset: 123, + }, + }, + constColumnsRefs: []columnHeaderRef{ + { + columnNameID: 2, + offset: 89834, + }, + { + columnNameID: 234, + offset: 8934, + }, + }, + } + data := cshIndex.marshal(nil) + for len(data) > 0 { + data = data[:len(data)-1] + f(data) + } +} + func TestColumnsHeaderUnmarshalFailure(t *testing.T) { f := func(data []byte) { t.Helper() csh := getColumnsHeader() defer putColumnsHeader(csh) - err := csh.unmarshalNoArena(data) - if err == nil { + if err := csh.unmarshalNoArena(data, partFormatLatestVersion); err == nil { t.Fatalf("expecting non-nil error") } } @@ -163,7 +250,7 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) { f(nil) f([]byte("foo")) - csh := columnsHeader{ + csh := &columnsHeader{ columnHeaders: []columnHeader{ { name: "foobar", @@ -191,11 +278,14 @@ func TestColumnsHeaderUnmarshalFailure(t *testing.T) { }, }, } - data := csh.marshal(nil) + cshIndex := getColumnsHeaderIndex() + g := &columnNameIDGenerator{} + data := csh.marshal(nil, cshIndex, g) for len(data) > 0 { data = data[:len(data)-1] f(data) } + putColumnsHeaderIndex(cshIndex) } func TestBlockHeaderReset(t *testing.T) { @@ -219,8 +309,10 @@ func TestBlockHeaderReset(t *testing.T) { maxTimestamp: 23434, marshalType: encoding.MarshalTypeNearestDelta2, }, - columnsHeaderOffset: 12332, - columnsHeaderSize: 234, + columnsHeaderIndexOffset: 18934, + columnsHeaderIndexSize: 8912, + columnsHeaderOffset: 12332, + columnsHeaderSize: 234, } bh.reset() bhZero := &blockHeader{} @@ -229,6 +321,35 @@ func TestBlockHeaderReset(t *testing.T) { } } +func TestColumnsHeaderIndexReset(t *testing.T) { + cshIndex := &columnsHeaderIndex{ + columnHeadersRefs: []columnHeaderRef{ + { + columnNameID: 234, + offset: 1234, + }, + }, + constColumnsRefs: []columnHeaderRef{ + { + columnNameID: 328, + offset: 21344, + }, + { + columnNameID: 1, + offset: 234, + }, + }, + } + cshIndex.reset() + cshIndexZero := &columnsHeaderIndex{ + columnHeadersRefs: []columnHeaderRef{}, + constColumnsRefs: []columnHeaderRef{}, + } + if !reflect.DeepEqual(cshIndex, cshIndexZero) { + t.Fatalf("unexpected non-zero columnsHeaderIndex after reset: %v", cshIndex) + } +} + func TestColumnsHeaderReset(t *testing.T) { csh := &columnsHeader{ columnHeaders: []columnHeader{ @@ -278,7 +399,7 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) { if len(data) != marshaledLen { t.Fatalf("unexpected length for marshaled blockHeader entries; got %d; want %d", len(data), marshaledLen) } - bhs2, err := unmarshalBlockHeaders(nil, data) + bhs2, err := unmarshalBlockHeaders(nil, data, partFormatLatestVersion) if err != nil { t.Fatalf("unexpected error when unmarshaling blockHeader entries: %s", err) } @@ -287,7 +408,7 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) { } } f(nil, 0) - f([]blockHeader{{}}, 61) + f([]blockHeader{{}}, 63) f([]blockHeader{ {}, { @@ -310,10 +431,12 @@ func TestMarshalUnmarshalBlockHeaders(t *testing.T) { maxTimestamp: 23434, marshalType: encoding.MarshalTypeNearestDelta2, }, - columnsHeaderOffset: 12332, - columnsHeaderSize: 234, + columnsHeaderIndexOffset: 1234, + columnsHeaderIndexSize: 89324, + columnsHeaderOffset: 12332, + columnsHeaderSize: 234, }, - }, 127) + }, 134) } func TestColumnHeaderMarshalUnmarshal(t *testing.T) { @@ -325,13 +448,17 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) { t.Fatalf("unexpected marshaled length of columnHeader; got %d; want %d", len(data), marshaledLen) } var ch2 columnHeader - tail, err := ch2.unmarshalNoArena(data) + tail, err := ch2.unmarshalNoArena(data, partFormatLatestVersion) if err != nil { t.Fatalf("unexpected error in umarshal(%v): %s", ch, err) } if len(tail) > 0 { t.Fatalf("unexpected non-empty tail after unmarshal(%v): %X", ch, tail) } + + // columnHeader.name isn't marshaled, since it is marshaled via columnsHeaderIndex starting from part format v1. + ch2.name = ch.name + if !reflect.DeepEqual(ch, &ch2) { t.Fatalf("unexpected columnHeader after unmarshal;\ngot\n%v\nwant\n%v", &ch2, ch) } @@ -340,7 +467,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) { f(&columnHeader{ name: "foo", valueType: valueTypeUint8, - }, 11) + }, 7) ch := &columnHeader{ name: "foobar", valueType: valueTypeDict, @@ -349,7 +476,7 @@ func TestColumnHeaderMarshalUnmarshal(t *testing.T) { valuesSize: 254452, } ch.valuesDict.getOrAdd("abc") - f(ch, 18) + f(ch, 11) } func TestColumnHeaderUnmarshalFailure(t *testing.T) { @@ -358,7 +485,7 @@ func TestColumnHeaderUnmarshalFailure(t *testing.T) { dataOrig := append([]byte{}, data...) var ch columnHeader - tail, err := ch.unmarshalNoArena(data) + tail, err := ch.unmarshalNoArena(data, partFormatLatestVersion) if err == nil { t.Fatalf("expecting non-nil error") } diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index d954bf0fb..cd7804938 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -1,11 +1,13 @@ package logstorage import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // The number of blocks to search at once by a single worker @@ -113,17 +115,35 @@ type blockSearch struct { // sbu is used for unmarshaling local columns sbu stringsBlockUnmarshaler + // cshIndexBlockCache holds columnsHeaderIndex data for the given block. + // + // It is initialized lazily by calling getColumnsHeaderIndex(). + cshIndexBlockCache []byte + // cshBlockCache holds columnsHeader data for the given block. // - // it is initialized lazily by calling getColumnsHeader(). - cshBlockCache []byte + // It is initialized lazily by calling getColumnsHeaderBlock(). + cshBlockCache []byte + cshBlockInitialized bool - // cshCache is the columnsHeader associated with the given block + // ccsCache is the cache for accessed const columns + ccsCache []Field + + // chsCache is the cache for accessed column headers + chsCache []columnHeader + + // cshIndexCache is the columnsHeaderIndex associated with the given block. // - // it is initialized lazily by calling getColumnsHeader(). + // It is initialized lazily by calling getColumnsHeaderIndex(). + cshIndexCache *columnsHeaderIndex + + // cshCache is the columnsHeader associated with the given block. + // + // It is initialized lazily by calling getColumnsHeaderV0(). cshCache *columnsHeader // seenStreams contains seen streamIDs for the recent searches. + // // It is used for speeding up fetching _stream column. seenStreams map[u128]string } @@ -151,7 +171,27 @@ func (bs *blockSearch) reset() { bs.sbu.reset() + bs.cshIndexBlockCache = bs.cshIndexBlockCache[:0] + bs.cshBlockCache = bs.cshBlockCache[:0] + bs.cshBlockInitialized = false + + ccsCache := bs.ccsCache + for i := range ccsCache { + ccsCache[i].Reset() + } + bs.ccsCache = ccsCache[:0] + + chsCache := bs.chsCache + for i := range chsCache { + chsCache[i].reset() + } + bs.chsCache = chsCache[:0] + + if bs.cshIndexCache != nil { + putColumnsHeaderIndex(bs.cshIndexCache) + bs.cshIndexCache = nil + } if bs.cshCache != nil { putColumnsHeader(bs.cshCache) @@ -190,16 +230,54 @@ func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) { } } +func (bs *blockSearch) partFormatVersion() uint { + return bs.bsw.p.ph.FormatVersion +} + func (bs *blockSearch) getConstColumnValue(name string) string { if name == "_msg" { name = "" } - csh := bs.getColumnsHeader() - for _, cc := range csh.constColumns { - if cc.Name == name { - return cc.Value + if bs.partFormatVersion() < 1 { + csh := bs.getColumnsHeaderV0() + for _, cc := range csh.constColumns { + if cc.Name == name { + return cc.Value + } } + return "" + } + + columnNameID, ok := bs.getColumnNameID(name) + if !ok { + return "" + } + + for i := range bs.ccsCache { + if bs.ccsCache[i].Name == name { + return bs.ccsCache[i].Value + } + } + + cshIndex := bs.getColumnsHeaderIndex() + for _, cr := range cshIndex.constColumnsRefs { + if cr.columnNameID != columnNameID { + continue + } + + b := bs.getColumnsHeaderBlock() + if cr.offset > uint64(len(b)) { + logger.Panicf("FATAL: %s: header offset for const column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset) + } + b = b[cr.offset:] + bs.ccsCache = slicesutil.SetLength(bs.ccsCache, len(bs.ccsCache)+1) + cc := &bs.ccsCache[len(bs.ccsCache)-1] + if _, err := cc.unmarshalNoArena(b, false); err != nil { + logger.Panicf("FATAL: %s: cannot unmarshal header for const column %q: %s", bs.bsw.p.path, name, err) + } + cc.Name = strings.Clone(name) + return cc.Value } return "" } @@ -209,46 +287,158 @@ func (bs *blockSearch) getColumnHeader(name string) *columnHeader { name = "" } - csh := bs.getColumnsHeader() - chs := csh.columnHeaders - for i := range chs { - ch := &chs[i] - if ch.name == name { - return ch + if bs.partFormatVersion() < 1 { + csh := bs.getColumnsHeaderV0() + chs := csh.columnHeaders + for i := range chs { + ch := &chs[i] + if ch.name == name { + return ch + } } + return nil + } + + columnNameID, ok := bs.getColumnNameID(name) + if !ok { + return nil + } + + for i := range bs.chsCache { + if bs.chsCache[i].name == name { + return &bs.chsCache[i] + } + } + + cshIndex := bs.getColumnsHeaderIndex() + for _, cr := range cshIndex.columnHeadersRefs { + if cr.columnNameID != columnNameID { + continue + } + + b := bs.getColumnsHeaderBlock() + if cr.offset > uint64(len(b)) { + logger.Panicf("FATAL: %s: header offset for column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset) + } + b = b[cr.offset:] + bs.chsCache = slicesutil.SetLength(bs.chsCache, len(bs.chsCache)+1) + ch := &bs.chsCache[len(bs.chsCache)-1] + if _, err := ch.unmarshalNoArena(b, partFormatLatestVersion); err != nil { + logger.Panicf("FATAL: %s: cannot unmarshal header for column %q: %s", bs.bsw.p.path, name, err) + } + ch.name = strings.Clone(name) + return ch } return nil } +func (bs *blockSearch) getColumnNameID(name string) (uint64, bool) { + id, ok := bs.bsw.p.columnNameIDs[name] + return id, ok +} + +func (bs *blockSearch) getColumnNameByID(id uint64) (string, bool) { + columnNames := bs.bsw.p.columnNames + if id >= uint64(len(columnNames)) { + return "", false + } + return columnNames[id], true +} + func (bs *blockSearch) getConstColumns() []Field { - csh := bs.getColumnsHeader() - return csh.constColumns + if bs.partFormatVersion() < 1 { + csh := bs.getColumnsHeaderV0() + return csh.constColumns + } + + chsIndex := bs.getColumnsHeaderIndex() + for _, cr := range chsIndex.constColumnsRefs { + columnName, ok := bs.getColumnNameByID(cr.columnNameID) + if !ok { + logger.Panicf("FATAL: %s: missing column name for id=%d", bs.bsw.p.path, cr.columnNameID) + } + _ = bs.getConstColumnValue(columnName) + } + return bs.ccsCache } func (bs *blockSearch) getColumnHeaders() []columnHeader { - csh := bs.getColumnsHeader() - return csh.columnHeaders + if bs.partFormatVersion() < 1 { + csh := bs.getColumnsHeaderV0() + return csh.columnHeaders + } + + chsIndex := bs.getColumnsHeaderIndex() + for _, cr := range chsIndex.columnHeadersRefs { + columnName, ok := bs.getColumnNameByID(cr.columnNameID) + if !ok { + logger.Panicf("FATAL: %s: missing column name for id=%d", bs.bsw.p.path, cr.columnNameID) + } + _ = bs.getColumnHeader(columnName) + } + return bs.chsCache } -func (bs *blockSearch) getColumnsHeader() *columnsHeader { +func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex { + if bs.partFormatVersion() < 1 { + logger.Panicf("BUG: getColumnsHeaderIndex() can be called only for part encoding v1+, while it has been called for v%d", bs.partFormatVersion()) + } + + if bs.cshIndexCache == nil { + bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh) + + bs.cshIndexCache = getColumnsHeaderIndex() + if err := bs.cshIndexCache.unmarshalNoArena(bs.cshIndexBlockCache); err != nil { + logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err) + } + } + return bs.cshIndexCache +} + +func (bs *blockSearch) getColumnsHeaderV0() *columnsHeader { + if bs.partFormatVersion() >= 1 { + logger.Panicf("BUG: getColumnsHeaderV0() can be called only for part encoding v0, while it has been called for v%d", bs.partFormatVersion()) + } + if bs.cshCache == nil { - bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh) + b := bs.getColumnsHeaderBlock() bs.cshCache = getColumnsHeader() - if err := bs.cshCache.unmarshalNoArena(bs.cshBlockCache); err != nil { + if err := bs.cshCache.unmarshalNoArena(b, 0); err != nil { logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err) } } return bs.cshCache } +func (bs *blockSearch) getColumnsHeaderBlock() []byte { + if !bs.cshBlockInitialized { + bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh) + bs.cshBlockInitialized = true + } + return bs.cshBlockCache +} + +func readColumnsHeaderIndexBlock(dst []byte, p *part, bh *blockHeader) []byte { + n := bh.columnsHeaderIndexSize + if n > maxColumnsHeaderIndexSize { + logger.Panicf("FATAL: %s: columns header index size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderIndexSize, n) + } + + dstLen := len(dst) + dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen) + p.columnsHeaderIndexFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderIndexOffset)) + + return dst +} + func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte { - columnsHeaderSize := bh.columnsHeaderSize - if columnsHeaderSize > maxColumnsHeaderSize { - logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, columnsHeaderSize) + n := bh.columnsHeaderSize + if n > maxColumnsHeaderSize { + logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, n) } dstLen := len(dst) - dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(columnsHeaderSize)+dstLen) + dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen) p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset)) return dst } @@ -263,11 +453,7 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter { } p := bs.bsw.p - - bloomFilterFile := p.fieldBloomFilterFile - if ch.name == "" { - bloomFilterFile = p.messageBloomFilterFile - } + bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name) bb := longTermBufPool.Get() bloomFilterSize := ch.bloomFilterSize @@ -275,7 +461,8 @@ func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter { logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxBloomFilterBlockSize, bloomFilterSize) } bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize)) - bloomFilterFile.MustReadAt(bb.B, int64(ch.bloomFilterOffset)) + + bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset)) bf = getBloomFilter() if err := bf.unmarshal(bb.B); err != nil { logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err) @@ -299,11 +486,7 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string { } p := bs.bsw.p - - valuesFile := p.fieldValuesFile - if ch.name == "" { - valuesFile = p.messageValuesFile - } + bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name) bb := longTermBufPool.Get() valuesSize := ch.valuesSize @@ -311,7 +494,7 @@ func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string { logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxValuesBlockSize, valuesSize) } bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize)) - valuesFile.MustReadAt(bb.B, int64(ch.valuesOffset)) + bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset)) values = getStringBucket() var err error @@ -378,7 +561,7 @@ func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []b logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err) } - dst, err = unmarshalBlockHeaders(dst, bb.B) + dst, err = unmarshalBlockHeaders(dst, bb.B, p.ph.FormatVersion) longTermBufPool.Put(bb) if err != nil { logger.Panicf("FATAL: %s: cannot unmarshal block headers read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err) diff --git a/lib/logstorage/block_stream_reader.go b/lib/logstorage/block_stream_reader.go index 018df187d..f8f3af015 100644 --- a/lib/logstorage/block_stream_reader.go +++ b/lib/logstorage/block_stream_reader.go @@ -4,6 +4,9 @@ import ( "path/filepath" "sync" + "github.com/cespare/xxhash/v2" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -43,68 +46,132 @@ func (r *readerWithStats) Read(p []byte) (int, error) { } func (r *readerWithStats) MustClose() { - r.r.MustClose() - r.r = nil + if r.r != nil { + r.r.MustClose() + r.r = nil + } } // streamReaders contains readers for blockStreamReader type streamReaders struct { + columnNamesReader readerWithStats metaindexReader readerWithStats indexReader readerWithStats + columnsHeaderIndexReader readerWithStats columnsHeaderReader readerWithStats timestampsReader readerWithStats - fieldValuesReader readerWithStats - fieldBloomFilterReader readerWithStats - messageValuesReader readerWithStats - messageBloomFilterReader readerWithStats + + messageBloomValuesReader bloomValuesReader + oldBloomValuesReader bloomValuesReader + bloomValuesShards [bloomValuesShardsCount]bloomValuesReader +} + +type bloomValuesReader struct { + bloom readerWithStats + values readerWithStats +} + +func (r *bloomValuesReader) reset() { + r.bloom.reset() + r.values.reset() +} + +func (r *bloomValuesReader) init(sr bloomValuesStreamReader) { + r.bloom.init(sr.bloom) + r.values.init(sr.values) +} + +func (r *bloomValuesReader) totalBytesRead() uint64 { + return r.bloom.bytesRead + r.values.bytesRead +} + +func (r *bloomValuesReader) MustClose() { + r.bloom.MustClose() + r.values.MustClose() +} + +type bloomValuesStreamReader struct { + bloom filestream.ReadCloser + values filestream.ReadCloser } func (sr *streamReaders) reset() { + sr.columnNamesReader.reset() sr.metaindexReader.reset() sr.indexReader.reset() + sr.columnsHeaderIndexReader.reset() sr.columnsHeaderReader.reset() sr.timestampsReader.reset() - sr.fieldValuesReader.reset() - sr.fieldBloomFilterReader.reset() - sr.messageValuesReader.reset() - sr.messageBloomFilterReader.reset() + + sr.messageBloomValuesReader.reset() + sr.oldBloomValuesReader.reset() + for i := range sr.bloomValuesShards[:] { + sr.bloomValuesShards[i].reset() + } } -func (sr *streamReaders) init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader, fieldValuesReader, fieldBloomFilterReader, - messageValuesReader, messageBloomFilterReader filestream.ReadCloser, +func (sr *streamReaders) init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader filestream.ReadCloser, + messageBloomValuesReader, oldBloomValuesReader bloomValuesStreamReader, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader, ) { + sr.columnNamesReader.init(columnNamesReader) sr.metaindexReader.init(metaindexReader) sr.indexReader.init(indexReader) + sr.columnsHeaderIndexReader.init(columnsHeaderIndexReader) sr.columnsHeaderReader.init(columnsHeaderReader) sr.timestampsReader.init(timestampsReader) - sr.fieldValuesReader.init(fieldValuesReader) - sr.fieldBloomFilterReader.init(fieldBloomFilterReader) - sr.messageValuesReader.init(messageValuesReader) - sr.messageBloomFilterReader.init(messageBloomFilterReader) + + sr.messageBloomValuesReader.init(messageBloomValuesReader) + sr.oldBloomValuesReader.init(oldBloomValuesReader) + for i := range sr.bloomValuesShards[:] { + sr.bloomValuesShards[i].init(bloomValuesShards[i]) + } } func (sr *streamReaders) totalBytesRead() uint64 { n := uint64(0) + + n += sr.columnNamesReader.bytesRead n += sr.metaindexReader.bytesRead n += sr.indexReader.bytesRead + n += sr.columnsHeaderIndexReader.bytesRead n += sr.columnsHeaderReader.bytesRead n += sr.timestampsReader.bytesRead - n += sr.fieldValuesReader.bytesRead - n += sr.fieldBloomFilterReader.bytesRead - n += sr.messageValuesReader.bytesRead - n += sr.messageBloomFilterReader.bytesRead + + n += sr.messageBloomValuesReader.totalBytesRead() + n += sr.oldBloomValuesReader.totalBytesRead() + for i := range sr.bloomValuesShards[:] { + n += sr.bloomValuesShards[i].totalBytesRead() + } + return n } func (sr *streamReaders) MustClose() { + sr.columnNamesReader.MustClose() sr.metaindexReader.MustClose() sr.indexReader.MustClose() + sr.columnsHeaderIndexReader.MustClose() sr.columnsHeaderReader.MustClose() sr.timestampsReader.MustClose() - sr.fieldValuesReader.MustClose() - sr.fieldBloomFilterReader.MustClose() - sr.messageValuesReader.MustClose() - sr.messageBloomFilterReader.MustClose() + + sr.messageBloomValuesReader.MustClose() + sr.oldBloomValuesReader.MustClose() + for i := range sr.bloomValuesShards[:] { + sr.bloomValuesShards[i].MustClose() + } +} + +func (sr *streamReaders) getBloomValuesReaderForColumnName(name string, partFormatVersion uint) *bloomValuesReader { + if name == "" { + return &sr.messageBloomValuesReader + } + if partFormatVersion < 1 { + return &sr.oldBloomValuesReader + } + + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx := h % uint64(len(sr.bloomValuesShards)) + return &sr.bloomValuesShards[idx] } // blockStreamReader is used for reading blocks in streaming manner from a part. @@ -121,6 +188,12 @@ type blockStreamReader struct { // streamReaders contains data readers in stream mode streamReaders streamReaders + // columnNameIDs contains columnName->id mapping for all the column names seen in the part + columnNameIDs map[string]uint64 + + // columnNames constains id->columnName mapping for all the columns seen in the part + columnNames []string + // indexBlockHeaders contains the list of all the indexBlockHeader entries for the part indexBlockHeaders []indexBlockHeader @@ -156,6 +229,9 @@ func (bsr *blockStreamReader) reset() { bsr.ph.reset() bsr.streamReaders.reset() + bsr.columnNameIDs = nil + bsr.columnNames = nil + ihs := bsr.indexBlockHeaders if len(ihs) > 10e3 { // The ihs len is unbound, so it is better to drop too long indexBlockHeaders in order to reduce memory usage @@ -195,17 +271,25 @@ func (bsr *blockStreamReader) MustInitFromInmemoryPart(mp *inmemoryPart) { bsr.ph = mp.ph // Initialize streamReaders + columnNamesReader := mp.columnNames.NewReader() metaindexReader := mp.metaindex.NewReader() indexReader := mp.index.NewReader() + columnsHeaderIndexReader := mp.columnsHeaderIndex.NewReader() columnsHeaderReader := mp.columnsHeader.NewReader() timestampsReader := mp.timestamps.NewReader() - fieldValuesReader := mp.fieldValues.NewReader() - fieldBloomFilterReader := mp.fieldBloomFilter.NewReader() - messageValuesReader := mp.messageValues.NewReader() - messageBloomFilterReader := mp.messageBloomFilter.NewReader() - bsr.streamReaders.init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader, - fieldValuesReader, fieldBloomFilterReader, messageValuesReader, messageBloomFilterReader) + messageBloomValuesReader := mp.messageBloomValues.NewStreamReader() + var oldBloomValuesReader bloomValuesStreamReader + var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader + for i := range bloomValuesShards[:] { + bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamReader() + } + + bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader, + messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards) + + // Read columnNames data + bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader) // Read metaindex data bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader) @@ -219,30 +303,63 @@ func (bsr *blockStreamReader) MustInitFromFilePart(path string) { // since they are usually deleted after the merge. const nocache = true - metaindexPath := filepath.Join(path, metaindexFilename) - indexPath := filepath.Join(path, indexFilename) - columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) - timestampsPath := filepath.Join(path, timestampsFilename) - fieldValuesPath := filepath.Join(path, fieldValuesFilename) - fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename) - messageValuesPath := filepath.Join(path, messageValuesFilename) - messageBloomFilterPath := filepath.Join(path, messageBloomFilename) - bsr.ph.mustReadMetadata(path) + columnNamesPath := filepath.Join(path, columnNamesFilename) + metaindexPath := filepath.Join(path, metaindexFilename) + indexPath := filepath.Join(path, indexFilename) + columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename) + columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) + timestampsPath := filepath.Join(path, timestampsFilename) + // Open data readers + var columnNamesReader filestream.ReadCloser + if bsr.ph.FormatVersion >= 1 { + columnNamesReader = filestream.MustOpen(columnNamesPath, nocache) + } metaindexReader := filestream.MustOpen(metaindexPath, nocache) indexReader := filestream.MustOpen(indexPath, nocache) + var columnsHeaderIndexReader filestream.ReadCloser + if bsr.ph.FormatVersion >= 1 { + columnsHeaderIndexReader = filestream.MustOpen(columnsHeaderIndexPath, nocache) + } columnsHeaderReader := filestream.MustOpen(columnsHeaderPath, nocache) timestampsReader := filestream.MustOpen(timestampsPath, nocache) - fieldValuesReader := filestream.MustOpen(fieldValuesPath, nocache) - fieldBloomFilterReader := filestream.MustOpen(fieldBloomFilterPath, nocache) - messageValuesReader := filestream.MustOpen(messageValuesPath, nocache) - messageBloomFilterReader := filestream.MustOpen(messageBloomFilterPath, nocache) + + messageBloomFilterPath := filepath.Join(path, messageBloomFilename) + messageValuesPath := filepath.Join(path, messageValuesFilename) + messageBloomValuesReader := bloomValuesStreamReader{ + bloom: filestream.MustOpen(messageBloomFilterPath, nocache), + values: filestream.MustOpen(messageValuesPath, nocache), + } + var oldBloomValuesReader bloomValuesStreamReader + var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamReader + if bsr.ph.FormatVersion < 1 { + bloomPath := filepath.Join(path, oldBloomFilename) + oldBloomValuesReader.bloom = filestream.MustOpen(bloomPath, nocache) + + valuesPath := filepath.Join(path, oldValuesFilename) + oldBloomValuesReader.values = filestream.MustOpen(valuesPath, nocache) + } else { + for i := range bloomValuesShards[:] { + shard := &bloomValuesShards[i] + + bloomPath := getBloomFilePath(path, uint64(i)) + shard.bloom = filestream.MustOpen(bloomPath, nocache) + + valuesPath := getValuesFilePath(path, uint64(i)) + shard.values = filestream.MustOpen(valuesPath, nocache) + } + } // Initialize streamReaders - bsr.streamReaders.init(metaindexReader, indexReader, columnsHeaderReader, timestampsReader, - fieldValuesReader, fieldBloomFilterReader, messageValuesReader, messageBloomFilterReader) + bsr.streamReaders.init(columnNamesReader, metaindexReader, indexReader, columnsHeaderIndexReader, columnsHeaderReader, timestampsReader, + messageBloomValuesReader, oldBloomValuesReader, bloomValuesShards) + + if bsr.ph.FormatVersion >= 1 { + // Read columnNames data + bsr.columnNames, bsr.columnNameIDs = mustReadColumnNames(&bsr.streamReaders.columnNamesReader) + } // Read metaindex data bsr.indexBlockHeaders = mustReadIndexBlockHeaders(bsr.indexBlockHeaders[:0], &bsr.streamReaders.metaindexReader) @@ -282,7 +399,7 @@ func (bsr *blockStreamReader) NextBlock() bool { // Read bsr.blockData bsr.a.reset() - bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders) + bsr.blockData.mustReadFrom(&bsr.a, bh, &bsr.streamReaders, bsr.ph.FormatVersion, bsr.columnNames) bsr.globalUncompressedSizeBytes += bh.uncompressedSizeBytes bsr.globalRowsCount += bh.rowsCount @@ -342,7 +459,7 @@ func (bsr *blockStreamReader) nextIndexBlock() bool { bb.B = ih.mustReadNextIndexBlock(bb.B[:0], &bsr.streamReaders) bsr.blockHeaders = resetBlockHeaders(bsr.blockHeaders) var err error - bsr.blockHeaders, err = unmarshalBlockHeaders(bsr.blockHeaders[:0], bb.B) + bsr.blockHeaders, err = unmarshalBlockHeaders(bsr.blockHeaders[:0], bb.B, bsr.ph.FormatVersion) longTermBufPool.Put(bb) if err != nil { logger.Panicf("FATAL: %s: cannot unmarshal blockHeader entries: %s", bsr.streamReaders.indexReader.Path(), err) diff --git a/lib/logstorage/block_stream_writer.go b/lib/logstorage/block_stream_writer.go index b37f1337b..a95a419ec 100644 --- a/lib/logstorage/block_stream_writer.go +++ b/lib/logstorage/block_stream_writer.go @@ -4,8 +4,9 @@ import ( "path/filepath" "sync" + "github.com/cespare/xxhash/v2" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -44,62 +45,116 @@ func (w *writerWithStats) MustClose() { // streamWriters contain writers for blockStreamWriter type streamWriters struct { + columnNamesWriter writerWithStats metaindexWriter writerWithStats indexWriter writerWithStats + columnsHeaderIndexWriter writerWithStats columnsHeaderWriter writerWithStats timestampsWriter writerWithStats - fieldValuesWriter writerWithStats - fieldBloomFilterWriter writerWithStats - messageValuesWriter writerWithStats - messageBloomFilterWriter writerWithStats + + messageBloomValuesWriter bloomValuesWriter + bloomValuesShards [bloomValuesShardsCount]bloomValuesWriter +} + +type bloomValuesWriter struct { + bloom writerWithStats + values writerWithStats +} + +func (w *bloomValuesWriter) reset() { + w.bloom.reset() + w.values.reset() +} + +func (w *bloomValuesWriter) init(sw bloomValuesStreamWriter) { + w.bloom.init(sw.bloom) + w.values.init(sw.values) +} + +func (w *bloomValuesWriter) totalBytesWritten() uint64 { + return w.bloom.bytesWritten + w.values.bytesWritten +} + +func (w *bloomValuesWriter) MustClose() { + w.bloom.MustClose() + w.values.MustClose() +} + +type bloomValuesStreamWriter struct { + bloom filestream.WriteCloser + values filestream.WriteCloser } func (sw *streamWriters) reset() { + sw.columnNamesWriter.reset() sw.metaindexWriter.reset() sw.indexWriter.reset() + sw.columnsHeaderIndexWriter.reset() sw.columnsHeaderWriter.reset() sw.timestampsWriter.reset() - sw.fieldValuesWriter.reset() - sw.fieldBloomFilterWriter.reset() - sw.messageValuesWriter.reset() - sw.messageBloomFilterWriter.reset() + + sw.messageBloomValuesWriter.reset() + for i := range sw.bloomValuesShards[:] { + sw.bloomValuesShards[i].reset() + } } -func (sw *streamWriters) init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter, fieldValuesWriter, fieldBloomFilterWriter, - messageValuesWriter, messageBloomFilterWriter filestream.WriteCloser, +func (sw *streamWriters) init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter filestream.WriteCloser, + messageBloomValuesWriter bloomValuesStreamWriter, bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter, ) { + sw.columnNamesWriter.init(columnNamesWriter) sw.metaindexWriter.init(metaindexWriter) sw.indexWriter.init(indexWriter) + sw.columnsHeaderIndexWriter.init(columnsHeaderIndexWriter) sw.columnsHeaderWriter.init(columnsHeaderWriter) sw.timestampsWriter.init(timestampsWriter) - sw.fieldValuesWriter.init(fieldValuesWriter) - sw.fieldBloomFilterWriter.init(fieldBloomFilterWriter) - sw.messageValuesWriter.init(messageValuesWriter) - sw.messageBloomFilterWriter.init(messageBloomFilterWriter) + + sw.messageBloomValuesWriter.init(messageBloomValuesWriter) + for i := range sw.bloomValuesShards[:] { + sw.bloomValuesShards[i].init(bloomValuesShards[i]) + } } func (sw *streamWriters) totalBytesWritten() uint64 { n := uint64(0) + + n += sw.columnNamesWriter.bytesWritten n += sw.metaindexWriter.bytesWritten n += sw.indexWriter.bytesWritten + n += sw.columnsHeaderIndexWriter.bytesWritten n += sw.columnsHeaderWriter.bytesWritten n += sw.timestampsWriter.bytesWritten - n += sw.fieldValuesWriter.bytesWritten - n += sw.fieldBloomFilterWriter.bytesWritten - n += sw.messageValuesWriter.bytesWritten - n += sw.messageBloomFilterWriter.bytesWritten + + n += sw.messageBloomValuesWriter.totalBytesWritten() + for i := range sw.bloomValuesShards[:] { + n += sw.bloomValuesShards[i].totalBytesWritten() + } + return n } func (sw *streamWriters) MustClose() { + sw.columnNamesWriter.MustClose() sw.metaindexWriter.MustClose() sw.indexWriter.MustClose() + sw.columnsHeaderIndexWriter.MustClose() sw.columnsHeaderWriter.MustClose() sw.timestampsWriter.MustClose() - sw.fieldValuesWriter.MustClose() - sw.fieldBloomFilterWriter.MustClose() - sw.messageValuesWriter.MustClose() - sw.messageBloomFilterWriter.MustClose() + + sw.messageBloomValuesWriter.MustClose() + for i := range sw.bloomValuesShards[:] { + sw.bloomValuesShards[i].MustClose() + } +} + +func (sw *streamWriters) getBloomValuesWriterForColumnName(name string) *bloomValuesWriter { + if name == "" { + return &sw.messageBloomValuesWriter + } + + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx := h % uint64(len(sw.bloomValuesShards)) + return &sw.bloomValuesShards[idx] } // blockStreamWriter is used for writing blocks into the underlying storage in streaming manner. @@ -148,6 +203,9 @@ type blockStreamWriter struct { // indexBlockHeader is used for marshaling the data to metaindexData indexBlockHeader indexBlockHeader + + // columnNameIDGenerator is used for generating columnName->id mapping for all the columns seen in bsw + columnNameIDGenerator columnNameIDGenerator } // reset resets bsw for subsequent re-use. @@ -175,12 +233,22 @@ func (bsw *blockStreamWriter) reset() { } bsw.indexBlockHeader.reset() + + bsw.columnNameIDGenerator.reset() } // MustInitForInmemoryPart initializes bsw from mp func (bsw *blockStreamWriter) MustInitForInmemoryPart(mp *inmemoryPart) { bsw.reset() - bsw.streamWriters.init(&mp.metaindex, &mp.index, &mp.columnsHeader, &mp.timestamps, &mp.fieldValues, &mp.fieldBloomFilter, &mp.messageValues, &mp.messageBloomFilter) + + messageBloomValues := mp.messageBloomValues.NewStreamWriter() + + var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter + for i := range bloomValuesShards[:] { + bloomValuesShards[i] = mp.bloomValuesShards[i].NewStreamWriter() + } + + bsw.streamWriters.init(&mp.columnNames, &mp.metaindex, &mp.index, &mp.columnsHeaderIndex, &mp.columnsHeader, &mp.timestamps, messageBloomValues, bloomValuesShards) } // MustInitForFilePart initializes bsw for writing data to file part located at path. @@ -191,28 +259,43 @@ func (bsw *blockStreamWriter) MustInitForFilePart(path string, nocache bool) { fs.MustMkdirFailIfExist(path) + columnNamesPath := filepath.Join(path, columnNamesFilename) metaindexPath := filepath.Join(path, metaindexFilename) indexPath := filepath.Join(path, indexFilename) + columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename) columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) timestampsPath := filepath.Join(path, timestampsFilename) - fieldValuesPath := filepath.Join(path, fieldValuesFilename) - fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename) - messageValuesPath := filepath.Join(path, messageValuesFilename) - messageBloomFilterPath := filepath.Join(path, messageBloomFilename) - // Always cache metaindex file, since it it re-read immediately after part creation + // Always cache columnNames files, since it is re-read immediately after part creation + columnNamesWriter := filestream.MustCreate(columnNamesPath, false) + + // Always cache metaindex file, since it is re-read immediately after part creation metaindexWriter := filestream.MustCreate(metaindexPath, false) indexWriter := filestream.MustCreate(indexPath, nocache) + columnsHeaderIndexWriter := filestream.MustCreate(columnsHeaderIndexPath, nocache) columnsHeaderWriter := filestream.MustCreate(columnsHeaderPath, nocache) timestampsWriter := filestream.MustCreate(timestampsPath, nocache) - fieldValuesWriter := filestream.MustCreate(fieldValuesPath, nocache) - fieldBloomFilterWriter := filestream.MustCreate(fieldBloomFilterPath, nocache) - messageValuesWriter := filestream.MustCreate(messageValuesPath, nocache) - messageBloomFilterWriter := filestream.MustCreate(messageBloomFilterPath, nocache) - bsw.streamWriters.init(metaindexWriter, indexWriter, columnsHeaderWriter, timestampsWriter, - fieldValuesWriter, fieldBloomFilterWriter, messageValuesWriter, messageBloomFilterWriter) + messageBloomFilterPath := filepath.Join(path, messageBloomFilename) + messageValuesPath := filepath.Join(path, messageValuesFilename) + messageBloomValuesWriter := bloomValuesStreamWriter{ + bloom: filestream.MustCreate(messageBloomFilterPath, nocache), + values: filestream.MustCreate(messageValuesPath, nocache), + } + + var bloomValuesShards [bloomValuesShardsCount]bloomValuesStreamWriter + for i := range bloomValuesShards[:] { + shard := &bloomValuesShards[i] + + bloomPath := getBloomFilePath(path, uint64(i)) + shard.bloom = filestream.MustCreate(bloomPath, nocache) + + valuesPath := getValuesFilePath(path, uint64(i)) + shard.values = filestream.MustCreate(valuesPath, nocache) + } + + bsw.streamWriters.init(columnNamesWriter, metaindexWriter, indexWriter, columnsHeaderIndexWriter, columnsHeaderWriter, timestampsWriter, messageBloomValuesWriter, bloomValuesShards) } // MustWriteRows writes timestamps with rows under the given sid to bsw. @@ -266,9 +349,9 @@ func (bsw *blockStreamWriter) mustWriteBlockInternal(sid *streamID, b *block, bd bh := getBlockHeader() if b != nil { - b.mustWriteTo(sid, bh, &bsw.streamWriters) + b.mustWriteTo(sid, bh, &bsw.streamWriters, &bsw.columnNameIDGenerator) } else { - bd.mustWriteTo(bh, &bsw.streamWriters) + bd.mustWriteTo(bh, &bsw.streamWriters, &bsw.columnNameIDGenerator) } th := &bh.timestampsHeader if bsw.globalRowsCount == 0 || th.minTimestamp < bsw.globalMinTimestamp { @@ -318,6 +401,7 @@ func (bsw *blockStreamWriter) mustFlushIndexBlock(data []byte) { // // bsw can be re-used after calling Finalize(). func (bsw *blockStreamWriter) Finalize(ph *partHeader) { + ph.FormatVersion = partFormatLatestVersion ph.UncompressedSizeBytes = bsw.globalUncompressedSizeBytes ph.RowsCount = bsw.globalRowsCount ph.BlocksCount = bsw.globalBlocksCount @@ -326,13 +410,11 @@ func (bsw *blockStreamWriter) Finalize(ph *partHeader) { bsw.mustFlushIndexBlock(bsw.indexBlockData) + // Write columnNames data + mustWriteColumnNames(&bsw.streamWriters.columnNamesWriter, bsw.columnNameIDGenerator.columnNames) + // Write metaindex data - bb := longTermBufPool.Get() - bb.B = encoding.CompressZSTDLevel(bb.B[:0], bsw.metaindexData, 1) - bsw.streamWriters.metaindexWriter.MustWrite(bb.B) - if len(bb.B) < 1024*1024 { - longTermBufPool.Put(bb) - } + mustWriteIndexBlockHeaders(&bsw.streamWriters.metaindexWriter, bsw.metaindexData) ph.CompressedSizeBytes = bsw.streamWriters.totalBytesWritten() diff --git a/lib/logstorage/bloomfilter.go b/lib/logstorage/bloomfilter.go index e012ac145..3a7d9034a 100644 --- a/lib/logstorage/bloomfilter.go +++ b/lib/logstorage/bloomfilter.go @@ -18,10 +18,19 @@ const bloomFilterHashesCount = 6 // bloomFilterBitsPerItem is the number of bits to use per each token. const bloomFilterBitsPerItem = 16 -// bloomFilterMarshal appends marshaled bloom filter for tokens to dst and returns the result. -func bloomFilterMarshal(dst []byte, tokens []string) []byte { +// bloomFilterMarshalTokens appends marshaled bloom filter for tokens to dst and returns the result. +func bloomFilterMarshalTokens(dst []byte, tokens []string) []byte { bf := getBloomFilter() - bf.mustInit(tokens) + bf.mustInitTokens(tokens) + dst = bf.marshal(dst) + putBloomFilter(bf) + return dst +} + +// bloomFilterMarshalHashes appends marshaled bloom filter for hashes to dst and returns the result. +func bloomFilterMarshalHashes(dst []byte, hashes []uint64) []byte { + bf := getBloomFilter() + bf.mustInitHashes(hashes) dst = bf.marshal(dst) putBloomFilter(bf) return dst @@ -61,23 +70,45 @@ func (bf *bloomFilter) unmarshal(src []byte) error { return nil } -// mustInit initializes bf with the given tokens -func (bf *bloomFilter) mustInit(tokens []string) { +// mustInitTokens initializes bf with the given tokens +func (bf *bloomFilter) mustInitTokens(tokens []string) { bitsCount := len(tokens) * bloomFilterBitsPerItem wordsCount := (bitsCount + 63) / 64 bits := slicesutil.SetLength(bf.bits, wordsCount) - bloomFilterAdd(bits, tokens) + bloomFilterAddTokens(bits, tokens) bf.bits = bits } -// bloomFilterAdd adds the given tokens to the bloom filter bits -func bloomFilterAdd(bits []uint64, tokens []string) { +// mustInitHashes initializes bf with the given hashes +func (bf *bloomFilter) mustInitHashes(hashes []uint64) { + bitsCount := len(hashes) * bloomFilterBitsPerItem + wordsCount := (bitsCount + 63) / 64 + bits := slicesutil.SetLength(bf.bits, wordsCount) + bloomFilterAddHashes(bits, hashes) + bf.bits = bits +} + +// bloomFilterAddTokens adds the given tokens to the bloom filter bits +func bloomFilterAddTokens(bits []uint64, tokens []string) { hashesCount := len(tokens) * bloomFilterHashesCount a := encoding.GetUint64s(hashesCount) a.A = appendTokensHashes(a.A[:0], tokens) + initBloomFilter(bits, a.A) + encoding.PutUint64s(a) +} +// bloomFilterAddHashes adds the given haehs to the bloom filter bits +func bloomFilterAddHashes(bits, hashes []uint64) { + hashesCount := len(hashes) * bloomFilterHashesCount + a := encoding.GetUint64s(hashesCount) + a.A = appendHashesHashes(a.A[:0], hashes) + initBloomFilter(bits, a.A) + encoding.PutUint64s(a) +} + +func initBloomFilter(bits, hashes []uint64) { maxBits := uint64(len(bits)) * 64 - for _, h := range a.A { + for _, h := range hashes { idx := h % maxBits i := idx / 64 j := idx % 64 @@ -87,8 +118,6 @@ func bloomFilterAdd(bits []uint64, tokens []string) { bits[i] = w | mask } } - - encoding.PutUint64s(a) } // appendTokensHashes appends hashes for the given tokens to dst and returns the result. @@ -114,6 +143,29 @@ func appendTokensHashes(dst []uint64, tokens []string) []uint64 { return dst } +// appendHashesHashes appends hashes for the given hashes to dst and returns the result. +// +// the appended hashes can be then passed to bloomFilter.containsAll(). +func appendHashesHashes(dst, hashes []uint64) []uint64 { + dstLen := len(dst) + hashesCount := len(hashes) * bloomFilterHashesCount + + dst = slicesutil.SetLength(dst, dstLen+hashesCount) + dst = dst[:dstLen] + + var buf [8]byte + hp := (*uint64)(unsafe.Pointer(&buf[0])) + for _, h := range hashes { + *hp = h + for i := 0; i < bloomFilterHashesCount; i++ { + h := xxhash.Sum64(buf[:]) + (*hp)++ + dst = append(dst, h) + } + } + return dst +} + // containsAll returns true if bf contains all the given tokens hashes generated by appendTokensHashes. func (bf *bloomFilter) containsAll(hashes []uint64) bool { bits := bf.bits diff --git a/lib/logstorage/bloomfilter_test.go b/lib/logstorage/bloomfilter_test.go index 4242b4890..083e745a6 100644 --- a/lib/logstorage/bloomfilter_test.go +++ b/lib/logstorage/bloomfilter_test.go @@ -8,10 +8,16 @@ import ( func TestBloomFilter(t *testing.T) { f := func(tokens []string) { t.Helper() - data := bloomFilterMarshal(nil, tokens) + dataTokens := bloomFilterMarshalTokens(nil, tokens) + hashes := tokenizeHashes(nil, tokens) + dataHashes := bloomFilterMarshalHashes(nil, hashes) + if string(dataTokens) != string(dataHashes) { + t.Fatalf("unexpected marshaled bloom filters from hashes\ngot\n%X\nwant\n%X", dataHashes, dataTokens) + } + bf := getBloomFilter() defer putBloomFilter(bf) - if err := bf.unmarshal(data); err != nil { + if err := bf.unmarshal(dataTokens); err != nil { t.Fatalf("unexpected error when unmarshaling bloom filter: %s", err) } tokensHashes := appendTokensHashes(nil, tokens) @@ -57,7 +63,7 @@ func TestBloomFilterFalsePositive(t *testing.T) { for i := range tokens { tokens[i] = fmt.Sprintf("token_%d", i) } - data := bloomFilterMarshal(nil, tokens) + data := bloomFilterMarshalTokens(nil, tokens) bf := getBloomFilter() defer putBloomFilter(bf) if err := bf.unmarshal(data); err != nil { @@ -79,3 +85,35 @@ func TestBloomFilterFalsePositive(t *testing.T) { t.Fatalf("too high false positive rate; got %.4f; want %.4f max", p, maxFalsePositive) } } + +func TestBloomFilterMarshal_TokensVSHashes(t *testing.T) { + tokens := make([]string, 100) + for i := range tokens { + tokens[i] = fmt.Sprintf("token_%d", i) + } + + dataTokens := bloomFilterMarshalTokens(nil, tokens) + + hashes := tokenizeHashes(nil, tokens) + dataHashes := bloomFilterMarshalHashes(nil, hashes) + + if string(dataTokens) != string(dataHashes) { + t.Fatalf("unexpected bloom filter obtained from hashes\ngot\n%X\nwant\n%X", dataHashes, dataTokens) + } +} + +func TestBloomFilterMarshalTokens(t *testing.T) { + f := func(tokens []string, resultExpected string) { + t.Helper() + + result := bloomFilterMarshalTokens(nil, tokens) + if string(result) != resultExpected { + t.Fatalf("unexpected result\ngot\n%X\nwant\n%X", result, resultExpected) + } + } + + f([]string{}, "") + f([]string{"foo"}, "\x00\x00\x00\x82\x40\x18\x00\x04") + f([]string{"foo", "bar", "baz"}, "\x00\x00\x81\xA3\x48\x5C\x10\x26") + f([]string{"foo", "bar", "baz", "foo"}, "\x00\x00\x81\xA3\x48\x5C\x10\x26") +} diff --git a/lib/logstorage/column_names.go b/lib/logstorage/column_names.go new file mode 100644 index 000000000..e11fab68a --- /dev/null +++ b/lib/logstorage/column_names.go @@ -0,0 +1,127 @@ +package logstorage + +import ( + "fmt" + "io" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +func mustWriteColumnNames(w *writerWithStats, columnNames []string) { + data := marshalColumnNames(nil, columnNames) + w.MustWrite(data) +} + +func mustReadColumnNames(r filestream.ReadCloser) ([]string, map[string]uint64) { + src, err := io.ReadAll(r) + if err != nil { + logger.Panicf("FATAL: %s: cannot read colum names: %s", r.Path(), err) + } + + columnNames, err := unmarshalColumnNames(src) + if err != nil { + logger.Panicf("FATAL: %s: %s", r.Path(), err) + } + + columnNameIDs, err := getColumnNameIDs(columnNames) + if err != nil { + logger.Panicf("BUG: %s: %s; columnNames=%v", r.Path(), err, columnNameIDs) + } + + return columnNames, columnNameIDs +} + +func getColumnNameIDs(columnNames []string) (map[string]uint64, error) { + m := make(map[uint64]string, len(columnNames)) + columnNameIDs := make(map[string]uint64, len(columnNames)) + for i, name := range columnNames { + id := uint64(i) + if prevName, ok := m[id]; ok { + return nil, fmt.Errorf("duplicate column name id=%d for columns %q and %q", id, prevName, name) + } + m[id] = name + columnNameIDs[name] = id + } + + return columnNameIDs, nil +} + +func marshalColumnNames(dst []byte, columnNames []string) []byte { + data := encoding.MarshalVarUint64(nil, uint64(len(columnNames))) + + for _, name := range columnNames { + data = encoding.MarshalBytes(data, bytesutil.ToUnsafeBytes(name)) + } + + dst = encoding.CompressZSTDLevel(dst, data, 1) + + return dst +} + +func unmarshalColumnNames(src []byte) ([]string, error) { + data, err := encoding.DecompressZSTD(nil, src) + if err != nil { + return nil, fmt.Errorf("cannot decompress column names from len(src)=%d: %w", len(src), err) + } + src = data + + n, nBytes := encoding.UnmarshalVarUint64(src) + if nBytes <= 0 { + return nil, fmt.Errorf("cannot parse the number of column names for len(src)=%d", len(src)) + } + src = src[nBytes:] + + m := make(map[string]uint64, n) + columnNames := make([]string, n) + for id := uint64(0); id < n; id++ { + name, nBytes := encoding.UnmarshalBytes(src) + if nBytes <= 0 { + return nil, fmt.Errorf("cannot parse colum name number %d out of %d", id, n) + } + src = src[nBytes:] + + if idPrev, ok := m[string(name)]; ok { + return nil, fmt.Errorf("duplicate ids for column name %q: %d and %d", name, idPrev, id) + } + + m[string(name)] = id + columnNames[id] = string(name) + } + + if len(src) > 0 { + return nil, fmt.Errorf("unexpected non-empty tail left after unmarshaling column name ids; len(tail)=%d", len(src)) + } + + return columnNames, nil +} + +type columnNameIDGenerator struct { + // columnNameIDs contains columnName->id mapping for already seen columns + columnNameIDs map[string]uint64 + + // columnNames contains id->columnName mapping for already seen columns + columnNames []string +} + +func (g *columnNameIDGenerator) reset() { + g.columnNameIDs = nil + g.columnNames = nil +} + +func (g *columnNameIDGenerator) getColumnNameID(name string) uint64 { + id, ok := g.columnNameIDs[name] + if !ok { + if g.columnNameIDs == nil { + g.columnNameIDs = make(map[string]uint64) + } + id = uint64(len(g.columnNames)) + nameCopy := strings.Clone(name) + g.columnNameIDs[nameCopy] = id + g.columnNames = append(g.columnNames, nameCopy) + } + return id +} diff --git a/lib/logstorage/column_names_test.go b/lib/logstorage/column_names_test.go new file mode 100644 index 000000000..f034d20d3 --- /dev/null +++ b/lib/logstorage/column_names_test.go @@ -0,0 +1,54 @@ +package logstorage + +import ( + "reflect" + "testing" +) + +func TestMarshalUnmarshalColumnNames(t *testing.T) { + f := func(columnNames []string) { + t.Helper() + + data := marshalColumnNames(nil, columnNames) + result, err := unmarshalColumnNames(data) + if err != nil { + t.Fatalf("unexpected error when unmarshaling columnNames: %s", err) + } + if !reflect.DeepEqual(columnNames, result) { + t.Fatalf("unexpected umarshaled columnNames\ngot\n%v\nwant\n%v", result, columnNames) + } + } + + f([]string{}) + + f([]string{"", "foo", "bar"}) + + f([]string{ + "asdf.sdf.dsfds.f fds. fds ", + "foo", + "bar.sdfsdf.fd", + "", + "aso apaa", + }) +} + +func TestColumnNameIDGenerator(t *testing.T) { + a := []string{"", "foo", "bar.baz", "asdf dsf dfs"} + + g := &columnNameIDGenerator{} + + for i, s := range a { + id := g.getColumnNameID(s) + if id != uint64(i) { + t.Fatalf("first run: unexpected id generated for s=%q; got %d; want %d; g=%v", s, id, i, g) + } + } + + // Repeat the loop + for i, s := range a { + id := g.getColumnNameID(s) + if id != uint64(i) { + t.Fatalf("second run: unexpected id generated for s=%q; got %d; want %d; g=%v", s, id, i, g) + } + } +} diff --git a/lib/logstorage/consts.go b/lib/logstorage/consts.go index e20dc5502..49a38470d 100644 --- a/lib/logstorage/consts.go +++ b/lib/logstorage/consts.go @@ -1,5 +1,15 @@ package logstorage +// partFormatLatestVersion is the latest format version for parts. +// +// See partHeader.FormatVersion for details. +const partFormatLatestVersion = 1 + +// bloomValuesShardsCount is the number of shards for bloomFilename and valuesFilename files. +// +// The partHeader.FormatVersion must be updated when this number changes. +const bloomValuesShardsCount = 8 + // maxUncompressedIndexBlockSize contains the maximum length of uncompressed block with blockHeader entries aka index block. // // The real block length can exceed this value by a small percentage because of the block write details. @@ -46,6 +56,9 @@ const maxBloomFilterBlockSize = 8 * 1024 * 1024 // maxColumnsHeaderSize is the maximum size of columnsHeader block const maxColumnsHeaderSize = 8 * 1024 * 1024 +// maxColumnsHeaderIndexSize is the maximum size of columnsHeaderIndex block +const maxColumnsHeaderIndexSize = 8 * 1024 * 1024 + // maxDictSizeBytes is the maximum length of all the keys in the valuesDict. // // Dict is stored in columnsHeader, which is read every time the corresponding block is scanned during search qieries. diff --git a/lib/logstorage/filenames.go b/lib/logstorage/filenames.go index cd4ac99c5..909286ba2 100644 --- a/lib/logstorage/filenames.go +++ b/lib/logstorage/filenames.go @@ -1,14 +1,18 @@ package logstorage const ( - metaindexFilename = "metaindex.bin" - indexFilename = "index.bin" - columnsHeaderFilename = "columns_header.bin" - timestampsFilename = "timestamps.bin" - fieldValuesFilename = "field_values.bin" - fieldBloomFilename = "field_bloom.bin" - messageValuesFilename = "message_values.bin" - messageBloomFilename = "message_bloom.bin" + columnNamesFilename = "column_names.bin" + metaindexFilename = "metaindex.bin" + indexFilename = "index.bin" + columnsHeaderIndexFilename = "columns_header_index.bin" + columnsHeaderFilename = "columns_header.bin" + timestampsFilename = "timestamps.bin" + oldValuesFilename = "field_values.bin" + oldBloomFilename = "field_bloom.bin" + valuesFilename = "values.bin" + bloomFilename = "bloom.bin" + messageValuesFilename = "message_values.bin" + messageBloomFilename = "message_bloom.bin" metadataFilename = "metadata.json" partsFilename = "parts.json" diff --git a/lib/logstorage/hash_tokenizer.go b/lib/logstorage/hash_tokenizer.go new file mode 100644 index 000000000..c693208fa --- /dev/null +++ b/lib/logstorage/hash_tokenizer.go @@ -0,0 +1,180 @@ +package logstorage + +import ( + "sync" + + "github.com/cespare/xxhash/v2" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// tokenizeHashes extracts word tokens from a, hashes them, appends hashes to dst and returns the result. +// +// The returned hashes can be used for building bloom filters. +func tokenizeHashes(dst []uint64, a []string) []uint64 { + t := getHashTokenizer() + for i, s := range a { + if i > 0 && s == a[i-1] { + // This string has been already tokenized + continue + } + dst = t.tokenizeString(dst, s) + } + putHashTokenizer(t) + + return dst +} + +const hashTokenizerBucketsCount = 1024 + +type hashTokenizer struct { + buckets [hashTokenizerBucketsCount]hashTokenizerBucket + bm bitmap +} + +type hashTokenizerBucket struct { + v uint64 + overflow []uint64 +} + +func (b *hashTokenizerBucket) reset() { + // do not spend CPU time on clearing v and b.overflow items, + // since they'll be overwritten with new items. + b.overflow = b.overflow[:0] +} + +func newHashTokenizer() *hashTokenizer { + var t hashTokenizer + t.bm.init(len(t.buckets)) + return &t +} + +func (t *hashTokenizer) reset() { + if t.bm.onesCount() <= len(t.buckets)/4 { + t.bm.forEachSetBit(func(idx int) bool { + t.buckets[idx].reset() + return false + }) + } else { + buckets := t.buckets[:] + for i := range buckets { + buckets[i].reset() + } + t.bm.init(len(t.buckets)) + } +} + +func (t *hashTokenizer) tokenizeString(dst []uint64, s string) []uint64 { + if !isASCII(s) { + // Slow path - s contains unicode chars + return t.tokenizeStringUnicode(dst, s) + } + + // Fast path for ASCII s + i := 0 + for i < len(s) { + // Search for the next token. + start := len(s) + for i < len(s) { + if !isTokenChar(s[i]) { + i++ + continue + } + start = i + i++ + break + } + // Search for the end of the token. + end := len(s) + for i < len(s) { + if isTokenChar(s[i]) { + i++ + continue + } + end = i + i++ + break + } + if end <= start { + break + } + + // Register the token. + token := s[start:end] + if h, ok := t.addToken(token); ok { + dst = append(dst, h) + } + } + return dst +} + +func (t *hashTokenizer) tokenizeStringUnicode(dst []uint64, s string) []uint64 { + for len(s) > 0 { + // Search for the next token. + n := len(s) + for offset, r := range s { + if isTokenRune(r) { + n = offset + break + } + } + s = s[n:] + // Search for the end of the token. + n = len(s) + for offset, r := range s { + if !isTokenRune(r) { + n = offset + break + } + } + if n == 0 { + break + } + + // Register the token + token := s[:n] + s = s[n:] + if h, ok := t.addToken(token); ok { + dst = append(dst, h) + } + } + return dst +} + +func (t *hashTokenizer) addToken(token string) (uint64, bool) { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(token)) + idx := int(h % uint64(len(t.buckets))) + + b := &t.buckets[idx] + if !t.bm.isSetBit(idx) { + b.v = h + t.bm.setBit(idx) + return h, true + } + + if b.v == h { + return h, false + } + for _, v := range b.overflow { + if v == h { + return h, false + } + } + b.overflow = append(b.overflow, h) + return h, true +} + +func getHashTokenizer() *hashTokenizer { + v := hashTokenizerPool.Get() + if v == nil { + return newHashTokenizer() + } + return v.(*hashTokenizer) +} + +func putHashTokenizer(t *hashTokenizer) { + t.reset() + hashTokenizerPool.Put(t) +} + +var hashTokenizerPool sync.Pool diff --git a/lib/logstorage/hash_tokenizer_test.go b/lib/logstorage/hash_tokenizer_test.go new file mode 100644 index 000000000..ff18c0558 --- /dev/null +++ b/lib/logstorage/hash_tokenizer_test.go @@ -0,0 +1,24 @@ +package logstorage + +import ( + "reflect" + "testing" +) + +func TestTokenizeHashes(t *testing.T) { + f := func(a []string, hashesExpected []uint64) { + t.Helper() + hashes := tokenizeHashes(nil, a) + if !reflect.DeepEqual(hashes, hashesExpected) { + t.Fatalf("unexpected hashes\ngot\n%X\nwant\n%X", hashes, hashesExpected) + } + } + + f(nil, nil) + f([]string{""}, nil) + f([]string{"foo"}, []uint64{0x33BF00A859C4BA3F}) + f([]string{"foo foo", "!!foo //"}, []uint64{0x33BF00A859C4BA3F}) + f([]string{"foo bar---.!!([baz]!!! %$# TaSte"}, []uint64{0x33BF00A859C4BA3F, 0x48A37C90AD27A659, 0x42598CF26A247404, 0x34709F40A3286E46}) + f([]string{"foo bar---.!!([baz]!!! %$# baz foo TaSte"}, []uint64{0x33BF00A859C4BA3F, 0x48A37C90AD27A659, 0x42598CF26A247404, 0x34709F40A3286E46}) + f([]string{"теСТ 1234 f12.34", "34 f12 AS"}, []uint64{0xFE846FA145CEABD1, 0xD8316E61D84F6BA4, 0x6D67BA71C4E03D10, 0x5E8D522CA93563ED, 0xED80AED10E029FC8}) +} diff --git a/lib/logstorage/hash_tokenizer_timing_test.go b/lib/logstorage/hash_tokenizer_timing_test.go new file mode 100644 index 000000000..b9a930031 --- /dev/null +++ b/lib/logstorage/hash_tokenizer_timing_test.go @@ -0,0 +1,19 @@ +package logstorage + +import ( + "strings" + "testing" +) + +func BenchmarkTokenizeHashes(b *testing.B) { + a := strings.Split(benchLogs, "\n") + + b.ReportAllocs() + b.SetBytes(int64(len(benchLogs))) + b.RunParallel(func(pb *testing.PB) { + var hashes []uint64 + for pb.Next() { + hashes = tokenizeHashes(hashes[:0], a) + } + }) +} diff --git a/lib/logstorage/index_block_header.go b/lib/logstorage/index_block_header.go index c0654b10b..e22bfba9e 100644 --- a/lib/logstorage/index_block_header.go +++ b/lib/logstorage/index_block_header.go @@ -110,25 +110,36 @@ func (ih *indexBlockHeader) unmarshal(src []byte) ([]byte, error) { return src[32:], nil } +// mustWriteIndexBlockHeaders writes metaindexData to w. +func mustWriteIndexBlockHeaders(w *writerWithStats, metaindexData []byte) { + bb := longTermBufPool.Get() + bb.B = encoding.CompressZSTDLevel(bb.B[:0], metaindexData, 1) + w.MustWrite(bb.B) + if len(bb.B) < 1024*1024 { + longTermBufPool.Put(bb) + } +} + // mustReadIndexBlockHeaders reads indexBlockHeader entries from r, appends them to dst and returns the result. func mustReadIndexBlockHeaders(dst []indexBlockHeader, r *readerWithStats) []indexBlockHeader { data, err := io.ReadAll(r) if err != nil { - logger.Panicf("FATAL: cannot read indexBlockHeader entries from %s: %s", r.Path(), err) + logger.Panicf("FATAL: %s: cannot read indexBlockHeader entries: %s", r.Path(), err) } bb := longTermBufPool.Get() bb.B, err = encoding.DecompressZSTD(bb.B[:0], data) if err != nil { - logger.Panicf("FATAL: cannot decompress indexBlockHeader entries from %s: %s", r.Path(), err) + logger.Panicf("FATAL: %s: cannot decompress indexBlockHeader entries: %s", r.Path(), err) } dst, err = unmarshalIndexBlockHeaders(dst, bb.B) if len(bb.B) < 1024*1024 { longTermBufPool.Put(bb) } if err != nil { - logger.Panicf("FATAL: cannot parse indexBlockHeader entries from %s: %s", r.Path(), err) + logger.Panicf("FATAL: %s: cannot parse indexBlockHeader entries: %s", r.Path(), err) } + return dst } diff --git a/lib/logstorage/inmemory_part.go b/lib/logstorage/inmemory_part.go index 2afd970ec..fd50b7ccd 100644 --- a/lib/logstorage/inmemory_part.go +++ b/lib/logstorage/inmemory_part.go @@ -14,38 +14,62 @@ type inmemoryPart struct { // ph contains partHeader information for the given in-memory part. ph partHeader + columnNames bytesutil.ByteBuffer metaindex bytesutil.ByteBuffer index bytesutil.ByteBuffer + columnsHeaderIndex bytesutil.ByteBuffer columnsHeader bytesutil.ByteBuffer timestamps bytesutil.ByteBuffer - fieldValues bytesutil.ByteBuffer - fieldBloomFilter bytesutil.ByteBuffer - messageValues bytesutil.ByteBuffer - messageBloomFilter bytesutil.ByteBuffer + + messageBloomValues bloomValuesBuffer + bloomValuesShards [bloomValuesShardsCount]bloomValuesBuffer +} + +type bloomValuesBuffer struct { + bloom bytesutil.ByteBuffer + values bytesutil.ByteBuffer +} + +func (b *bloomValuesBuffer) reset() { + b.bloom.Reset() + b.values.Reset() +} + +func (b *bloomValuesBuffer) NewStreamReader() bloomValuesStreamReader { + return bloomValuesStreamReader{ + bloom: b.bloom.NewReader(), + values: b.values.NewReader(), + } +} + +func (b *bloomValuesBuffer) NewStreamWriter() bloomValuesStreamWriter { + return bloomValuesStreamWriter{ + bloom: &b.bloom, + values: &b.values, + } } // reset resets mp, so it can be re-used func (mp *inmemoryPart) reset() { mp.ph.reset() + mp.columnNames.Reset() mp.metaindex.Reset() mp.index.Reset() + mp.columnsHeaderIndex.Reset() mp.columnsHeader.Reset() mp.timestamps.Reset() - mp.fieldValues.Reset() - mp.fieldBloomFilter.Reset() - mp.messageValues.Reset() - mp.messageBloomFilter.Reset() + + mp.messageBloomValues.reset() + for i := range mp.bloomValuesShards[:] { + mp.bloomValuesShards[i].reset() + } } // mustInitFromRows initializes mp from lr. func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) { mp.reset() - if len(lr.timestamps) == 0 { - return - } - sort.Sort(lr) bsw := getBlockStreamWriter() @@ -75,6 +99,7 @@ func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) { } bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows) putTmpRows(trs) + bsw.Finalize(&mp.ph) putBlockStreamWriter(bsw) } @@ -83,23 +108,34 @@ func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) { func (mp *inmemoryPart) MustStoreToDisk(path string) { fs.MustMkdirFailIfExist(path) + columnNamesPath := filepath.Join(path, columnNamesFilename) metaindexPath := filepath.Join(path, metaindexFilename) indexPath := filepath.Join(path, indexFilename) + columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename) columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) timestampsPath := filepath.Join(path, timestampsFilename) - fieldValuesPath := filepath.Join(path, fieldValuesFilename) - fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename) messageValuesPath := filepath.Join(path, messageValuesFilename) messageBloomFilterPath := filepath.Join(path, messageBloomFilename) + fs.MustWriteSync(columnNamesPath, mp.columnNames.B) fs.MustWriteSync(metaindexPath, mp.metaindex.B) fs.MustWriteSync(indexPath, mp.index.B) + fs.MustWriteSync(columnsHeaderIndexPath, mp.columnsHeaderIndex.B) fs.MustWriteSync(columnsHeaderPath, mp.columnsHeader.B) fs.MustWriteSync(timestampsPath, mp.timestamps.B) - fs.MustWriteSync(fieldValuesPath, mp.fieldValues.B) - fs.MustWriteSync(fieldBloomFilterPath, mp.fieldBloomFilter.B) - fs.MustWriteSync(messageValuesPath, mp.messageValues.B) - fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomFilter.B) + + fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomValues.bloom.B) + fs.MustWriteSync(messageValuesPath, mp.messageBloomValues.values.B) + + for i := range mp.bloomValuesShards[:] { + shard := &mp.bloomValuesShards[i] + + bloomPath := getBloomFilePath(path, uint64(i)) + fs.MustWriteSync(bloomPath, shard.bloom.B) + + valuesPath := getValuesFilePath(path, uint64(i)) + fs.MustWriteSync(valuesPath, shard.values.B) + } mp.ph.mustWriteMetadata(path) diff --git a/lib/logstorage/inmemory_part_test.go b/lib/logstorage/inmemory_part_test.go index 8bf59eeb5..c2f62c99c 100644 --- a/lib/logstorage/inmemory_part_test.go +++ b/lib/logstorage/inmemory_part_test.go @@ -75,7 +75,7 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) { f(GetLogRows(nil, nil), 0, 0) // Check how inmemoryPart works with a single stream - f(newTestLogRows(1, 1, 0), 1, 0.8) + f(newTestLogRows(1, 1, 0), 1, 0.7) f(newTestLogRows(1, 2, 0), 1, 0.9) f(newTestLogRows(1, 10, 0), 1, 2.0) f(newTestLogRows(1, 1000, 0), 1, 7.1) @@ -83,9 +83,9 @@ func TestInmemoryPartMustInitFromRows(t *testing.T) { // Check how inmemoryPart works with multiple streams f(newTestLogRows(2, 1, 0), 2, 0.8) - f(newTestLogRows(10, 1, 0), 10, 0.9) - f(newTestLogRows(100, 1, 0), 100, 1.0) - f(newTestLogRows(10, 5, 0), 10, 1.4) + f(newTestLogRows(10, 1, 0), 10, 1.1) + f(newTestLogRows(100, 1, 0), 100, 1.2) + f(newTestLogRows(10, 5, 0), 10, 1.5) f(newTestLogRows(10, 1000, 0), 10, 7.2) f(newTestLogRows(100, 100, 0), 100, 5.0) } @@ -192,14 +192,14 @@ func TestInmemoryPartInitFromBlockStreamReaders(t *testing.T) { f([]*LogRows{GetLogRows(nil, nil), GetLogRows(nil, nil)}, 0, 0) // Check merge with a single reader - f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.8) + f([]*LogRows{newTestLogRows(1, 1, 0)}, 1, 0.7) f([]*LogRows{newTestLogRows(1, 10, 0)}, 1, 2.0) f([]*LogRows{newTestLogRows(1, 100, 0)}, 1, 4.9) f([]*LogRows{newTestLogRows(1, 1000, 0)}, 1, 7.1) f([]*LogRows{newTestLogRows(1, 10000, 0)}, 1, 7.4) - f([]*LogRows{newTestLogRows(10, 1, 0)}, 10, 0.9) - f([]*LogRows{newTestLogRows(100, 1, 0)}, 100, 1.0) - f([]*LogRows{newTestLogRows(1000, 1, 0)}, 1000, 1.0) + f([]*LogRows{newTestLogRows(10, 1, 0)}, 10, 1.1) + f([]*LogRows{newTestLogRows(100, 1, 0)}, 100, 1.3) + f([]*LogRows{newTestLogRows(1000, 1, 0)}, 1000, 1.2) f([]*LogRows{newTestLogRows(10, 10, 0)}, 10, 2.1) f([]*LogRows{newTestLogRows(10, 100, 0)}, 10, 4.9) diff --git a/lib/logstorage/part.go b/lib/logstorage/part.go index 90dce4287..6598ebdbc 100644 --- a/lib/logstorage/part.go +++ b/lib/logstorage/part.go @@ -1,8 +1,12 @@ package logstorage import ( + "fmt" "path/filepath" + "github.com/cespare/xxhash/v2" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" ) @@ -19,16 +23,35 @@ type part struct { // ph contains partHeader for the given part. ph partHeader + // columnNameIDs is a mapping from column names seen in the given part to internal IDs. + // The internal IDs are used in columnHeaderRef. + columnNameIDs map[string]uint64 + + // columnNames is a mapping from internal IDs to column names. + // The internal IDs are used in columnHeaderRef. + columnNames []string + // indexBlockHeaders contains a list of indexBlockHeader entries for the given part. indexBlockHeaders []indexBlockHeader indexFile fs.MustReadAtCloser + columnsHeaderIndexFile fs.MustReadAtCloser columnsHeaderFile fs.MustReadAtCloser timestampsFile fs.MustReadAtCloser - fieldValuesFile fs.MustReadAtCloser - fieldBloomFilterFile fs.MustReadAtCloser - messageValuesFile fs.MustReadAtCloser - messageBloomFilterFile fs.MustReadAtCloser + + messageBloomValues bloomValuesReaderAt + oldBloomValues bloomValuesReaderAt + bloomValuesShards [bloomValuesShardsCount]bloomValuesReaderAt +} + +type bloomValuesReaderAt struct { + bloom fs.MustReadAtCloser + values fs.MustReadAtCloser +} + +func (r *bloomValuesReaderAt) MustClose() { + r.bloom.MustClose() + r.values.MustClose() } func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part { @@ -37,6 +60,10 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part { p.path = "" p.ph = mp.ph + // Read columnNames + columnNamesReader := mp.columnNames.NewReader() + p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader) + // Read metaindex metaindexReader := mp.metaindex.NewReader() var mrs readerWithStats @@ -45,12 +72,19 @@ func mustOpenInmemoryPart(pt *partition, mp *inmemoryPart) *part { // Open data files p.indexFile = &mp.index + p.columnsHeaderIndexFile = &mp.columnsHeaderIndex p.columnsHeaderFile = &mp.columnsHeader p.timestampsFile = &mp.timestamps - p.fieldValuesFile = &mp.fieldValues - p.fieldBloomFilterFile = &mp.fieldBloomFilter - p.messageValuesFile = &mp.messageValues - p.messageBloomFilterFile = &mp.messageBloomFilter + + p.messageBloomValues.bloom = &mp.messageBloomValues.bloom + p.messageBloomValues.values = &mp.messageBloomValues.values + + // Open files with bloom filters and column values + for i := range p.bloomValuesShards[:] { + shard := &p.bloomValuesShards[i] + shard.bloom = &mp.bloomValuesShards[i].bloom + shard.values = &mp.bloomValuesShards[i].values + } return &p } @@ -61,14 +95,21 @@ func mustOpenFilePart(pt *partition, path string) *part { p.path = path p.ph.mustReadMetadata(path) + columnNamesPath := filepath.Join(path, columnNamesFilename) metaindexPath := filepath.Join(path, metaindexFilename) indexPath := filepath.Join(path, indexFilename) + columnsHeaderIndexPath := filepath.Join(path, columnsHeaderIndexFilename) columnsHeaderPath := filepath.Join(path, columnsHeaderFilename) timestampsPath := filepath.Join(path, timestampsFilename) - fieldValuesPath := filepath.Join(path, fieldValuesFilename) - fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename) - messageValuesPath := filepath.Join(path, messageValuesFilename) - messageBloomFilterPath := filepath.Join(path, messageBloomFilename) + + // Read columnNames + if p.ph.FormatVersion >= 1 { + columnNamesReader := filestream.MustOpen(columnNamesPath, true) + var crs readerWithStats + crs.init(columnNamesReader) + p.columnNames, p.columnNameIDs = mustReadColumnNames(columnNamesReader) + crs.MustClose() + } // Read metaindex metaindexReader := filestream.MustOpen(metaindexPath, true) @@ -79,24 +120,78 @@ func mustOpenFilePart(pt *partition, path string) *part { // Open data files p.indexFile = fs.MustOpenReaderAt(indexPath) + if p.ph.FormatVersion >= 1 { + p.columnsHeaderIndexFile = fs.MustOpenReaderAt(columnsHeaderIndexPath) + } p.columnsHeaderFile = fs.MustOpenReaderAt(columnsHeaderPath) p.timestampsFile = fs.MustOpenReaderAt(timestampsPath) - p.fieldValuesFile = fs.MustOpenReaderAt(fieldValuesPath) - p.fieldBloomFilterFile = fs.MustOpenReaderAt(fieldBloomFilterPath) - p.messageValuesFile = fs.MustOpenReaderAt(messageValuesPath) - p.messageBloomFilterFile = fs.MustOpenReaderAt(messageBloomFilterPath) + + // Open files with bloom filters and column values + messageBloomFilterPath := filepath.Join(path, messageBloomFilename) + p.messageBloomValues.bloom = fs.MustOpenReaderAt(messageBloomFilterPath) + + messageValuesPath := filepath.Join(path, messageValuesFilename) + p.messageBloomValues.values = fs.MustOpenReaderAt(messageValuesPath) + + if p.ph.FormatVersion < 1 { + bloomPath := filepath.Join(path, oldBloomFilename) + p.oldBloomValues.bloom = fs.MustOpenReaderAt(bloomPath) + + valuesPath := filepath.Join(path, oldValuesFilename) + p.oldBloomValues.values = fs.MustOpenReaderAt(valuesPath) + } else { + for i := range p.bloomValuesShards[:] { + shard := &p.bloomValuesShards[i] + + bloomPath := getBloomFilePath(path, uint64(i)) + shard.bloom = fs.MustOpenReaderAt(bloomPath) + + valuesPath := getValuesFilePath(path, uint64(i)) + shard.values = fs.MustOpenReaderAt(valuesPath) + } + } return &p } func mustClosePart(p *part) { p.indexFile.MustClose() + if p.ph.FormatVersion >= 1 { + p.columnsHeaderIndexFile.MustClose() + } p.columnsHeaderFile.MustClose() p.timestampsFile.MustClose() - p.fieldValuesFile.MustClose() - p.fieldBloomFilterFile.MustClose() - p.messageValuesFile.MustClose() - p.messageBloomFilterFile.MustClose() + p.messageBloomValues.MustClose() + + if p.ph.FormatVersion < 1 { + p.oldBloomValues.MustClose() + } else { + for i := range p.bloomValuesShards[:] { + p.bloomValuesShards[i].MustClose() + } + } p.pt = nil } + +func (p *part) getBloomValuesFileForColumnName(name string) *bloomValuesReaderAt { + if name == "" { + return &p.messageBloomValues + } + + if p.ph.FormatVersion < 1 { + return &p.oldBloomValues + } + + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(name)) + idx := h % uint64(len(p.bloomValuesShards)) + return &p.bloomValuesShards[idx] +} + +func getBloomFilePath(partPath string, shardNum uint64) string { + return filepath.Join(partPath, bloomFilename) + fmt.Sprintf("%d", shardNum) +} + +func getValuesFilePath(partPath string, shardNum uint64) string { + return filepath.Join(partPath, valuesFilename) + fmt.Sprintf("%d", shardNum) +} diff --git a/lib/logstorage/part_header.go b/lib/logstorage/part_header.go index 65157e5ca..05baf4195 100644 --- a/lib/logstorage/part_header.go +++ b/lib/logstorage/part_header.go @@ -14,6 +14,9 @@ import ( // partHeader contains the information about a single part type partHeader struct { + // FormatVersion is the version of the part format + FormatVersion uint + // CompressedSizeBytes is physical size of the part CompressedSizeBytes uint64 @@ -35,6 +38,7 @@ type partHeader struct { // reset resets ph for subsequent re-use func (ph *partHeader) reset() { + ph.FormatVersion = 0 ph.CompressedSizeBytes = 0 ph.UncompressedSizeBytes = 0 ph.RowsCount = 0 @@ -45,8 +49,8 @@ func (ph *partHeader) reset() { // String returns string represenation for ph. func (ph *partHeader) String() string { - return fmt.Sprintf("{CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}", - ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp)) + return fmt.Sprintf("{FormatVersion=%d, CompressedSizeBytes=%d, UncompressedSizeBytes=%d, RowsCount=%d, BlocksCount=%d, MinTimestamp=%s, MaxTimestamp=%s}", + ph.FormatVersion, ph.CompressedSizeBytes, ph.UncompressedSizeBytes, ph.RowsCount, ph.BlocksCount, timestampToString(ph.MinTimestamp), timestampToString(ph.MaxTimestamp)) } func (ph *partHeader) mustReadMetadata(partPath string) { @@ -62,9 +66,15 @@ func (ph *partHeader) mustReadMetadata(partPath string) { } // Perform various checks + if ph.FormatVersion > partFormatLatestVersion { + logger.Panicf("FATAL: unsupported part format version; got %d; mustn't exceed %d", partFormatLatestVersion) + } if ph.MinTimestamp > ph.MaxTimestamp { logger.Panicf("FATAL: MinTimestamp cannot exceed MaxTimestamp; got %d vs %d", ph.MinTimestamp, ph.MaxTimestamp) } + if ph.BlocksCount > ph.RowsCount { + logger.Panicf("FATAL: BlocksCount=%d cannot exceed RowsCount=%d", ph.BlocksCount, ph.RowsCount) + } } func (ph *partHeader) mustWriteMetadata(partPath string) { diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index ca1638f86..7f70b122e 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -30,30 +30,34 @@ func (f *Field) String() string { return string(x) } -func (f *Field) marshal(dst []byte) []byte { - dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name)) +func (f *Field) marshal(dst []byte, marshalFieldName bool) []byte { + if marshalFieldName { + dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Name)) + } dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(f.Value)) return dst } -func (f *Field) unmarshalNoArena(src []byte) ([]byte, error) { +func (f *Field) unmarshalNoArena(src []byte, unmarshalFieldName bool) ([]byte, error) { srcOrig := src // Unmarshal field name - b, nSize := encoding.UnmarshalBytes(src) - if nSize <= 0 { - return srcOrig, fmt.Errorf("cannot unmarshal field name") + if unmarshalFieldName { + name, nSize := encoding.UnmarshalBytes(src) + if nSize <= 0 { + return srcOrig, fmt.Errorf("cannot unmarshal field name") + } + src = src[nSize:] + f.Name = bytesutil.ToUnsafeString(name) } - src = src[nSize:] - f.Name = bytesutil.ToUnsafeString(b) // Unmarshal field value - b, nSize = encoding.UnmarshalBytes(src) + value, nSize := encoding.UnmarshalBytes(src) if nSize <= 0 { return srcOrig, fmt.Errorf("cannot unmarshal field value") } src = src[nSize:] - f.Value = bytesutil.ToUnsafeString(b) + f.Value = bytesutil.ToUnsafeString(value) return src, nil } diff --git a/lib/logstorage/tokenizer.go b/lib/logstorage/tokenizer.go index fa3ec904f..16f0b5c95 100644 --- a/lib/logstorage/tokenizer.go +++ b/lib/logstorage/tokenizer.go @@ -8,7 +8,7 @@ import ( // tokenizeStrings extracts word tokens from a, appends them to dst and returns the result. // -// the order of returned tokens is unspecified. +// The order of returned tokens equals the order of tokens seen in a. func tokenizeStrings(dst, a []string) []string { t := getTokenizer() for i, s := range a { @@ -145,27 +145,3 @@ func putTokenizer(t *tokenizer) { } var tokenizerPool sync.Pool - -type tokensBuf struct { - A []string -} - -func (tb *tokensBuf) reset() { - clear(tb.A) - tb.A = tb.A[:0] -} - -func getTokensBuf() *tokensBuf { - v := tokensBufPool.Get() - if v == nil { - return &tokensBuf{} - } - return v.(*tokensBuf) -} - -func putTokensBuf(tb *tokensBuf) { - tb.reset() - tokensBufPool.Put(tb) -} - -var tokensBufPool sync.Pool