diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go
index 0cf47d962c..9a96c41d96 100644
--- a/lib/logstorage/datadb.go
+++ b/lib/logstorage/datadb.go
@@ -39,12 +39,12 @@ const maxInmemoryPartsPerPartition = 20
 // datadb represents a database with log data
 type datadb struct {
 	// mergeIdx is used for generating unique directory names for parts
-	mergeIdx uint64
+	mergeIdx atomic.Uint64
 
-	inmemoryMergesTotal  uint64
-	inmemoryActiveMerges uint64
-	fileMergesTotal      uint64
-	fileActiveMerges     uint64
+	inmemoryMergesTotal  atomic.Uint64
+	inmemoryActiveMerges atomic.Int64
+	fileMergesTotal      atomic.Uint64
+	fileActiveMerges     atomic.Int64
 
 	// pt is the partition the datadb belongs to
 	pt *partition
@@ -171,12 +171,12 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
 
 	ddb := &datadb{
 		pt:            pt,
-		mergeIdx:      uint64(time.Now().UnixNano()),
 		flushInterval: flushInterval,
 		path:          path,
 		fileParts:     pws,
 		stopCh:        make(chan struct{}),
 	}
+	ddb.mergeIdx.Store(uint64(time.Now().UnixNano()))
 
 	// Start merge workers in the hope they'll merge the remaining parts
 	ddb.partsLock.Lock()
@@ -388,13 +388,13 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
 	}
 
 	if dstPartType == partInmemory {
-		atomic.AddUint64(&ddb.inmemoryMergesTotal, 1)
-		atomic.AddUint64(&ddb.inmemoryActiveMerges, 1)
-		defer atomic.AddUint64(&ddb.inmemoryActiveMerges, ^uint64(0))
+		ddb.inmemoryMergesTotal.Add(1)
+		ddb.inmemoryActiveMerges.Add(1)
+		defer ddb.inmemoryActiveMerges.Add(-1)
 	} else {
-		atomic.AddUint64(&ddb.fileMergesTotal, 1)
-		atomic.AddUint64(&ddb.fileActiveMerges, 1)
-		defer atomic.AddUint64(&ddb.fileActiveMerges, ^uint64(0))
+		ddb.fileMergesTotal.Add(1)
+		ddb.fileActiveMerges.Add(1)
+		defer ddb.fileActiveMerges.Add(-1)
 	}
 
 	// Initialize destination paths.
@@ -489,7 +489,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
 }
 
 func (ddb *datadb) nextMergeIdx() uint64 {
-	return atomic.AddUint64(&ddb.mergeIdx, 1)
+	return ddb.mergeIdx.Add(1)
 }
 
 type partType int
@@ -657,10 +657,10 @@ func (s *DatadbStats) RowsCount() uint64 {
 
 // updateStats updates s with ddb stats
 func (ddb *datadb) updateStats(s *DatadbStats) {
-	s.InmemoryMergesTotal += atomic.LoadUint64(&ddb.inmemoryMergesTotal)
-	s.InmemoryActiveMerges += atomic.LoadUint64(&ddb.inmemoryActiveMerges)
-	s.FileMergesTotal += atomic.LoadUint64(&ddb.fileMergesTotal)
-	s.FileActiveMerges += atomic.LoadUint64(&ddb.fileActiveMerges)
+	s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load()
+	s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load())
+	s.FileMergesTotal += ddb.fileMergesTotal.Load()
+	s.FileActiveMerges += uint64(ddb.fileActiveMerges.Load())
 
 	ddb.partsLock.Lock()
 
@@ -855,7 +855,7 @@ func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) {
 
 func availableDiskSpace(path string) uint64 {
 	available := fs.MustGetFreeSpace(path)
-	reserved := atomic.LoadUint64(&reservedDiskSpace)
+	reserved := reservedDiskSpace.Load()
 	if available < reserved {
 		return 0
 	}
@@ -873,18 +873,18 @@ func tryReserveDiskSpace(path string, n uint64) bool {
 }
 
 func reserveDiskSpace(n uint64) uint64 {
-	return atomic.AddUint64(&reservedDiskSpace, n)
+	return reservedDiskSpace.Add(n)
 }
 
 func releaseDiskSpace(n uint64) {
-	atomic.AddUint64(&reservedDiskSpace, ^(n - 1))
+	reservedDiskSpace.Add(^(n - 1))
 }
 
 // reservedDiskSpace tracks global reserved disk space for currently executed
 // background merges across all the partitions.
 //
 // It should allow avoiding background merges when there is no free disk space.
-var reservedDiskSpace uint64
+var reservedDiskSpace atomic.Uint64
 
 func needStop(stopCh <-chan struct{}) bool {
 	select {
diff --git a/lib/logstorage/hash128_timing_test.go b/lib/logstorage/hash128_timing_test.go
index 7cdccb9631..fa9346ec27 100644
--- a/lib/logstorage/hash128_timing_test.go
+++ b/lib/logstorage/hash128_timing_test.go
@@ -22,8 +22,8 @@ func BenchmarkHash128(b *testing.B) {
 				n += h.lo
 			}
 		}
-		atomic.AddUint64(&GlobalSinkU64, n)
+		GlobalSinkU64.Add(n)
 	})
 }
 
