diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 0fc16d434..433b2fc4e 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -53,12 +54,12 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc } if dmis.Has(bsm.Block.bh.TSID.MetricID) { // Skip blocks for deleted metrics. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) continue } if bsm.Block.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. - *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + atomic.AddUint64(rowsDeleted, uint64(bsm.Block.bh.RowsCount)) continue } if pendingBlockIsEmpty { @@ -181,7 +182,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { nextIdx++ } - *rowsDeleted += uint64(nextIdx - b.nextIdx) + atomic.AddUint64(rowsDeleted, uint64(nextIdx - b.nextIdx)) b.nextIdx = nextIdx }