From 4617dc8bbeb7c610fb5b1a37553e75a464eae20f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 23 Feb 2024 23:46:10 +0200 Subject: [PATCH] lib/logstorage: consistently use atomic.* types instead of atomic.* functions on regular types See ea9e2b19a5fa8aecc25c9da10fad8f4c1c58df38 --- lib/logstorage/datadb.go | 42 +++++------ lib/logstorage/hash128_timing_test.go | 4 +- lib/logstorage/indexdb.go | 18 ++--- lib/logstorage/partition_test.go | 8 +- lib/logstorage/storage.go | 12 +-- lib/logstorage/storage_search_test.go | 104 +++++++++++++------------- 6 files changed, 94 insertions(+), 94 deletions(-) diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index 0cf47d962..9a96c41d9 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -39,12 +39,12 @@ const maxInmemoryPartsPerPartition = 20 // datadb represents a database with log data type datadb struct { // mergeIdx is used for generating unique directory names for parts - mergeIdx uint64 + mergeIdx atomic.Uint64 - inmemoryMergesTotal uint64 - inmemoryActiveMerges uint64 - fileMergesTotal uint64 - fileActiveMerges uint64 + inmemoryMergesTotal atomic.Uint64 + inmemoryActiveMerges atomic.Int64 + fileMergesTotal atomic.Uint64 + fileActiveMerges atomic.Int64 // pt is the partition the datadb belongs to pt *partition @@ -171,12 +171,12 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da ddb := &datadb{ pt: pt, - mergeIdx: uint64(time.Now().UnixNano()), flushInterval: flushInterval, path: path, fileParts: pws, stopCh: make(chan struct{}), } + ddb.mergeIdx.Store(uint64(time.Now().UnixNano())) // Start merge workers in the hope they'll merge the remaining parts ddb.partsLock.Lock() @@ -388,13 +388,13 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { } if dstPartType == partInmemory { - atomic.AddUint64(&ddb.inmemoryMergesTotal, 1) - atomic.AddUint64(&ddb.inmemoryActiveMerges, 1) - defer atomic.AddUint64(&ddb.inmemoryActiveMerges, ^uint64(0)) + ddb.inmemoryMergesTotal.Add(1) + ddb.inmemoryActiveMerges.Add(1) + defer ddb.inmemoryActiveMerges.Add(-1) } else { - atomic.AddUint64(&ddb.fileMergesTotal, 1) - atomic.AddUint64(&ddb.fileActiveMerges, 1) - defer atomic.AddUint64(&ddb.fileActiveMerges, ^uint64(0)) + ddb.fileMergesTotal.Add(1) + ddb.fileActiveMerges.Add(1) + defer ddb.fileActiveMerges.Add(-1) } // Initialize destination paths. @@ -489,7 +489,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { } func (ddb *datadb) nextMergeIdx() uint64 { - return atomic.AddUint64(&ddb.mergeIdx, 1) + return ddb.mergeIdx.Add(1) } type partType int @@ -657,10 +657,10 @@ func (s *DatadbStats) RowsCount() uint64 { // updateStats updates s with ddb stats func (ddb *datadb) updateStats(s *DatadbStats) { - s.InmemoryMergesTotal += atomic.LoadUint64(&ddb.inmemoryMergesTotal) - s.InmemoryActiveMerges += atomic.LoadUint64(&ddb.inmemoryActiveMerges) - s.FileMergesTotal += atomic.LoadUint64(&ddb.fileMergesTotal) - s.FileActiveMerges += atomic.LoadUint64(&ddb.fileActiveMerges) + s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load() + s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load()) + s.FileMergesTotal += ddb.fileMergesTotal.Load() + s.FileActiveMerges += uint64(ddb.fileActiveMerges.Load()) ddb.partsLock.Lock() @@ -855,7 +855,7 @@ func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) { func availableDiskSpace(path string) uint64 { available := fs.MustGetFreeSpace(path) - reserved := atomic.LoadUint64(&reservedDiskSpace) + reserved := reservedDiskSpace.Load() if available < reserved { return 0 } @@ -873,18 +873,18 @@ func tryReserveDiskSpace(path string, n uint64) bool { } func reserveDiskSpace(n uint64) uint64 { - return atomic.AddUint64(&reservedDiskSpace, n) + return reservedDiskSpace.Add(n) } func releaseDiskSpace(n uint64) { - atomic.AddUint64(&reservedDiskSpace, ^(n - 1)) + reservedDiskSpace.Add(^(n - 1)) } // reservedDiskSpace tracks global reserved disk space for currently executed // background merges across all the partitions. // // It should allow avoiding background merges when there is no free disk space. -var reservedDiskSpace uint64 +var reservedDiskSpace atomic.Uint64 func needStop(stopCh <-chan struct{}) bool { select { diff --git a/lib/logstorage/hash128_timing_test.go b/lib/logstorage/hash128_timing_test.go index 7cdccb963..fa9346ec2 100644 --- a/lib/logstorage/hash128_timing_test.go +++ b/lib/logstorage/hash128_timing_test.go @@ -22,8 +22,8 @@ func BenchmarkHash128(b *testing.B) { n += h.lo } } - atomic.AddUint64(&GlobalSinkU64, n) + GlobalSinkU64.Add(n) }) } -var GlobalSinkU64 uint64 +var GlobalSinkU64 atomic.Uint64 diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index 6591c6fbf..d71775258 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -49,7 +49,11 @@ type IndexdbStats struct { type indexdb struct { // streamsCreatedTotal is the number of log streams created since the indexdb intialization. - streamsCreatedTotal uint64 + streamsCreatedTotal atomic.Uint64 + + // the generation of the streamFilterCache. + // It is updated each time new item is added to tb. + streamFilterCacheGeneration atomic.Uint32 // path is the path to indexdb path string @@ -63,10 +67,6 @@ type indexdb struct { // indexSearchPool is a pool of indexSearch struct for the given indexdb indexSearchPool sync.Pool - // the generation of the streamFilterCache. - // It is updated each time new item is added to tb. - streamFilterCacheGeneration uint32 - // s is the storage where indexdb belongs to. s *Storage } @@ -99,7 +99,7 @@ func (idb *indexdb) debugFlush() { } func (idb *indexdb) updateStats(d *IndexdbStats) { - d.StreamsCreatedTotal += atomic.LoadUint64(&idb.streamsCreatedTotal) + d.StreamsCreatedTotal += idb.streamsCreatedTotal.Load() var tm mergeset.TableMetrics idb.tb.UpdateMetrics(&tm) @@ -476,17 +476,17 @@ func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical [ bi.items = items putBatchItems(bi) - atomic.AddUint64(&idb.streamsCreatedTotal, 1) + idb.streamsCreatedTotal.Add(1) } func (idb *indexdb) invalidateStreamFilterCache() { // This function must be fast, since it is called each // time new indexdb entry is added. - atomic.AddUint32(&idb.streamFilterCacheGeneration, 1) + idb.streamFilterCacheGeneration.Add(1) } func (idb *indexdb) marshalStreamFilterCacheKey(dst []byte, tenantIDs []TenantID, sf *StreamFilter) []byte { - dst = encoding.MarshalUint32(dst, idb.streamFilterCacheGeneration) + dst = encoding.MarshalUint32(dst, idb.streamFilterCacheGeneration.Load()) dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(idb.partitionName)) dst = encoding.MarshalVarUint64(dst, uint64(len(tenantIDs))) for i := range tenantIDs { diff --git a/lib/logstorage/partition_test.go b/lib/logstorage/partition_test.go index d11986d87..2bbee20ba 100644 --- a/lib/logstorage/partition_test.go +++ b/lib/logstorage/partition_test.go @@ -133,14 +133,14 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) { pt := mustOpenPartition(s, path) const workersCount = 3 - totalRowsCount := uint64(0) + var totalRowsCount atomic.Uint64 doneCh := make(chan struct{}, workersCount) for i := 0; i < cap(doneCh); i++ { go func() { for j := 0; j < 7; j++ { lr := newTestLogRows(5, 10, int64(j)) pt.mustAddRows(lr) - atomic.AddUint64(&totalRowsCount, uint64(len(lr.timestamps))) + totalRowsCount.Add(uint64(len(lr.timestamps))) } doneCh <- struct{}{} }() @@ -157,8 +157,8 @@ func TestPartitionMustAddRowsConcurrent(t *testing.T) { var ddbStats DatadbStats pt.ddb.updateStats(&ddbStats) - if n := ddbStats.RowsCount(); n != totalRowsCount { - t.Fatalf("unexpected number of entries; got %d; want %d", n, totalRowsCount) + if n := ddbStats.RowsCount(); n != totalRowsCount.Load() { + t.Fatalf("unexpected number of entries; got %d; want %d", n, totalRowsCount.Load()) } mustClosePartition(pt) diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index 4fd40df77..a5ce10739 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -70,8 +70,8 @@ type StorageConfig struct { // Storage is the storage for log entries. type Storage struct { - rowsDroppedTooBigTimestamp uint64 - rowsDroppedTooSmallTimestamp uint64 + rowsDroppedTooBigTimestamp atomic.Uint64 + rowsDroppedTooSmallTimestamp atomic.Uint64 // path is the path to the Storage directory path string @@ -442,7 +442,7 @@ func (s *Storage) MustAddRows(lr *LogRows) { tooSmallTimestampLogger.Warnf("skipping log entry with too small timestamp=%s; it must be bigger than %s according "+ "to the configured -retentionPeriod=%dd. See https://docs.victoriametrics.com/VictoriaLogs/#retention ; "+ "log entry: %s", &tsf, &minAllowedTsf, durationToDays(s.retention), &rf) - atomic.AddUint64(&s.rowsDroppedTooSmallTimestamp, 1) + s.rowsDroppedTooSmallTimestamp.Add(1) continue } if day > maxAllowedDay { @@ -452,7 +452,7 @@ func (s *Storage) MustAddRows(lr *LogRows) { tooBigTimestampLogger.Warnf("skipping log entry with too big timestamp=%s; it must be smaller than %s according "+ "to the configured -futureRetention=%dd; see https://docs.victoriametrics.com/VictoriaLogs/#retention ; "+ "log entry: %s", &tsf, &maxAllowedTsf, durationToDays(s.futureRetention), &rf) - atomic.AddUint64(&s.rowsDroppedTooBigTimestamp, 1) + s.rowsDroppedTooBigTimestamp.Add(1) continue } lrPart := m[day] @@ -527,8 +527,8 @@ func (s *Storage) getPartitionForDay(day int64) *partitionWrapper { // UpdateStats updates ss for the given s. func (s *Storage) UpdateStats(ss *StorageStats) { - ss.RowsDroppedTooBigTimestamp += atomic.LoadUint64(&s.rowsDroppedTooBigTimestamp) - ss.RowsDroppedTooSmallTimestamp += atomic.LoadUint64(&s.rowsDroppedTooSmallTimestamp) + ss.RowsDroppedTooBigTimestamp += s.rowsDroppedTooBigTimestamp.Load() + ss.RowsDroppedTooSmallTimestamp += s.rowsDroppedTooSmallTimestamp.Load() s.partitionsLock.Lock() ss.PartitionsCount += uint64(len(s.partitions)) diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 98b8f902a..adc29a4bd 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -110,7 +110,7 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: uint32(10*i + 1), } expectedTenantID := tenantID.String() - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(columns []BlockColumn) { hasTenantIDColumn := false var columnNames []string @@ -131,41 +131,41 @@ func TestStorageRunQuery(t *testing.T) { if !hasTenantIDColumn { panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames)) } - atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + rowsCount.Add(uint32(len(columns[0].Values))) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } } }) t.Run("matching-multiple-tenant-ids", func(t *testing.T) { q := mustParseQuery(`"log message"`) - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(columns []BlockColumn) { - atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + rowsCount.Add(uint32(len(columns[0].Values))) } s.RunQuery(allTenantIDs, q, nil, processBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-in-filter", func(t *testing.T) { q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`) - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(columns []BlockColumn) { - atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + rowsCount.Add(uint32(len(columns[0].Values))) } s.RunQuery(allTenantIDs, q, nil, processBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("stream-filter-mismatch", func(t *testing.T) { @@ -183,7 +183,7 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } expectedStreamID := fmt.Sprintf("stream_id=%d", i) - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(columns []BlockColumn) { hasStreamIDColumn := false var columnNames []string @@ -204,14 +204,14 @@ func TestStorageRunQuery(t *testing.T) { if !hasStreamIDColumn { panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames)) } - atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + rowsCount.Add(uint32(len(columns[0].Values))) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, n, expectedRowsCount) } } }) @@ -221,16 +221,16 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(columns []BlockColumn) { - atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + rowsCount.Add(uint32(len(columns[0].Values))) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * 2 - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-time-range", func(t *testing.T) { @@ -241,16 +241,16 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(columns []BlockColumn) { - atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + rowsCount.Add(uint32(len(columns[0].Values))) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-stream-id-with-time-range", func(t *testing.T) { @@ -261,16 +261,16 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(columns []BlockColumn) { - atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values))) + rowsCount.Add(uint32(len(columns[0].Values))) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := blocksPerStream - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-stream-id-missing-time-range", func(t *testing.T) { @@ -460,18 +460,18 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + rowsCount.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } } }) @@ -484,15 +484,15 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(workerID uint, br *blockResult) { - atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + rowsCount.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of matching rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("stream-filter-mismatch", func(t *testing.T) { @@ -525,18 +525,18 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + rowsCount.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } } }) @@ -554,18 +554,18 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + rowsCount.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-multiple-stream-ids-with-re-filter", func(t *testing.T) { @@ -591,18 +591,18 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(workerID uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + rowsCount.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * 2 - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-stream-id-smaller-time-range", func(t *testing.T) { @@ -619,15 +619,15 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - rowsCount := uint32(0) + var rowsCount atomic.Uint32 processBlock := func(workerID uint, br *blockResult) { - atomic.AddUint32(&rowsCount, uint32(br.RowsCount())) + rowsCount.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := blocksPerStream - if rowsCount != uint32(expectedRowsCount) { - t.Fatalf("unexpected number of rows; got %d; want %d", rowsCount, expectedRowsCount) + if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {