lib/{storage,mergeset}: make sure that requests and misses cache counters never go down

This commit is contained in:
Aliaksandr Valialkin 2020-04-10 14:44:38 +03:00
parent 661cfb03e2
commit 0b2f678d8e
4 changed files with 44 additions and 40 deletions

View file

@ -204,8 +204,6 @@ func (idxbc *indexBlockCache) MustClose() {
close(idxbc.cleanerStopCh) close(idxbc.cleanerStopCh)
idxbc.cleanerWG.Wait() idxbc.cleanerWG.Wait()
atomic.AddUint64(&indexBlockCacheRequests, idxbc.requests)
atomic.AddUint64(&indexBlockCacheMisses, idxbc.misses)
// It is safe returning idxbc.m to pool, since the Reset must be called // It is safe returning idxbc.m to pool, since the Reset must be called
// when the idxbc entries are no longer accessed by concurrent goroutines. // when the idxbc entries are no longer accessed by concurrent goroutines.
for _, idxbe := range idxbc.m { for _, idxbe := range idxbc.m {
@ -240,11 +238,6 @@ func (idxbc *indexBlockCache) cleanByTimeout() {
idxbc.mu.Unlock() idxbc.mu.Unlock()
} }
var (
indexBlockCacheRequests uint64
indexBlockCacheMisses uint64
)
func (idxbc *indexBlockCache) Get(k uint64) *indexBlock { func (idxbc *indexBlockCache) Get(k uint64) *indexBlock {
atomic.AddUint64(&idxbc.requests, 1) atomic.AddUint64(&idxbc.requests, 1)
idxbc.mu.RLock() idxbc.mu.RLock()
@ -361,8 +354,6 @@ func (ibc *inmemoryBlockCache) MustClose() {
close(ibc.cleanerStopCh) close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait() ibc.cleanerWG.Wait()
atomic.AddUint64(&inmemoryBlockCacheRequests, ibc.requests)
atomic.AddUint64(&inmemoryBlockCacheMisses, ibc.misses)
// It is safe returning ibc.m entries to pool, since the Reset function may be called // It is safe returning ibc.m entries to pool, since the Reset function may be called
// only if no other goroutines access ibc entries. // only if no other goroutines access ibc entries.
for _, ibe := range ibc.m { for _, ibe := range ibc.m {
@ -397,11 +388,6 @@ func (ibc *inmemoryBlockCache) cleanByTimeout() {
ibc.mu.Unlock() ibc.mu.Unlock()
} }
var (
inmemoryBlockCacheRequests uint64
inmemoryBlockCacheMisses uint64
)
func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock { func (ibc *inmemoryBlockCache) Get(k inmemoryBlockCacheKey) *inmemoryBlock {
atomic.AddUint64(&ibc.requests, 1) atomic.AddUint64(&ibc.requests, 1)

View file

@ -19,6 +19,16 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
) )
// These are global counters for cache requests and misses for parts
// which were already merged into another parts.
var (
historicalDataBlockCacheRequests uint64
historicalDataBlockCacheMisses uint64
historicalIndexBlockCacheRequests uint64
historicalIndexBlockCacheMisses uint64
)
// maxParts is the maximum number of parts in the table. // maxParts is the maximum number of parts in the table.
// //
// This number may be reached when the insertion pace outreaches merger pace. // This number may be reached when the insertion pace outreaches merger pace.
@ -326,11 +336,11 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
} }
tb.partsLock.Unlock() tb.partsLock.Unlock()
atomic.AddUint64(&m.DataBlocksCacheRequests, atomic.LoadUint64(&inmemoryBlockCacheRequests)) m.DataBlocksCacheRequests += atomic.LoadUint64(&historicalDataBlockCacheRequests)
atomic.AddUint64(&m.DataBlocksCacheMisses, atomic.LoadUint64(&inmemoryBlockCacheMisses)) m.DataBlocksCacheMisses += atomic.LoadUint64(&historicalDataBlockCacheMisses)
atomic.AddUint64(&m.IndexBlocksCacheRequests, atomic.LoadUint64(&indexBlockCacheRequests)) m.IndexBlocksCacheRequests += atomic.LoadUint64(&historicalIndexBlockCacheRequests)
atomic.AddUint64(&m.IndexBlocksCacheMisses, atomic.LoadUint64(&indexBlockCacheMisses)) m.IndexBlocksCacheMisses += atomic.LoadUint64(&historicalIndexBlockCacheMisses)
} }
// AddItems adds the given items to the tb. // AddItems adds the given items to the tb.
@ -1300,6 +1310,10 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa
dst := pws[:0] dst := pws[:0]
for _, pw := range pws { for _, pw := range pws {
if partsToRemove[pw] { if partsToRemove[pw] {
atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests())
atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses())
atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests())
atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses())
removedParts++ removedParts++
continue continue
} }

View file

