diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 898c5175a..a5f3793b7 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -148,10 +148,9 @@ type Table struct { } type rawItemsShards struct { - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + flushDeadlineMs atomic.Int64 - shardIdx uint32 + shardIdx atomic.Uint32 // shards reduce lock contention when adding rows on multi-CPU systems. shards []rawItemsShard @@ -182,7 +181,7 @@ func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) { shards := riss.shards shardsLen := uint32(len(shards)) for len(items) > 0 { - n := atomic.AddUint32(&riss.shardIdx, 1) + n := riss.shardIdx.Add(1) idx := n % shardsLen tailItems, ibsToFlush := shards[idx].addItems(items) riss.addIbsToFlush(tb, ibsToFlush) @@ -220,12 +219,11 @@ func (riss *rawItemsShards) Len() int { } func (riss *rawItemsShards) updateFlushDeadline() { - atomic.StoreInt64(&riss.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) + riss.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli()) } type rawItemsShardNopad struct { - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + flushDeadlineMs atomic.Int64 mu sync.Mutex ibs []*inmemoryBlock @@ -291,7 +289,7 @@ func (ris *rawItemsShard) addItems(items [][]byte) ([][]byte, []*inmemoryBlock) } func (ris *rawItemsShard) updateFlushDeadline() { - atomic.StoreInt64(&ris.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) + ris.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli()) } var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second) @@ -792,7 +790,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { var dst []*inmemoryBlock currentTimeMs := time.Now().UnixMilli() - flushDeadlineMs := atomic.LoadInt64(&riss.flushDeadlineMs) + flushDeadlineMs := riss.flushDeadlineMs.Load() if isFinal || currentTimeMs >= flushDeadlineMs { riss.ibsToFlushLock.Lock() dst = riss.ibsToFlush @@ -808,7 +806,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { } func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, currentTimeMs int64, isFinal bool) []*inmemoryBlock { - flushDeadlineMs := atomic.LoadInt64(&ris.flushDeadlineMs) + flushDeadlineMs := ris.flushDeadlineMs.Load() if !isFinal && currentTimeMs < flushDeadlineMs { // Fast path - nothing to flush return dst diff --git a/lib/storage/partition.go b/lib/storage/partition.go index cadf74258..c48a4f1f4 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -421,11 +421,9 @@ func (pt *partition) AddRows(rows []rawRow) { var isDebug = false type rawRowsShards struct { - // warn: order is important for aligning of 64-bit atomic operations on 32-bit arch - shardIdx uint32 + flushDeadlineMs atomic.Int64 - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + shardIdx atomic.Uint32 // Shards reduce lock contention when adding rows on multi-CPU systems. shards []rawRowsShard @@ -442,7 +440,7 @@ func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) { shards := rrss.shards shardsLen := uint32(len(shards)) for len(rows) > 0 { - n := atomic.AddUint32(&rrss.shardIdx, 1) + n := rrss.shardIdx.Add(1) idx := n % shardsLen tailRows, rowsToFlush := shards[idx].addRows(rows) rrss.addRowsToFlush(pt, rowsToFlush) @@ -487,12 +485,11 @@ func (rrss *rawRowsShards) Len() int { } func (rrss *rawRowsShards) updateFlushDeadline() { - atomic.StoreInt64(&rrss.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) + rrss.flushDeadlineMs.Store(time.Now().Add(pendingRowsFlushInterval).UnixMilli()) } type rawRowsShardNopad struct { - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + flushDeadlineMs atomic.Int64 mu sync.Mutex rows []rawRow @@ -1096,7 +1093,7 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { var dst [][]rawRow currentTimeMs := time.Now().UnixMilli() - flushDeadlineMs := atomic.LoadInt64(&rrss.flushDeadlineMs) + flushDeadlineMs := rrss.flushDeadlineMs.Load() if isFinal || currentTimeMs >= flushDeadlineMs { rrss.rowssToFlushLock.Lock() dst = rrss.rowssToFlush @@ -1112,7 +1109,7 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { } func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow { - flushDeadlineMs := atomic.LoadInt64(&rrs.flushDeadlineMs) + flushDeadlineMs := rrs.flushDeadlineMs.Load() if !isFinal && currentTimeMs < flushDeadlineMs { // Fast path - nothing to flush return dst @@ -1128,7 +1125,7 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int6 } func (rrs *rawRowsShard) updateFlushDeadline() { - atomic.StoreInt64(&rrs.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) + rrs.flushDeadlineMs.Store(time.Now().Add(pendingRowsFlushInterval).UnixMilli()) } func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow {