lib/{storage,mergeset}: properly fix 'unaligned 64-bit atomic operation' panic on 32-bit architectures

The issue has been introduced in bace9a2501
The improper fix was in the d4c0615dcd ,
since it fixed the issue just by an accident, because Go comiler aligned the rawRowsShards field
by 4-byte boundary inside partition struct.

The proper fix is to use atomic.Int64 field - this guarantees that the access to this field
won't result in unaligned 64-bit atomic operation. See https://github.com/golang/go/issues/50860
and https://github.com/golang/go/issues/19057
This commit is contained in:
Aliaksandr Valialkin 2024-02-23 22:27:03 +02:00
parent cf94522389
commit ea9e2b19a5
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 16 additions and 21 deletions

View file

@ -148,10 +148,9 @@ type Table struct {
} }
type rawItemsShards struct { type rawItemsShards struct {
// Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures flushDeadlineMs atomic.Int64
flushDeadlineMs int64
shardIdx uint32 shardIdx atomic.Uint32
// shards reduce lock contention when adding rows on multi-CPU systems. // shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawItemsShard shards []rawItemsShard
@ -182,7 +181,7 @@ func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
shards := riss.shards shards := riss.shards
shardsLen := uint32(len(shards)) shardsLen := uint32(len(shards))
for len(items) > 0 { for len(items) > 0 {
n := atomic.AddUint32(&riss.shardIdx, 1) n := riss.shardIdx.Add(1)
idx := n % shardsLen idx := n % shardsLen
tailItems, ibsToFlush := shards[idx].addItems(items) tailItems, ibsToFlush := shards[idx].addItems(items)
riss.addIbsToFlush(tb, ibsToFlush) riss.addIbsToFlush(tb, ibsToFlush)
@ -220,12 +219,11 @@ func (riss *rawItemsShards) Len() int {
} }
func (riss *rawItemsShards) updateFlushDeadline() { func (riss *rawItemsShards) updateFlushDeadline() {
atomic.StoreInt64(&riss.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) riss.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli())
} }
type rawItemsShardNopad struct { type rawItemsShardNopad struct {
// Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures flushDeadlineMs atomic.Int64
flushDeadlineMs int64
mu sync.Mutex mu sync.Mutex
ibs []*inmemoryBlock ibs []*inmemoryBlock
@ -291,7 +289,7 @@ func (ris *rawItemsShard) addItems(items [][]byte) ([][]byte, []*inmemoryBlock)
} }
func (ris *rawItemsShard) updateFlushDeadline() { func (ris *rawItemsShard) updateFlushDeadline() {
atomic.StoreInt64(&ris.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli()) ris.flushDeadlineMs.Store(time.Now().Add(pendingItemsFlushInterval).UnixMilli())
} }
var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second) var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second)
@ -792,7 +790,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
var dst []*inmemoryBlock var dst []*inmemoryBlock
currentTimeMs := time.Now().UnixMilli() currentTimeMs := time.Now().UnixMilli()
flushDeadlineMs := atomic.LoadInt64(&riss.flushDeadlineMs) flushDeadlineMs := riss.flushDeadlineMs.Load()
if isFinal || currentTimeMs >= flushDeadlineMs { if isFinal || currentTimeMs >= flushDeadlineMs {
riss.ibsToFlushLock.Lock() riss.ibsToFlushLock.Lock()
dst = riss.ibsToFlush dst = riss.ibsToFlush
@ -808,7 +806,7 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
} }
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, currentTimeMs int64, isFinal bool) []*inmemoryBlock { func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, currentTimeMs int64, isFinal bool) []*inmemoryBlock {
flushDeadlineMs := atomic.LoadInt64(&ris.flushDeadlineMs) flushDeadlineMs := ris.flushDeadlineMs.Load()
if !isFinal && currentTimeMs < flushDeadlineMs { if !isFinal && currentTimeMs < flushDeadlineMs {
// Fast path - nothing to flush // Fast path - nothing to flush
return dst return dst

View file

@ -421,11 +421,9 @@ func (pt *partition) AddRows(rows []rawRow) {
var isDebug = false var isDebug = false
type rawRowsShards struct { type rawRowsShards struct {
// warn: order is important for aligning of 64-bit atomic operations on 32-bit arch flushDeadlineMs atomic.Int64
shardIdx uint32
// Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures shardIdx atomic.Uint32
flushDeadlineMs int64
// Shards reduce lock contention when adding rows on multi-CPU systems. // Shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawRowsShard shards []rawRowsShard
@ -442,7 +440,7 @@ func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {
shards := rrss.shards shards := rrss.shards
shardsLen := uint32(len(shards)) shardsLen := uint32(len(shards))
for len(rows) > 0 { for len(rows) > 0 {
n := atomic.AddUint32(&rrss.shardIdx, 1) n := rrss.shardIdx.Add(1)
idx := n % shardsLen idx := n % shardsLen
tailRows, rowsToFlush := shards[idx].addRows(rows) tailRows, rowsToFlush := shards[idx].addRows(rows)
rrss.addRowsToFlush(pt, rowsToFlush) rrss.addRowsToFlush(pt, rowsToFlush)
@ -487,12 +485,11 @@ func (rrss *rawRowsShards) Len() int {
} }
func (rrss *rawRowsShards) updateFlushDeadline() { func (rrss *rawRowsShards) updateFlushDeadline() {
atomic.StoreInt64(&rrss.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) rrss.flushDeadlineMs.Store(time.Now().Add(pendingRowsFlushInterval).UnixMilli())
} }
type rawRowsShardNopad struct { type rawRowsShardNopad struct {
// Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures flushDeadlineMs atomic.Int64
flushDeadlineMs int64
mu sync.Mutex mu sync.Mutex
rows []rawRow rows []rawRow
@ -1096,7 +1093,7 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
var dst [][]rawRow var dst [][]rawRow
currentTimeMs := time.Now().UnixMilli() currentTimeMs := time.Now().UnixMilli()
flushDeadlineMs := atomic.LoadInt64(&rrss.flushDeadlineMs) flushDeadlineMs := rrss.flushDeadlineMs.Load()
if isFinal || currentTimeMs >= flushDeadlineMs { if isFinal || currentTimeMs >= flushDeadlineMs {
rrss.rowssToFlushLock.Lock() rrss.rowssToFlushLock.Lock()
dst = rrss.rowssToFlush dst = rrss.rowssToFlush
@ -1112,7 +1109,7 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
} }
func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow { func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow {
flushDeadlineMs := atomic.LoadInt64(&rrs.flushDeadlineMs) flushDeadlineMs := rrs.flushDeadlineMs.Load()
if !isFinal && currentTimeMs < flushDeadlineMs { if !isFinal && currentTimeMs < flushDeadlineMs {
// Fast path - nothing to flush // Fast path - nothing to flush
return dst return dst
@ -1128,7 +1125,7 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int6
} }
func (rrs *rawRowsShard) updateFlushDeadline() { func (rrs *rawRowsShard) updateFlushDeadline() {
atomic.StoreInt64(&rrs.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli()) rrs.flushDeadlineMs.Store(time.Now().Add(pendingRowsFlushInterval).UnixMilli())
} }
func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow { func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow {