lib/{mergeset,storage}: switch from sync.Pool to chan-based pool for inmemoryPart objects

This should reduce memory usage on systems with big number of CPU cores,
since every inmemoryPart object occupies at least 64KB of memory and sync.Pool maintains
a separate pool inmemoryPart objects per each CPU core.

Though the new scheme for the pool worsens per-cpu cache locality, this should be amortized
by big sizes of inmemoryPart objects.
This commit is contained in:
Aliaksandr Valialkin 2021-07-06 16:28:39 +03:00
parent d8e7c1ef27
commit b805a675f3
2 changed files with 76 additions and 60 deletions

View file

@ -1,9 +1,8 @@
package mergeset
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -27,86 +26,95 @@ type inmemoryPart struct {
lensData bytesutil.ByteBuffer
}
func (ip *inmemoryPart) Reset() {
ip.ph.Reset()
ip.sb.Reset()
ip.bh.Reset()
ip.mr.Reset()
func (mp *inmemoryPart) Reset() {
mp.ph.Reset()
mp.sb.Reset()
mp.bh.Reset()
mp.mr.Reset()
ip.unpackedIndexBlockBuf = ip.unpackedIndexBlockBuf[:0]
ip.packedIndexBlockBuf = ip.packedIndexBlockBuf[:0]
mp.unpackedIndexBlockBuf = mp.unpackedIndexBlockBuf[:0]
mp.packedIndexBlockBuf = mp.packedIndexBlockBuf[:0]
ip.unpackedMetaindexBuf = ip.unpackedMetaindexBuf[:0]
ip.packedMetaindexBuf = ip.packedMetaindexBuf[:0]
mp.unpackedMetaindexBuf = mp.unpackedMetaindexBuf[:0]
mp.packedMetaindexBuf = mp.packedMetaindexBuf[:0]
ip.metaindexData.Reset()
ip.indexData.Reset()
ip.itemsData.Reset()
ip.lensData.Reset()
mp.metaindexData.Reset()
mp.indexData.Reset()
mp.itemsData.Reset()
mp.lensData.Reset()
}
// Init initializes ip from ib.
func (ip *inmemoryPart) Init(ib *inmemoryBlock) {
ip.Reset()
// Init initializes mp from ib.
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
mp.Reset()
// Use the minimum possible compressLevel for compressing inmemoryPart,
// since it will be merged into file part soon.
compressLevel := 0
ip.bh.firstItem, ip.bh.commonPrefix, ip.bh.itemsCount, ip.bh.marshalType = ib.MarshalUnsortedData(&ip.sb, ip.bh.firstItem[:0], ip.bh.commonPrefix[:0], compressLevel)
mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(&mp.sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
ip.ph.itemsCount = uint64(len(ib.items))
ip.ph.blocksCount = 1
ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...)
ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
mp.ph.itemsCount = uint64(len(ib.items))
mp.ph.blocksCount = 1
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
fs.MustWriteData(&ip.itemsData, ip.sb.itemsData)
ip.bh.itemsBlockOffset = 0
ip.bh.itemsBlockSize = uint32(len(ip.sb.itemsData))
fs.MustWriteData(&mp.itemsData, mp.sb.itemsData)
mp.bh.itemsBlockOffset = 0
mp.bh.itemsBlockSize = uint32(len(mp.sb.itemsData))
fs.MustWriteData(&ip.lensData, ip.sb.lensData)
ip.bh.lensBlockOffset = 0
ip.bh.lensBlockSize = uint32(len(ip.sb.lensData))
fs.MustWriteData(&mp.lensData, mp.sb.lensData)
mp.bh.lensBlockOffset = 0
mp.bh.lensBlockSize = uint32(len(mp.sb.lensData))
ip.unpackedIndexBlockBuf = ip.bh.Marshal(ip.unpackedIndexBlockBuf[:0])
ip.packedIndexBlockBuf = encoding.CompressZSTDLevel(ip.packedIndexBlockBuf[:0], ip.unpackedIndexBlockBuf, 0)
fs.MustWriteData(&ip.indexData, ip.packedIndexBlockBuf)
mp.unpackedIndexBlockBuf = mp.bh.Marshal(mp.unpackedIndexBlockBuf[:0])
mp.packedIndexBlockBuf = encoding.CompressZSTDLevel(mp.packedIndexBlockBuf[:0], mp.unpackedIndexBlockBuf, 0)
fs.MustWriteData(&mp.indexData, mp.packedIndexBlockBuf)
ip.mr.firstItem = append(ip.mr.firstItem[:0], ip.bh.firstItem...)
ip.mr.blockHeadersCount = 1
ip.mr.indexBlockOffset = 0
ip.mr.indexBlockSize = uint32(len(ip.packedIndexBlockBuf))
ip.unpackedMetaindexBuf = ip.mr.Marshal(ip.unpackedMetaindexBuf[:0])
ip.packedMetaindexBuf = encoding.CompressZSTDLevel(ip.packedMetaindexBuf[:0], ip.unpackedMetaindexBuf, 0)
fs.MustWriteData(&ip.metaindexData, ip.packedMetaindexBuf)
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
mp.mr.blockHeadersCount = 1
mp.mr.indexBlockOffset = 0
mp.mr.indexBlockSize = uint32(len(mp.packedIndexBlockBuf))
mp.unpackedMetaindexBuf = mp.mr.Marshal(mp.unpackedMetaindexBuf[:0])
mp.packedMetaindexBuf = encoding.CompressZSTDLevel(mp.packedMetaindexBuf[:0], mp.unpackedMetaindexBuf, 0)
fs.MustWriteData(&mp.metaindexData, mp.packedMetaindexBuf)
}
// It is safe calling NewPart multiple times.
// It is unsafe re-using ip while the returned part is in use.
func (ip *inmemoryPart) NewPart() *part {
ph := ip.ph
size := ip.size()
p, err := newPart(&ph, "", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
// It is unsafe re-using mp while the returned part is in use.
func (mp *inmemoryPart) NewPart() *part {
ph := mp.ph
size := mp.size()
p, err := newPart(&ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
if err != nil {
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
}
return p
}
func (ip *inmemoryPart) size() uint64 {
return uint64(len(ip.metaindexData.B) + len(ip.indexData.B) + len(ip.itemsData.B) + len(ip.lensData.B))
func (mp *inmemoryPart) size() uint64 {
return uint64(len(mp.metaindexData.B) + len(mp.indexData.B) + len(mp.itemsData.B) + len(mp.lensData.B))
}
func getInmemoryPart() *inmemoryPart {
v := ipPool.Get()
if v == nil {
select {
case mp := <-mpPool:
return mp
default:
return &inmemoryPart{}
}
return v.(*inmemoryPart)
}
func putInmemoryPart(ip *inmemoryPart) {
ip.Reset()
ipPool.Put(ip)
func putInmemoryPart(mp *inmemoryPart) {
mp.Reset()
select {
case mpPool <- mp:
default:
// Drop mp in order to reduce memory usage.
}
}
var ipPool sync.Pool
// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores,
// since sync.Pool maintains per-CPU pool of inmemoryPart objects.
//
// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage.
var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())

View file

@ -1,9 +1,8 @@
package storage
import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -56,16 +55,25 @@ func (mp *inmemoryPart) NewPart() (*part, error) {
}
func getInmemoryPart() *inmemoryPart {
v := mpPool.Get()
if v == nil {
select {
case mp := <-mpPool:
return mp
default:
return &inmemoryPart{}
}
return v.(*inmemoryPart)
}
func putInmemoryPart(mp *inmemoryPart) {
mp.Reset()
mpPool.Put(mp)
select {
case mpPool <- mp:
default:
// Drop mp in order to reduce memory usage.
}
}
var mpPool sync.Pool
// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores,
// since sync.Pool maintains per-CPU pool of inmemoryPart objects.
//
// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage.
var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())