lib/mergeset: consistently use atomic.* type for refCount and mustDrop fields in table struct in the same way as it is used in lib/storage

See ea9e2b19a5 and a204fd69f1
This commit is contained in:
Aliaksandr Valialkin 2024-02-23 22:59:20 +02:00
parent a204fd69f1
commit 5c89150fc9
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -295,17 +295,18 @@ func (ris *rawItemsShard) updateFlushDeadline() {
var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second) var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second)
type partWrapper struct { type partWrapper struct {
// refCount is the number of references to partWrapper
refCount atomic.Int32
// mustDrop marks partWrapper for deletion.
// This field should be updated only after partWrapper
// was removed from the list of active parts.
mustDrop atomic.Bool
p *part p *part
mp *inmemoryPart mp *inmemoryPart
refCount uint32
// mustBeDeleted marks partWrapper for deletion.
// This field should be updated only after partWrapper
// was removed from the list of active parts.
mustBeDeleted uint32
isInMerge bool isInMerge bool
// The deadline when the in-memory part must be flushed to disk. // The deadline when the in-memory part must be flushed to disk.
@ -313,20 +314,20 @@ type partWrapper struct {
} }
func (pw *partWrapper) incRef() { func (pw *partWrapper) incRef() {
atomic.AddUint32(&pw.refCount, 1) pw.refCount.Add(1)
} }
func (pw *partWrapper) decRef() { func (pw *partWrapper) decRef() {
n := atomic.AddUint32(&pw.refCount, ^uint32(0)) n := pw.refCount.Add(-1)
if int32(n) < 0 { if n < 0 {
logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n)) logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", n)
} }
if n > 0 { if n > 0 {
return return
} }
deletePath := "" deletePath := ""
if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 { if pw.mp == nil && pw.mustDrop.Load() {
deletePath = pw.p.path deletePath = pw.p.path
} }
if pw.mp != nil { if pw.mp != nil {
@ -611,7 +612,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.InmemoryBlocksCount += p.ph.blocksCount m.InmemoryBlocksCount += p.ph.blocksCount
m.InmemoryItemsCount += p.ph.itemsCount m.InmemoryItemsCount += p.ph.itemsCount
m.InmemorySizeBytes += p.size m.InmemorySizeBytes += p.size
m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) m.PartsRefCount += uint64(pw.refCount.Load())
} }
m.FilePartsCount += uint64(len(tb.fileParts)) m.FilePartsCount += uint64(len(tb.fileParts))
@ -620,7 +621,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.FileBlocksCount += p.ph.blocksCount m.FileBlocksCount += p.ph.blocksCount
m.FileItemsCount += p.ph.itemsCount m.FileItemsCount += p.ph.itemsCount
m.FileSizeBytes += p.size m.FileSizeBytes += p.size
m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) m.PartsRefCount += uint64(pw.refCount.Load())
} }
tb.partsLock.Unlock() tb.partsLock.Unlock()
@ -1035,12 +1036,13 @@ func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDis
func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.Time) *partWrapper { func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.Time) *partWrapper {
p := mp.NewPart() p := mp.NewPart()
return &partWrapper{ pw := &partWrapper{
p: p, p: p,
mp: mp, mp: mp,
refCount: 1,
flushToDiskDeadline: flushToDiskDeadline, flushToDiskDeadline: flushToDiskDeadline,
} }
pw.incRef()
return pw
} }
func getMaxInmemoryPartSize() uint64 { func getMaxInmemoryPartSize() uint64 {
@ -1341,8 +1343,8 @@ func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPar
pNew := mustOpenFilePart(dstPartPath) pNew := mustOpenFilePart(dstPartPath)
pwNew := &partWrapper{ pwNew := &partWrapper{
p: pNew, p: pNew,
refCount: 1,
} }
pwNew.incRef()
return pwNew return pwNew
} }
@ -1408,7 +1410,7 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst
// Mark old parts as must be deleted and decrement reference count, // Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted. // so they are eventually closed and deleted.
for _, pw := range pws { for _, pw := range pws {
atomic.StoreUint32(&pw.mustBeDeleted, 1) pw.mustDrop.Store(true)
pw.decRef() pw.decRef()
} }
} }
@ -1514,8 +1516,8 @@ func mustOpenParts(path string) []*partWrapper {
p := mustOpenFilePart(partPath) p := mustOpenFilePart(partPath)
pw := &partWrapper{ pw := &partWrapper{
p: p, p: p,
refCount: 1,
} }
pw.incRef()
pws = append(pws, pw) pws = append(pws, pw)
} }
partNamesPath := filepath.Join(path, partsFilename) partNamesPath := filepath.Join(path, partsFilename)