From 1eb3346eccfcc573563b17a03b536add1fb9e3dc Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 23 Feb 2024 22:27:03 +0200 Subject: [PATCH] lib/{storage,mergeset}: properly fix 'unaligned 64-bit atomic operation' panic on 32-bit architectures The issue has been introduced in bace9a25011d9175f9fa6861e129f8ed8f9e0521 The improper fix was in the d4c0615dcd7177224138fbb6d0023b3678ae6262 , since it fixed the issue just by an accident, because Go comiler aligned the rawRowsShards field by 4-byte boundary inside partition struct. The proper fix is to use atomic.Int64 field - this guarantees that the access to this field won't result in unaligned 64-bit atomic operation. See https://github.com/golang/go/issues/50860 and https://github.com/golang/go/issues/19057 --- lib/mergeset/table.go | 18 ++++++++---------- lib/storage/partition.go | 19 ++++++++----------- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 898c5175a..a5f3793b7 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -148,10 +148,9 @@ type Table struct { } type rawItemsShards struct { - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + flushDeadlineMs atomic.Int64 - shardIdx uint32 + shardIdx atomic.Uint32 // shards reduce lock contention when adding rows on multi-CPU systems. shards []rawItemsShard @@ -182,7 +181,7 @@ func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) { shards := riss.shards shardsLen := uint32(len(shards)) for len(items) > 0 { - n := atomic.AddUint32(&riss.shardIdx, 1) + n := riss.shardIdx.Add(1) idx := n % shardsLen tailItems, ibsToFlush := shards[idx].addItems(items) riss.addIbsToFlush(tb, ibsToFlush) @@ -220,12 +219,11 @@ func (riss *rawItemsShards) Len() int { } func (riss *rawItemsShards) updateFlushDeadline() { - atomic.StoreInt64(&riss.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) + riss.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli()) } type rawItemsShardNopad struct { - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + flushDeadlineMs atomic.Int64 mu sync.Mutex ibs []*inmemoryBlock @@ -291,7 +289,7 @@ func (ris *rawItemsShard) addItems(items [][]byte) ([][]byte, []*inmemoryBlock) } func (ris *rawItemsShard) updateFlushDeadline() { - atomic.StoreInt64(&ris.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) + ris.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli()) } var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second) @@ -792,7 +790,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { var dst []*inmemoryBlock currentTimeMs := time.Now().UnixMilli() - flushDeadlineMs := atomic.LoadInt64(&riss.flushDeadlineMs) + flushDeadlineMs := riss.flushDeadlineMs.Load() if isFinal || currentTimeMs >= flushDeadlineMs { riss.ibsToFlushLock.Lock() dst = riss.ibsToFlush @@ -808,7 +806,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { } func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, currentTimeMs int64, isFinal bool) []*inmemoryBlock { - flushDeadlineMs := atomic.LoadInt64(&ris.flushDeadlineMs) + flushDeadlineMs := ris.flushDeadlineMs.Load() if !isFinal && currentTimeMs < flushDeadlineMs { // Fast path - nothing to flush return dst diff --git a/lib/storage/partition.go b/lib/storage/partition.go index cadf74258..c48a4f1f4 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -421,11 +421,9 @@ func (pt *partition) AddRows(rows []rawRow) { var isDebug = false type rawRowsShards struct { - // warn: order is important for aligning of 64-bit atomic operations on 32-bit arch - shardIdx uint32 + flushDeadlineMs atomic.Int64 - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + shardIdx atomic.Uint32 // Shards reduce lock contention when adding rows on multi-CPU systems. shards []rawRowsShard @@ -442,7 +440,7 @@ func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) { shards := rrss.shards shardsLen := uint32(len(shards)) for len(rows) > 0 { - n := atomic.AddUint32(&rrss.shardIdx, 1) + n := rrss.shardIdx.Add(1) idx := n % shardsLen tailRows, rowsToFlush := shards[idx].addRows(rows) rrss.addRowsToFlush(pt, rowsToFlush) @@ -487,12 +485,11 @@ func (rrss *rawRowsShards) Len() int { } func (rrss *rawRowsShards) updateFlushDeadline() { - atomic.StoreInt64(&rrss.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) + rrss.flushDeadlineMs.Store(time.Now().Add(pendingRowsFlushInterval).UnixMilli()) } type rawRowsShardNopad struct { - // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures - flushDeadlineMs int64 + flushDeadlineMs atomic.Int64 mu sync.Mutex rows []rawRow @@ -1096,7 +1093,7 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { var dst [][]rawRow currentTimeMs := time.Now().UnixMilli() - flushDeadlineMs := atomic.LoadInt64(&rrss.flushDeadlineMs) + flushDeadlineMs := rrss.flushDeadlineMs.Load() if isFinal || currentTimeMs >= flushDeadlineMs { rrss.rowssToFlushLock.Lock() dst = rrss.rowssToFlush @@ -1112,7 +1109,7 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { } func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow { - flushDeadlineMs := atomic.LoadInt64(&rrs.flushDeadlineMs) + flushDeadlineMs := rrs.flushDeadlineMs.Load() if !isFinal && currentTimeMs < flushDeadlineMs { // Fast path - nothing to flush return dst @@ -1128,7 +1125,7 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int6 } func (rrs *rawRowsShard) updateFlushDeadline() { - atomic.StoreInt64(&rrs.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) + rrs.flushDeadlineMs.Store(time.Now().Add(pendingRowsFlushInterval).UnixMilli()) } func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow {