lib/storage: consistently use atomic.* type for refCount and mustDrop fields in indexDB, table and partition structs

See ea9e2b19a5
This commit is contained in:
Aliaksandr Valialkin 2024-02-23 22:54:55 +02:00
parent 0f1ea36dc8
commit a204fd69f1
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 43 additions and 46 deletions

View file

@ -67,10 +67,11 @@ const (
// indexDB represents an index db. // indexDB represents an index db.
type indexDB struct { type indexDB struct {
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. // The number of references to indexDB struct.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 . refCount atomic.Int32
refCount uint64 // if the mustDrop is set to true, then the indexDB must be dropped after refCount reaches zero.
mustDrop atomic.Bool
// The number of missing MetricID -> TSID entries. // The number of missing MetricID -> TSID entries.
// High rate for this value means corrupted indexDB. // High rate for this value means corrupted indexDB.
@ -90,8 +91,6 @@ type indexDB struct {
// The db must be automatically recovered after that. // The db must be automatically recovered after that.
missingMetricNamesForMetricID uint64 missingMetricNamesForMetricID uint64
mustDrop uint64
// generation identifies the index generation ID // generation identifies the index generation ID
// and is used for syncing items from different indexDBs // and is used for syncing items from different indexDBs
generation uint64 generation uint64
@ -151,7 +150,6 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
tagFiltersCacheSize := getTagFiltersCacheSize() tagFiltersCacheSize := getTagFiltersCacheSize()
db := &indexDB{ db := &indexDB{
refCount: 1,
generation: gen, generation: gen,
tb: tb, tb: tb,
name: name, name: name,
@ -160,6 +158,7 @@ func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
s: s, s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128), loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
} }
db.incRef()
return db return db
} }
@ -199,7 +198,7 @@ type IndexDBMetrics struct {
} }
func (db *indexDB) scheduleToDrop() { func (db *indexDB) scheduleToDrop() {
atomic.AddUint64(&db.mustDrop, 1) db.mustDrop.Store(true)
} }
// UpdateMetrics updates m with metrics from the db. // UpdateMetrics updates m with metrics from the db.
@ -216,7 +215,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) m.IndexDBRefCount += uint64(db.refCount.Load())
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
@ -235,7 +234,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
db.tb.UpdateMetrics(&m.TableMetrics) db.tb.UpdateMetrics(&m.TableMetrics)
db.doExtDB(func(extDB *indexDB) { db.doExtDB(func(extDB *indexDB) {
extDB.tb.UpdateMetrics(&m.TableMetrics) extDB.tb.UpdateMetrics(&m.TableMetrics)
m.IndexDBRefCount += atomic.LoadUint64(&extDB.refCount) m.IndexDBRefCount += uint64(extDB.refCount.Load())
}) })
} }
@ -274,12 +273,12 @@ func (db *indexDB) MustClose() {
} }
func (db *indexDB) incRef() { func (db *indexDB) incRef() {
atomic.AddUint64(&db.refCount, 1) db.refCount.Add(1)
} }
func (db *indexDB) decRef() { func (db *indexDB) decRef() {
n := atomic.AddUint64(&db.refCount, ^uint64(0)) n := db.refCount.Add(-1)
if int64(n) < 0 { if n < 0 {
logger.Panicf("BUG: negative refCount: %d", n) logger.Panicf("BUG: negative refCount: %d", n)
} }
if n > 0 { if n > 0 {
@ -298,7 +297,7 @@ func (db *indexDB) decRef() {
db.s = nil db.s = nil
db.loopsPerDateTagFilterCache = nil db.loopsPerDateTagFilterCache = nil
if atomic.LoadUint64(&db.mustDrop) == 0 { if !db.mustDrop.Load() {
return return
} }

View file

@ -146,12 +146,12 @@ type partition struct {
// partWrapper is a wrapper for the part. // partWrapper is a wrapper for the part.
type partWrapper struct { type partWrapper struct {
// The number of references to the part. // The number of references to the part.
refCount uint32 refCount atomic.Int32
// The flag, which is set when the part must be deleted after refCount reaches zero. // The flag, which is set when the part must be deleted after refCount reaches zero.
// This field should be updated only after partWrapper // This field should be updated only after partWrapper
// was removed from the list of active parts. // was removed from the list of active parts.
mustBeDeleted uint32 mustDrop atomic.Bool
// The part itself. // The part itself.
p *part p *part
@ -167,20 +167,20 @@ type partWrapper struct {
} }
func (pw *partWrapper) incRef() { func (pw *partWrapper) incRef() {
atomic.AddUint32(&pw.refCount, 1) pw.refCount.Add(1)
} }
func (pw *partWrapper) decRef() { func (pw *partWrapper) decRef() {
n := atomic.AddUint32(&pw.refCount, ^uint32(0)) n := pw.refCount.Add(-1)
if int32(n) < 0 { if n < 0 {
logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n)) logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", n)
} }
if n > 0 { if n > 0 {
return return
} }
deletePath := "" deletePath := ""
if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 { if pw.mp == nil && pw.mustDrop.Load() {
deletePath = pw.p.path deletePath = pw.p.path
} }
if pw.mp != nil { if pw.mp != nil {
@ -347,21 +347,21 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.InmemoryRowsCount += p.ph.RowsCount m.InmemoryRowsCount += p.ph.RowsCount
m.InmemoryBlocksCount += p.ph.BlocksCount m.InmemoryBlocksCount += p.ph.BlocksCount
m.InmemorySizeBytes += p.size m.InmemorySizeBytes += p.size
m.InmemoryPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) m.InmemoryPartsRefCount += uint64(pw.refCount.Load())
} }
for _, pw := range pt.smallParts { for _, pw := range pt.smallParts {
p := pw.p p := pw.p
m.SmallRowsCount += p.ph.RowsCount m.SmallRowsCount += p.ph.RowsCount
m.SmallBlocksCount += p.ph.BlocksCount m.SmallBlocksCount += p.ph.BlocksCount
m.SmallSizeBytes += p.size m.SmallSizeBytes += p.size
m.SmallPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) m.SmallPartsRefCount += uint64(pw.refCount.Load())
} }
for _, pw := range pt.bigParts { for _, pw := range pt.bigParts {
p := pw.p p := pw.p
m.BigRowsCount += p.ph.RowsCount m.BigRowsCount += p.ph.RowsCount
m.BigBlocksCount += p.ph.BlocksCount m.BigBlocksCount += p.ph.BlocksCount
m.BigSizeBytes += p.size m.BigSizeBytes += p.size
m.BigPartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) m.BigPartsRefCount += uint64(pw.refCount.Load())
} }
m.InmemoryPartsCount += uint64(len(pt.inmemoryParts)) m.InmemoryPartsCount += uint64(len(pt.inmemoryParts))
@ -823,9 +823,9 @@ func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.T
pw := &partWrapper{ pw := &partWrapper{
p: p, p: p,
mp: mp, mp: mp,
refCount: 1,
flushToDiskDeadline: flushToDiskDeadline, flushToDiskDeadline: flushToDiskDeadline,
} }
pw.incRef()
return pw return pw
} }
@ -1587,9 +1587,9 @@ func (pt *partition) openCreatedPart(ph *partHeader, pws []*partWrapper, mpNew *
// Open the created part from disk. // Open the created part from disk.
pNew := mustOpenFilePart(dstPartPath) pNew := mustOpenFilePart(dstPartPath)
pwNew := &partWrapper{ pwNew := &partWrapper{
p: pNew, p: pNew,
refCount: 1,
} }
pwNew.incRef()
return pwNew return pwNew
} }
@ -1648,7 +1648,7 @@ func (pt *partition) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper,
// Mark old parts as must be deleted and decrement reference count, // Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted. // so they are eventually closed and deleted.
for _, pw := range pws { for _, pw := range pws {
atomic.StoreUint32(&pw.mustBeDeleted, 1) pw.mustDrop.Store(true)
pw.decRef() pw.decRef()
} }
} }
@ -1942,9 +1942,9 @@ func mustOpenParts(path string, partNames []string) []*partWrapper {
partPath := filepath.Join(path, partName) partPath := filepath.Join(path, partName)
p := mustOpenFilePart(partPath) p := mustOpenFilePart(partPath)
pw := &partWrapper{ pw := &partWrapper{
p: p, p: p,
refCount: 1,
} }
pw.incRef()
pws = append(pws, pw) pws = append(pws, pw)
} }

