From ddb3519e17f0decba8fc24ab834402eb61e13956 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 17 Sep 2020 02:05:54 +0300 Subject: [PATCH] lib/{mergeset,storage}: code prettifying --- lib/mergeset/table.go | 21 +++++++++++++-------- lib/storage/partition.go | 32 ++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index f309c039d6..e85b5ecd2c 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -727,6 +727,11 @@ func (tb *Table) partMerger() error { var errNothingToMerge = fmt.Errorf("nothing to merge") +// mergeParts merges pws. +// +// Merging is immediately stopped if stopCh is closed. +// +// All the parts inside pws must have isInMerge field set to true. func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterParts bool) error { if len(pws) == 0 { // Nothing to merge. @@ -1140,7 +1145,7 @@ func runTransactions(txnLock *sync.RWMutex, path string) error { } func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error { - // The transaction must be run under read lock in order to provide + // The transaction must run under read lock in order to provide // consistent snapshots with Table.CreateSnapshot(). txnLock.RLock() defer txnLock.RUnlock() @@ -1335,15 +1340,15 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool) ([]*pa removedParts := 0 dst := pws[:0] for _, pw := range pws { - if partsToRemove[pw] { - atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests()) - atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses()) - atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests()) - atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses()) - removedParts++ + if !partsToRemove[pw] { + dst = append(dst, pw) continue } - dst = append(dst, pw) + atomic.AddUint64(&historicalDataBlockCacheRequests, pw.p.ibCache.Requests()) + atomic.AddUint64(&historicalDataBlockCacheMisses, pw.p.ibCache.Misses()) + atomic.AddUint64(&historicalIndexBlockCacheRequests, pw.p.idxbCache.Requests()) + atomic.AddUint64(&historicalIndexBlockCacheMisses, pw.p.idxbCache.Misses()) + removedParts++ } return dst, removedParts } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 3e512ba3c6..2df66fa558 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -963,6 +963,11 @@ func (pt *partition) mergeSmallParts(isFinal bool) error { var errNothingToMerge = fmt.Errorf("nothing to merge") +// mergeParts merges pws. +// +// Merging is immediately stopped if stopCh is closed. +// +// All the parts inside pws must have isInMerge field set to true. func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) error { if len(pws) == 0 { // Nothing to merge. @@ -1038,7 +1043,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro } else { atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) - // Prioritize small merges over big merges. } err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) if isBigPart { @@ -1167,20 +1171,20 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig removedParts := 0 dst := pws[:0] for _, pw := range pws { - if partsToRemove[pw] { - requests := pw.p.ibCache.Requests() - misses := pw.p.ibCache.Misses() - if isBig { - atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests) - atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses) - } else { - atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests) - atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses) - } - removedParts++ + if !partsToRemove[pw] { + dst = append(dst, pw) continue } - dst = append(dst, pw) + requests := pw.p.ibCache.Requests() + misses := pw.p.ibCache.Misses() + if isBig { + atomic.AddUint64(&historicalBigIndexBlocksCacheRequests, requests) + atomic.AddUint64(&historicalBigIndexBlocksCacheMisses, misses) + } else { + atomic.AddUint64(&historicalSmallIndexBlocksCacheRequests, requests) + atomic.AddUint64(&historicalSmallIndexBlocksCacheMisses, misses) + } + removedParts++ } return dst, removedParts } @@ -1470,7 +1474,7 @@ func runTransactions(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, path strin } func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath string) error { - // The transaction must be run under read lock in order to provide + // The transaction must run under read lock in order to provide // consistent snapshots with partition.CreateSnapshot(). txnLock.RLock() defer txnLock.RUnlock()