diff --git a/lib/mergeset/inmemory_part.go b/lib/mergeset/inmemory_part.go index e2f4f02bd..de1a8078a 100644 --- a/lib/mergeset/inmemory_part.go +++ b/lib/mergeset/inmemory_part.go @@ -1,9 +1,8 @@ package mergeset import ( - "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -27,86 +26,95 @@ type inmemoryPart struct { lensData bytesutil.ByteBuffer } -func (ip *inmemoryPart) Reset() { - ip.ph.Reset() - ip.sb.Reset() - ip.bh.Reset() - ip.mr.Reset() +func (mp *inmemoryPart) Reset() { + mp.ph.Reset() + mp.sb.Reset() + mp.bh.Reset() + mp.mr.Reset() - ip.unpackedIndexBlockBuf = ip.unpackedIndexBlockBuf[:0] - ip.packedIndexBlockBuf = ip.packedIndexBlockBuf[:0] + mp.unpackedIndexBlockBuf = mp.unpackedIndexBlockBuf[:0] + mp.packedIndexBlockBuf = mp.packedIndexBlockBuf[:0] - ip.unpackedMetaindexBuf = ip.unpackedMetaindexBuf[:0] - ip.packedMetaindexBuf = ip.packedMetaindexBuf[:0] + mp.unpackedMetaindexBuf = mp.unpackedMetaindexBuf[:0] + mp.packedMetaindexBuf = mp.packedMetaindexBuf[:0] - ip.metaindexData.Reset() - ip.indexData.Reset() - ip.itemsData.Reset() - ip.lensData.Reset() + mp.metaindexData.Reset() + mp.indexData.Reset() + mp.itemsData.Reset() + mp.lensData.Reset() } -// Init initializes ip from ib. -func (ip *inmemoryPart) Init(ib *inmemoryBlock) { - ip.Reset() +// Init initializes mp from ib. +func (mp *inmemoryPart) Init(ib *inmemoryBlock) { + mp.Reset() // Use the minimum possible compressLevel for compressing inmemoryPart, // since it will be merged into file part soon. compressLevel := 0 - ip.bh.firstItem, ip.bh.commonPrefix, ip.bh.itemsCount, ip.bh.marshalType = ib.MarshalUnsortedData(&ip.sb, ip.bh.firstItem[:0], ip.bh.commonPrefix[:0], compressLevel) + mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(&mp.sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel) - ip.ph.itemsCount = uint64(len(ib.items)) - ip.ph.blocksCount = 1 - ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...) - ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...) + mp.ph.itemsCount = uint64(len(ib.items)) + mp.ph.blocksCount = 1 + mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...) + mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...) - fs.MustWriteData(&ip.itemsData, ip.sb.itemsData) - ip.bh.itemsBlockOffset = 0 - ip.bh.itemsBlockSize = uint32(len(ip.sb.itemsData)) + fs.MustWriteData(&mp.itemsData, mp.sb.itemsData) + mp.bh.itemsBlockOffset = 0 + mp.bh.itemsBlockSize = uint32(len(mp.sb.itemsData)) - fs.MustWriteData(&ip.lensData, ip.sb.lensData) - ip.bh.lensBlockOffset = 0 - ip.bh.lensBlockSize = uint32(len(ip.sb.lensData)) + fs.MustWriteData(&mp.lensData, mp.sb.lensData) + mp.bh.lensBlockOffset = 0 + mp.bh.lensBlockSize = uint32(len(mp.sb.lensData)) - ip.unpackedIndexBlockBuf = ip.bh.Marshal(ip.unpackedIndexBlockBuf[:0]) - ip.packedIndexBlockBuf = encoding.CompressZSTDLevel(ip.packedIndexBlockBuf[:0], ip.unpackedIndexBlockBuf, 0) - fs.MustWriteData(&ip.indexData, ip.packedIndexBlockBuf) + mp.unpackedIndexBlockBuf = mp.bh.Marshal(mp.unpackedIndexBlockBuf[:0]) + mp.packedIndexBlockBuf = encoding.CompressZSTDLevel(mp.packedIndexBlockBuf[:0], mp.unpackedIndexBlockBuf, 0) + fs.MustWriteData(&mp.indexData, mp.packedIndexBlockBuf) - ip.mr.firstItem = append(ip.mr.firstItem[:0], ip.bh.firstItem...) - ip.mr.blockHeadersCount = 1 - ip.mr.indexBlockOffset = 0 - ip.mr.indexBlockSize = uint32(len(ip.packedIndexBlockBuf)) - ip.unpackedMetaindexBuf = ip.mr.Marshal(ip.unpackedMetaindexBuf[:0]) - ip.packedMetaindexBuf = encoding.CompressZSTDLevel(ip.packedMetaindexBuf[:0], ip.unpackedMetaindexBuf, 0) - fs.MustWriteData(&ip.metaindexData, ip.packedMetaindexBuf) + mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...) + mp.mr.blockHeadersCount = 1 + mp.mr.indexBlockOffset = 0 + mp.mr.indexBlockSize = uint32(len(mp.packedIndexBlockBuf)) + mp.unpackedMetaindexBuf = mp.mr.Marshal(mp.unpackedMetaindexBuf[:0]) + mp.packedMetaindexBuf = encoding.CompressZSTDLevel(mp.packedMetaindexBuf[:0], mp.unpackedMetaindexBuf, 0) + fs.MustWriteData(&mp.metaindexData, mp.packedMetaindexBuf) } // It is safe calling NewPart multiple times. -// It is unsafe re-using ip while the returned part is in use. -func (ip *inmemoryPart) NewPart() *part { - ph := ip.ph - size := ip.size() - p, err := newPart(&ph, "", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) +// It is unsafe re-using mp while the returned part is in use. +func (mp *inmemoryPart) NewPart() *part { + ph := mp.ph + size := mp.size() + p, err := newPart(&ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData) if err != nil { logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err) } return p } -func (ip *inmemoryPart) size() uint64 { - return uint64(len(ip.metaindexData.B) + len(ip.indexData.B) + len(ip.itemsData.B) + len(ip.lensData.B)) +func (mp *inmemoryPart) size() uint64 { + return uint64(len(mp.metaindexData.B) + len(mp.indexData.B) + len(mp.itemsData.B) + len(mp.lensData.B)) } func getInmemoryPart() *inmemoryPart { - v := ipPool.Get() - if v == nil { + select { + case mp := <-mpPool: + return mp + default: return &inmemoryPart{} } - return v.(*inmemoryPart) } -func putInmemoryPart(ip *inmemoryPart) { - ip.Reset() - ipPool.Put(ip) +func putInmemoryPart(mp *inmemoryPart) { + mp.Reset() + select { + case mpPool <- mp: + default: + // Drop mp in order to reduce memory usage. + } } -var ipPool sync.Pool +// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores, +// since sync.Pool maintains per-CPU pool of inmemoryPart objects. +// +// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage. +var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs()) diff --git a/lib/storage/inmemory_part.go b/lib/storage/inmemory_part.go index 788e3f182..3f78e24b7 100644 --- a/lib/storage/inmemory_part.go +++ b/lib/storage/inmemory_part.go @@ -1,9 +1,8 @@ package storage import ( - "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -56,16 +55,25 @@ func (mp *inmemoryPart) NewPart() (*part, error) { } func getInmemoryPart() *inmemoryPart { - v := mpPool.Get() - if v == nil { + select { + case mp := <-mpPool: + return mp + default: return &inmemoryPart{} } - return v.(*inmemoryPart) } func putInmemoryPart(mp *inmemoryPart) { mp.Reset() - mpPool.Put(mp) + select { + case mpPool <- mp: + default: + // Drop mp in order to reduce memory usage. + } } -var mpPool sync.Pool +// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores, +// since sync.Pool maintains per-CPU pool of inmemoryPart objects. +// +// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage. +var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())