From b805a675f31fbe2e58b5fda735aa856000076b81 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 6 Jul 2021 16:28:39 +0300 Subject: [PATCH] lib/{mergeset,storage}: switch from sync.Pool to chan-based pool for inmemoryPart objects This should reduce memory usage on systems with big number of CPU cores, since every inmemoryPart object occupies at least 64KB of memory and sync.Pool maintains a separate pool inmemoryPart objects per each CPU core. Though the new scheme for the pool worsens per-cpu cache locality, this should be amortized by big sizes of inmemoryPart objects. --- lib/mergeset/inmemory_part.go | 114 ++++++++++++++++++---------------- lib/storage/inmemory_part.go | 22 ++++--- 2 files changed, 76 insertions(+), 60 deletions(-) diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index e2f4f02bd8..de1a8078a4 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -1,9 +1,8 @@ package mergeset import ( - "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -27,86 +26,95 @@ type inmemoryPart struct { lensData bytesutil.ByteBuffer } -func (ip *inmemoryPart) Reset() { - ip.ph.Reset() - ip.sb.Reset() - ip.bh.Reset() - ip.mr.Reset() +func (mp *inmemoryPart) Reset() { + mp.ph.Reset() + mp.sb.Reset() + mp.bh.Reset() + mp.mr.Reset() - ip.unpackedIndexBlockBuf = ip.unpackedIndexBlockBuf[:0] - ip.packedIndexBlockBuf = ip.packedIndexBlockBuf[:0] + mp.unpackedIndexBlockBuf = mp.unpackedIndexBlockBuf[:0] + mp.packedIndexBlockBuf = mp.packedIndexBlockBuf[:0] - ip.unpackedMetaindexBuf = ip.unpackedMetaindexBuf[:0] - ip.packedMetaindexBuf = ip.packedMetaindexBuf[:0] + mp.unpackedMetaindexBuf = mp.unpackedMetaindexBuf[:0] + mp.packedMetaindexBuf = mp.packedMetaindexBuf[:0] - ip.metaindexData.Reset() - ip.indexData.Reset() - ip.itemsData.Reset() - ip.lensData.Reset() + mp.metaindexData.Reset() + mp.indexData.Reset() + mp.itemsData.Reset() + mp.lensData.Reset() } -// Init initializes ip from ib. -func (ip *inmemoryPart) Init(ib *inmemoryBlock) { - ip.Reset() +// Init initializes mp from ib. +func (mp *inmemoryPart) Init(ib *inmemoryBlock) { + mp.Reset() // Use the minimum possible compressLevel for compressing inmemoryPart, // since it will be merged into file part soon. compressLevel := 0 - ip.bh.firstItem, ip.bh.commonPrefix, ip.bh.itemsCount, ip.bh.marshalType = ib.MarshalUnsortedData(&ip.sb, ip.bh.firstItem[:0], ip.bh.commonPrefix[:0], compressLevel) + mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(&mp.sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel) - ip.ph.itemsCount = uint64(len(ib.items)) - ip.ph.blocksCount = 1 - ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...) - ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...) + mp.ph.itemsCount = uint64(len(ib.items)) + mp.ph.blocksCount = 1 + mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...) + mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...) - fs.MustWriteData(&ip.itemsData, ip.sb.itemsData) - ip.bh.itemsBlockOffset = 0 - ip.bh.itemsBlockSize = uint32(len(ip.sb.itemsData)) + fs.MustWriteData(&mp.itemsData, mp.sb.itemsData) + mp.bh.itemsBlockOffset = 0 + mp.bh.itemsBlockSize = uint32(len(mp.sb.itemsData)) - fs.MustWriteData(&ip.lensData, ip.sb.lensData) - ip.bh.lensBlockOffset = 0 - ip.bh.lensBlockSize = uint32(len(ip.sb.lensData)) + fs.MustWriteData(&mp.lensData, mp.sb.lensData) + mp.bh.lensBlockOffset = 0 + mp.bh.lensBlockSize = uint32(len(mp.sb.lensData)) - ip.unpackedIndexBlockBuf = ip.bh.Marshal(ip.unpackedIndexBlockBuf[:0]) - ip.packedIndexBlockBuf = encoding.CompressZSTDLevel(ip.packedIndexBlockBuf[:0], ip.unpackedIndexBlockBuf, 0) - fs.MustWriteData(&ip.indexData, ip.packedIndexBlockBuf) + mp.unpackedIndexBlockBuf = mp.bh.Marshal(mp.unpackedIndexBlockBuf[:0]) + mp.packedIndexBlockBuf = encoding.CompressZSTDLevel(mp.packedIndexBlockBuf[:0], mp.unpackedIndexBlockBuf, 0) + fs.MustWriteData(&mp.indexData, mp.packedIndexBlockBuf) - ip.mr.firstItem = append(ip.mr.firstItem[:0], ip.bh.firstItem...) - ip.mr.blockHeadersCount = 1 - ip.mr.indexBlockOffset = 0 - ip.mr.indexBlockSize = uint32(len(ip.packedIndexBlockBuf)) - ip.unpackedMetaindexBuf = ip.mr.Marshal(ip.unpackedMetaindexBuf[:0]) - ip.packedMetaindexBuf = encoding.CompressZSTDLevel(ip.packedMetaindexBuf[:0], ip.unpackedMetaindexBuf, 0) - fs.MustWriteData(&ip.metaindexData, ip.packedMetaindexBuf) + mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...) + mp.mr.blockHeadersCount = 1 + mp.mr.indexBlockOffset = 0 + mp.mr.indexBlockSize = uint32(len(mp.packedIndexBlockBuf)) + mp.unpackedMetaindexBuf = mp.mr.Marshal(mp.unpackedMetaindexBuf[:0]) + mp.packedMetaindexBuf = encoding.CompressZSTDLevel(mp.packedMetaindexBuf[:0], mp.unpackedMetaindexBuf, 0) + fs.MustWriteData(&mp.metaindexData, mp.packedMetaindexBuf) } // It is safe calling NewPart multiple times. -// It is unsafe re-using ip while the returned part is in use. -func (ip *inmemoryPart) NewPart() *part { - ph := ip.ph - size := ip.size() - p, err := newPart(&ph, "", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) +// It is unsafe re-using mp while the returned part is in use. +func (mp *inmemoryPart) NewPart() *part { + ph := mp.ph + size := mp.size() + p, err := newPart(&ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData) if err != nil { logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err) } return p } -func (ip *inmemoryPart) size() uint64 { - return uint64(len(ip.metaindexData.B) + len(ip.indexData.B) + len(ip.itemsData.B) + len(ip.lensData.B)) +func (mp *inmemoryPart) size() uint64 { + return uint64(len(mp.metaindexData.B) + len(mp.indexData.B) + len(mp.itemsData.B) + len(mp.lensData.B)) } func getInmemoryPart() *inmemoryPart { - v := ipPool.Get() - if v == nil { + select { + case mp := <-mpPool: + return mp + default: return &inmemoryPart{} } - return v.(*inmemoryPart) } -func putInmemoryPart(ip *inmemoryPart) { - ip.Reset() - ipPool.Put(ip) +func putInmemoryPart(mp *inmemoryPart) { + mp.Reset() + select { + case mpPool <- mp: + default: + // Drop mp in order to reduce memory usage. + } } -var ipPool sync.Pool +// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores, +// since sync.Pool maintains per-CPU pool of inmemoryPart objects. +// +// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage. +var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs()) diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index 788e3f1825..3f78e24b7b 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -1,9 +1,8 @@ package storage import ( - "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -56,16 +55,25 @@ func (mp *inmemoryPart) NewPart() (*part, error) { } func getInmemoryPart() *inmemoryPart { - v := mpPool.Get() - if v == nil { + select { + case mp := <-mpPool: + return mp + default: return &inmemoryPart{} } - return v.(*inmemoryPart) } func putInmemoryPart(mp *inmemoryPart) { mp.Reset() - mpPool.Put(mp) + select { + case mpPool <- mp: + default: + // Drop mp in order to reduce memory usage. + } } -var mpPool sync.Pool +// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores, +// since sync.Pool maintains per-CPU pool of inmemoryPart objects. +// +// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage. +var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())