diff --git a/lib/mergeset/block_header.go b/lib/mergeset/block_header.go index 5404c2af6..49e35aa09 100644 --- a/lib/mergeset/block_header.go +++ b/lib/mergeset/block_header.go @@ -3,6 +3,7 @@ package mergeset import ( "fmt" "sort" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -34,6 +35,10 @@ type blockHeader struct { lensBlockSize uint32 } +func (bh *blockHeader) SizeBytes() int { + return int(unsafe.Sizeof(*bh)) + cap(bh.commonPrefix) + cap(bh.firstItem) +} + func (bh *blockHeader) Reset() { bh.commonPrefix = bh.commonPrefix[:0] bh.firstItem = bh.firstItem[:0] diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 2b01d94be..7da60ceb3 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -138,24 +138,14 @@ type indexBlock struct { } func (idxb *indexBlock) SizeBytes() int { - return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{})) -} - -func getIndexBlock() *indexBlock { - v := indexBlockPool.Get() - if v == nil { - return &indexBlock{} + bhs := idxb.bhs[:cap(idxb.bhs)] + n := int(unsafe.Sizeof(*idxb)) + for i := range bhs { + n += bhs[i].SizeBytes() } - return v.(*indexBlock) + return n } -func putIndexBlock(idxb *indexBlock) { - idxb.bhs = idxb.bhs[:0] - indexBlockPool.Put(idxb) -} - -var indexBlockPool sync.Pool - type indexBlockCache struct { // Atomically updated counters must go first in the struct, so they are properly // aligned to 8 bytes on 32-bit architectures. @@ -194,12 +184,6 @@ func newIndexBlockCache() *indexBlockCache { func (idxbc *indexBlockCache) MustClose() { close(idxbc.cleanerStopCh) idxbc.cleanerWG.Wait() - - // It is safe returning idxbc.m to pool, since the MustClose can be called - // when the idxbc entries are no longer accessed by concurrent goroutines. - for _, idxbe := range idxbc.m { - putIndexBlock(idxbe.idxb) - } idxbc.m = nil } @@ -223,8 +207,6 @@ func (idxbc *indexBlockCache) cleanByTimeout() { for k, idxbe := range idxbc.m { // Delete items accessed more than two minutes ago. if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 2*60 { - // do not call putIndexBlock(ibxbc.m[k]), since it - // may be used by concurrent goroutines. delete(idxbc.m, k) } } @@ -257,8 +239,6 @@ func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) { // Remove 10% of items from the cache. overflow = int(float64(len(idxbc.m)) * 0.1) for k := range idxbc.m { - // do not call putIndexBlock(ibxbc.m[k]), since it - // may be used by concurrent goroutines. delete(idxbc.m, k) overflow-- if overflow == 0 { @@ -348,12 +328,6 @@ func newInmemoryBlockCache() *inmemoryBlockCache { func (ibc *inmemoryBlockCache) MustClose() { close(ibc.cleanerStopCh) ibc.cleanerWG.Wait() - - // It is safe returning ibc.m entries to pool, since the MustClose can be called - // only if no other goroutines access ibc entries. - for _, ibe := range ibc.m { - putInmemoryBlock(ibe.ib) - } ibc.m = nil } @@ -377,8 +351,6 @@ func (ibc *inmemoryBlockCache) cleanByTimeout() { for k, ibe := range ibc.m { // Delete items accessed more than a two minutes ago. if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 { - // do not call putInmemoryBlock(ibc.m[k]), since it - // may be used by concurrent goroutines. delete(ibc.m, k) } } @@ -412,8 +384,6 @@ func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) { // Remove 10% of items from the cache. overflow = int(float64(len(ibc.m)) * 0.1) for k := range ibc.m { - // do not call putInmemoryBlock(ib), since the ib - // may be used by concurrent goroutines. delete(ibc.m, k) overflow-- if overflow == 0 { diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 0c7b17855..ea4b5ca25 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -279,7 +279,7 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { if err != nil { return nil, fmt.Errorf("cannot decompress index block: %w", err) } - idxb := getIndexBlock() + idxb := &indexBlock{} idxb.bhs, err = unmarshalBlockHeaders(idxb.bhs[:0], ps.indexBuf, int(mr.blockHeadersCount)) if err != nil { return nil, fmt.Errorf("cannot unmarshal block headers from index block (offset=%d, size=%d): %w", mr.indexBlockOffset, mr.indexBlockSize, err) diff --git a/lib/storage/part.go b/lib/storage/part.go index 52b0ec41a..378cdc869 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -145,21 +145,6 @@ func (idxb *indexBlock) SizeBytes() int { return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{})) } -func getIndexBlock() *indexBlock { - v := indexBlockPool.Get() - if v == nil { - return &indexBlock{} - } - return v.(*indexBlock) -} - -func putIndexBlock(ib *indexBlock) { - ib.bhs = ib.bhs[:0] - indexBlockPool.Put(ib) -} - -var indexBlockPool sync.Pool - type indexBlockCache struct { // Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 @@ -198,12 +183,6 @@ func newIndexBlockCache() *indexBlockCache { func (ibc *indexBlockCache) MustClose(isBig bool) { close(ibc.cleanerStopCh) ibc.cleanerWG.Wait() - - // It is safe returning ibc.m itemst to the pool, since Reset must - // be called only when no other goroutines access ibc entries. - for _, ibe := range ibc.m { - putIndexBlock(ibe.ib) - } ibc.m = nil } @@ -259,7 +238,6 @@ func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) { // Remove 10% of items from the cache. overflow = int(float64(len(ibc.m)) * 0.1) for k := range ibc.m { - // Do not call putIndexBlock on ibc.m entries, since they may be used by concurrent goroutines. delete(ibc.m, k) overflow-- if overflow == 0 { diff --git a/lib/storage/part_search.go b/lib/storage/part_search.go index 705595b53..5acc087a4 100644 --- a/lib/storage/part_search.go +++ b/lib/storage/part_search.go @@ -218,7 +218,7 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { if err != nil { return nil, fmt.Errorf("cannot decompress index block: %w", err) } - ib := getIndexBlock() + ib := &indexBlock{} ib.bhs, err = unmarshalBlockHeaders(ib.bhs[:0], ps.indexBuf, int(mr.BlockHeadersCount)) if err != nil { return nil, fmt.Errorf("cannot unmarshal index block: %w", err)