lib/{mergeset,storage}: properly limit cache sizes for indexdb

Previously these caches could exceed limits set via `-memory.allowedPercent` and/or `-memory.allowedBytes`,
since limits were set independently per each data part. If the number of data parts was big, then limits could be exceeded,
which could result to out of memory errors.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007
This commit is contained in:
Aliaksandr Valialkin 2022-01-20 18:34:59 +02:00
parent 84f6b3014c
commit 145337792d
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 93 additions and 670 deletions

View file

@ -654,11 +654,8 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_entries{type="storage/next_day_metric_ids"}`, func() float64 {
return float64(m().NextDayMetricIDCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/smallIndexBlocks"}`, func() float64 {
return float64(tm().SmallIndexBlocksCacheSize)
metrics.NewGauge(`vm_cache_entries{type="storage/indexBlocks"}`, func() float64 {
return float64(tm().IndexBlocksCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="indexdb/dataBlocks"}`, func() float64 {
return float64(idbm().DataBlocksCacheSize)
@ -685,11 +682,8 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/smallIndexBlocks"}`, func() float64 {
return float64(tm().SmallIndexBlocksCacheSizeBytes)
metrics.NewGauge(`vm_cache_size_bytes{type="storage/indexBlocks"}`, func() float64 {
return float64(tm().IndexBlocksCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/dataBlocks"}`, func() float64 {
return float64(idbm().DataBlocksCacheSizeBytes)
@ -722,11 +716,8 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/smallIndexBlocks"}`, func() float64 {
return float64(tm().SmallIndexBlocksCacheSizeMaxBytes)
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/indexBlocks"}`, func() float64 {
return float64(tm().IndexBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/dataBlocks"}`, func() float64 {
return float64(idbm().DataBlocksCacheSizeMaxBytes)
@ -747,11 +738,8 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_requests_total{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/smallIndexBlocks"}`, func() float64 {
return float64(tm().SmallIndexBlocksCacheRequests)
metrics.NewGauge(`vm_cache_requests_total{type="storage/indexBlocks"}`, func() float64 {
return float64(tm().IndexBlocksCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/dataBlocks"}`, func() float64 {
return float64(idbm().DataBlocksCacheRequests)
@ -775,11 +763,8 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_misses_total{type="storage/metricName"}`, func() float64 {
return float64(m().MetricNameCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/bigIndexBlocks"}`, func() float64 {
return float64(tm().BigIndexBlocksCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/smallIndexBlocks"}`, func() float64 {
return float64(tm().SmallIndexBlocksCacheMisses)
metrics.NewGauge(`vm_cache_misses_total{type="storage/indexBlocks"}`, func() float64 {
return float64(tm().IndexBlocksCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/dataBlocks"}`, func() float64 {
return float64(idbm().DataBlocksCacheMisses)

View file

@ -7,6 +7,7 @@ sort: 15
## tip
* BUGFIX: return proper results from `highestMax()` function at [Graphite render API](https://docs.victoriametrics.com/#graphite-render-api-usage). Previously it was incorrectly returning timeseries with min peaks instead of max peaks.
* BUGFIX: properly limit indexdb cache sizes. Previously they could exceed values set via `-memory.allowedPercent` and/or `-memory.allowedBytes` when `indexdb` contained many data parts. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007).
## [v1.72.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.72.0)

View file

@ -4,46 +4,39 @@ import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
func getMaxCachedIndexBlocksPerPart() int {
maxCachedIndexBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 4
if n == 0 {
n = 10
}
maxCachedIndexBlocksPerPart = n
var idxbCache = blockcache.NewCache(getMaxIndexBlocksCacheSize)
var ibCache = blockcache.NewCache(getMaxInmemoryBlocksCacheSize)
func getMaxIndexBlocksCacheSize() int {
maxIndexBlockCacheSizeOnce.Do(func() {
maxIndexBlockCacheSize = memory.Allowed() / 10
})
return maxCachedIndexBlocksPerPart
return maxIndexBlockCacheSize
}
var (
maxCachedIndexBlocksPerPart int
maxCachedIndexBlocksPerPartOnce sync.Once
maxIndexBlockCacheSize int
maxIndexBlockCacheSizeOnce sync.Once
)
func getMaxCachedInmemoryBlocksPerPart() int {
maxCachedInmemoryBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 4
if n == 0 {
n = 10
}
maxCachedInmemoryBlocksPerPart = n
func getMaxInmemoryBlocksCacheSize() int {
maxInmemoryBlockCacheSizeOnce.Do(func() {
maxInmemoryBlockCacheSize = memory.Allowed() / 4
})
return maxCachedInmemoryBlocksPerPart
return maxInmemoryBlockCacheSize
}
var (
maxCachedInmemoryBlocksPerPart int
maxCachedInmemoryBlocksPerPartOnce sync.Once
maxInmemoryBlockCacheSize int
maxInmemoryBlockCacheSizeOnce sync.Once
)
type part struct {
@ -58,9 +51,6 @@ type part struct {
indexFile fs.MustReadAtCloser
itemsFile fs.MustReadAtCloser
lensFile fs.MustReadAtCloser
idxbCache *indexBlockCache
ibCache *inmemoryBlockCache
}
func openFilePart(path string) (*part, error) {
@ -112,9 +102,6 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
p.lensFile = lensFile
p.ph.CopyFrom(ph)
p.idxbCache = newIndexBlockCache()
p.ibCache = newInmemoryBlockCache()
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
err := fmt.Errorf("error opening part %s: %w", p.path, errors[0])
@ -129,8 +116,8 @@ func (p *part) MustClose() {
p.itemsFile.MustClose()
p.lensFile.MustClose()
p.idxbCache.MustClose()
p.ibCache.MustClose()
idxbCache.RemoveBlocksForPart(p)
ibCache.RemoveBlocksForPart(p)
}
type indexBlock struct {
@ -145,346 +132,3 @@ func (idxb *indexBlock) SizeBytes() int {
}
return n
}
type indexBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[uint64]*indexBlockCacheEntry
mu sync.RWMutex
perKeyMisses map[uint64]int
perKeyMissesLock sync.Mutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type indexBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
idxb *indexBlock
}
func newIndexBlockCache() *indexBlockCache {
var idxbc indexBlockCache
idxbc.m = make(map[uint64]*indexBlockCacheEntry)
idxbc.perKeyMisses = make(map[uint64]int)
idxbc.cleanerStopCh = make(chan struct{})
idxbc.cleanerWG.Add(1)
go func() {
defer idxbc.cleanerWG.Done()
idxbc.cleaner()
}()
return &idxbc
}
func (idxbc *indexBlockCache) MustClose() {
close(idxbc.cleanerStopCh)
idxbc.cleanerWG.Wait()
idxbc.m = nil
idxbc.perKeyMisses = nil
}
// cleaner periodically cleans least recently used items.
func (idxbc *indexBlockCache) cleaner() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
perKeyMissesTicker := time.NewTicker(2 * time.Minute)
defer perKeyMissesTicker.Stop()
for {
select {
case <-ticker.C:
idxbc.cleanByTimeout()
case <-perKeyMissesTicker.C:
idxbc.perKeyMissesLock.Lock()
idxbc.perKeyMisses = make(map[uint64]int, len(idxbc.perKeyMisses))
idxbc.perKeyMissesLock.Unlock()
case <-idxbc.cleanerStopCh:
return
}
}
}
func (idxbc *indexBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
idxbc.mu.Lock()
for k, idxbe := range idxbc.m {
// Delete items accessed more than two minutes ago.
// This time should be enough for repeated queries.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 2*60 {
delete(idxbc.m, k)
}
}
idxbc.mu.Unlock()
}
func (idxbc *indexBlockCache) Get(k uint64) *indexBlock {
atomic.AddUint64(&idxbc.requests, 1)
idxbc.mu.RLock()
idxbe := idxbc.m[k]
idxbc.mu.RUnlock()
if idxbe != nil {
currentTime := fasttime.UnixTimestamp()
if atomic.LoadUint64(&idxbe.lastAccessTime) != currentTime {
atomic.StoreUint64(&idxbe.lastAccessTime, currentTime)
}
return idxbe.idxb
}
idxbc.perKeyMissesLock.Lock()
idxbc.perKeyMisses[k]++
idxbc.perKeyMissesLock.Unlock()
atomic.AddUint64(&idxbc.misses, 1)
return nil
}
// Put puts idxb under the key k into idxbc.
func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) {
idxbc.perKeyMissesLock.Lock()
doNotCache := idxbc.perKeyMisses[k] == 1
idxbc.perKeyMissesLock.Unlock()
if doNotCache {
// Do not cache ib if it has been requested only once (aka one-time-wonders items).
// This should reduce memory usage for the ibc cache.
return
}
idxbc.mu.Lock()
// Remove superfluous entries.
if overflow := len(idxbc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(idxbc.m)) * 0.1)
for k := range idxbc.m {
delete(idxbc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store idxb in the cache.
idxbe := &indexBlockCacheEntry{
lastAccessTime: fasttime.UnixTimestamp(),
idxb: idxb,
}
idxbc.m[k] = idxbe
idxbc.mu.Unlock()
}
func (idxbc *indexBlockCache) Len() uint64 {
idxbc.mu.RLock()
n := len(idxbc.m)
idxbc.mu.RUnlock()
return uint64(n)
}
func (idxbc *indexBlockCache) SizeBytes() uint64 {
n := 0
idxbc.mu.RLock()
for _, e := range idxbc.m {
n += e.idxb.SizeBytes()
}
idxbc.mu.RUnlock()
return uint64(n)
}
func (idxbc *indexBlockCache) SizeMaxBytes() uint64 {
avgBlockSize := float64(64 * 1024)
blocksCount := idxbc.Len()
if blocksCount > 0 {
avgBlockSize = float64(idxbc.SizeBytes()) / float64(blocksCount)
}
return uint64(avgBlockSize * float64(getMaxCachedIndexBlocksPerPart()))
}
func (idxbc *indexBlockCache) Requests() uint64 {
return atomic.LoadUint64(&idxbc.requests)
}
func (idxbc *indexBlockCache) Misses() uint64 {
return atomic.LoadUint64(&idxbc.misses)
}
type inmemoryBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry
mu sync.RWMutex
perKeyMisses map[inmemoryBlockCacheKey]int
perKeyMissesLock sync.Mutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type inmemoryBlockCacheKey struct {
itemsBlockOffset uint64
}
func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) {
ibck.itemsBlockOffset = bh.itemsBlockOffset
}
type inmemoryBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
ib *inmemoryBlock
}
func newInmemoryBlockCache() *inmemoryBlockCache {
var ibc inmemoryBlockCache
ibc.m = make(map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry)
ibc.perKeyMisses = make(map[inmemoryBlockCacheKey]int)
ibc.cleanerStopCh = make(chan struct{})
ibc.cleanerWG.Add(1)
go func() {
defer ibc.cleanerWG.Done()
ibc.cleaner()
}()
return &ibc
}
func (ibc *inmemoryBlockCache) MustClose() {
close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait()
ibc.m = nil
ibc.perKeyMisses = nil
}
// cleaner periodically cleans least recently used items.
func (ibc *inmemoryBlockCache) cleaner() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
perKeyMissesTicker := time.NewTicker(2 * time.Minute)
defer perKeyMissesTicker.Stop()
for {
select {
case <-ticker.C:
ibc.cleanByTimeout()
case <-perKeyMissesTicker.C:
ibc.perKeyMissesLock.Lock()
ibc.perKeyMisses = make(map[inmemoryBlockCacheKey]int, len(ibc.perKeyMisses))
ibc.perKeyMissesLock.Unlock()
case <-ibc.cleanerStopCh:
return
}
}
}
func (ibc *inmemoryBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than two minutes ago.
// This time should be enough for repeated queries.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 {
delete(ibc.m, k)
}
}
ibc.mu.Unlock()
}
func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock {
atomic.AddUint64(&ibc.requests, 1)
ibc.mu.RLock()
ibe := ibc.m[k]
ibc.mu.RUnlock()
if ibe != nil {
currentTime := fasttime.UnixTimestamp()
if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime {
atomic.StoreUint64(&ibe.lastAccessTime, currentTime)
}
return ibe.ib
}
ibc.perKeyMissesLock.Lock()
ibc.perKeyMisses[k]++
ibc.perKeyMissesLock.Unlock()
atomic.AddUint64(&ibc.misses, 1)
return nil
}
// Put puts ib under key k into ibc.
func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) {
ibc.perKeyMissesLock.Lock()
doNotCache := ibc.perKeyMisses[k] == 1
ibc.perKeyMissesLock.Unlock()
if doNotCache {
// Do not cache ib if it has been requested only once (aka one-time-wonders items).
// This should reduce memory usage for the ibc cache.
return
}
ibc.mu.Lock()
// Clean superfluous entries in cache.
if overflow := len(ibc.m) - getMaxCachedInmemoryBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1)
for k := range ibc.m {
delete(ibc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store ib in the cache.
ibe := &inmemoryBlockCacheEntry{
lastAccessTime: fasttime.UnixTimestamp(),
ib: ib,
}
ibc.m[k] = ibe
ibc.mu.Unlock()
}
func (ibc *inmemoryBlockCache) Len() uint64 {
ibc.mu.RLock()
n := len(ibc.m)
ibc.mu.RUnlock()
return uint64(n)
}
func (ibc *inmemoryBlockCache) SizeBytes() uint64 {
n := 0
ibc.mu.RLock()
for _, e := range ibc.m {
n += e.ib.SizeBytes()
}
ibc.mu.RUnlock()
return uint64(n)
}
func (ibc *inmemoryBlockCache) SizeMaxBytes() uint64 {
avgBlockSize := float64(128 * 1024)
blocksCount := ibc.Len()
if blocksCount > 0 {
avgBlockSize = float64(ibc.SizeBytes()) / float64(blocksCount)
}
return uint64(avgBlockSize * float64(getMaxCachedInmemoryBlocksPerPart()))
}
func (ibc *inmemoryBlockCache) Requests() uint64 {
return atomic.LoadUint64(&ibc.requests)
}
func (ibc *inmemoryBlockCache) Misses() uint64 {
return atomic.LoadUint64(&ibc.misses)
}

View file

@ -5,6 +5,7 @@ import (
"io"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -25,9 +26,6 @@ type partSearch struct {
// The remaining block headers to scan in the current metaindexRow.
bhs []blockHeader
idxbCache *indexBlockCache
ibCache *inmemoryBlockCache
// err contains the last error.
err error
@ -45,8 +43,6 @@ func (ps *partSearch) reset() {
ps.p = nil
ps.mrs = nil
ps.bhs = nil
ps.idxbCache = nil
ps.ibCache = nil
ps.err = nil
ps.indexBuf = ps.indexBuf[:0]
@ -65,8 +61,6 @@ func (ps *partSearch) Init(p *part) {
ps.reset()
ps.p = p
ps.idxbCache = p.idxbCache
ps.ibCache = p.ibCache
}
// Seek seeks for the first item greater or equal to k in ps.
@ -261,16 +255,20 @@ func (ps *partSearch) nextBHS() error {
}
mr := &ps.mrs[0]
ps.mrs = ps.mrs[1:]
idxbKey := mr.indexBlockOffset
idxb := ps.idxbCache.Get(idxbKey)
if idxb == nil {
var err error
idxb, err = ps.readIndexBlock(mr)
idxbKey := blockcache.Key{
Part: ps.p,
Offset: mr.indexBlockOffset,
}
b := idxbCache.GetBlock(idxbKey)
if b == nil {
idxb, err := ps.readIndexBlock(mr)
if err != nil {
return fmt.Errorf("cannot read index block: %w", err)
}
ps.idxbCache.Put(idxbKey, idxb)
b = idxb
idxbCache.PutBlock(idxbKey, b)
}
idxb := b.(*indexBlock)
ps.bhs = idxb.bhs
return nil
}
@ -293,17 +291,20 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) {
}
func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) {
var ibKey inmemoryBlockCacheKey
ibKey.Init(bh)
ib := ps.ibCache.Get(ibKey)
if ib != nil {
return ib, nil
ibKey := blockcache.Key{
Part: ps.p,
Offset: bh.itemsBlockOffset,
}
ib, err := ps.readInmemoryBlock(bh)
if err != nil {
return nil, err
b := ibCache.GetBlock(ibKey)
if b == nil {
ib, err := ps.readInmemoryBlock(bh)
if err != nil {
return nil, err
}
b = ib
ibCache.PutBlock(ibKey, b)
}
ps.ibCache.Put(ibKey, ib)
ib := b.(*inmemoryBlock)
return ib, nil
}

View file

@ -22,16 +22,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
)
// These are global counters for cache requests and misses for parts
// which were already merged into another parts.
var (
historicalDataBlockCacheRequests uint64
historicalDataBlockCacheMisses uint64
historicalIndexBlockCacheRequests uint64
historicalIndexBlockCacheMisses uint64
)
// maxParts is the maximum number of parts in the table.
//
// This number may be reached when the insertion pace outreaches merger pace.
@ -443,27 +433,21 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.ItemsCount += p.ph.itemsCount
m.SizeBytes += p.size
m.DataBlocksCacheSize += p.ibCache.Len()
m.DataBlocksCacheSizeBytes += p.ibCache.SizeBytes()
m.DataBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes()
m.DataBlocksCacheRequests += p.ibCache.Requests()
m.DataBlocksCacheMisses += p.ibCache.Misses()
m.IndexBlocksCacheSize += p.idxbCache.Len()
m.IndexBlocksCacheSizeBytes += p.idxbCache.SizeBytes()
m.IndexBlocksCacheSizeMaxBytes += p.idxbCache.SizeMaxBytes()
m.IndexBlocksCacheRequests += p.idxbCache.Requests()
m.IndexBlocksCacheMisses += p.idxbCache.Misses()
m.PartsRefCount += atomic.LoadUint64(&pw.refCount)
}
tb.partsLock.Unlock()
m.DataBlocksCacheRequests = atomic.LoadUint64(&historicalDataBlockCacheRequests)
m.DataBlocksCacheMisses = atomic.LoadUint64(&historicalDataBlockCacheMisses)
m.DataBlocksCacheSize = uint64(ibCache.Len())
m.DataBlocksCacheSizeBytes = uint64(ibCache.SizeBytes())
m.DataBlocksCacheSizeMaxBytes = uint64(ibCache.SizeMaxBytes())
m.DataBlocksCacheRequests = ibCache.Requests()
m.DataBlocksCacheMisses = ibCache.Misses()
m.IndexBlocksCacheRequests = atomic.LoadUint64(&historicalIndexBlockCacheRequests)
m.IndexBlocksCacheMisses = atomic.LoadUint64(&historicalIndexBlockCacheMisses)
m.IndexBlocksCacheSize = uint64(idxbCache.Len())
m.IndexBlocksCacheSizeBytes = uint64(idxbCache.SizeBytes())
m.IndexBlocksCacheSizeMaxBytes = uint64(idxbCache.SizeMaxBytes())
m.IndexBlocksCacheRequests = idxbCache.Requests()
m.IndexBlocksCacheMisses = idxbCache.Misses()
}
// AddItems adds the given items to the tb.
@ -1466,10 +1450,6 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa
dst = append(dst, pw)
continue
}
atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests())
atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses())
atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests())
atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses())
removedParts++
}
return dst, removedParts

View file

@ -4,30 +4,26 @@ import (
"fmt"
"path/filepath"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
func getMaxCachedIndexBlocksPerPart() int {
maxCachedIndexBlocksPerPartOnce.Do(func() {
n := memory.Allowed() / 1024 / 1024 / 8
if n < 16 {
n = 16
}
maxCachedIndexBlocksPerPart = n
var ibCache = blockcache.NewCache(getMaxIndexBlocksCacheSize)
func getMaxIndexBlocksCacheSize() int {
maxIndexBlockCacheSizeOnce.Do(func() {
maxIndexBlockCacheSize = memory.Allowed() / 10
})
return maxCachedIndexBlocksPerPart
return maxIndexBlockCacheSize
}
var (
maxCachedIndexBlocksPerPart int
maxCachedIndexBlocksPerPartOnce sync.Once
maxIndexBlockCacheSize int
maxIndexBlockCacheSizeOnce sync.Once
)
// part represents a searchable part containing time series data.
@ -47,8 +43,6 @@ type part struct {
indexFile fs.MustReadAtCloser
metaindex []metaindexRow
ibCache *indexBlockCache
}
// openFilePart opens file-based part from the given path.
@ -105,9 +99,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
p.timestampsFile = timestampsFile
p.valuesFile = valuesFile
p.indexFile = indexFile
p.metaindex = metaindex
p.ibCache = newIndexBlockCache()
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
@ -133,8 +125,7 @@ func (p *part) MustClose() {
p.valuesFile.MustClose()
p.indexFile.MustClose()
isBig := p.size > maxSmallPartSize()
p.ibCache.MustClose(isBig)
ibCache.RemoveBlocksForPart(p)
}
type indexBlock struct {
@ -144,148 +135,3 @@ type indexBlock struct {
func (idxb *indexBlock) SizeBytes() int {
return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{}))
}
type indexBlockCache struct {
// Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[uint64]*indexBlockCacheEntry
mu sync.RWMutex
cleanerStopCh chan struct{}
cleanerWG sync.WaitGroup
}
type indexBlockCacheEntry struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
lastAccessTime uint64
ib *indexBlock
}
func newIndexBlockCache() *indexBlockCache {
var ibc indexBlockCache
ibc.m = make(map[uint64]*indexBlockCacheEntry)
ibc.cleanerStopCh = make(chan struct{})
ibc.cleanerWG.Add(1)
go func() {
defer ibc.cleanerWG.Done()
ibc.cleaner()
}()
return &ibc
}
func (ibc *indexBlockCache) MustClose(isBig bool) {
close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait()
ibc.m = nil
}
// cleaner periodically cleans least recently used items.
func (ibc *indexBlockCache) cleaner() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ibc.cleanByTimeout()
case <-ibc.cleanerStopCh:
return
}
}
}
func (ibc *indexBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than two minutes ago.
// This time should be enough for repeated queries.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 {
delete(ibc.m, k)
}
}
ibc.mu.Unlock()
}
func (ibc *indexBlockCache) Get(k uint64) *indexBlock {
atomic.AddUint64(&ibc.requests, 1)
ibc.mu.RLock()
ibe := ibc.m[k]
ibc.mu.RUnlock()
if ibe != nil {
currentTime := fasttime.UnixTimestamp()
if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime {
atomic.StoreUint64(&ibe.lastAccessTime, currentTime)
}
return ibe.ib
}
atomic.AddUint64(&ibc.misses, 1)
return nil
}
func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) {
ibc.mu.Lock()
// Clean superfluous cache entries.
if overflow := len(ibc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 {
// Remove 10% of items from the cache.
overflow = int(float64(len(ibc.m)) * 0.1)
for k := range ibc.m {
delete(ibc.m, k)
overflow--
if overflow == 0 {
break
}
}
}
// Store frequently requested ib in the cache.
ibe := &indexBlockCacheEntry{
lastAccessTime: fasttime.UnixTimestamp(),
ib: ib,
}
ibc.m[k] = ibe
ibc.mu.Unlock()
}
func (ibc *indexBlockCache) Requests() uint64 {
return atomic.LoadUint64(&ibc.requests)
}
func (ibc *indexBlockCache) Misses() uint64 {
return atomic.LoadUint64(&ibc.misses)
}
func (ibc *indexBlockCache) Len() uint64 {
ibc.mu.Lock()
n := uint64(len(ibc.m))
ibc.mu.Unlock()
return n
}
func (ibc *indexBlockCache) SizeBytes() uint64 {
n := 0
ibc.mu.Lock()
for _, e := range ibc.m {
n += e.ib.SizeBytes()
}
ibc.mu.Unlock()
return uint64(n)
}
func (ibc *indexBlockCache) SizeMaxBytes() uint64 {
avgBlockSize := float64(64 * 1024)
blocksCount := ibc.Len()
if blocksCount > 0 {
avgBlockSize = float64(ibc.SizeBytes()) / float64(blocksCount)
}
return uint64(avgBlockSize * float64(getMaxCachedIndexBlocksPerPart()))
}

View file

@ -7,6 +7,7 @@ import (
"sort"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -32,8 +33,6 @@ type partSearch struct {
metaindex []metaindexRow
ibCache *indexBlockCache
bhs []blockHeader
compressedIndexBuf []byte
@ -48,7 +47,6 @@ func (ps *partSearch) reset() {
ps.tsids = nil
ps.tsidIdx = 0
ps.metaindex = nil
ps.ibCache = nil
ps.bhs = nil
ps.compressedIndexBuf = ps.compressedIndexBuf[:0]
ps.indexBuf = ps.indexBuf[:0]
@ -76,7 +74,6 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) {
}
ps.tr = tr
ps.metaindex = p.metaindex
ps.ibCache = p.ibCache
// Advance to the first tsid. There is no need in checking
// the returned result, since it will be checked in NextBlock.
@ -154,19 +151,23 @@ func (ps *partSearch) nextBHS() bool {
// Found the index block which may contain the required data
// for the ps.BlockRef.bh.TSID and the given timestamp range.
indexBlockKey := mr.IndexBlockOffset
ib := ps.ibCache.Get(indexBlockKey)
if ib == nil {
indexBlockKey := blockcache.Key{
Part: ps.p,
Offset: mr.IndexBlockOffset,
}
b := ibCache.GetBlock(indexBlockKey)
if b == nil {
// Slow path - actually read and unpack the index block.
var err error
ib, err = ps.readIndexBlock(mr)
ib, err := ps.readIndexBlock(mr)
if err != nil {
ps.err = fmt.Errorf("cannot read index block for part %q at offset %d with size %d: %w",
&ps.p.ph, mr.IndexBlockOffset, mr.IndexBlockSize, err)
return false
}
ps.ibCache.Put(indexBlockKey, ib)
b = ib
ibCache.PutBlock(indexBlockKey, b)
}
ib := b.(*indexBlock)
ps.bhs = ib.bhs
return true
}

View file

@ -25,16 +25,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
// These are global counters for cache requests and misses for parts
// which were already merged into another parts.
var (
historicalBigIndexBlocksCacheRequests uint64
historicalBigIndexBlocksCacheMisses uint64
historicalSmallIndexBlocksCacheRequests uint64
historicalSmallIndexBlocksCacheMisses uint64
)
func maxSmallPartSize() uint64 {
// Small parts are cached in the OS page cache,
// so limit their size by the remaining free RAM.
@ -306,17 +296,11 @@ func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs
type partitionMetrics struct {
PendingRows uint64
BigIndexBlocksCacheSize uint64
BigIndexBlocksCacheSizeBytes uint64
BigIndexBlocksCacheSizeMaxBytes uint64
BigIndexBlocksCacheRequests uint64
BigIndexBlocksCacheMisses uint64
SmallIndexBlocksCacheSize uint64
SmallIndexBlocksCacheSizeBytes uint64
SmallIndexBlocksCacheSizeMaxBytes uint64
SmallIndexBlocksCacheRequests uint64
SmallIndexBlocksCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheSizeMaxBytes uint64
IndexBlocksCacheRequests uint64
IndexBlocksCacheMisses uint64
BigSizeBytes uint64
SmallSizeBytes uint64
@ -362,11 +346,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
for _, pw := range pt.bigParts {
p := pw.p
m.BigIndexBlocksCacheSize += p.ibCache.Len()
m.BigIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes()
m.BigIndexBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes()
m.BigIndexBlocksCacheRequests += p.ibCache.Requests()
m.BigIndexBlocksCacheMisses += p.ibCache.Misses()
m.BigRowsCount += p.ph.RowsCount
m.BigBlocksCount += p.ph.BlocksCount
m.BigSizeBytes += p.size
@ -376,11 +355,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
for _, pw := range pt.smallParts {
p := pw.p
m.SmallIndexBlocksCacheSize += p.ibCache.Len()
m.SmallIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes()
m.SmallIndexBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes()
m.SmallIndexBlocksCacheRequests += p.ibCache.Requests()
m.SmallIndexBlocksCacheMisses += p.ibCache.Misses()
m.SmallRowsCount += p.ph.RowsCount
m.SmallBlocksCount += p.ph.BlocksCount
m.SmallSizeBytes += p.size
@ -392,11 +366,11 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
pt.partsLock.Unlock()
m.BigIndexBlocksCacheRequests = atomic.LoadUint64(&historicalBigIndexBlocksCacheRequests)
m.BigIndexBlocksCacheMisses = atomic.LoadUint64(&historicalBigIndexBlocksCacheMisses)
m.SmallIndexBlocksCacheRequests = atomic.LoadUint64(&historicalSmallIndexBlocksCacheRequests)
m.SmallIndexBlocksCacheMisses = atomic.LoadUint64(&historicalSmallIndexBlocksCacheMisses)
m.IndexBlocksCacheSize = uint64(ibCache.Len())
m.IndexBlocksCacheSizeBytes = uint64(ibCache.SizeBytes())
m.IndexBlocksCacheSizeMaxBytes = uint64(ibCache.SizeMaxBytes())
m.IndexBlocksCacheRequests = ibCache.Requests()
m.IndexBlocksCacheMisses = ibCache.Misses()
m.ActiveBigMerges += atomic.LoadUint64(&pt.activeBigMerges)
m.ActiveSmallMerges += atomic.LoadUint64(&pt.activeSmallMerges)
@ -1311,15 +1285,6 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
dst = append(dst, pw)
continue
}
requests := pw.p.ibCache.Requests()
misses := pw.p.ibCache.Misses()
if isBig {
atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests)
atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses)
} else {
atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests)
atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses)
}
removedParts++
}
return dst, removedParts