View file

@ -33,25 +33,23 @@ type table struct {
// partitionWrapper provides refcounting mechanism for the partition. // partitionWrapper provides refcounting mechanism for the partition.
type partitionWrapper struct { type partitionWrapper struct {
// Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs. // refCount is the number of open references to partitionWrapper.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 refCount atomic.Int32
refCount uint64 // if mustDrop is true, then the partition must be dropped after refCount reaches zero.
mustDrop atomic.Bool
// The partition must be dropped if mustDrop > 0
mustDrop uint64
pt *partition pt *partition
} }
func (ptw *partitionWrapper) incRef() { func (ptw *partitionWrapper) incRef() {
atomic.AddUint64(&ptw.refCount, 1) ptw.refCount.Add(1)
} }
func (ptw *partitionWrapper) decRef() { func (ptw *partitionWrapper) decRef() {
n := atomic.AddUint64(&ptw.refCount, ^uint64(0)) n := ptw.refCount.Add(-1)
if int64(n) < 0 { if n < 0 {
logger.Panicf("BUG: pts.refCount must be positive; got %d", int64(n)) logger.Panicf("BUG: pts.refCount must be positive; got %d", n)
} }
if n > 0 { if n > 0 {
return return
@ -60,18 +58,18 @@ func (ptw *partitionWrapper) decRef() {
// refCount is zero. Close the partition. // refCount is zero. Close the partition.
ptw.pt.MustClose() ptw.pt.MustClose()
if atomic.LoadUint64(&ptw.mustDrop) == 0 { if !ptw.mustDrop.Load() {
ptw.pt = nil ptw.pt = nil
return return
} }
// ptw.mustDrop > 0. Drop the partition. // Drop the partition.
ptw.pt.Drop() ptw.pt.Drop()
ptw.pt = nil ptw.pt = nil
} }
func (ptw *partitionWrapper) scheduleToDrop() { func (ptw *partitionWrapper) scheduleToDrop() {
atomic.AddUint64(&ptw.mustDrop, 1) ptw.mustDrop.Store(true)
} }
// mustOpenTable opens a table on the given path. // mustOpenTable opens a table on the given path.
@ -158,9 +156,9 @@ func (tb *table) MustDeleteSnapshot(snapshotName string) {
func (tb *table) addPartitionNolock(pt *partition) { func (tb *table) addPartitionNolock(pt *partition) {
ptw := &partitionWrapper{ ptw := &partitionWrapper{
pt: pt, pt: pt,
refCount: 1,
} }
ptw.incRef()
tb.ptws = append(tb.ptws, ptw) tb.ptws = append(tb.ptws, ptw)
} }
@ -177,7 +175,7 @@ func (tb *table) MustClose() {
tb.ptwsLock.Unlock() tb.ptwsLock.Unlock()
for _, ptw := range ptws { for _, ptw := range ptws {
if n := atomic.LoadUint64(&ptw.refCount); n != 1 { if n := ptw.refCount.Load(); n != 1 {
logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n) logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n)
} }
ptw.decRef() ptw.decRef()
@ -223,7 +221,7 @@ func (tb *table) UpdateMetrics(m *TableMetrics) {
for _, ptw := range ptws { for _, ptw := range ptws {
ptw.pt.UpdateMetrics(&m.partitionMetrics) ptw.pt.UpdateMetrics(&m.partitionMetrics)
m.PartitionsRefCount += atomic.LoadUint64(&ptw.refCount) m.PartitionsRefCount += uint64(ptw.refCount.Load())
} }
// Collect separate metrics for the last partition. // Collect separate metrics for the last partition.