lib/logstorage: consistently use atomic.* type for refCount and mustDrop fields in datadb and storage structs in the same way as it is used in lib/storage

See ea9e2b19a5 and a204fd69f1
This commit is contained in:
Aliaksandr Valialkin 2024-02-23 23:04:38 +02:00
parent 5c89150fc9
commit 275335c181
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 17 additions and 17 deletions

View file

@ -92,10 +92,10 @@ type partWrapper struct {
// refCount is the number of references to p. // refCount is the number of references to p.
// //
// When the number of references reaches zero, then p is closed. // When the number of references reaches zero, then p is closed.
refCount int32 refCount atomic.Int32
// The flag, which is set when the part must be deleted after refCount reaches zero. // The flag, which is set when the part must be deleted after refCount reaches zero.
mustBeDeleted uint32 mustDrop atomic.Bool
// p is an opened part // p is an opened part
p *part p *part
@ -111,18 +111,18 @@ type partWrapper struct {
} }
func (pw *partWrapper) incRef() { func (pw *partWrapper) incRef() {
atomic.AddInt32(&pw.refCount, 1) pw.refCount.Add(1)
} }
func (pw *partWrapper) decRef() { func (pw *partWrapper) decRef() {
n := atomic.AddInt32(&pw.refCount, -1) n := pw.refCount.Add(-1)
if n > 0 { if n > 0 {
return return
} }
deletePath := "" deletePath := ""
if pw.mp == nil { if pw.mp == nil {
if atomic.LoadUint32(&pw.mustBeDeleted) != 0 { if pw.mustDrop.Load() {
deletePath = pw.p.path deletePath = pw.p.path
} }
} else { } else {
@ -767,7 +767,7 @@ func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, d
// Mark old parts as must be deleted and decrement reference count, // Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted. // so they are eventually closed and deleted.
for _, pw := range pws { for _, pw := range pws {
atomic.StoreUint32(&pw.mustBeDeleted, 1) pw.mustDrop.Store(true)
pw.decRef() pw.decRef()
} }
} }
@ -917,8 +917,8 @@ func mustCloseDatadb(ddb *datadb) {
// close file parts // close file parts
for _, pw := range ddb.fileParts { for _, pw := range ddb.fileParts {
pw.decRef() pw.decRef()
if pw.refCount != 0 { if n := pw.refCount.Load(); n != 0 {
logger.Panicf("BUG: ther are %d references to filePart", pw.refCount) logger.Panicf("BUG: ther are %d references to filePart", n)
} }
} }
ddb.fileParts = nil ddb.fileParts = nil

View file

@ -142,10 +142,10 @@ type Storage struct {
type partitionWrapper struct { type partitionWrapper struct {
// refCount is the number of active references to p. // refCount is the number of active references to p.
// When it reaches zero, then the p is closed. // When it reaches zero, then the p is closed.
refCount int32 refCount atomic.Int32
// The flag, which is set when the partition must be deleted after refCount reaches zero. // The flag, which is set when the partition must be deleted after refCount reaches zero.
mustBeDeleted uint32 mustDrop atomic.Bool
// day is the day for the partition in the unix timestamp divided by the number of seconds in the day. // day is the day for the partition in the unix timestamp divided by the number of seconds in the day.
day int64 day int64
@ -164,17 +164,17 @@ func newPartitionWrapper(pt *partition, day int64) *partitionWrapper {
} }
func (ptw *partitionWrapper) incRef() { func (ptw *partitionWrapper) incRef() {
atomic.AddInt32(&ptw.refCount, 1) ptw.refCount.Add(1)
} }
func (ptw *partitionWrapper) decRef() { func (ptw *partitionWrapper) decRef() {
n := atomic.AddInt32(&ptw.refCount, -1) n := ptw.refCount.Add(-1)
if n > 0 { if n > 0 {
return return
} }
deletePath := "" deletePath := ""
if atomic.LoadUint32(&ptw.mustBeDeleted) != 0 { if ptw.mustDrop.Load() {
deletePath = ptw.pt.path deletePath = ptw.pt.path
} }
@ -293,7 +293,7 @@ func MustOpenStorage(path string, cfg *StorageConfig) *Storage {
break break
} }
logger.Infof("the partition %s is scheduled to be deleted because it is outside the -futureRetention=%dd", ptw.pt.path, durationToDays(s.futureRetention)) logger.Infof("the partition %s is scheduled to be deleted because it is outside the -futureRetention=%dd", ptw.pt.path, durationToDays(s.futureRetention))
atomic.StoreUint32(&ptw.mustBeDeleted, 1) ptw.mustDrop.Store(true)
ptw.decRef() ptw.decRef()
j-- j--
} }
@ -348,7 +348,7 @@ func (s *Storage) watchRetention() {
for _, ptw := range ptwsToDelete { for _, ptw := range ptwsToDelete {
logger.Infof("the partition %s is scheduled to be deleted because it is outside the -retentionPeriod=%dd", ptw.pt.path, durationToDays(s.retention)) logger.Infof("the partition %s is scheduled to be deleted because it is outside the -retentionPeriod=%dd", ptw.pt.path, durationToDays(s.retention))
atomic.StoreUint32(&ptw.mustBeDeleted, 1) ptw.mustDrop.Store(true)
ptw.decRef() ptw.decRef()
} }
@ -379,8 +379,8 @@ func (s *Storage) MustClose() {
// Close partitions // Close partitions
for _, pw := range s.partitions { for _, pw := range s.partitions {
pw.decRef() pw.decRef()
if pw.refCount != 0 { if n := pw.refCount.Load(); n != 0 {
logger.Panicf("BUG: there are %d users of partition", pw.refCount) logger.Panicf("BUG: there are %d users of partition", n)
} }
} }
s.partitions = nil s.partitions = nil