lib/{storage,mergeset}: do not create index blocks with sizes exceeding 64Kb in common case

This should reduce memory fragmentation and memory usage for indexdb/indexBlocks and storage/indexBlocks caches
This commit is contained in:
Aliaksandr Valialkin 2024-02-08 13:51:06 +02:00
parent 1c3eac5c1e
commit 950b126a09
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 11 additions and 5 deletions

View file

@ -148,12 +148,15 @@ func (bsw *blockStreamWriter) WriteBlock(ib *inmemoryBlock) {
bsw.lensBlockOffset += uint64(bsw.bh.lensBlockSize) bsw.lensBlockOffset += uint64(bsw.bh.lensBlockSize)
// Write blockHeader // Write blockHeader
unpackedIndexBlockBufLen := len(bsw.unpackedIndexBlockBuf)
bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf) bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf)
if len(bsw.unpackedIndexBlockBuf) > maxIndexBlockSize {
bsw.unpackedIndexBlockBuf = bsw.unpackedIndexBlockBuf[:unpackedIndexBlockBufLen]
bsw.flushIndexData()
bsw.unpackedIndexBlockBuf = bsw.bh.Marshal(bsw.unpackedIndexBlockBuf)
}
bsw.bh.Reset() bsw.bh.Reset()
bsw.mr.blockHeadersCount++ bsw.mr.blockHeadersCount++
if len(bsw.unpackedIndexBlockBuf) >= maxIndexBlockSize {
bsw.flushIndexData()
}
} }
// The maximum size of index block with multiple blockHeaders. // The maximum size of index block with multiple blockHeaders.

View file

@ -143,11 +143,14 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM
atomic.AddUint64(&timestampsBlocksMerged, 1) atomic.AddUint64(&timestampsBlocksMerged, 1)
atomic.AddUint64(&timestampsBytesSaved, uint64(len(timestampsData))) atomic.AddUint64(&timestampsBytesSaved, uint64(len(timestampsData)))
} }
indexDataLen := len(bsw.indexData)
bsw.indexData = append(bsw.indexData, headerData...) bsw.indexData = append(bsw.indexData, headerData...)
bsw.mr.RegisterBlockHeader(&b.bh) if len(bsw.indexData) > maxBlockSize {
if len(bsw.indexData) >= maxBlockSize { bsw.indexData = bsw.indexData[:indexDataLen]
bsw.flushIndexData() bsw.flushIndexData()
bsw.indexData = append(bsw.indexData, headerData...)
} }
bsw.mr.RegisterBlockHeader(&b.bh)
if !usePrevTimestamps { if !usePrevTimestamps {
bsw.prevTimestampsData = append(bsw.prevTimestampsData[:0], timestampsData...) bsw.prevTimestampsData = append(bsw.prevTimestampsData[:0], timestampsData...)
bsw.prevTimestampsBlockOffset = bsw.timestampsBlockOffset bsw.prevTimestampsBlockOffset = bsw.timestampsBlockOffset