diff --git a/lib/mergeset/part.go b/lib/mergeset/part.go index a5231e58e..14bdedd78 100644 --- a/lib/mergeset/part.go +++ b/lib/mergeset/part.go @@ -204,8 +204,6 @@ func (idxbc *indexBlockCache) MustClose() { close(idxbc.cleanerStopCh) idxbc.cleanerWG.Wait() - atomic.AddUint64(&indexBlockCacheRequests, idxbc.requests) - atomic.AddUint64(&indexBlockCacheMisses, idxbc.misses) // It is safe returning idxbc.m to pool, since the Reset must be called // when the idxbc entries are no longer accessed by concurrent goroutines. for _, idxbe := range idxbc.m { @@ -240,11 +238,6 @@ func (idxbc *indexBlockCache) cleanByTimeout() { idxbc.mu.Unlock() } -var ( - indexBlockCacheRequests uint64 - indexBlockCacheMisses uint64 -) - func (idxbc *indexBlockCache) Get(k uint64) *indexBlock { atomic.AddUint64(&idxbc.requests, 1) idxbc.mu.RLock() @@ -361,8 +354,6 @@ func (ibc *inmemoryBlockCache) MustClose() { close(ibc.cleanerStopCh) ibc.cleanerWG.Wait() - atomic.AddUint64(&inmemoryBlockCacheRequests, ibc.requests) - atomic.AddUint64(&inmemoryBlockCacheMisses, ibc.misses) // It is safe returning ibc.m entries to pool, since the Reset function may be called // only if no other goroutines access ibc entries. for _, ibe := range ibc.m { @@ -397,11 +388,6 @@ func (ibc *inmemoryBlockCache) cleanByTimeout() { ibc.mu.Unlock() } -var ( - inmemoryBlockCacheRequests uint64 - inmemoryBlockCacheMisses uint64 -) - func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock { atomic.AddUint64(&ibc.requests, 1) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 40e66f8be..7467943c3 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -19,6 +19,16 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" ) +// These are global counters for cache requests and misses for parts +// which were already merged into another parts. +var ( + historicalDataBlockCacheRequests uint64 + historicalDataBlockCacheMisses uint64 + + historicalIndexBlockCacheRequests uint64 + historicalIndexBlockCacheMisses uint64 +) + // maxParts is the maximum number of parts in the table. // // This number may be reached when the insertion pace outreaches merger pace. @@ -326,11 +336,11 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { } tb.partsLock.Unlock() - atomic.AddUint64(&m.DataBlocksCacheRequests, atomic.LoadUint64(&inmemoryBlockCacheRequests)) - atomic.AddUint64(&m.DataBlocksCacheMisses, atomic.LoadUint64(&inmemoryBlockCacheMisses)) + m.DataBlocksCacheRequests += atomic.LoadUint64(&historicalDataBlockCacheRequests) + m.DataBlocksCacheMisses += atomic.LoadUint64(&historicalDataBlockCacheMisses) - atomic.AddUint64(&m.IndexBlocksCacheRequests, atomic.LoadUint64(&indexBlockCacheRequests)) - atomic.AddUint64(&m.IndexBlocksCacheMisses, atomic.LoadUint64(&indexBlockCacheMisses)) + m.IndexBlocksCacheRequests += atomic.LoadUint64(&historicalIndexBlockCacheRequests) + m.IndexBlocksCacheMisses += atomic.LoadUint64(&historicalIndexBlockCacheMisses) } // AddItems adds the given items to the tb. @@ -1300,6 +1310,10 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa dst := pws[:0] for _, pw := range pws { if partsToRemove[pw] { + atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests()) + atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses()) + atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests()) + atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses()) removedParts++ continue } diff --git a/lib/storage/part.go b/lib/storage/part.go index 21071888a..7334f6b2d 100644 --- a/lib/storage/part.go +++ b/lib/storage/part.go @@ -206,13 +206,6 @@ func (ibc *indexBlockCache) MustClose(isBig bool) { close(ibc.cleanerStopCh) ibc.cleanerWG.Wait() - if isBig { - atomic.AddUint64(&bigIndexBlockCacheRequests, ibc.requests) - atomic.AddUint64(&bigIndexBlockCacheMisses, ibc.misses) - } else { - atomic.AddUint64(&smallIndexBlockCacheRequests, ibc.requests) - atomic.AddUint64(&smallIndexBlockCacheMisses, ibc.misses) - } // It is safe returning ibc.m itemst to the pool, since Reset must // be called only when no other goroutines access ibc entries. for _, ibe := range ibc.m { @@ -247,14 +240,6 @@ func (ibc *indexBlockCache) cleanByTimeout() { ibc.mu.Unlock() } -var ( - bigIndexBlockCacheRequests uint64 - bigIndexBlockCacheMisses uint64 - - smallIndexBlockCacheRequests uint64 - smallIndexBlockCacheMisses uint64 -) - func (ibc *indexBlockCache) Get(k uint64) *indexBlock { atomic.AddUint64(&ibc.requests, 1) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index b9cf1b2c5..7555fe4e7 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -23,6 +23,16 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" ) +// These are global counters for cache requests and misses for parts +// which were already merged into another parts. +var ( + historicalBigIndexBlocksCacheRequests uint64 + historicalBigIndexBlocksCacheMisses uint64 + + historicalSmallIndexBlocksCacheRequests uint64 + historicalSmallIndexBlocksCacheMisses uint64 +) + func maxRowsPerSmallPart() uint64 { // Small parts are cached in the OS page cache, // so limit the number of rows for small part by the remaining free RAM. @@ -356,11 +366,11 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { pt.partsLock.Unlock() - atomic.AddUint64(&m.BigIndexBlocksCacheRequests, atomic.LoadUint64(&bigIndexBlockCacheRequests)) - atomic.AddUint64(&m.BigIndexBlocksCacheMisses, atomic.LoadUint64(&bigIndexBlockCacheMisses)) + m.BigIndexBlocksCacheRequests += atomic.LoadUint64(&historicalBigIndexBlocksCacheRequests) + m.BigIndexBlocksCacheMisses += atomic.LoadUint64(&historicalBigIndexBlocksCacheMisses) - atomic.AddUint64(&m.SmallIndexBlocksCacheRequests, atomic.LoadUint64(&smallIndexBlockCacheRequests)) - atomic.AddUint64(&m.SmallIndexBlocksCacheMisses, atomic.LoadUint64(&smallIndexBlockCacheMisses)) + m.SmallIndexBlocksCacheRequests += atomic.LoadUint64(&historicalSmallIndexBlocksCacheRequests) + m.SmallIndexBlocksCacheMisses += atomic.LoadUint64(&historicalSmallIndexBlocksCacheMisses) m.ActiveBigMerges += atomic.LoadUint64(&pt.activeBigMerges) m.ActiveSmallMerges += atomic.LoadUint64(&pt.activeSmallMerges) @@ -1145,8 +1155,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro removedSmallParts := 0 removedBigParts := 0 pt.partsLock.Lock() - pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m) - pt.bigParts, removedBigParts = removeParts(pt.bigParts, m) + pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m, false) + pt.bigParts, removedBigParts = removeParts(pt.bigParts, m, true) if newPW != nil { if isBigPart { pt.bigParts = append(pt.bigParts, newPW) @@ -1193,11 +1203,20 @@ func (pt *partition) nextMergeIdx() uint64 { return atomic.AddUint64(&pt.mergeIdx, 1) } -func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*partWrapper, int) { +func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig bool) ([]*partWrapper, int) { removedParts := 0 dst := pws[:0] for _, pw := range pws { if partsToRemove[pw] { + requests := pw.p.ibCache.Requests() + misses := pw.p.ibCache.Misses() + if isBig { + atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests) + atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses) + } else { + atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests) + atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses) + } removedParts++ continue }