diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index 3504b563b..6591c6fbf 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -81,7 +81,7 @@ func mustOpenIndexdb(path, partitionName string, s *Storage) *indexdb { partitionName: partitionName, s: s, } - isReadOnly := uint32(0) + var isReadOnly atomic.Bool idb.tb = mergeset.MustOpenTable(path, idb.invalidateStreamFilterCache, mergeTagToStreamIDsRows, &isReadOnly) return idb } diff --git a/lib/mergeset/merge.go b/lib/mergeset/merge.go index df8a7fcd6..39c42c2d7 100644 --- a/lib/mergeset/merge.go +++ b/lib/mergeset/merge.go @@ -28,7 +28,7 @@ type PrepareBlockCallback func(data []byte, items []Item) ([]byte, []Item) // // It also atomically adds the number of items merged to itemsMerged. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{}, - itemsMerged *uint64) error { + itemsMerged *atomic.Uint64) error { bsm := bsmPool.Get().(*blockStreamMerger) if err := bsm.Init(bsrs, prepareBlock); err != nil { return fmt.Errorf("cannot initialize blockStreamMerger: %w", err) @@ -100,7 +100,7 @@ func (bsm *blockStreamMerger) Init(bsrs []*blockStreamReader, prepareBlock Prepa var errForciblyStopped = fmt.Errorf("forcibly stopped") -func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error { +func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *atomic.Uint64) error { again: if len(bsm.bsrHeap) == 0 { // Write the last (maybe incomplete) inmemoryBlock to bsw. @@ -163,14 +163,14 @@ again: goto again } -func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *uint64) { +func (bsm *blockStreamMerger) flushIB(bsw *blockStreamWriter, ph *partHeader, itemsMerged *atomic.Uint64) { items := bsm.ib.items data := bsm.ib.data if len(items) == 0 { // Nothing to flush. return } - atomic.AddUint64(itemsMerged, uint64(len(items))) + itemsMerged.Add(uint64(len(items))) if bsm.prepareBlock != nil { bsm.firstItem = append(bsm.firstItem[:0], items[0].String(data)...) bsm.lastItem = append(bsm.lastItem[:0], items[len(items)-1].String(data)...) diff --git a/lib/mergeset/merge_test.go b/lib/mergeset/merge_test.go index 0b628e1d9..4f3e4d33f 100644 --- a/lib/mergeset/merge_test.go +++ b/lib/mergeset/merge_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "reflect" "sort" + "sync/atomic" "testing" "time" ) @@ -27,7 +28,7 @@ func TestMultilevelMerge(t *testing.T) { // Prepare blocks to merge. bsrs, items := newTestInmemoryBlockStreamReaders(r, 10, 4000) - var itemsMerged uint64 + var itemsMerged atomic.Uint64 // First level merge var dstIP1 inmemoryPart @@ -44,12 +45,12 @@ func TestMultilevelMerge(t *testing.T) { t.Fatalf("cannot merge first level part 2: %s", err) } - if itemsMerged != uint64(len(items)) { - t.Fatalf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items)) + if n := itemsMerged.Load(); n != uint64(len(items)) { + t.Fatalf("unexpected itemsMerged; got %d; want %d", n, len(items)) } // Second level merge (aka final merge) - itemsMerged = 0 + itemsMerged.Store(0) var dstIP inmemoryPart var bsw blockStreamWriter bsrsTop := []*blockStreamReader{ @@ -60,8 +61,8 @@ func TestMultilevelMerge(t *testing.T) { if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrsTop, nil, nil, &itemsMerged); err != nil { t.Fatalf("cannot merge second level: %s", err) } - if itemsMerged != uint64(len(items)) { - t.Fatalf("unexpected itemsMerged after final merge; got %d; want %d", itemsMerged, len(items)) + if n := itemsMerged.Load(); n != uint64(len(items)) { + t.Fatalf("unexpected itemsMerged after final merge; got %d; want %d", n, len(items)) } // Verify the resulting part (dstIP) contains all the items @@ -78,13 +79,13 @@ func TestMergeForciblyStop(t *testing.T) { var bsw blockStreamWriter bsw.MustInitFromInmemoryPart(&dstIP, 1) ch := make(chan struct{}) - var itemsMerged uint64 + var itemsMerged atomic.Uint64 close(ch) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); !errors.Is(err, errForciblyStopped) { t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped) } - if itemsMerged != 0 { - t.Fatalf("unexpected itemsMerged; got %d; want %d", itemsMerged, 0) + if n := itemsMerged.Load(); n != 0 { + t.Fatalf("unexpected itemsMerged; got %d; want %d", n, 0) } } @@ -122,15 +123,15 @@ func testMergeBlockStreamsSerial(r *rand.Rand, blocksToMerge, maxItemsPerBlock i bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksToMerge, maxItemsPerBlock) // Merge blocks. - var itemsMerged uint64 + var itemsMerged atomic.Uint64 var dstIP inmemoryPart var bsw blockStreamWriter bsw.MustInitFromInmemoryPart(&dstIP, -4) if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return fmt.Errorf("cannot merge block streams: %w", err) } - if itemsMerged != uint64(len(items)) { - return fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items)) + if n := itemsMerged.Load(); n != uint64(len(items)) { + return fmt.Errorf("unexpected itemsMerged; got %d; want %d", n, len(items)) } // Verify the resulting part (dstIP) contains all the items diff --git a/lib/mergeset/part_search_test.go b/lib/mergeset/part_search_test.go index b61e9b552..fb141b3f7 100644 --- a/lib/mergeset/part_search_test.go +++ b/lib/mergeset/part_search_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/rand" "sort" + "sync/atomic" "testing" "time" ) @@ -148,15 +149,15 @@ func testPartSearchSerial(r *rand.Rand, p *part, items []string) error { func newTestPart(r *rand.Rand, blocksCount, maxItemsPerBlock int) (*part, []string, error) { bsrs, items := newTestInmemoryBlockStreamReaders(r, blocksCount, maxItemsPerBlock) - var itemsMerged uint64 + var itemsMerged atomic.Uint64 var ip inmemoryPart var bsw blockStreamWriter bsw.MustInitFromInmemoryPart(&ip, -3) if err := mergeBlockStreams(&ip.ph, &bsw, bsrs, nil, nil, &itemsMerged); err != nil { return nil, nil, fmt.Errorf("cannot merge blocks: %w", err) } - if itemsMerged != uint64(len(items)) { - return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", itemsMerged, len(items)) + if n := itemsMerged.Load(); n != uint64(len(items)) { + return nil, nil, fmt.Errorf("unexpected itemsMerged; got %d; want %d", n, len(items)) } size := ip.size() p := newPart(&ip.ph, "partName", size, ip.metaindexData.NewReader(), &ip.indexData, &ip.itemsData, &ip.lensData) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 6f21b81f3..f22f2406f 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -83,33 +83,29 @@ func maxItemsPerCachedPart() uint64 { // Table represents mergeset table. type Table struct { - // Atomically updated counters must go first in the struct, so they are properly - // aligned to 8 bytes on 32-bit architectures. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + activeInmemoryMerges atomic.Int64 + activeFileMerges atomic.Int64 - activeInmemoryMerges uint64 - activeFileMerges uint64 + inmemoryMergesCount atomic.Uint64 + fileMergesCount atomic.Uint64 - inmemoryMergesCount uint64 - fileMergesCount uint64 + inmemoryItemsMerged atomic.Uint64 + fileItemsMerged atomic.Uint64 - inmemoryItemsMerged uint64 - fileItemsMerged uint64 + itemsAdded atomic.Uint64 + itemsAddedSizeBytes atomic.Uint64 - itemsAdded uint64 - itemsAddedSizeBytes uint64 + inmemoryPartsLimitReachedCount atomic.Uint64 - inmemoryPartsLimitReachedCount uint64 - - mergeIdx uint64 + mergeIdx atomic.Uint64 path string flushCallback func() - needFlushCallbackCall uint32 + needFlushCallbackCall atomic.Bool prepareBlock PrepareBlockCallback - isReadOnly *uint32 + isReadOnly *atomic.Bool // rawItems contains recently added items that haven't been converted to parts yet. // @@ -353,7 +349,7 @@ func (pw *partWrapper) decRef() { // to persistent storage. // // The table is created if it doesn't exist yet. -func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *uint32) *Table { +func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallback, isReadOnly *atomic.Bool) *Table { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -363,7 +359,6 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC pws := mustOpenParts(path) tb := &Table{ - mergeIdx: uint64(time.Now().UnixNano()), path: path, flushCallback: flushCallback, prepareBlock: prepareBlock, @@ -372,6 +367,7 @@ func MustOpenTable(path string, flushCallback func(), prepareBlock PrepareBlockC inmemoryPartsLimitCh: make(chan struct{}, maxInmemoryParts), stopCh: make(chan struct{}), } + tb.mergeIdx.Store(uint64(time.Now().UnixNano())) tb.rawItems.init() tb.startBackgroundWorkers() @@ -464,7 +460,7 @@ func (tb *Table) startFlushCallbackWorker() { tb.wg.Done() return case <-tc.C: - if atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 1, 0) { + if tb.needFlushCallbackCall.CompareAndSwap(true, false) { tb.flushCallback() } } @@ -588,19 +584,19 @@ func (tm *TableMetrics) TotalItemsCount() uint64 { // UpdateMetrics updates m with metrics from tb. func (tb *Table) UpdateMetrics(m *TableMetrics) { - m.ActiveInmemoryMerges += atomic.LoadUint64(&tb.activeInmemoryMerges) - m.ActiveFileMerges += atomic.LoadUint64(&tb.activeFileMerges) + m.ActiveInmemoryMerges += uint64(tb.activeInmemoryMerges.Load()) + m.ActiveFileMerges += uint64(tb.activeFileMerges.Load()) - m.InmemoryMergesCount += atomic.LoadUint64(&tb.inmemoryMergesCount) - m.FileMergesCount += atomic.LoadUint64(&tb.fileMergesCount) + m.InmemoryMergesCount += tb.inmemoryMergesCount.Load() + m.FileMergesCount += tb.fileMergesCount.Load() - m.InmemoryItemsMerged += atomic.LoadUint64(&tb.inmemoryItemsMerged) - m.FileItemsMerged += atomic.LoadUint64(&tb.fileItemsMerged) + m.InmemoryItemsMerged += tb.inmemoryItemsMerged.Load() + m.FileItemsMerged += tb.fileItemsMerged.Load() - m.ItemsAdded += atomic.LoadUint64(&tb.itemsAdded) - m.ItemsAddedSizeBytes += atomic.LoadUint64(&tb.itemsAddedSizeBytes) + m.ItemsAdded += tb.itemsAdded.Load() + m.ItemsAddedSizeBytes += tb.itemsAddedSizeBytes.Load() - m.InmemoryPartsLimitReachedCount += atomic.LoadUint64(&tb.inmemoryPartsLimitReachedCount) + m.InmemoryPartsLimitReachedCount += tb.inmemoryPartsLimitReachedCount.Load() m.PendingItems += uint64(tb.rawItems.Len()) @@ -644,12 +640,12 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) { // It logs the ignored items, so users could notice and fix the issue. func (tb *Table) AddItems(items [][]byte) { tb.rawItems.addItems(tb, items) - atomic.AddUint64(&tb.itemsAdded, uint64(len(items))) + tb.itemsAdded.Add(uint64(len(items))) n := 0 for _, item := range items { n += len(item) } - atomic.AddUint64(&tb.itemsAddedSizeBytes, uint64(n)) + tb.itemsAddedSizeBytes.Add(uint64(n)) } // getParts appends parts snapshot to dst and returns it. @@ -890,7 +886,7 @@ func (tb *Table) addToInmemoryParts(pw *partWrapper, isFinal bool) { select { case tb.inmemoryPartsLimitCh <- struct{}{}: default: - atomic.AddUint64(&tb.inmemoryPartsLimitReachedCount, 1) + tb.inmemoryPartsLimitReachedCount.Add(1) select { case tb.inmemoryPartsLimitCh <- struct{}{}: case <-tb.stopCh: @@ -906,10 +902,10 @@ func (tb *Table) addToInmemoryParts(pw *partWrapper, isFinal bool) { if isFinal { tb.flushCallback() } else { - // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization - // at fast path when needFlushCallbackCall is already set to 1. - if atomic.LoadUint32(&tb.needFlushCallbackCall) == 0 { - atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1) + // Use Load in front of CompareAndSwap in order to avoid slow inter-CPU synchronization + // at fast path when needFlushCallbackCall is already set to true. + if !tb.needFlushCallbackCall.Load() { + tb.needFlushCallbackCall.CompareAndSwap(false, true) } } } @@ -1072,7 +1068,7 @@ func (tb *Table) NotifyReadWriteMode() { func (tb *Table) inmemoryPartsMerger() { for { - if atomic.LoadUint32(tb.isReadOnly) != 0 { + if tb.isReadOnly.Load() { return } maxOutBytes := tb.getMaxFilePartSize() @@ -1105,7 +1101,7 @@ func (tb *Table) inmemoryPartsMerger() { func (tb *Table) filePartsMerger() { for { - if atomic.LoadUint32(tb.isReadOnly) != 0 { + if tb.isReadOnly.Load() { return } maxOutBytes := tb.getMaxFilePartSize() @@ -1303,9 +1299,9 @@ func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader { func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { var ph partHeader - var itemsMerged *uint64 - var mergesCount *uint64 - var activeMerges *uint64 + var itemsMerged *atomic.Uint64 + var mergesCount *atomic.Uint64 + var activeMerges *atomic.Int64 switch dstPartType { case partInmemory: itemsMerged = &tb.inmemoryItemsMerged @@ -1318,10 +1314,10 @@ func (tb *Table) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, default: logger.Panicf("BUG: unknown partType=%d", dstPartType) } - atomic.AddUint64(activeMerges, 1) + activeMerges.Add(1) err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, itemsMerged) - atomic.AddUint64(activeMerges, ^uint64(0)) - atomic.AddUint64(mergesCount, 1) + activeMerges.Add(-1) + mergesCount.Add(1) if err != nil { return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err) } @@ -1462,7 +1458,7 @@ func getCompressLevel(itemsCount uint64) int { } func (tb *Table) nextMergeIdx() uint64 { - return atomic.AddUint64(&tb.mergeIdx, 1) + return tb.mergeIdx.Add(1) } func mustOpenParts(path string) []*partWrapper { diff --git a/lib/mergeset/table_search_test.go b/lib/mergeset/table_search_test.go index c12fdaf73..79a652ff7 100644 --- a/lib/mergeset/table_search_test.go +++ b/lib/mergeset/table_search_test.go @@ -41,7 +41,7 @@ func TestTableSearchSerial(t *testing.T) { func() { // Re-open the table and verify the search works. - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, nil, nil, &isReadOnly) defer tb.MustClose() if err := testTableSearchSerial(tb, items); err != nil { @@ -75,7 +75,7 @@ func TestTableSearchConcurrent(t *testing.T) { // Re-open the table and verify the search works. func() { - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, nil, nil, &isReadOnly) defer tb.MustClose() if err := testTableSearchConcurrent(tb, items); err != nil { @@ -145,11 +145,11 @@ func testTableSearchSerial(tb *Table, items []string) error { } func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string, error) { - var flushes uint64 + var flushes atomic.Uint64 flushCallback := func() { - atomic.AddUint64(&flushes, 1) + flushes.Add(1) } - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, flushCallback, nil, &isReadOnly) items := make([]string, itemsCount) for i := 0; i < itemsCount; i++ { @@ -158,7 +158,7 @@ func newTestTable(r *rand.Rand, path string, itemsCount int) (*Table, []string, items[i] = item } tb.DebugFlush() - if itemsCount > 0 && atomic.LoadUint64(&flushes) == 0 { + if itemsCount > 0 && flushes.Load() == 0 { return nil, nil, fmt.Errorf("unexpeted zero flushes for itemsCount=%d", itemsCount) } diff --git a/lib/mergeset/table_search_timing_test.go b/lib/mergeset/table_search_timing_test.go index cb1e24cc8..f8deb4d17 100644 --- a/lib/mergeset/table_search_timing_test.go +++ b/lib/mergeset/table_search_timing_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "os" + "sync/atomic" "testing" ) @@ -34,7 +35,7 @@ func benchmarkTableSearch(b *testing.B, itemsCount int) { // Force finishing pending merges tb.MustClose() - var isReadOnly uint32 + var isReadOnly atomic.Bool tb = MustOpenTable(path, nil, nil, &isReadOnly) defer tb.MustClose() diff --git a/lib/mergeset/table_test.go b/lib/mergeset/table_test.go index 7b7376681..e044ab34a 100644 --- a/lib/mergeset/table_test.go +++ b/lib/mergeset/table_test.go @@ -20,7 +20,7 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, nil, nil, &isReadOnly) // Close it @@ -39,7 +39,7 @@ func TestTableAddItemsTooLongItem(t *testing.T) { t.Fatalf("cannot remove %q: %s", path, err) } - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, nil, nil, &isReadOnly) tb.AddItems([][]byte{make([]byte, maxInmemoryBlockSize+1)}) tb.MustClose() @@ -56,11 +56,11 @@ func TestTableAddItemsSerial(t *testing.T) { _ = os.RemoveAll(path) }() - var flushes uint64 + var flushes atomic.Uint64 flushCallback := func() { - atomic.AddUint64(&flushes, 1) + flushes.Add(1) } - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, flushCallback, nil, &isReadOnly) const itemsCount = 10e3 @@ -68,7 +68,7 @@ func TestTableAddItemsSerial(t *testing.T) { // Verify items count after pending items flush. tb.DebugFlush() - if atomic.LoadUint64(&flushes) == 0 { + if flushes.Load() == 0 { t.Fatalf("unexpected zero flushes") } @@ -109,7 +109,7 @@ func TestTableCreateSnapshotAt(t *testing.T) { t.Fatalf("cannot remove %q: %s", path, err) } - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, nil, nil, &isReadOnly) // Write a lot of items into the table, so background merges would start. @@ -185,14 +185,14 @@ func TestTableAddItemsConcurrent(t *testing.T) { _ = os.RemoveAll(path) }() - var flushes uint64 + var flushes atomic.Uint64 flushCallback := func() { - atomic.AddUint64(&flushes, 1) + flushes.Add(1) } prepareBlock := func(data []byte, items []Item) ([]byte, []Item) { return data, items } - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, flushCallback, prepareBlock, &isReadOnly) const itemsCount = 10e3 @@ -200,7 +200,7 @@ func TestTableAddItemsConcurrent(t *testing.T) { // Verify items count after pending items flush. tb.DebugFlush() - if atomic.LoadUint64(&flushes) == 0 { + if flushes.Load() == 0 { t.Fatalf("unexpected zero flushes") } @@ -254,7 +254,7 @@ func testReopenTable(t *testing.T, path string, itemsCount int) { t.Helper() for i := 0; i < 10; i++ { - var isReadOnly uint32 + var isReadOnly atomic.Bool tb := MustOpenTable(path, nil, nil, &isReadOnly) var m TableMetrics tb.UpdateMetrics(&m) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index ec0461d70..884d0c26d 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -132,7 +132,7 @@ func getTagFiltersCacheSize() int { // // The last segment of the path should contain unique hex value which // will be then used as indexDB.generation -func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB { +func mustOpenIndexDB(path string, s *Storage, isReadOnly *atomic.Bool) *indexDB { if s == nil { logger.Panicf("BUG: Storage must be nin-nil") } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 2e939ab1b..788afa4ab 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -519,7 +519,7 @@ func TestIndexDBOpenClose(t *testing.T) { var s Storage tableName := nextIndexDBTableName() for i := 0; i < 5; i++ { - var isReadOnly uint32 + var isReadOnly atomic.Bool db := mustOpenIndexDB(tableName, &s, &isReadOnly) db.MustClose() } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index a81b65f57..32d6fa988 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -604,7 +604,7 @@ func (pt *partition) NotifyReadWriteMode() { func (pt *partition) inmemoryPartsMerger() { for { - if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + if pt.s.isReadOnly.Load() { return } maxOutBytes := pt.getMaxBigPartSize() @@ -637,7 +637,7 @@ func (pt *partition) inmemoryPartsMerger() { func (pt *partition) smallPartsMerger() { for { - if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + if pt.s.isReadOnly.Load() { return } maxOutBytes := pt.getMaxBigPartSize() @@ -670,7 +670,7 @@ func (pt *partition) smallPartsMerger() { func (pt *partition) bigPartsMerger() { for { - if atomic.LoadUint32(&pt.s.isReadOnly) != 0 { + if pt.s.isReadOnly.Load() { return } maxOutBytes := pt.getMaxBigPartSize() diff --git a/lib/storage/storage.go b/lib/storage/storage.go index cb9039c83..1d571d80a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -149,7 +149,8 @@ type Storage struct { deletedMetricIDs atomic.Pointer[uint64set.Set] deletedMetricIDsUpdateLock sync.Mutex - isReadOnly uint32 + // isReadOnly is set to true when the storage is in read-only mode. + isReadOnly atomic.Bool } // MustOpenStorage opens storage on the given path with the given retentionMsecs. @@ -650,7 +651,7 @@ var freeDiskSpaceLimitBytes uint64 // IsReadOnly returns information is storage in read only mode func (s *Storage) IsReadOnly() bool { - return atomic.LoadUint32(&s.isReadOnly) == 1 + return s.isReadOnly.Load() } func (s *Storage) startFreeDiskSpaceWatcher() { @@ -659,18 +660,18 @@ func (s *Storage) startFreeDiskSpaceWatcher() { if freeSpaceBytes < freeDiskSpaceLimitBytes { // Switch the storage to readonly mode if there is no enough free space left at s.path // - // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization + // Use Load in front of CompareAndSwap in order to avoid slow inter-CPU synchronization // when the storage is already in read-only mode. - if atomic.LoadUint32(&s.isReadOnly) == 0 && atomic.CompareAndSwapUint32(&s.isReadOnly, 0, 1) { + if !s.isReadOnly.Load() && s.isReadOnly.CompareAndSwap(false, true) { // log notification only on state change logger.Warnf("switching the storage at %s to read-only mode, since it has less than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", s.path, freeDiskSpaceLimitBytes, freeSpaceBytes) } return } - // Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization + // Use Load in front of CompareAndSwap in order to avoid slow inter-CPU synchronization // when the storage isn't in read-only mode. - if atomic.LoadUint32(&s.isReadOnly) == 1 && atomic.CompareAndSwapUint32(&s.isReadOnly, 1, 0) { + if s.isReadOnly.Load() && s.isReadOnly.CompareAndSwap(true, false) { s.notifyReadWriteMode() logger.Warnf("switching the storage at %s to read-write mode, since it has more than -storage.minFreeDiskSpaceBytes=%d of free space: %d bytes left", s.path, freeDiskSpaceLimitBytes, freeSpaceBytes)