diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a5f3793b7..6f21b81f3 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -295,17 +295,18 @@ func (ris *rawItemsShard) updateFlushDeadline() { var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second) type partWrapper struct { + // refCount is the number of references to partWrapper + refCount atomic.Int32 + + // mustDrop marks partWrapper for deletion. + // This field should be updated only after partWrapper + // was removed from the list of active parts. + mustDrop atomic.Bool + p *part mp *inmemoryPart - refCount uint32 - - // mustBeDeleted marks partWrapper for deletion. - // This field should be updated only after partWrapper - // was removed from the list of active parts. - mustBeDeleted uint32 - isInMerge bool // The deadline when the in-memory part must be flushed to disk. @@ -313,20 +314,20 @@ type partWrapper struct { } func (pw *partWrapper) incRef() { - atomic.AddUint32(&pw.refCount, 1) + pw.refCount.Add(1) } func (pw *partWrapper) decRef() { - n := atomic.AddUint32(&pw.refCount, ^uint32(0)) - if int32(n) < 0 { - logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", int32(n)) + n := pw.refCount.Add(-1) + if n < 0 { + logger.Panicf("BUG: pw.refCount must be bigger than 0; got %d", n) } if n > 0 { return } deletePath := "" - if pw.mp == nil && atomic.LoadUint32(&pw.mustBeDeleted) != 0 { + if pw.mp == nil && pw.mustDrop.Load() { deletePath = pw.p.path } if pw.mp != nil { @@ -611,7 +612,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.InmemoryBlocksCount += p.ph.blocksCount m.InmemoryItemsCount += p.ph.itemsCount m.InmemorySizeBytes += p.size - m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) + m.PartsRefCount += uint64(pw.refCount.Load()) } m.FilePartsCount += uint64(len(tb.fileParts)) @@ -620,7 +621,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { m.FileBlocksCount += p.ph.blocksCount m.FileItemsCount += p.ph.itemsCount m.FileSizeBytes += p.size - m.PartsRefCount += uint64(atomic.LoadUint32(&pw.refCount)) + m.PartsRefCount += uint64(pw.refCount.Load()) } tb.partsLock.Unlock() @@ -1035,12 +1036,13 @@ func (tb *Table) mustMergeIntoInmemoryPart(bsrs []*blockStreamReader, flushToDis func newPartWrapperFromInmemoryPart(mp *inmemoryPart, flushToDiskDeadline time.Time) *partWrapper { p := mp.NewPart() - return &partWrapper{ + pw := &partWrapper{ p: p, mp: mp, - refCount: 1, flushToDiskDeadline: flushToDiskDeadline, } + pw.incRef() + return pw } func getMaxInmemoryPartSize() uint64 { @@ -1340,9 +1342,9 @@ func (tb *Table) openCreatedPart(pws []*partWrapper, mpNew *inmemoryPart, dstPar // Open the created part from disk. pNew := mustOpenFilePart(dstPartPath) pwNew := &partWrapper{ - p: pNew, - refCount: 1, + p: pNew, } + pwNew.incRef() return pwNew } @@ -1408,7 +1410,7 @@ func (tb *Table) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dst // Mark old parts as must be deleted and decrement reference count, // so they are eventually closed and deleted. for _, pw := range pws { - atomic.StoreUint32(&pw.mustBeDeleted, 1) + pw.mustDrop.Store(true) pw.decRef() } } @@ -1513,9 +1515,9 @@ func mustOpenParts(path string) []*partWrapper { partPath := filepath.Join(path, partName) p := mustOpenFilePart(partPath) pw := &partWrapper{ - p: p, - refCount: 1, + p: p, } + pw.incRef() pws = append(pws, pw) } partNamesPath := filepath.Join(path, partsFilename)