mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{mergeset,storage}: pass compressLevel to blockStreamWriter.InitFromInmemoryPart
This allows packing in-memory blocks with different compression levels depending on its contents. This may save memory usage.
This commit is contained in:
parent
6d87462f4b
commit
343c69fc15
10 changed files with 29 additions and 28 deletions
|
@ -63,13 +63,10 @@ func (bsw *blockStreamWriter) reset() {
|
|||
bsw.mrFirstItemCaught = false
|
||||
}
|
||||
|
||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
|
||||
bsw.reset()
|
||||
|
||||
// Use the minimum compression level for in-memory blocks,
|
||||
// since they are going to be re-compressed during the merge into file-based blocks.
|
||||
bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||
|
||||
bsw.compressLevel = compressLevel
|
||||
bsw.metaindexWriter = &mp.metaindexData
|
||||
bsw.indexWriter = &mp.indexData
|
||||
bsw.itemsWriter = &mp.itemsData
|
||||
|
|
|
@ -30,14 +30,14 @@ func TestMultilevelMerge(t *testing.T) {
|
|||
// First level merge
|
||||
var dstIP1 inmemoryPart
|
||||
var bsw1 blockStreamWriter
|
||||
bsw1.InitFromInmemoryPart(&dstIP1)
|
||||
bsw1.InitFromInmemoryPart(&dstIP1, -5)
|
||||
if err := mergeBlockStreams(&dstIP1.ph, &bsw1, bsrs[:5], nil, nil, &itemsMerged); err != nil {
|
||||
t.Fatalf("cannot merge first level part 1: %s", err)
|
||||
}
|
||||
|
||||
var dstIP2 inmemoryPart
|
||||
var bsw2 blockStreamWriter
|
||||
bsw2.InitFromInmemoryPart(&dstIP2)
|
||||
bsw2.InitFromInmemoryPart(&dstIP2, -5)
|
||||
if err := mergeBlockStreams(&dstIP2.ph, &bsw2, bsrs[5:], nil, nil, &itemsMerged); err != nil {
|
||||
t.Fatalf("cannot merge first level part 2: %s", err)
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ func TestMultilevelMerge(t *testing.T) {
|
|||
newTestBlockStreamReader(&dstIP1),
|
||||
newTestBlockStreamReader(&dstIP2),
|
||||
}
|
||||
bsw.InitFromInmemoryPart(&dstIP)
|
||||
bsw.InitFromInmemoryPart(&dstIP, 1)
|
||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil {
|
||||
t.Fatalf("cannot merge second level: %s", err)
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||
bsrs, _ := newTestInmemoryBlockStreamReaders(20, 4000)
|
||||
var dstIP inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&dstIP)
|
||||
bsw.InitFromInmemoryPart(&dstIP, 1)
|
||||
ch := make(chan struct{})
|
||||
var itemsMerged uint64
|
||||
close(ch)
|
||||
|
@ -120,7 +120,7 @@ func testMergeBlockStreamsSerial(blocksToMerge, maxItemsPerBlock int) error {
|
|||
var itemsMerged uint64
|
||||
var dstIP inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&dstIP)
|
||||
bsw.InitFromInmemoryPart(&dstIP, -4)
|
||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||
return fmt.Errorf("cannot merge block streams: %w", err)
|
||||
}
|
||||
|
|
|
@ -149,7 +149,7 @@ func newTestPart(blocksCount, maxItemsPerBlock int) (*part, []string, error) {
|
|||
var itemsMerged uint64
|
||||
var ip inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&ip)
|
||||
bsw.InitFromInmemoryPart(&ip, -3)
|
||||
if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot merge blocks: %w", err)
|
||||
}
|
||||
|
|
|
@ -661,6 +661,11 @@ func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
|
|||
atomic.AddUint64(&tb.activeMerges, 1)
|
||||
defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))
|
||||
|
||||
outItemsCount := uint64(0)
|
||||
for _, ib := range ibs {
|
||||
outItemsCount += uint64(ib.Len())
|
||||
}
|
||||
|
||||
// Prepare blockStreamReaders for source blocks.
|
||||
bsrs := make([]*blockStreamReader, 0, len(ibs))
|
||||
for _, ib := range ibs {
|
||||
|
@ -688,9 +693,10 @@ func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
|
|||
}
|
||||
|
||||
// Prepare blockStreamWriter for destination part.
|
||||
compressLevel := getCompressLevel(outItemsCount)
|
||||
bsw := getBlockStreamWriter()
|
||||
mpDst := &inmemoryPart{}
|
||||
bsw.InitFromInmemoryPart(mpDst)
|
||||
bsw.InitFromInmemoryPart(mpDst, compressLevel)
|
||||
|
||||
// Merge parts.
|
||||
// The merge shouldn't be interrupted by stopCh,
|
||||
|
@ -869,7 +875,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||
mergeIdx := tb.nextMergeIdx()
|
||||
tmpPartPath := fmt.Sprintf("%s/tmp/%016X", tb.path, mergeIdx)
|
||||
bsw := getBlockStreamWriter()
|
||||
compressLevel := getCompressLevelForPartItems(outItemsCount, outBlocksCount)
|
||||
compressLevel := getCompressLevel(outItemsCount)
|
||||
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
|
||||
return fmt.Errorf("cannot create destination part %q: %w", tmpPartPath, err)
|
||||
}
|
||||
|
@ -958,9 +964,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||
return nil
|
||||
}
|
||||
|
||||
func getCompressLevelForPartItems(itemsCount, blocksCount uint64) int {
|
||||
// There is no need in using blocksCount here, since mergeset blocks are usually full.
|
||||
|
||||
func getCompressLevel(itemsCount uint64) int {
|
||||
if itemsCount <= 1<<16 {
|
||||
// -5 is the minimum supported compression for zstd.
|
||||
// See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||
|
|
|
@ -80,13 +80,10 @@ func (bsw *blockStreamWriter) reset() {
|
|||
}
|
||||
|
||||
// InitFromInmemoryPart initialzes bsw from inmemory part.
|
||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart) {
|
||||
func (bsw *blockStreamWriter) InitFromInmemoryPart(mp *inmemoryPart, compressLevel int) {
|
||||
bsw.reset()
|
||||
|
||||
// Use the minimum compression level for in-memory blocks,
|
||||
// since they are going to be re-compressed during the merge into file-based blocks.
|
||||
bsw.compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||
|
||||
bsw.compressLevel = compressLevel
|
||||
bsw.timestampsWriter = &mp.timestampsData
|
||||
bsw.valuesWriter = &mp.valuesData
|
||||
bsw.indexWriter = &mp.indexData
|
||||
|
|
|
@ -47,7 +47,7 @@ func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeR
|
|||
}
|
||||
}
|
||||
|
||||
bsw.InitFromInmemoryPart(&mp)
|
||||
bsw.InitFromInmemoryPart(&mp, -5)
|
||||
for i := range ebsCopy {
|
||||
bsw.WriteExternalBlock(&ebsCopy[i], &ph, &rowsMerged)
|
||||
}
|
||||
|
|
|
@ -361,7 +361,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||
|
||||
var mp inmemoryPart
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&mp)
|
||||
bsw.InitFromInmemoryPart(&mp, -5)
|
||||
ch := make(chan struct{})
|
||||
var rowsMerged, rowsDeleted uint64
|
||||
close(ch)
|
||||
|
@ -384,7 +384,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
|
|||
var mp inmemoryPart
|
||||
|
||||
var bsw blockStreamWriter
|
||||
bsw.InitFromInmemoryPart(&mp)
|
||||
bsw.InitFromInmemoryPart(&mp, -5)
|
||||
|
||||
strg := newTestStorage()
|
||||
var rowsMerged, rowsDeleted uint64
|
||||
|
|
|
@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
|
|||
bsrs[i].InitFromInmemoryPart(mp)
|
||||
}
|
||||
mpOut.Reset()
|
||||
bsw.InitFromInmemoryPart(&mpOut)
|
||||
bsw.InitFromInmemoryPart(&mpOut, -5)
|
||||
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
|
||||
panic(fmt.Errorf("cannot merge block streams: %w", err))
|
||||
}
|
||||
|
|
|
@ -1181,7 +1181,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
mergeIdx := pt.nextMergeIdx()
|
||||
tmpPartPath := fmt.Sprintf("%s/tmp/%016X", ptPath, mergeIdx)
|
||||
bsw := getBlockStreamWriter()
|
||||
compressLevel := getCompressLevelForRowsCount(outRowsCount, outBlocksCount)
|
||||
compressLevel := getCompressLevel(outRowsCount, outBlocksCount)
|
||||
if err := bsw.InitFromFilePart(tmpPartPath, nocache, compressLevel); err != nil {
|
||||
return fmt.Errorf("cannot create destination part %q: %w", tmpPartPath, err)
|
||||
}
|
||||
|
@ -1301,7 +1301,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func getCompressLevelForRowsCount(rowsCount, blocksCount uint64) int {
|
||||
func getCompressLevel(rowsCount, blocksCount uint64) int {
|
||||
avgRowsPerBlock := rowsCount / blocksCount
|
||||
// See https://github.com/facebook/zstd/releases/tag/v1.3.4 about negative compression levels.
|
||||
if avgRowsPerBlock <= 10 {
|
||||
|
|
|
@ -86,7 +86,10 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
|
|||
logger.Panicf("BUG: rows count must be smaller than 2^32; got %d", len(rows))
|
||||
}
|
||||
|
||||
rrm.bsw.InitFromInmemoryPart(mp)
|
||||
// Use the minimum compression level for first-level in-memory blocks,
|
||||
// since they are going to be re-compressed during subsequent merges.
|
||||
const compressLevel = -5 // See https://github.com/facebook/zstd/releases/tag/v1.3.4
|
||||
rrm.bsw.InitFromInmemoryPart(mp, compressLevel)
|
||||
|
||||
ph := &mp.ph
|
||||
ph.Reset()
|
||||
|
|
Loading…
Reference in a new issue