From a204fd69f173f6e76d9f684d176d20f2fdca010c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 23 Feb 2024 22:54:55 +0200 Subject: [PATCH] lib/storage: consistently use atomic.* type for refCount and mustDrop fields in indexDB, table and partition structs See ea9e2b19a5fa8aecc25c9da10fad8f4c1c58df38 --- lib/storage/index_db.go | 25 ++++++++++++------------- lib/storage/partition.go | 32 ++++++++++++++++---------------- lib/storage/table.go | 32 +++++++++++++++----------------- 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 7ee94b157..ec0461d70 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -67,10 +67,11 @@ const ( // indexDB represents an index db. type indexDB struct { - // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 . + // The number of references to indexDB struct. + refCount atomic.Int32 - refCount uint64 + // if the mustDrop is set to true, then the indexDB must be dropped after refCount reaches zero. + mustDrop atomic.Bool // The number of missing MetricID -> TSID entries. // High rate for this value means corrupted indexDB. @@ -90,8 +91,6 @@ type indexDB struct { // The db must be automatically recovered after that. missingMetricNamesForMetricID uint64 - mustDrop uint64 - // generation identifies the index generation ID // and is used for syncing items from different indexDBs generation uint64 @@ -151,7 +150,6 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB { tagFiltersCacheSize := getTagFiltersCacheSize() db := &indexDB{ - refCount: 1, generation: gen, tb: tb, name: name, @@ -160,6 +158,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB { s: s, loopsPerDateTagFilterCache: workingsetcache.New(mem / 128), } + db.incRef() return db } @@ -199,7 +198,7 @@ type IndexDBMetrics struct { } func (db *indexDB) scheduleToDrop() { - atomic.AddUint64(&db.mustDrop, 1) + db.mustDrop.Store(true) } // UpdateMetrics updates m with metrics from the db. @@ -216,7 +215,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) - m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) + m.IndexDBRefCount += uint64(db.refCount.Load()) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) @@ -235,7 +234,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { db.tb.UpdateMetrics(&m.TableMetrics) db.doExtDB(func(extDB *indexDB) { extDB.tb.UpdateMetrics(&m.TableMetrics) - m.IndexDBRefCount += atomic.LoadUint64(&extDB.refCount) + m.IndexDBRefCount += uint64(extDB.refCount.Load()) }) } @@ -274,12 +273,12 @@ func (db *indexDB) MustClose() { } func (db *indexDB) incRef() { - atomic.AddUint64(&db.refCount, 1) + db.refCount.Add(1) } func (db *indexDB) decRef() { - n := atomic.AddUint64(&db.refCount, ^uint64(0)) - if int64(n) < 0 { + n := db.refCount.Add(-1) + if n < 0 { logger.Panicf("BUG: negative refCount: %d", n) } if n > 0 { @@ -298,7 +297,7 @@ func (db *indexDB) decRef() { db.s = nil db.loopsPerDateTagFilterCache = nil - if atomic.LoadUint64(&db.mustDrop) == 0 { + if !db.mustDrop.Load() { return } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index c48a4f1f4..a81b65f57 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -146,12 +146,12 @@ type partition struct { // partWrapper is a wrapper for the part. type partWrapper struct { // The number of references to the part. - refCount uint32 + refCount atomic.Int32 // The flag, which is set when the part must be deleted after refCount reaches zero. // This field should be updated only after partWrapper // was removed from the list of active parts. - mustBeDeleted uint32 + mustDrop atomic.Bool // The part itself. p *part @@ -167,20 +167,20 @@ type partWrapper struct { } func (pw *partWrapper) incRef() { - atomic.AddUint32(&pw.refCount, 1) + pw.refCount.Add(1) } func (pw *partWrapper) decRef() { - n := atomic.AddUint32(&pw.refCount, ^uint32(0)) - if int32(n) < 0 { - logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n)) + n := pw.refCount.Add(-1) + if n < 0 { + logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", n) } if n > 0 { return } deletePath := "" - if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 { + if pw.mp == nil && pw.mustDrop.Load() { deletePath = pw.p.path } if pw.mp != nil { @@ -347,21 +347,21 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.InmemoryRowsCount += p.ph.RowsCount m.InmemoryBlocksCount += p.ph.BlocksCount m.InmemorySizeBytes += p.size - m.InmemoryPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) + m.InmemoryPartsRefCount += uint64(pw.refCount.Load()) } for _, pw := range pt.smallParts { p := pw.p m.SmallRowsCount += p.ph.RowsCount m.SmallBlocksCount += p.ph.BlocksCount m.SmallSizeBytes += p.size - m.SmallPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) + m.SmallPartsRefCount += uint64(pw.refCount.Load()) } for _, pw := range pt.bigParts { p := pw.p m.BigRowsCount += p.ph.RowsCount m.BigBlocksCount += p.ph.BlocksCount m.BigSizeBytes += p.size - m.BigPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) + m.BigPartsRefCount += uint64(pw.refCount.Load()) } m.InmemoryPartsCount += uint64(len(pt.inmemoryParts)) @@ -823,9 +823,9 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T pw := &partWrapper{ p: p, mp: mp, - refCount: 1, flushToDiskDeadline: flushToDiskDeadline, } + pw.incRef() return pw } @@ -1587,9 +1587,9 @@ func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew * // Open the created part from disk. pNew := mustOpenFilePart(dstPartPath) pwNew := &partWrapper{ - p: pNew, - refCount: 1, + p: pNew, } + pwNew.incRef() return pwNew } @@ -1648,7 +1648,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, // Mark old parts as must be deleted and decrement reference count, // so they are eventually closed and deleted. for _, pw := range pws { - atomic.StoreUint32(&pw.mustBeDeleted, 1) + pw.mustDrop.Store(true) pw.decRef() } } @@ -1942,9 +1942,9 @@ func mustOpenParts(path string, partNames []string) []*partWrapper { partPath := filepath.Join(path, partName) p := mustOpenFilePart(partPath) pw := &partWrapper{ - p: p, - refCount: 1, + p: p, } + pw.incRef() pws = append(pws, pw) } diff --git a/lib/storage/table.go b/lib/storage/table.go index e76d65f38..2fa5979b2 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -33,25 +33,23 @@ type table struct { // partitionWrapper provides refcounting mechanism for the partition. type partitionWrapper struct { - // Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + // refCount is the number of open references to partitionWrapper. + refCount atomic.Int32 - refCount uint64 - - // The partition must be dropped if mustDrop > 0 - mustDrop uint64 + // if mustDrop is true, then the partition must be dropped after refCount reaches zero. + mustDrop atomic.Bool pt *partition } func (ptw *partitionWrapper) incRef() { - atomic.AddUint64(&ptw.refCount, 1) + ptw.refCount.Add(1) } func (ptw *partitionWrapper) decRef() { - n := atomic.AddUint64(&ptw.refCount, ^uint64(0)) - if int64(n) < 0 { - logger.Panicf("BUG: pts.refCount must be positive; got %d", int64(n)) + n := ptw.refCount.Add(-1) + if n < 0 { + logger.Panicf("BUG: pts.refCount must be positive; got %d", n) } if n > 0 { return @@ -60,18 +58,18 @@ func (ptw *partitionWrapper) decRef() { // refCount is zero. Close the partition. ptw.pt.MustClose() - if atomic.LoadUint64(&ptw.mustDrop) == 0 { + if !ptw.mustDrop.Load() { ptw.pt = nil return } - // ptw.mustDrop > 0. Drop the partition. + // Drop the partition. ptw.pt.Drop() ptw.pt = nil } func (ptw *partitionWrapper) scheduleToDrop() { - atomic.AddUint64(&ptw.mustDrop, 1) + ptw.mustDrop.Store(true) } // mustOpenTable opens a table on the given path. @@ -158,9 +156,9 @@ func (tb *table) MustDeleteSnapshot(snapshotName string) { func (tb *table) addPartitionNolock(pt *partition) { ptw := &partitionWrapper{ - pt: pt, - refCount: 1, + pt: pt, } + ptw.incRef() tb.ptws = append(tb.ptws, ptw) } @@ -177,7 +175,7 @@ func (tb *table) MustClose() { tb.ptwsLock.Unlock() for _, ptw := range ptws { - if n := atomic.LoadUint64(&ptw.refCount); n != 1 { + if n := ptw.refCount.Load(); n != 1 { logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n) } ptw.decRef() @@ -223,7 +221,7 @@ func (tb *table) UpdateMetrics(m *TableMetrics) { for _, ptw := range ptws { ptw.pt.UpdateMetrics(&m.partitionMetrics) - m.PartitionsRefCount += atomic.LoadUint64(&ptw.refCount) + m.PartitionsRefCount += uint64(ptw.refCount.Load()) } // Collect separate metrics for the last partition.