@ -206,13 +206,6 @@ func (ibc *indexBlockCache) MustClose(isBig bool) {
close(ibc.cleanerStopCh) close(ibc.cleanerStopCh)
ibc.cleanerWG.Wait() ibc.cleanerWG.Wait()
if isBig {
atomic.AddUint64(&bigIndexBlockCacheRequests, ibc.requests)
atomic.AddUint64(&bigIndexBlockCacheMisses, ibc.misses)
} else {
atomic.AddUint64(&smallIndexBlockCacheRequests, ibc.requests)
atomic.AddUint64(&smallIndexBlockCacheMisses, ibc.misses)
}
// It is safe returning ibc.m itemst to the pool, since Reset must // It is safe returning ibc.m itemst to the pool, since Reset must
// be called only when no other goroutines access ibc entries. // be called only when no other goroutines access ibc entries.
for _, ibe := range ibc.m { for _, ibe := range ibc.m {
@ -247,14 +240,6 @@ func (ibc *indexBlockCache) cleanByTimeout() {
ibc.mu.Unlock() ibc.mu.Unlock()
} }
var (
bigIndexBlockCacheRequests uint64
bigIndexBlockCacheMisses uint64
smallIndexBlockCacheRequests uint64
smallIndexBlockCacheMisses uint64
)
func (ibc *indexBlockCache) Get(k uint64) *indexBlock { func (ibc *indexBlockCache) Get(k uint64) *indexBlock {
atomic.AddUint64(&ibc.requests, 1) atomic.AddUint64(&ibc.requests, 1)

View file

@ -23,6 +23,16 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
) )
// These are global counters for cache requests and misses for parts
// which were already merged into another parts.
var (
historicalBigIndexBlocksCacheRequests uint64
historicalBigIndexBlocksCacheMisses uint64
historicalSmallIndexBlocksCacheRequests uint64
historicalSmallIndexBlocksCacheMisses uint64
)
func maxRowsPerSmallPart() uint64 { func maxRowsPerSmallPart() uint64 {
// Small parts are cached in the OS page cache, // Small parts are cached in the OS page cache,
// so limit the number of rows for small part by the remaining free RAM. // so limit the number of rows for small part by the remaining free RAM.
@ -356,11 +366,11 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
pt.partsLock.Unlock() pt.partsLock.Unlock()
atomic.AddUint64(&m.BigIndexBlocksCacheRequests, atomic.LoadUint64(&bigIndexBlockCacheRequests)) m.BigIndexBlocksCacheRequests += atomic.LoadUint64(&historicalBigIndexBlocksCacheRequests)
atomic.AddUint64(&m.BigIndexBlocksCacheMisses, atomic.LoadUint64(&bigIndexBlockCacheMisses)) m.BigIndexBlocksCacheMisses += atomic.LoadUint64(&historicalBigIndexBlocksCacheMisses)
atomic.AddUint64(&m.SmallIndexBlocksCacheRequests, atomic.LoadUint64(&smallIndexBlockCacheRequests)) m.SmallIndexBlocksCacheRequests += atomic.LoadUint64(&historicalSmallIndexBlocksCacheRequests)
atomic.AddUint64(&m.SmallIndexBlocksCacheMisses, atomic.LoadUint64(&smallIndexBlockCacheMisses)) m.SmallIndexBlocksCacheMisses += atomic.LoadUint64(&historicalSmallIndexBlocksCacheMisses)
m.ActiveBigMerges += atomic.LoadUint64(&pt.activeBigMerges) m.ActiveBigMerges += atomic.LoadUint64(&pt.activeBigMerges)
m.ActiveSmallMerges += atomic.LoadUint64(&pt.activeSmallMerges) m.ActiveSmallMerges += atomic.LoadUint64(&pt.activeSmallMerges)
@ -1145,8 +1155,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
removedSmallParts := 0 removedSmallParts := 0
removedBigParts := 0 removedBigParts := 0
pt.partsLock.Lock() pt.partsLock.Lock()
pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m) pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m, false)
pt.bigParts, removedBigParts = removeParts(pt.bigParts, m) pt.bigParts, removedBigParts = removeParts(pt.bigParts, m, true)
if newPW != nil { if newPW != nil {
if isBigPart { if isBigPart {
pt.bigParts = append(pt.bigParts, newPW) pt.bigParts = append(pt.bigParts, newPW)
@ -1193,11 +1203,20 @@ func (pt *partition) nextMergeIdx() uint64 {
return atomic.AddUint64(&pt.mergeIdx, 1) return atomic.AddUint64(&pt.mergeIdx, 1)
} }
func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*partWrapper, int) { func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig bool) ([]*partWrapper, int) {
removedParts := 0 removedParts := 0
dst := pws[:0] dst := pws[:0]
for _, pw := range pws { for _, pw := range pws {
if partsToRemove[pw] { if partsToRemove[pw] {
requests := pw.p.ibCache.Requests()
misses := pw.p.ibCache.Misses()
if isBig {
atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests)
atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses)
} else {
atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests)
atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses)
}
removedParts++ removedParts++
continue continue
} }