lib/{mergeset,storage}: do not use pools for indexBlock and inmemoryBlock during their caching, since this results in higher memory usage in production without any performance gains

This commit is contained in:
Aliaksandr Valialkin 2021-02-21 21:18:49 +02:00
parent 2cfb376945
commit cb311bb156
5 changed files with 12 additions and 59 deletions

View file

@ -3,6 +3,7 @@ package mergeset
import ( import (
"fmt" "fmt"
"sort" "sort"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -34,6 +35,10 @@ type blockHeader struct {
lensBlockSize uint32 lensBlockSize uint32
} }
func (bh *blockHeader) SizeBytes() int {
return int(unsafe.Sizeof(*bh)) + cap(bh.commonPrefix) + cap(bh.firstItem)
}
func (bh *blockHeader) Reset() { func (bh *blockHeader) Reset() {
bh.commonPrefix = bh.commonPrefix[:0] bh.commonPrefix = bh.commonPrefix[:0]
bh.firstItem = bh.firstItem[:0] bh.firstItem = bh.firstItem[:0]

View file

@ -138,24 +138,14 @@ type indexBlock struct {
} }
func (idxb *indexBlock) SizeBytes() int { func (idxb *indexBlock) SizeBytes() int {
return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{})) bhs := idxb.bhs[:cap(idxb.bhs)]
} n := int(unsafe.Sizeof(*idxb))
for i := range bhs {
func getIndexBlock() *indexBlock { n += bhs[i].SizeBytes()
v := indexBlockPool.Get()
if v == nil {
return &indexBlock{}
} }
return v.(*indexBlock) return n
} }
func putIndexBlock(idxb *indexBlock) {
idxb.bhs = idxb.bhs[:0]
indexBlockPool.Put(idxb)
}
var indexBlockPool sync.Pool
type indexBlockCache struct { type indexBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly // Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures. // aligned to 8 bytes on 32-bit architectures.
@ -194,12 +184,6 @@ func newIndexBlockCache() *indexBlockCache {
func (idxbc *indexBlockCache) MustClose() { func (idxbc *indexBlockCache) MustClose() {
close(idxbc.cleanerStopCh) close(idxbc.cleanerStopCh)
idxbc.cleanerWG.Wait() idxbc.cleanerWG.Wait()
// It is safe returning idxbc.m to pool, since the MustClose can be called
// when the idxbc entries are no longer accessed by concurrent goroutines.
for _, idxbe := range idxbc.m {
putIndexBlock(idxbe.idxb)
}
idxbc.m = nil idxbc.m = nil
} }
@ -223,8 +207,6 @@ func (idxbc *indexBlockCache) cleanByTimeout() {
for k, idxbe := range idxbc.m { for k, idxbe := range idxbc.m {
// Delete items accessed more than two minutes ago. // Delete items accessed more than two minutes ago.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 2*60 { if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 2*60 {
// do not call putIndexBlock(ibxbc.m[k]), since it
// may be used by concurrent goroutines.
delete(idxbc.m, k) delete(idxbc.m, k)
} }
} }
@ -257,8 +239,6 @@ func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) {
// Remove 10% of items from the cache. // Remove 10% of items from the cache.
overflow = int(float64(len(idxbc.m)) * 0.1) overflow = int(float64(len(idxbc.m)) * 0.1)
for k := range idxbc.m { for k := range idxbc.m {
// do not call putIndexBlock(ibxbc.m[k]), since it
// may be used by concurrent goroutines.
delete(idxbc.m, k) delete(idxbc.m, k)
overflow-- overflow--
if overflow == 0 { if overflow == 0 {
@ -348,12 +328,6 @@ func newInmemoryBlockCache() *inmemoryBlockCache {
func (ibc *inmemoryBlockCache) MustClose() { func (ibc *inmemoryBlockCache) MustClose() {
close(ibc.cleanerStopCh) close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait() ibc.cleanerWG.Wait()
// It is safe returning ibc.m entries to pool, since the MustClose can be called
// only if no other goroutines access ibc entries.
for _, ibe := range ibc.m {
putInmemoryBlock(ibe.ib)
}
ibc.m = nil ibc.m = nil
} }
@ -377,8 +351,6 @@ func (ibc *inmemoryBlockCache) cleanByTimeout() {
for k, ibe := range ibc.m { for k, ibe := range ibc.m {
// Delete items accessed more than a two minutes ago. // Delete items accessed more than a two minutes ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 { if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 {
// do not call putInmemoryBlock(ibc.m[k]), since it
// may be used by concurrent goroutines.
delete(ibc.m, k) delete(ibc.m, k)
} }
} }
@ -412,8 +384,6 @@ func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) {
// Remove 10% of items from the cache. // Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1) overflow = int(float64(len(ibc.m)) * 0.1)
for k := range ibc.m { for k := range ibc.m {
// do not call putInmemoryBlock(ib), since the ib
// may be used by concurrent goroutines.
delete(ibc.m, k) delete(ibc.m, k)
overflow-- overflow--
if overflow == 0 { if overflow == 0 {

View file

@ -279,7 +279,7 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot decompress index block: %w", err) return nil, fmt.Errorf("cannot decompress index block: %w", err)
} }
idxb := getIndexBlock() idxb := &indexBlock{}
idxb.bhs, err = unmarshalBlockHeaders(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount)) idxb.bhs, err = unmarshalBlockHeaders(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount))
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err) return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err)

View file

@ -145,21 +145,6 @@ func (idxb *indexBlock) SizeBytes() int {
return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{})) return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{}))
} }
func getIndexBlock() *indexBlock {
v := indexBlockPool.Get()
if v == nil {
return &indexBlock{}
}
return v.(*indexBlock)
}
func putIndexBlock(ib *indexBlock) {
ib.bhs = ib.bhs[:0]
indexBlockPool.Put(ib)
}
var indexBlockPool sync.Pool
type indexBlockCache struct { type indexBlockCache struct {
// Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures. // Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
@ -198,12 +183,6 @@ func newIndexBlockCache() *indexBlockCache {
func (ibc *indexBlockCache) MustClose(isBig bool) { func (ibc *indexBlockCache) MustClose(isBig bool) {
close(ibc.cleanerStopCh) close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait() ibc.cleanerWG.Wait()
// It is safe returning ibc.m itemst to the pool, since Reset must
// be called only when no other goroutines access ibc entries.
for _, ibe := range ibc.m {
putIndexBlock(ibe.ib)
}
ibc.m = nil ibc.m = nil
} }
@ -259,7 +238,6 @@ func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) {
// Remove 10% of items from the cache. // Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1) overflow = int(float64(len(ibc.m)) * 0.1)
for k := range ibc.m { for k := range ibc.m {
// Do not call putIndexBlock on ibc.m entries, since they may be used by concurrent goroutines.
delete(ibc.m, k) delete(ibc.m, k)
overflow-- overflow--
if overflow == 0 { if overflow == 0 {

View file

@ -218,7 +218,7 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot decompress index block: %w", err) return nil, fmt.Errorf("cannot decompress index block: %w", err)
} }
ib := getIndexBlock() ib := &indexBlock{}
ib.bhs, err = unmarshalBlockHeaders(ib.bhs[:0], ps.indexBuf, int(mr.BlockHeadersCount)) ib.bhs, err = unmarshalBlockHeaders(ib.bhs[:0], ps.indexBuf, int(mr.BlockHeadersCount))
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot unmarshal index block: %w", err) return nil, fmt.Errorf("cannot unmarshal index block: %w", err)