mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{mergeset,storage}: switch from sync.Pool to chan-based pool for inmemoryPart objects
This should reduce memory usage on systems with big number of CPU cores, since every inmemoryPart object occupies at least 64KB of memory and sync.Pool maintains a separate pool inmemoryPart objects per each CPU core. Though the new scheme for the pool worsens per-cpu cache locality, this should be amortized by big sizes of inmemoryPart objects.
This commit is contained in:
parent
7c6d3981bf
commit
8aa9bba9bd
2 changed files with 76 additions and 60 deletions
|
@ -1,9 +1,8 @@
|
|||
package mergeset
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -27,86 +26,95 @@ type inmemoryPart struct {
|
|||
lensData bytesutil.ByteBuffer
|
||||
}
|
||||
|
||||
func (ip *inmemoryPart) Reset() {
|
||||
ip.ph.Reset()
|
||||
ip.sb.Reset()
|
||||
ip.bh.Reset()
|
||||
ip.mr.Reset()
|
||||
func (mp *inmemoryPart) Reset() {
|
||||
mp.ph.Reset()
|
||||
mp.sb.Reset()
|
||||
mp.bh.Reset()
|
||||
mp.mr.Reset()
|
||||
|
||||
ip.unpackedIndexBlockBuf = ip.unpackedIndexBlockBuf[:0]
|
||||
ip.packedIndexBlockBuf = ip.packedIndexBlockBuf[:0]
|
||||
mp.unpackedIndexBlockBuf = mp.unpackedIndexBlockBuf[:0]
|
||||
mp.packedIndexBlockBuf = mp.packedIndexBlockBuf[:0]
|
||||
|
||||
ip.unpackedMetaindexBuf = ip.unpackedMetaindexBuf[:0]
|
||||
ip.packedMetaindexBuf = ip.packedMetaindexBuf[:0]
|
||||
mp.unpackedMetaindexBuf = mp.unpackedMetaindexBuf[:0]
|
||||
mp.packedMetaindexBuf = mp.packedMetaindexBuf[:0]
|
||||
|
||||
ip.metaindexData.Reset()
|
||||
ip.indexData.Reset()
|
||||
ip.itemsData.Reset()
|
||||
ip.lensData.Reset()
|
||||
mp.metaindexData.Reset()
|
||||
mp.indexData.Reset()
|
||||
mp.itemsData.Reset()
|
||||
mp.lensData.Reset()
|
||||
}
|
||||
|
||||
// Init initializes ip from ib.
|
||||
func (ip *inmemoryPart) Init(ib *inmemoryBlock) {
|
||||
ip.Reset()
|
||||
// Init initializes mp from ib.
|
||||
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {
|
||||
mp.Reset()
|
||||
|
||||
// Use the minimum possible compressLevel for compressing inmemoryPart,
|
||||
// since it will be merged into file part soon.
|
||||
compressLevel := 0
|
||||
ip.bh.firstItem, ip.bh.commonPrefix, ip.bh.itemsCount, ip.bh.marshalType = ib.MarshalUnsortedData(&ip.sb, ip.bh.firstItem[:0], ip.bh.commonPrefix[:0], compressLevel)
|
||||
mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(&mp.sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
|
||||
|
||||
ip.ph.itemsCount = uint64(len(ib.items))
|
||||
ip.ph.blocksCount = 1
|
||||
ip.ph.firstItem = append(ip.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
||||
ip.ph.lastItem = append(ip.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
||||
mp.ph.itemsCount = uint64(len(ib.items))
|
||||
mp.ph.blocksCount = 1
|
||||
mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
|
||||
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
|
||||
|
||||
fs.MustWriteData(&ip.itemsData, ip.sb.itemsData)
|
||||
ip.bh.itemsBlockOffset = 0
|
||||
ip.bh.itemsBlockSize = uint32(len(ip.sb.itemsData))
|
||||
fs.MustWriteData(&mp.itemsData, mp.sb.itemsData)
|
||||
mp.bh.itemsBlockOffset = 0
|
||||
mp.bh.itemsBlockSize = uint32(len(mp.sb.itemsData))
|
||||
|
||||
fs.MustWriteData(&ip.lensData, ip.sb.lensData)
|
||||
ip.bh.lensBlockOffset = 0
|
||||
ip.bh.lensBlockSize = uint32(len(ip.sb.lensData))
|
||||
fs.MustWriteData(&mp.lensData, mp.sb.lensData)
|
||||
mp.bh.lensBlockOffset = 0
|
||||
mp.bh.lensBlockSize = uint32(len(mp.sb.lensData))
|
||||
|
||||
ip.unpackedIndexBlockBuf = ip.bh.Marshal(ip.unpackedIndexBlockBuf[:0])
|
||||
ip.packedIndexBlockBuf = encoding.CompressZSTDLevel(ip.packedIndexBlockBuf[:0], ip.unpackedIndexBlockBuf, 0)
|
||||
fs.MustWriteData(&ip.indexData, ip.packedIndexBlockBuf)
|
||||
mp.unpackedIndexBlockBuf = mp.bh.Marshal(mp.unpackedIndexBlockBuf[:0])
|
||||
mp.packedIndexBlockBuf = encoding.CompressZSTDLevel(mp.packedIndexBlockBuf[:0], mp.unpackedIndexBlockBuf, 0)
|
||||
fs.MustWriteData(&mp.indexData, mp.packedIndexBlockBuf)
|
||||
|
||||
ip.mr.firstItem = append(ip.mr.firstItem[:0], ip.bh.firstItem...)
|
||||
ip.mr.blockHeadersCount = 1
|
||||
ip.mr.indexBlockOffset = 0
|
||||
ip.mr.indexBlockSize = uint32(len(ip.packedIndexBlockBuf))
|
||||
ip.unpackedMetaindexBuf = ip.mr.Marshal(ip.unpackedMetaindexBuf[:0])
|
||||
ip.packedMetaindexBuf = encoding.CompressZSTDLevel(ip.packedMetaindexBuf[:0], ip.unpackedMetaindexBuf, 0)
|
||||
fs.MustWriteData(&ip.metaindexData, ip.packedMetaindexBuf)
|
||||
mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
|
||||
mp.mr.blockHeadersCount = 1
|
||||
mp.mr.indexBlockOffset = 0
|
||||
mp.mr.indexBlockSize = uint32(len(mp.packedIndexBlockBuf))
|
||||
mp.unpackedMetaindexBuf = mp.mr.Marshal(mp.unpackedMetaindexBuf[:0])
|
||||
mp.packedMetaindexBuf = encoding.CompressZSTDLevel(mp.packedMetaindexBuf[:0], mp.unpackedMetaindexBuf, 0)
|
||||
fs.MustWriteData(&mp.metaindexData, mp.packedMetaindexBuf)
|
||||
}
|
||||
|
||||
// It is safe calling NewPart multiple times.
|
||||
// It is unsafe re-using ip while the returned part is in use.
|
||||
func (ip *inmemoryPart) NewPart() *part {
|
||||
ph := ip.ph
|
||||
size := ip.size()
|
||||
p, err := newPart(&ph, "", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData)
|
||||
// It is unsafe re-using mp while the returned part is in use.
|
||||
func (mp *inmemoryPart) NewPart() *part {
|
||||
ph := mp.ph
|
||||
size := mp.size()
|
||||
p, err := newPart(&ph, "", size, mp.metaindexData.NewReader(), &mp.indexData, &mp.itemsData, &mp.lensData)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot create a part from inmemoryPart: %s", err)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (ip *inmemoryPart) size() uint64 {
|
||||
return uint64(len(ip.metaindexData.B) + len(ip.indexData.B) + len(ip.itemsData.B) + len(ip.lensData.B))
|
||||
func (mp *inmemoryPart) size() uint64 {
|
||||
return uint64(len(mp.metaindexData.B) + len(mp.indexData.B) + len(mp.itemsData.B) + len(mp.lensData.B))
|
||||
}
|
||||
|
||||
func getInmemoryPart() *inmemoryPart {
|
||||
v := ipPool.Get()
|
||||
if v == nil {
|
||||
select {
|
||||
case mp := <-mpPool:
|
||||
return mp
|
||||
default:
|
||||
return &inmemoryPart{}
|
||||
}
|
||||
return v.(*inmemoryPart)
|
||||
}
|
||||
|
||||
func putInmemoryPart(ip *inmemoryPart) {
|
||||
ip.Reset()
|
||||
ipPool.Put(ip)
|
||||
func putInmemoryPart(mp *inmemoryPart) {
|
||||
mp.Reset()
|
||||
select {
|
||||
case mpPool <- mp:
|
||||
default:
|
||||
// Drop mp in order to reduce memory usage.
|
||||
}
|
||||
}
|
||||
|
||||
var ipPool sync.Pool
|
||||
// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores,
|
||||
// since sync.Pool maintains per-CPU pool of inmemoryPart objects.
|
||||
//
|
||||
// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage.
|
||||
var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
@ -56,16 +55,25 @@ func (mp *inmemoryPart) NewPart() (*part, error) {
|
|||
}
|
||||
|
||||
func getInmemoryPart() *inmemoryPart {
|
||||
v := mpPool.Get()
|
||||
if v == nil {
|
||||
select {
|
||||
case mp := <-mpPool:
|
||||
return mp
|
||||
default:
|
||||
return &inmemoryPart{}
|
||||
}
|
||||
return v.(*inmemoryPart)
|
||||
}
|
||||
|
||||
func putInmemoryPart(mp *inmemoryPart) {
|
||||
mp.Reset()
|
||||
mpPool.Put(mp)
|
||||
select {
|
||||
case mpPool <- mp:
|
||||
default:
|
||||
// Drop mp in order to reduce memory usage.
|
||||
}
|
||||
}
|
||||
|
||||
var mpPool sync.Pool
|
||||
// Use chan instead of sync.Pool in order to reduce memory usage on systems with big number of CPU cores,
|
||||
// since sync.Pool maintains per-CPU pool of inmemoryPart objects.
|
||||
//
|
||||
// The inmemoryPart object size can exceed 64KB, so it is better to use chan instead of sync.Pool for reducing memory usage.
|
||||
var mpPool = make(chan *inmemoryPart, cgroup.AvailableCPUs())
|
||||
|
|
Loading…
Reference in a new issue