diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index fdcbbc8f52..a308f64934 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -529,11 +529,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_entries{type="storage/next_day_metric_ids"}`, func() float64 { return float64(m().NextDayMetricIDCacheSize) }) - metrics.NewGauge(`vm_cache_entries{type="storage/bigIndexBlocks"}`, func() float64 { - return float64(tm().BigIndexBlocksCacheSize) - }) - metrics.NewGauge(`vm_cache_entries{type="storage/smallIndexBlocks"}`, func() float64 { - return float64(tm().SmallIndexBlocksCacheSize) + metrics.NewGauge(`vm_cache_entries{type="storage/indexBlocks"}`, func() float64 { + return float64(tm().IndexBlocksCacheSize) }) metrics.NewGauge(`vm_cache_entries{type="indexdb/dataBlocks"}`, func() float64 { return float64(idbm().DataBlocksCacheSize) @@ -560,11 +557,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheSizeBytes) }) - metrics.NewGauge(`vm_cache_size_bytes{type="storage/bigIndexBlocks"}`, func() float64 { - return float64(tm().BigIndexBlocksCacheSizeBytes) - }) - metrics.NewGauge(`vm_cache_size_bytes{type="storage/smallIndexBlocks"}`, func() float64 { - return float64(tm().SmallIndexBlocksCacheSizeBytes) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/indexBlocks"}`, func() float64 { + return float64(tm().IndexBlocksCacheSizeBytes) }) metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/dataBlocks"}`, func() float64 { return float64(idbm().DataBlocksCacheSizeBytes) @@ -597,11 +591,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheSizeMaxBytes) }) - metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/bigIndexBlocks"}`, func() float64 { - return float64(tm().BigIndexBlocksCacheSizeMaxBytes) - }) - metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/smallIndexBlocks"}`, func() float64 { - return float64(tm().SmallIndexBlocksCacheSizeMaxBytes) + metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/indexBlocks"}`, func() float64 { + return float64(tm().IndexBlocksCacheSizeMaxBytes) }) metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/dataBlocks"}`, func() float64 { return float64(idbm().DataBlocksCacheSizeMaxBytes) @@ -622,11 +613,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_requests_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheRequests) }) - metrics.NewGauge(`vm_cache_requests_total{type="storage/bigIndexBlocks"}`, func() float64 { - return float64(tm().BigIndexBlocksCacheRequests) - }) - metrics.NewGauge(`vm_cache_requests_total{type="storage/smallIndexBlocks"}`, func() float64 { - return float64(tm().SmallIndexBlocksCacheRequests) + metrics.NewGauge(`vm_cache_requests_total{type="storage/indexBlocks"}`, func() float64 { + return float64(tm().IndexBlocksCacheRequests) }) metrics.NewGauge(`vm_cache_requests_total{type="indexdb/dataBlocks"}`, func() float64 { return float64(idbm().DataBlocksCacheRequests) @@ -650,11 +638,8 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_misses_total{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheMisses) }) - metrics.NewGauge(`vm_cache_misses_total{type="storage/bigIndexBlocks"}`, func() float64 { - return float64(tm().BigIndexBlocksCacheMisses) - }) - metrics.NewGauge(`vm_cache_misses_total{type="storage/smallIndexBlocks"}`, func() float64 { - return float64(tm().SmallIndexBlocksCacheMisses) + metrics.NewGauge(`vm_cache_misses_total{type="storage/indexBlocks"}`, func() float64 { + return float64(tm().IndexBlocksCacheMisses) }) metrics.NewGauge(`vm_cache_misses_total{type="indexdb/dataBlocks"}`, func() float64 { return float64(idbm().DataBlocksCacheMisses) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index a868d1ad4d..e77c965b8d 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -7,6 +7,7 @@ sort: 15 ## tip * BUGFIX: return proper results from `highestMax()` function at [Graphite render API](https://docs.victoriametrics.com/#graphite-render-api-usage). Previously it was incorrectly returning timeseries with min peaks instead of max peaks. +* BUGFIX: properly limit indexdb cache sizes. Previously they could exceed values set via `-memory.allowedPercent` and/or `-memory.allowedBytes` when `indexdb` contained many data parts. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007). ## [v1.72.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.72.0) diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index 79922ee3f3..356c6878e1 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -4,46 +4,39 @@ import ( "fmt" "path/filepath" "sync" - "sync/atomic" - "time" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) -func getMaxCachedIndexBlocksPerPart() int { - maxCachedIndexBlocksPerPartOnce.Do(func() { - n := memory.Allowed() / 1024 / 1024 / 4 - if n == 0 { - n = 10 - } - maxCachedIndexBlocksPerPart = n +var idxbCache = blockcache.NewCache(getMaxIndexBlocksCacheSize) +var ibCache = blockcache.NewCache(getMaxInmemoryBlocksCacheSize) + +func getMaxIndexBlocksCacheSize() int { + maxIndexBlockCacheSizeOnce.Do(func() { + maxIndexBlockCacheSize = memory.Allowed() / 10 }) - return maxCachedIndexBlocksPerPart + return maxIndexBlockCacheSize } var ( - maxCachedIndexBlocksPerPart int - maxCachedIndexBlocksPerPartOnce sync.Once + maxIndexBlockCacheSize int + maxIndexBlockCacheSizeOnce sync.Once ) -func getMaxCachedInmemoryBlocksPerPart() int { - maxCachedInmemoryBlocksPerPartOnce.Do(func() { - n := memory.Allowed() / 1024 / 1024 / 4 - if n == 0 { - n = 10 - } - maxCachedInmemoryBlocksPerPart = n +func getMaxInmemoryBlocksCacheSize() int { + maxInmemoryBlockCacheSizeOnce.Do(func() { + maxInmemoryBlockCacheSize = memory.Allowed() / 4 }) - return maxCachedInmemoryBlocksPerPart + return maxInmemoryBlockCacheSize } var ( - maxCachedInmemoryBlocksPerPart int - maxCachedInmemoryBlocksPerPartOnce sync.Once + maxInmemoryBlockCacheSize int + maxInmemoryBlockCacheSizeOnce sync.Once ) type part struct { @@ -58,9 +51,6 @@ type part struct { indexFile fs.MustReadAtCloser itemsFile fs.MustReadAtCloser lensFile fs.MustReadAtCloser - - idxbCache *indexBlockCache - ibCache *inmemoryBlockCache } func openFilePart(path string) (*part, error) { @@ -112,9 +102,6 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.lensFile = lensFile p.ph.CopyFrom(ph) - p.idxbCache = newIndexBlockCache() - p.ibCache = newInmemoryBlockCache() - if len(errors) > 0 { // Return only the first error, since it has no sense in returning all errors. err := fmt.Errorf("error opening part %s: %w", p.path, errors[0]) @@ -129,8 +116,8 @@ func (p *part) MustClose() { p.itemsFile.MustClose() p.lensFile.MustClose() - p.idxbCache.MustClose() - p.ibCache.MustClose() + idxbCache.RemoveBlocksForPart(p) + ibCache.RemoveBlocksForPart(p) } type indexBlock struct { @@ -145,346 +132,3 @@ func (idxb *indexBlock) SizeBytes() int { } return n } - -type indexBlockCache struct { - // Atomically updated counters must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - requests uint64 - misses uint64 - - m map[uint64]*indexBlockCacheEntry - mu sync.RWMutex - - perKeyMisses map[uint64]int - perKeyMissesLock sync.Mutex - - cleanerStopCh chan struct{} - cleanerWG sync.WaitGroup -} - -type indexBlockCacheEntry struct { - // Atomically updated counters must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - lastAccessTime uint64 - - idxb *indexBlock -} - -func newIndexBlockCache() *indexBlockCache { - var idxbc indexBlockCache - idxbc.m = make(map[uint64]*indexBlockCacheEntry) - idxbc.perKeyMisses = make(map[uint64]int) - idxbc.cleanerStopCh = make(chan struct{}) - idxbc.cleanerWG.Add(1) - go func() { - defer idxbc.cleanerWG.Done() - idxbc.cleaner() - }() - return &idxbc -} - -func (idxbc *indexBlockCache) MustClose() { - close(idxbc.cleanerStopCh) - idxbc.cleanerWG.Wait() - idxbc.m = nil - idxbc.perKeyMisses = nil -} - -// cleaner periodically cleans least recently used items. -func (idxbc *indexBlockCache) cleaner() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - perKeyMissesTicker := time.NewTicker(2 * time.Minute) - defer perKeyMissesTicker.Stop() - for { - select { - case <-ticker.C: - idxbc.cleanByTimeout() - case <-perKeyMissesTicker.C: - idxbc.perKeyMissesLock.Lock() - idxbc.perKeyMisses = make(map[uint64]int, len(idxbc.perKeyMisses)) - idxbc.perKeyMissesLock.Unlock() - case <-idxbc.cleanerStopCh: - return - } - } -} - -func (idxbc *indexBlockCache) cleanByTimeout() { - currentTime := fasttime.UnixTimestamp() - idxbc.mu.Lock() - for k, idxbe := range idxbc.m { - // Delete items accessed more than two minutes ago. - // This time should be enough for repeated queries. - if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 2*60 { - delete(idxbc.m, k) - } - } - idxbc.mu.Unlock() -} - -func (idxbc *indexBlockCache) Get(k uint64) *indexBlock { - atomic.AddUint64(&idxbc.requests, 1) - idxbc.mu.RLock() - idxbe := idxbc.m[k] - idxbc.mu.RUnlock() - - if idxbe != nil { - currentTime := fasttime.UnixTimestamp() - if atomic.LoadUint64(&idxbe.lastAccessTime) != currentTime { - atomic.StoreUint64(&idxbe.lastAccessTime, currentTime) - } - return idxbe.idxb - } - idxbc.perKeyMissesLock.Lock() - idxbc.perKeyMisses[k]++ - idxbc.perKeyMissesLock.Unlock() - atomic.AddUint64(&idxbc.misses, 1) - return nil -} - -// Put puts idxb under the key k into idxbc. -func (idxbc *indexBlockCache) Put(k uint64, idxb *indexBlock) { - idxbc.perKeyMissesLock.Lock() - doNotCache := idxbc.perKeyMisses[k] == 1 - idxbc.perKeyMissesLock.Unlock() - if doNotCache { - // Do not cache ib if it has been requested only once (aka one-time-wonders items). - // This should reduce memory usage for the ibc cache. - return - } - - idxbc.mu.Lock() - // Remove superfluous entries. - if overflow := len(idxbc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 { - // Remove 10% of items from the cache. - overflow = int(float64(len(idxbc.m)) * 0.1) - for k := range idxbc.m { - delete(idxbc.m, k) - overflow-- - if overflow == 0 { - break - } - } - } - - // Store idxb in the cache. - idxbe := &indexBlockCacheEntry{ - lastAccessTime: fasttime.UnixTimestamp(), - idxb: idxb, - } - idxbc.m[k] = idxbe - idxbc.mu.Unlock() -} - -func (idxbc *indexBlockCache) Len() uint64 { - idxbc.mu.RLock() - n := len(idxbc.m) - idxbc.mu.RUnlock() - return uint64(n) -} - -func (idxbc *indexBlockCache) SizeBytes() uint64 { - n := 0 - idxbc.mu.RLock() - for _, e := range idxbc.m { - n += e.idxb.SizeBytes() - } - idxbc.mu.RUnlock() - return uint64(n) -} - -func (idxbc *indexBlockCache) SizeMaxBytes() uint64 { - avgBlockSize := float64(64 * 1024) - blocksCount := idxbc.Len() - if blocksCount > 0 { - avgBlockSize = float64(idxbc.SizeBytes()) / float64(blocksCount) - } - return uint64(avgBlockSize * float64(getMaxCachedIndexBlocksPerPart())) -} - -func (idxbc *indexBlockCache) Requests() uint64 { - return atomic.LoadUint64(&idxbc.requests) -} - -func (idxbc *indexBlockCache) Misses() uint64 { - return atomic.LoadUint64(&idxbc.misses) -} - -type inmemoryBlockCache struct { - // Atomically updated counters must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - requests uint64 - misses uint64 - - m map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry - mu sync.RWMutex - - perKeyMisses map[inmemoryBlockCacheKey]int - perKeyMissesLock sync.Mutex - - cleanerStopCh chan struct{} - cleanerWG sync.WaitGroup -} - -type inmemoryBlockCacheKey struct { - itemsBlockOffset uint64 -} - -func (ibck *inmemoryBlockCacheKey) Init(bh *blockHeader) { - ibck.itemsBlockOffset = bh.itemsBlockOffset -} - -type inmemoryBlockCacheEntry struct { - // Atomically updated counters must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - lastAccessTime uint64 - - ib *inmemoryBlock -} - -func newInmemoryBlockCache() *inmemoryBlockCache { - var ibc inmemoryBlockCache - ibc.m = make(map[inmemoryBlockCacheKey]*inmemoryBlockCacheEntry) - ibc.perKeyMisses = make(map[inmemoryBlockCacheKey]int) - ibc.cleanerStopCh = make(chan struct{}) - ibc.cleanerWG.Add(1) - go func() { - defer ibc.cleanerWG.Done() - ibc.cleaner() - }() - return &ibc -} - -func (ibc *inmemoryBlockCache) MustClose() { - close(ibc.cleanerStopCh) - ibc.cleanerWG.Wait() - ibc.m = nil - ibc.perKeyMisses = nil -} - -// cleaner periodically cleans least recently used items. -func (ibc *inmemoryBlockCache) cleaner() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - perKeyMissesTicker := time.NewTicker(2 * time.Minute) - defer perKeyMissesTicker.Stop() - for { - select { - case <-ticker.C: - ibc.cleanByTimeout() - case <-perKeyMissesTicker.C: - ibc.perKeyMissesLock.Lock() - ibc.perKeyMisses = make(map[inmemoryBlockCacheKey]int, len(ibc.perKeyMisses)) - ibc.perKeyMissesLock.Unlock() - case <-ibc.cleanerStopCh: - return - } - } -} - -func (ibc *inmemoryBlockCache) cleanByTimeout() { - currentTime := fasttime.UnixTimestamp() - ibc.mu.Lock() - for k, ibe := range ibc.m { - // Delete items accessed more than two minutes ago. - // This time should be enough for repeated queries. - if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 { - delete(ibc.m, k) - } - } - ibc.mu.Unlock() -} - -func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock { - atomic.AddUint64(&ibc.requests, 1) - - ibc.mu.RLock() - ibe := ibc.m[k] - ibc.mu.RUnlock() - - if ibe != nil { - currentTime := fasttime.UnixTimestamp() - if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime { - atomic.StoreUint64(&ibe.lastAccessTime, currentTime) - } - return ibe.ib - } - ibc.perKeyMissesLock.Lock() - ibc.perKeyMisses[k]++ - ibc.perKeyMissesLock.Unlock() - atomic.AddUint64(&ibc.misses, 1) - return nil -} - -// Put puts ib under key k into ibc. -func (ibc *inmemoryBlockCache) Put(k inmemoryBlockCacheKey, ib *inmemoryBlock) { - ibc.perKeyMissesLock.Lock() - doNotCache := ibc.perKeyMisses[k] == 1 - ibc.perKeyMissesLock.Unlock() - if doNotCache { - // Do not cache ib if it has been requested only once (aka one-time-wonders items). - // This should reduce memory usage for the ibc cache. - return - } - - ibc.mu.Lock() - // Clean superfluous entries in cache. - if overflow := len(ibc.m) - getMaxCachedInmemoryBlocksPerPart(); overflow > 0 { - // Remove 10% of items from the cache. - overflow = int(float64(len(ibc.m)) * 0.1) - for k := range ibc.m { - delete(ibc.m, k) - overflow-- - if overflow == 0 { - break - } - } - } - - // Store ib in the cache. - ibe := &inmemoryBlockCacheEntry{ - lastAccessTime: fasttime.UnixTimestamp(), - ib: ib, - } - ibc.m[k] = ibe - ibc.mu.Unlock() -} - -func (ibc *inmemoryBlockCache) Len() uint64 { - ibc.mu.RLock() - n := len(ibc.m) - ibc.mu.RUnlock() - return uint64(n) -} - -func (ibc *inmemoryBlockCache) SizeBytes() uint64 { - n := 0 - ibc.mu.RLock() - for _, e := range ibc.m { - n += e.ib.SizeBytes() - } - ibc.mu.RUnlock() - return uint64(n) -} - -func (ibc *inmemoryBlockCache) SizeMaxBytes() uint64 { - avgBlockSize := float64(128 * 1024) - blocksCount := ibc.Len() - if blocksCount > 0 { - avgBlockSize = float64(ibc.SizeBytes()) / float64(blocksCount) - } - return uint64(avgBlockSize * float64(getMaxCachedInmemoryBlocksPerPart())) -} - -func (ibc *inmemoryBlockCache) Requests() uint64 { - return atomic.LoadUint64(&ibc.requests) -} - -func (ibc *inmemoryBlockCache) Misses() uint64 { - return atomic.LoadUint64(&ibc.misses) -} diff --git a/lib/mergeset/part_search.go b/lib/mergeset/part_search.go index 8671561b63..479070a006 100644 --- a/lib/mergeset/part_search.go +++ b/lib/mergeset/part_search.go @@ -5,6 +5,7 @@ import ( "io" "sort" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -25,9 +26,6 @@ type partSearch struct { // The remaining block headers to scan in the current metaindexRow. bhs []blockHeader - idxbCache *indexBlockCache - ibCache *inmemoryBlockCache - // err contains the last error. err error @@ -45,8 +43,6 @@ func (ps *partSearch) reset() { ps.p = nil ps.mrs = nil ps.bhs = nil - ps.idxbCache = nil - ps.ibCache = nil ps.err = nil ps.indexBuf = ps.indexBuf[:0] @@ -65,8 +61,6 @@ func (ps *partSearch) Init(p *part) { ps.reset() ps.p = p - ps.idxbCache = p.idxbCache - ps.ibCache = p.ibCache } // Seek seeks for the first item greater or equal to k in ps. @@ -261,16 +255,20 @@ func (ps *partSearch) nextBHS() error { } mr := &ps.mrs[0] ps.mrs = ps.mrs[1:] - idxbKey := mr.indexBlockOffset - idxb := ps.idxbCache.Get(idxbKey) - if idxb == nil { - var err error - idxb, err = ps.readIndexBlock(mr) + idxbKey := blockcache.Key{ + Part: ps.p, + Offset: mr.indexBlockOffset, + } + b := idxbCache.GetBlock(idxbKey) + if b == nil { + idxb, err := ps.readIndexBlock(mr) if err != nil { return fmt.Errorf("cannot read index block: %w", err) } - ps.idxbCache.Put(idxbKey, idxb) + b = idxb + idxbCache.PutBlock(idxbKey, b) } + idxb := b.(*indexBlock) ps.bhs = idxb.bhs return nil } @@ -293,17 +291,20 @@ func (ps *partSearch) readIndexBlock(mr *metaindexRow) (*indexBlock, error) { } func (ps *partSearch) getInmemoryBlock(bh *blockHeader) (*inmemoryBlock, error) { - var ibKey inmemoryBlockCacheKey - ibKey.Init(bh) - ib := ps.ibCache.Get(ibKey) - if ib != nil { - return ib, nil + ibKey := blockcache.Key{ + Part: ps.p, + Offset: bh.itemsBlockOffset, } - ib, err := ps.readInmemoryBlock(bh) - if err != nil { - return nil, err + b := ibCache.GetBlock(ibKey) + if b == nil { + ib, err := ps.readInmemoryBlock(bh) + if err != nil { + return nil, err + } + b = ib + ibCache.PutBlock(ibKey, b) } - ps.ibCache.Put(ibKey, ib) + ib := b.(*inmemoryBlock) return ib, nil } diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 7573e27c23..24a69a856a 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -22,16 +22,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" ) -// These are global counters for cache requests and misses for parts -// which were already merged into another parts. -var ( - historicalDataBlockCacheRequests uint64 - historicalDataBlockCacheMisses uint64 - - historicalIndexBlockCacheRequests uint64 - historicalIndexBlockCacheMisses uint64 -) - // maxParts is the maximum number of parts in the table. // // This number may be reached when the insertion pace outreaches merger pace. @@ -443,27 +433,21 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.ItemsCount += p.ph.itemsCount m.SizeBytes += p.size - m.DataBlocksCacheSize += p.ibCache.Len() - m.DataBlocksCacheSizeBytes += p.ibCache.SizeBytes() - m.DataBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes() - m.DataBlocksCacheRequests += p.ibCache.Requests() - m.DataBlocksCacheMisses += p.ibCache.Misses() - - m.IndexBlocksCacheSize += p.idxbCache.Len() - m.IndexBlocksCacheSizeBytes += p.idxbCache.SizeBytes() - m.IndexBlocksCacheSizeMaxBytes += p.idxbCache.SizeMaxBytes() - m.IndexBlocksCacheRequests += p.idxbCache.Requests() - m.IndexBlocksCacheMisses += p.idxbCache.Misses() - m.PartsRefCount += atomic.LoadUint64(&pw.refCount) } tb.partsLock.Unlock() - m.DataBlocksCacheRequests = atomic.LoadUint64(&historicalDataBlockCacheRequests) - m.DataBlocksCacheMisses = atomic.LoadUint64(&historicalDataBlockCacheMisses) + m.DataBlocksCacheSize = uint64(ibCache.Len()) + m.DataBlocksCacheSizeBytes = uint64(ibCache.SizeBytes()) + m.DataBlocksCacheSizeMaxBytes = uint64(ibCache.SizeMaxBytes()) + m.DataBlocksCacheRequests = ibCache.Requests() + m.DataBlocksCacheMisses = ibCache.Misses() - m.IndexBlocksCacheRequests = atomic.LoadUint64(&historicalIndexBlockCacheRequests) - m.IndexBlocksCacheMisses = atomic.LoadUint64(&historicalIndexBlockCacheMisses) + m.IndexBlocksCacheSize = uint64(idxbCache.Len()) + m.IndexBlocksCacheSizeBytes = uint64(idxbCache.SizeBytes()) + m.IndexBlocksCacheSizeMaxBytes = uint64(idxbCache.SizeMaxBytes()) + m.IndexBlocksCacheRequests = idxbCache.Requests() + m.IndexBlocksCacheMisses = idxbCache.Misses() } // AddItems adds the given items to the tb. @@ -1466,10 +1450,6 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa dst = append(dst, pw) continue } - atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests()) - atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses()) - atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests()) - atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses()) removedParts++ } return dst, removedParts diff --git a/lib/storage/part.go b/lib/storage/part.go index 78146b9daf..fb1befd68a 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -4,30 +4,26 @@ import ( "fmt" "path/filepath" "sync" - "sync/atomic" - "time" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) -func getMaxCachedIndexBlocksPerPart() int { - maxCachedIndexBlocksPerPartOnce.Do(func() { - n := memory.Allowed() / 1024 / 1024 / 8 - if n < 16 { - n = 16 - } - maxCachedIndexBlocksPerPart = n +var ibCache = blockcache.NewCache(getMaxIndexBlocksCacheSize) + +func getMaxIndexBlocksCacheSize() int { + maxIndexBlockCacheSizeOnce.Do(func() { + maxIndexBlockCacheSize = memory.Allowed() / 10 }) - return maxCachedIndexBlocksPerPart + return maxIndexBlockCacheSize } var ( - maxCachedIndexBlocksPerPart int - maxCachedIndexBlocksPerPartOnce sync.Once + maxIndexBlockCacheSize int + maxIndexBlockCacheSizeOnce sync.Once ) // part represents a searchable part containing time series data. @@ -47,8 +43,6 @@ type part struct { indexFile fs.MustReadAtCloser metaindex []metaindexRow - - ibCache *indexBlockCache } // openFilePart opens file-based part from the given path. @@ -105,9 +99,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea p.timestampsFile = timestampsFile p.valuesFile = valuesFile p.indexFile = indexFile - p.metaindex = metaindex - p.ibCache = newIndexBlockCache() if len(errors) > 0 { // Return only the first error, since it has no sense in returning all errors. @@ -133,8 +125,7 @@ func (p *part) MustClose() { p.valuesFile.MustClose() p.indexFile.MustClose() - isBig := p.size > maxSmallPartSize() - p.ibCache.MustClose(isBig) + ibCache.RemoveBlocksForPart(p) } type indexBlock struct { @@ -144,148 +135,3 @@ type indexBlock struct { func (idxb *indexBlock) SizeBytes() int { return cap(idxb.bhs) * int(unsafe.Sizeof(blockHeader{})) } - -type indexBlockCache struct { - // Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - requests uint64 - misses uint64 - - m map[uint64]*indexBlockCacheEntry - mu sync.RWMutex - - cleanerStopCh chan struct{} - cleanerWG sync.WaitGroup -} - -type indexBlockCacheEntry struct { - // Atomically updated counters must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - lastAccessTime uint64 - - ib *indexBlock -} - -func newIndexBlockCache() *indexBlockCache { - var ibc indexBlockCache - ibc.m = make(map[uint64]*indexBlockCacheEntry) - - ibc.cleanerStopCh = make(chan struct{}) - ibc.cleanerWG.Add(1) - go func() { - defer ibc.cleanerWG.Done() - ibc.cleaner() - }() - return &ibc -} - -func (ibc *indexBlockCache) MustClose(isBig bool) { - close(ibc.cleanerStopCh) - ibc.cleanerWG.Wait() - ibc.m = nil -} - -// cleaner periodically cleans least recently used items. -func (ibc *indexBlockCache) cleaner() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - ibc.cleanByTimeout() - case <-ibc.cleanerStopCh: - return - } - } -} - -func (ibc *indexBlockCache) cleanByTimeout() { - currentTime := fasttime.UnixTimestamp() - ibc.mu.Lock() - for k, ibe := range ibc.m { - // Delete items accessed more than two minutes ago. - // This time should be enough for repeated queries. - if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 2*60 { - delete(ibc.m, k) - } - } - ibc.mu.Unlock() -} - -func (ibc *indexBlockCache) Get(k uint64) *indexBlock { - atomic.AddUint64(&ibc.requests, 1) - - ibc.mu.RLock() - ibe := ibc.m[k] - ibc.mu.RUnlock() - - if ibe != nil { - currentTime := fasttime.UnixTimestamp() - if atomic.LoadUint64(&ibe.lastAccessTime) != currentTime { - atomic.StoreUint64(&ibe.lastAccessTime, currentTime) - } - return ibe.ib - } - atomic.AddUint64(&ibc.misses, 1) - return nil -} - -func (ibc *indexBlockCache) Put(k uint64, ib *indexBlock) { - ibc.mu.Lock() - - // Clean superfluous cache entries. - if overflow := len(ibc.m) - getMaxCachedIndexBlocksPerPart(); overflow > 0 { - // Remove 10% of items from the cache. - overflow = int(float64(len(ibc.m)) * 0.1) - for k := range ibc.m { - delete(ibc.m, k) - overflow-- - if overflow == 0 { - break - } - } - } - - // Store frequently requested ib in the cache. - ibe := &indexBlockCacheEntry{ - lastAccessTime: fasttime.UnixTimestamp(), - ib: ib, - } - ibc.m[k] = ibe - ibc.mu.Unlock() -} - -func (ibc *indexBlockCache) Requests() uint64 { - return atomic.LoadUint64(&ibc.requests) -} - -func (ibc *indexBlockCache) Misses() uint64 { - return atomic.LoadUint64(&ibc.misses) -} - -func (ibc *indexBlockCache) Len() uint64 { - ibc.mu.Lock() - n := uint64(len(ibc.m)) - ibc.mu.Unlock() - return n -} - -func (ibc *indexBlockCache) SizeBytes() uint64 { - n := 0 - ibc.mu.Lock() - for _, e := range ibc.m { - n += e.ib.SizeBytes() - } - ibc.mu.Unlock() - return uint64(n) -} - -func (ibc *indexBlockCache) SizeMaxBytes() uint64 { - avgBlockSize := float64(64 * 1024) - blocksCount := ibc.Len() - if blocksCount > 0 { - avgBlockSize = float64(ibc.SizeBytes()) / float64(blocksCount) - } - return uint64(avgBlockSize * float64(getMaxCachedIndexBlocksPerPart())) -} diff --git a/lib/storage/part_search.go b/lib/storage/part_search.go index 5acc087a4c..6e7af19510 100644 --- a/lib/storage/part_search.go +++ b/lib/storage/part_search.go @@ -7,6 +7,7 @@ import ( "sort" "strings" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/blockcache" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -32,8 +33,6 @@ type partSearch struct { metaindex []metaindexRow - ibCache *indexBlockCache - bhs []blockHeader compressedIndexBuf []byte @@ -48,7 +47,6 @@ func (ps *partSearch) reset() { ps.tsids = nil ps.tsidIdx = 0 ps.metaindex = nil - ps.ibCache = nil ps.bhs = nil ps.compressedIndexBuf = ps.compressedIndexBuf[:0] ps.indexBuf = ps.indexBuf[:0] @@ -76,7 +74,6 @@ func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange) { } ps.tr = tr ps.metaindex = p.metaindex - ps.ibCache = p.ibCache // Advance to the first tsid. There is no need in checking // the returned result, since it will be checked in NextBlock. @@ -154,19 +151,23 @@ func (ps *partSearch) nextBHS() bool { // Found the index block which may contain the required data // for the ps.BlockRef.bh.TSID and the given timestamp range. - indexBlockKey := mr.IndexBlockOffset - ib := ps.ibCache.Get(indexBlockKey) - if ib == nil { + indexBlockKey := blockcache.Key{ + Part: ps.p, + Offset: mr.IndexBlockOffset, + } + b := ibCache.GetBlock(indexBlockKey) + if b == nil { // Slow path - actually read and unpack the index block. - var err error - ib, err = ps.readIndexBlock(mr) + ib, err := ps.readIndexBlock(mr) if err != nil { ps.err = fmt.Errorf("cannot read index block for part %q at offset %d with size %d: %w", &ps.p.ph, mr.IndexBlockOffset, mr.IndexBlockSize, err) return false } - ps.ibCache.Put(indexBlockKey, ib) + b = ib + ibCache.PutBlock(indexBlockKey, b) } + ib := b.(*indexBlock) ps.bhs = ib.bhs return true } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 52c1048474..567efc6b5a 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -25,16 +25,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) -// These are global counters for cache requests and misses for parts -// which were already merged into another parts. -var ( - historicalBigIndexBlocksCacheRequests uint64 - historicalBigIndexBlocksCacheMisses uint64 - - historicalSmallIndexBlocksCacheRequests uint64 - historicalSmallIndexBlocksCacheMisses uint64 -) - func maxSmallPartSize() uint64 { // Small parts are cached in the OS page cache, // so limit their size by the remaining free RAM. @@ -306,17 +296,11 @@ func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs type partitionMetrics struct { PendingRows uint64 - BigIndexBlocksCacheSize uint64 - BigIndexBlocksCacheSizeBytes uint64 - BigIndexBlocksCacheSizeMaxBytes uint64 - BigIndexBlocksCacheRequests uint64 - BigIndexBlocksCacheMisses uint64 - - SmallIndexBlocksCacheSize uint64 - SmallIndexBlocksCacheSizeBytes uint64 - SmallIndexBlocksCacheSizeMaxBytes uint64 - SmallIndexBlocksCacheRequests uint64 - SmallIndexBlocksCacheMisses uint64 + IndexBlocksCacheSize uint64 + IndexBlocksCacheSizeBytes uint64 + IndexBlocksCacheSizeMaxBytes uint64 + IndexBlocksCacheRequests uint64 + IndexBlocksCacheMisses uint64 BigSizeBytes uint64 SmallSizeBytes uint64 @@ -362,11 +346,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { for _, pw := range pt.bigParts { p := pw.p - m.BigIndexBlocksCacheSize += p.ibCache.Len() - m.BigIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes() - m.BigIndexBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes() - m.BigIndexBlocksCacheRequests += p.ibCache.Requests() - m.BigIndexBlocksCacheMisses += p.ibCache.Misses() m.BigRowsCount += p.ph.RowsCount m.BigBlocksCount += p.ph.BlocksCount m.BigSizeBytes += p.size @@ -376,11 +355,6 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { for _, pw := range pt.smallParts { p := pw.p - m.SmallIndexBlocksCacheSize += p.ibCache.Len() - m.SmallIndexBlocksCacheSizeBytes += p.ibCache.SizeBytes() - m.SmallIndexBlocksCacheSizeMaxBytes += p.ibCache.SizeMaxBytes() - m.SmallIndexBlocksCacheRequests += p.ibCache.Requests() - m.SmallIndexBlocksCacheMisses += p.ibCache.Misses() m.SmallRowsCount += p.ph.RowsCount m.SmallBlocksCount += p.ph.BlocksCount m.SmallSizeBytes += p.size @@ -392,11 +366,11 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { pt.partsLock.Unlock() - m.BigIndexBlocksCacheRequests = atomic.LoadUint64(&historicalBigIndexBlocksCacheRequests) - m.BigIndexBlocksCacheMisses = atomic.LoadUint64(&historicalBigIndexBlocksCacheMisses) - - m.SmallIndexBlocksCacheRequests = atomic.LoadUint64(&historicalSmallIndexBlocksCacheRequests) - m.SmallIndexBlocksCacheMisses = atomic.LoadUint64(&historicalSmallIndexBlocksCacheMisses) + m.IndexBlocksCacheSize = uint64(ibCache.Len()) + m.IndexBlocksCacheSizeBytes = uint64(ibCache.SizeBytes()) + m.IndexBlocksCacheSizeMaxBytes = uint64(ibCache.SizeMaxBytes()) + m.IndexBlocksCacheRequests = ibCache.Requests() + m.IndexBlocksCacheMisses = ibCache.Misses() m.ActiveBigMerges += atomic.LoadUint64(&pt.activeBigMerges) m.ActiveSmallMerges += atomic.LoadUint64(&pt.activeSmallMerges) @@ -1311,15 +1285,6 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig dst = append(dst, pw) continue } - requests := pw.p.ibCache.Requests() - misses := pw.p.ibCache.Misses() - if isBig { - atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests) - atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses) - } else { - atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests) - atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses) - } removedParts++ } return dst, removedParts