From e7dfcdfff657f928bd187677b74a5856a818252c Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Fri, 23 Feb 2024 22:54:55 +0200
Subject: [PATCH] lib/storage: consistently use atomic.* type for refCount and
 mustDrop fields in indexDB, table and partition structs

See ea9e2b19a5fa8aecc25c9da10fad8f4c1c58df38
---
 lib/storage/index_db.go  | 25 ++++++++++++-------------
 lib/storage/partition.go | 32 ++++++++++++++++----------------
 lib/storage/table.go     | 32 +++++++++++++++-----------------
 3 files changed, 43 insertions(+), 46 deletions(-)

diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go
index faa39ffce2..db4d7a3bd2 100644
--- a/lib/storage/index_db.go
+++ b/lib/storage/index_db.go
@@ -67,10 +67,11 @@ const (
 
 // indexDB represents an index db.
 type indexDB struct {
-	// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
-	// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
+	// The number of references to indexDB struct.
+	refCount atomic.Int32
 
-	refCount uint64
+	// if the mustDrop is set to true, then the indexDB must be dropped after refCount reaches zero.
+	mustDrop atomic.Bool
 
 	// The number of missing MetricID -> TSID entries.
 	// High rate for this value means corrupted indexDB.
@@ -90,8 +91,6 @@ type indexDB struct {
 	// The db must be automatically recovered after that.
 	missingMetricNamesForMetricID uint64
 
-	mustDrop uint64
-
 	// generation identifies the index generation ID
 	// and is used for syncing items from different indexDBs
 	generation uint64
@@ -151,7 +150,6 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
 	tagFiltersCacheSize := getTagFiltersCacheSize()
 
 	db := &indexDB{
-		refCount:   1,
 		generation: gen,
 		tb:         tb,
 		name:       name,
@@ -160,6 +158,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
 		s:                          s,
 		loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
 	}
+	db.incRef()
 	return db
 }
 
@@ -199,7 +198,7 @@ type IndexDBMetrics struct {
 }
 
 func (db *indexDB) scheduleToDrop() {
-	atomic.AddUint64(&db.mustDrop, 1)
+	db.mustDrop.Store(true)
 }
 
 // UpdateMetrics updates m with metrics from the db.
@@ -216,7 +215,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
 
 	m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
 
-	m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
+	m.IndexDBRefCount += uint64(db.refCount.Load())
 	m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
 
 	m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
@@ -235,7 +234,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
 	db.tb.UpdateMetrics(&m.TableMetrics)
 	db.doExtDB(func(extDB *indexDB) {
 		extDB.tb.UpdateMetrics(&m.TableMetrics)
-		m.IndexDBRefCount += atomic.LoadUint64(&extDB.refCount)
+		m.IndexDBRefCount += uint64(extDB.refCount.Load())
 	})
 }
 
@@ -274,12 +273,12 @@ func (db *indexDB) MustClose() {
 }
 
 func (db *indexDB) incRef() {
-	atomic.AddUint64(&db.refCount, 1)
+	db.refCount.Add(1)
 }
 
 func (db *indexDB) decRef() {
-	n := atomic.AddUint64(&db.refCount, ^uint64(0))
-	if int64(n) < 0 {
+	n := db.refCount.Add(-1)
+	if n < 0 {
 		logger.Panicf("BUG: negative refCount: %d", n)
 	}
 	if n > 0 {
@@ -298,7 +297,7 @@ func (db *indexDB) decRef() {
 	db.s = nil
 	db.loopsPerDateTagFilterCache = nil
 
-	if atomic.LoadUint64(&db.mustDrop) == 0 {
+	if !db.mustDrop.Load() {
 		return
 	}
 
diff --git a/lib/storage/partition.go b/lib/storage/partition.go
index c48a4f1f45..a81b65f57b 100644
--- a/lib/storage/partition.go
+++ b/lib/storage/partition.go
@@ -146,12 +146,12 @@ type partition struct {
 // partWrapper is a wrapper for the part.
 type partWrapper struct {
 	// The number of references to the part.
-	refCount uint32
+	refCount atomic.Int32
 
 	// The flag, which is set when the part must be deleted after refCount reaches zero.
 	// This field should be updated only after partWrapper
 	// was removed from the list of active parts.
-	mustBeDeleted uint32
+	mustDrop atomic.Bool
 
 	// The part itself.
 	p *part
@@ -167,20 +167,20 @@ type partWrapper struct {
 }
 
 func (pw *partWrapper) incRef() {
-	atomic.AddUint32(&pw.refCount, 1)
+	pw.refCount.Add(1)
 }
 
 func (pw *partWrapper) decRef() {
-	n := atomic.AddUint32(&pw.refCount, ^uint32(0))
-	if int32(n) < 0 {
-		logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n))
+	n := pw.refCount.Add(-1)
+	if n < 0 {
+		logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", n)
 	}
 	if n > 0 {
 		return
 	}
 
 	deletePath := ""
-	if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 {
+	if pw.mp == nil && pw.mustDrop.Load() {
 		deletePath = pw.p.path
 	}
 	if pw.mp != nil {
@@ -347,21 +347,21 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
 		m.InmemoryRowsCount += p.ph.RowsCount
 		m.InmemoryBlocksCount += p.ph.BlocksCount
 		m.InmemorySizeBytes += p.size
-		m.InmemoryPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
+		m.InmemoryPartsRefCount += uint64(pw.refCount.Load())
 	}
 	for _, pw := range pt.smallParts {
 		p := pw.p
 		m.SmallRowsCount += p.ph.RowsCount
 		m.SmallBlocksCount += p.ph.BlocksCount
 		m.SmallSizeBytes += p.size
-		m.SmallPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
+		m.SmallPartsRefCount += uint64(pw.refCount.Load())
 	}
 	for _, pw := range pt.bigParts {
 		p := pw.p
 		m.BigRowsCount += p.ph.RowsCount
 		m.BigBlocksCount += p.ph.BlocksCount
 		m.BigSizeBytes += p.size
-		m.BigPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount))
+		m.BigPartsRefCount += uint64(pw.refCount.Load())
 	}
 
 	m.InmemoryPartsCount += uint64(len(pt.inmemoryParts))
@@ -823,9 +823,9 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T
 	pw := &partWrapper{
 		p:                   p,
 		mp:                  mp,
-		refCount:            1,
 		flushToDiskDeadline: flushToDiskDeadline,
 	}
+	pw.incRef()
 	return pw
 }
 
@@ -1587,9 +1587,9 @@ func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *
 	// Open the created part from disk.
 	pNew := mustOpenFilePart(dstPartPath)
 	pwNew := &partWrapper{
-		p:        pNew,
-		refCount: 1,
+		p: pNew,
 	}
+	pwNew.incRef()
 	return pwNew
 }
 
@@ -1648,7 +1648,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper,
 	// Mark old parts as must be deleted and decrement reference count,
 	// so they are eventually closed and deleted.
 	for _, pw := range pws {
-		atomic.StoreUint32(&pw.mustBeDeleted, 1)
+		pw.mustDrop.Store(true)
 		pw.decRef()
 	}
 }
@@ -1942,9 +1942,9 @@ func mustOpenParts(path string, partNames []string) []*partWrapper {
 		partPath := filepath.Join(path, partName)
 		p := mustOpenFilePart(partPath)
 		pw := &partWrapper{
-			p:        p,
-			refCount: 1,
+			p: p,
 		}
+		pw.incRef()
 		pws = append(pws, pw)
 	}
 
diff --git a/lib/storage/table.go b/lib/storage/table.go
index e76d65f38c..2fa5979b27 100644
--- a/lib/storage/table.go
+++ b/lib/storage/table.go
@@ -33,25 +33,23 @@ type table struct {
 
 // partitionWrapper provides refcounting mechanism for the partition.
 type partitionWrapper struct {
-	// Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs.
-	// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
+	// refCount is the number of open references to partitionWrapper.
+	refCount atomic.Int32
 
-	refCount uint64
-
-	// The partition must be dropped if mustDrop > 0
-	mustDrop uint64
+	// if mustDrop is true, then the partition must be dropped after refCount reaches zero.
+	mustDrop atomic.Bool
 
 	pt *partition
 }
 
 func (ptw *partitionWrapper) incRef() {
-	atomic.AddUint64(&ptw.refCount, 1)
+	ptw.refCount.Add(1)
 }
 
 func (ptw *partitionWrapper) decRef() {
-	n := atomic.AddUint64(&ptw.refCount, ^uint64(0))
-	if int64(n) < 0 {
-		logger.Panicf("BUG: pts.refCount must be positive; got %d", int64(n))
+	n := ptw.refCount.Add(-1)
+	if n < 0 {
+		logger.Panicf("BUG: pts.refCount must be positive; got %d", n)
 	}
 	if n > 0 {
 		return
@@ -60,18 +58,18 @@ func (ptw *partitionWrapper) decRef() {
 	// refCount is zero. Close the partition.
 	ptw.pt.MustClose()
 
-	if atomic.LoadUint64(&ptw.mustDrop) == 0 {
+	if !ptw.mustDrop.Load() {
 		ptw.pt = nil
 		return
 	}
 
-	// ptw.mustDrop > 0. Drop the partition.
+	// Drop the partition.
 	ptw.pt.Drop()
 	ptw.pt = nil
 }
 
 func (ptw *partitionWrapper) scheduleToDrop() {
-	atomic.AddUint64(&ptw.mustDrop, 1)
+	ptw.mustDrop.Store(true)
 }
 
 // mustOpenTable opens a table on the given path.
@@ -158,9 +156,9 @@ func (tb *table) MustDeleteSnapshot(snapshotName string) {
 
 func (tb *table) addPartitionNolock(pt *partition) {
 	ptw := &partitionWrapper{
-		pt:       pt,
-		refCount: 1,
+		pt: pt,
 	}
+	ptw.incRef()
 	tb.ptws = append(tb.ptws, ptw)
 }
 
@@ -177,7 +175,7 @@ func (tb *table) MustClose() {
 	tb.ptwsLock.Unlock()
 
 	for _, ptw := range ptws {
-		if n := atomic.LoadUint64(&ptw.refCount); n != 1 {
+		if n := ptw.refCount.Load(); n != 1 {
 			logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n)
 		}
 		ptw.decRef()
@@ -223,7 +221,7 @@ func (tb *table) UpdateMetrics(m *TableMetrics) {
 
 	for _, ptw := range ptws {
 		ptw.pt.UpdateMetrics(&m.partitionMetrics)
-		m.PartitionsRefCount += atomic.LoadUint64(&ptw.refCount)
+		m.PartitionsRefCount += uint64(ptw.refCount.Load())
 	}
 
 	// Collect separate metrics for the last partition.