mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/{mergeset,storage}: code prettifying
This commit is contained in:
parent
bf826dd828
commit
ddb3519e17
2 changed files with 31 additions and 22 deletions
|
@ -727,6 +727,11 @@ func (tb *Table) partMerger() error {
|
|||
|
||||
var errNothingToMerge = fmt.Errorf("nothing to merge")
|
||||
|
||||
// mergeParts merges pws.
|
||||
//
|
||||
// Merging is immediately stopped if stopCh is closed.
|
||||
//
|
||||
// All the parts inside pws must have isInMerge field set to true.
|
||||
func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterParts bool) error {
|
||||
if len(pws) == 0 {
|
||||
// Nothing to merge.
|
||||
|
@ -1140,7 +1145,7 @@ func runTransactions(txnLock *sync.RWMutex, path string) error {
|
|||
}
|
||||
|
||||
func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
|
||||
// The transaction must be run under read lock in order to provide
|
||||
// The transaction must run under read lock in order to provide
|
||||
// consistent snapshots with Table.CreateSnapshot().
|
||||
txnLock.RLock()
|
||||
defer txnLock.RUnlock()
|
||||
|
@ -1335,15 +1340,15 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa
|
|||
removedParts := 0
|
||||
dst := pws[:0]
|
||||
for _, pw := range pws {
|
||||
if partsToRemove[pw] {
|
||||
atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests())
|
||||
atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses())
|
||||
atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests())
|
||||
atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses())
|
||||
removedParts++
|
||||
if !partsToRemove[pw] {
|
||||
dst = append(dst, pw)
|
||||
continue
|
||||
}
|
||||
dst = append(dst, pw)
|
||||
atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests())
|
||||
atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses())
|
||||
atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests())
|
||||
atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses())
|
||||
removedParts++
|
||||
}
|
||||
return dst, removedParts
|
||||
}
|
||||
|
|
|
@ -963,6 +963,11 @@ func (pt *partition) mergeSmallParts(isFinal bool) error {
|
|||
|
||||
var errNothingToMerge = fmt.Errorf("nothing to merge")
|
||||
|
||||
// mergeParts merges pws.
|
||||
//
|
||||
// Merging is immediately stopped if stopCh is closed.
|
||||
//
|
||||
// All the parts inside pws must have isInMerge field set to true.
|
||||
func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) error {
|
||||
if len(pws) == 0 {
|
||||
// Nothing to merge.
|
||||
|
@ -1038,7 +1043,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
} else {
|
||||
atomic.AddUint64(&pt.smallMergesCount, 1)
|
||||
atomic.AddUint64(&pt.activeSmallMerges, 1)
|
||||
// Prioritize small merges over big merges.
|
||||
}
|
||||
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted)
|
||||
if isBigPart {
|
||||
|
@ -1167,20 +1171,20 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
|
|||
removedParts := 0
|
||||
dst := pws[:0]
|
||||
for _, pw := range pws {
|
||||
if partsToRemove[pw] {
|
||||
requests := pw.p.ibCache.Requests()
|
||||
misses := pw.p.ibCache.Misses()
|
||||
if isBig {
|
||||
atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests)
|
||||
atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses)
|
||||
} else {
|
||||
atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests)
|
||||
atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses)
|
||||
}
|
||||
removedParts++
|
||||
if !partsToRemove[pw] {
|
||||
dst = append(dst, pw)
|
||||
continue
|
||||
}
|
||||
dst = append(dst, pw)
|
||||
requests := pw.p.ibCache.Requests()
|
||||
misses := pw.p.ibCache.Misses()
|
||||
if isBig {
|
||||
atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests)
|
||||
atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses)
|
||||
} else {
|
||||
atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests)
|
||||
atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses)
|
||||
}
|
||||
removedParts++
|
||||
}
|
||||
return dst, removedParts
|
||||
}
|
||||
|
@ -1470,7 +1474,7 @@ func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path strin
|
|||
}
|
||||
|
||||
func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath string) error {
|
||||
// The transaction must be run under read lock in order to provide
|
||||
// The transaction must run under read lock in order to provide
|
||||
// consistent snapshots with partition.CreateSnapshot().
|
||||
txnLock.RLock()
|
||||
defer txnLock.RUnlock()
|
||||
|
|
Loading…
Reference in a new issue