-var GlobalSinkU64 uint64
+var GlobalSinkU64 atomic.Uint64
diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go
index 6591c6fbf6..d717752580 100644
--- a/lib/logstorage/indexdb.go
+++ b/lib/logstorage/indexdb.go
@@ -49,7 +49,11 @@ type IndexdbStats struct {
 
 type indexdb struct {
 	// streamsCreatedTotal is the number of log streams created since the indexdb intialization.
-	streamsCreatedTotal uint64
+	streamsCreatedTotal atomic.Uint64
+
+	// the generation of the streamFilterCache.
+	// It is updated each time new item is added to tb.
+	streamFilterCacheGeneration atomic.Uint32
 
 	// path is the path to indexdb
 	path string
@@ -63,10 +67,6 @@ type indexdb struct {
 	// indexSearchPool is a pool of indexSearch struct for the given indexdb
 	indexSearchPool sync.Pool
 
-	// the generation of the streamFilterCache.
-	// It is updated each time new item is added to tb.
-	streamFilterCacheGeneration uint32
-
 	// s is the storage where indexdb belongs to.
 	s *Storage
 }
@@ -99,7 +99,7 @@ func (idb *indexdb) debugFlush() {
 }
 
 func (idb *indexdb) updateStats(d *IndexdbStats) {
-	d.StreamsCreatedTotal += atomic.LoadUint64(&idb.streamsCreatedTotal)
+	d.StreamsCreatedTotal += idb.streamsCreatedTotal.Load()
 
 	var tm mergeset.TableMetrics
 	idb.tb.UpdateMetrics(&tm)
@@ -476,17 +476,17 @@ func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical [
 	bi.items = items
 	putBatchItems(bi)
 
-	atomic.AddUint64(&idb.streamsCreatedTotal, 1)
+	idb.streamsCreatedTotal.Add(1)
 }
 
 func (idb *indexdb) invalidateStreamFilterCache() {
 	// This function must be fast, since it is called each
 	// time new indexdb entry is added.
-	atomic.AddUint32(&idb.streamFilterCacheGeneration, 1)
+	idb.streamFilterCacheGeneration.Add(1)
 }
 
 func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte {
-	dst = encoding.MarshalUint32(dst, idb.streamFilterCacheGeneration)
+	dst = encoding.MarshalUint32(dst, idb.streamFilterCacheGeneration.Load())
 	dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName))
 	dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs)))
 	for i := range tenantIDs {
diff --git a/lib/logstorage/partition_test.go b/lib/logstorage/partition_test.go
index d11986d873..2bbee20ba0 100644
--- a/lib/logstorage/partition_test.go
+++ b/lib/logstorage/partition_test.go
@@ -133,14 +133,14 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) {
 	pt := mustOpenPartition(s, path)
 
 	const workersCount = 3
-	totalRowsCount := uint64(0)
+	var totalRowsCount atomic.Uint64
 	doneCh := make(chan struct{}, workersCount)
 	for i := 0; i < cap(doneCh); i++ {
 		go func() {
 			for j := 0; j < 7; j++ {
 				lr := newTestLogRows(5, 10, int64(j))
 				pt.mustAddRows(lr)
-				atomic.AddUint64(&totalRowsCount, uint64(len(lr.timestamps)))
+				totalRowsCount.Add(uint64(len(lr.timestamps)))
 			}
 			doneCh <- struct{}{}
 		}()
@@ -157,8 +157,8 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) {
 
 	var ddbStats DatadbStats
 	pt.ddb.updateStats(&ddbStats)
-	if n := ddbStats.RowsCount(); n != totalRowsCount {
-		t.Fatalf("unexpected number of entries; got %d; want %d", n, totalRowsCount)
+	if n := ddbStats.RowsCount(); n != totalRowsCount.Load() {
+		t.Fatalf("unexpected number of entries; got %d; want %d", n, totalRowsCount.Load())
 	}
 
 	mustClosePartition(pt)
diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go
index 4fd40df774..a5ce10739b 100644
--- a/lib/logstorage/storage.go
+++ b/lib/logstorage/storage.go
@@ -70,8 +70,8 @@ type StorageConfig struct {
 
 // Storage is the storage for log entries.
 type Storage struct {
-	rowsDroppedTooBigTimestamp   uint64
-	rowsDroppedTooSmallTimestamp uint64
+	rowsDroppedTooBigTimestamp   atomic.Uint64
+	rowsDroppedTooSmallTimestamp atomic.Uint64
 
 	// path is the path to the Storage directory
 	path string
@@ -442,7 +442,7 @@ func (s *Storage) MustAddRows(lr *LogRows) {
 			tooSmallTimestampLogger.Warnf("skipping log entry with too small timestamp=%s; it must be bigger than %s according "+
 				"to the configured -retentionPeriod=%dd. See https://docs.victoriametrics.com/VictoriaLogs/#retention ; "+
 				"log entry: %s", &tsf, &minAllowedTsf, durationToDays(s.retention), &rf)
-			atomic.AddUint64(&s.rowsDroppedTooSmallTimestamp, 1)
+			s.rowsDroppedTooSmallTimestamp.Add(1)
 			continue
 		}
 		if day > maxAllowedDay {
@@ -452,7 +452,7 @@ func (s *Storage) MustAddRows(lr *LogRows) {
 			tooBigTimestampLogger.Warnf("skipping log entry with too big timestamp=%s; it must be smaller than %s according "+
 				"to the configured -futureRetention=%dd; see https://docs.victoriametrics.com/VictoriaLogs/#retention ; "+
 				"log entry: %s", &tsf, &maxAllowedTsf, durationToDays(s.futureRetention), &rf)
-			atomic.AddUint64(&s.rowsDroppedTooBigTimestamp, 1)
+			s.rowsDroppedTooBigTimestamp.Add(1)
 			continue
 		}
 		lrPart := m[day]
@@ -527,8 +527,8 @@ func (s *Storage) getPartitionForDay(day int64) *partitionWrapper {
 
 // UpdateStats updates ss for the given s.
 func (s *Storage) UpdateStats(ss *StorageStats) {
-	ss.RowsDroppedTooBigTimestamp += atomic.LoadUint64(&s.rowsDroppedTooBigTimestamp)
-	ss.RowsDroppedTooSmallTimestamp += atomic.LoadUint64(&s.rowsDroppedTooSmallTimestamp)
+	ss.RowsDroppedTooBigTimestamp += s.rowsDroppedTooBigTimestamp.Load()
+	ss.RowsDroppedTooSmallTimestamp += s.rowsDroppedTooSmallTimestamp.Load()
 
 	s.partitionsLock.Lock()
 	ss.PartitionsCount += uint64(len(s.partitions))
diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go
index 98b8f902a1..adc29a4bd2 100644
--- a/lib/logstorage/storage_search_test.go
+++ b/lib/logstorage/storage_search_test.go
@@ -110,7 +110,7 @@ func TestStorageRunQuery(t *testing.T) {
 				ProjectID: uint32(10*i + 1),
 			}
 			expectedTenantID := tenantID.String()
-			rowsCount := uint32(0)
+			var rowsCount atomic.Uint32
 			processBlock := func(columns []BlockColumn) {
 				hasTenantIDColumn := false
 				var columnNames []string
@@ -131,41 +131,41 @@ func TestStorageRunQuery(t *testing.T) {
 				if !hasTenantIDColumn {
 					panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames))
 				}
-				atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
+				rowsCount.Add(uint32(len(columns[0].Values)))
 			}
 			tenantIDs := []TenantID{tenantID}
 			s.RunQuery(tenantIDs, q, nil, processBlock)
 
 			expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
-			if rowsCount != uint32(expectedRowsCount) {
-				t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount)
+			if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+				t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
 			}
 		}
 	})
 	t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
 		q := mustParseQuery(`"log message"`)
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(columns []BlockColumn) {
-			atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
+			rowsCount.Add(uint32(len(columns[0].Values)))
 		}
 		s.RunQuery(allTenantIDs, q, nil, processBlock)
 
 		expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("matching-in-filter", func(t *testing.T) {
 		q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`)
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(columns []BlockColumn) {
-			atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
+			rowsCount.Add(uint32(len(columns[0].Values)))
 		}
 		s.RunQuery(allTenantIDs, q, nil, processBlock)
 
 		expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("stream-filter-mismatch", func(t *testing.T) {
@@ -183,7 +183,7 @@ func TestStorageRunQuery(t *testing.T) {
 				ProjectID: 11,
 			}
 			expectedStreamID := fmt.Sprintf("stream_id=%d", i)
-			rowsCount := uint32(0)
+			var rowsCount atomic.Uint32
 			processBlock := func(columns []BlockColumn) {
 				hasStreamIDColumn := false
 				var columnNames []string
@@ -204,14 +204,14 @@ func TestStorageRunQuery(t *testing.T) {
 				if !hasStreamIDColumn {
 					panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames))
 				}
-				atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
+				rowsCount.Add(uint32(len(columns[0].Values)))
 			}
 			tenantIDs := []TenantID{tenantID}
 			s.RunQuery(tenantIDs, q, nil, processBlock)
 
 			expectedRowsCount := blocksPerStream * rowsPerBlock
-			if rowsCount != uint32(expectedRowsCount) {
-				t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, rowsCount, expectedRowsCount)
+			if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+				t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, n, expectedRowsCount)
 			}
 		}
 	})
@@ -221,16 +221,16 @@ func TestStorageRunQuery(t *testing.T) {
 			AccountID: 1,
 			ProjectID: 11,
 		}
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(columns []BlockColumn) {
-			atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
+			rowsCount.Add(uint32(len(columns[0].Values)))
 		}
 		tenantIDs := []TenantID{tenantID}
 		s.RunQuery(tenantIDs, q, nil, processBlock)
 
 		expectedRowsCount := streamsPerTenant * blocksPerStream * 2
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("matching-time-range", func(t *testing.T) {
@@ -241,16 +241,16 @@ func TestStorageRunQuery(t *testing.T) {
 			AccountID: 1,
 			ProjectID: 11,
 		}
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(columns []BlockColumn) {
-			atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
+			rowsCount.Add(uint32(len(columns[0].Values)))
 		}
 		tenantIDs := []TenantID{tenantID}
 		s.RunQuery(tenantIDs, q, nil, processBlock)
 
 		expectedRowsCount := streamsPerTenant * blocksPerStream
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("matching-stream-id-with-time-range", func(t *testing.T) {
@@ -261,16 +261,16 @@ func TestStorageRunQuery(t *testing.T) {
 			AccountID: 1,
 			ProjectID: 11,
 		}
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(columns []BlockColumn) {
-			atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
+			rowsCount.Add(uint32(len(columns[0].Values)))
 		}
 		tenantIDs := []TenantID{tenantID}
 		s.RunQuery(tenantIDs, q, nil, processBlock)
 
 		expectedRowsCount := blocksPerStream
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {
@@ -460,18 +460,18 @@ func TestStorageSearch(t *testing.T) {
 				filter:            f,
 				resultColumnNames: []string{"_msg"},
 			}
-			rowsCount := uint32(0)
+			var rowsCount atomic.Uint32
 			processBlock := func(workerID uint, br *blockResult) {
 				if !br.streamID.tenantID.equal(&tenantID) {
 					panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
 				}
-				atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
+				rowsCount.Add(uint32(br.RowsCount()))
 			}
 			s.search(workersCount, so, nil, processBlock)
 
 			expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
-			if rowsCount != uint32(expectedRowsCount) {
-				t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount)
+			if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+				t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
 			}
 		}
 	})
@@ -484,15 +484,15 @@ func TestStorageSearch(t *testing.T) {
 			filter:            f,
 			resultColumnNames: []string{"_msg"},
 		}
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(workerID uint, br *blockResult) {
-			atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
+			rowsCount.Add(uint32(br.RowsCount()))
 		}
 		s.search(workersCount, so, nil, processBlock)
 
 		expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("stream-filter-mismatch", func(t *testing.T) {
@@ -525,18 +525,18 @@ func TestStorageSearch(t *testing.T) {
 				filter:            f,
 				resultColumnNames: []string{"_msg"},
 			}
-			rowsCount := uint32(0)
+			var rowsCount atomic.Uint32
 			processBlock := func(workerID uint, br *blockResult) {
 				if !br.streamID.tenantID.equal(&tenantID) {
 					panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
 				}
-				atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
+				rowsCount.Add(uint32(br.RowsCount()))
 			}
 			s.search(workersCount, so, nil, processBlock)
 
 			expectedRowsCount := blocksPerStream * rowsPerBlock
-			if rowsCount != uint32(expectedRowsCount) {
-				t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount)
+			if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+				t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
 			}
 		}
 	})
@@ -554,18 +554,18 @@ func TestStorageSearch(t *testing.T) {
 			filter:            f,
 			resultColumnNames: []string{"_msg"},
 		}
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(workerID uint, br *blockResult) {
 			if !br.streamID.tenantID.equal(&tenantID) {
 				panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
 			}
-			atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
+			rowsCount.Add(uint32(br.RowsCount()))
 		}
 		s.search(workersCount, so, nil, processBlock)
 
 		expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("matching-multiple-stream-ids-with-re-filter", func(t *testing.T) {
@@ -591,18 +591,18 @@ func TestStorageSearch(t *testing.T) {
 			filter:            f,
 			resultColumnNames: []string{"_msg"},
 		}
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(workerID uint, br *blockResult) {
 			if !br.streamID.tenantID.equal(&tenantID) {
 				panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
 			}
-			atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
+			rowsCount.Add(uint32(br.RowsCount()))
 		}
 		s.search(workersCount, so, nil, processBlock)
 
 		expectedRowsCount := streamsPerTenant * blocksPerStream * 2
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("matching-stream-id-smaller-time-range", func(t *testing.T) {
@@ -619,15 +619,15 @@ func TestStorageSearch(t *testing.T) {
 			filter:            f,
 			resultColumnNames: []string{"_msg"},
 		}
-		rowsCount := uint32(0)
+		var rowsCount atomic.Uint32
 		processBlock := func(workerID uint, br *blockResult) {
-			atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
+			rowsCount.Add(uint32(br.RowsCount()))
 		}
 		s.search(workersCount, so, nil, processBlock)
 
 		expectedRowsCount := blocksPerStream
-		if rowsCount != uint32(expectedRowsCount) {
-			t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount)
+		if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
+			t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
 		}
 	})
 	t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {