diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index ed44357f7..043260961 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -204,7 +204,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error { ibs[i] = nil } ris.ibs = ibs[:0] - ris.lastFlushTime = fasttime.UnixTimestamp() + atomic.StoreUint64(&ris.lastFlushTime, fasttime.UnixTimestamp()) } ris.mu.Unlock() @@ -632,19 +632,18 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i if flushSeconds <= 0 { flushSeconds = 1 } - - ris.mu.Lock() - if isFinal || currentTime-ris.lastFlushTime > uint64(flushSeconds) { + lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime) + if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) { + ris.mu.Lock() ibs := ris.ibs dst = append(dst, ibs...) for i := range ibs { ibs[i] = nil } ris.ibs = ibs[:0] - ris.lastFlushTime = currentTime + atomic.StoreUint64(&ris.lastFlushTime, currentTime) + ris.mu.Unlock() } - ris.mu.Unlock() - return dst } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index d7af8a17a..5687f23ba 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -477,7 +477,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { rowsToFlush = append(rowsToFlush, rrs.rows...) rowsToFlush = append(rowsToFlush, rows...) rrs.rows = rrs.rows[:0] - rrs.lastFlushTime = fasttime.UnixTimestamp() + atomic.StoreUint64(&rrs.lastFlushTime, fasttime.UnixTimestamp()) } rrs.mu.Unlock() @@ -726,14 +726,14 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFin if flushSeconds <= 0 { flushSeconds = 1 } - - rrs.mu.Lock() - if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) { + lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime) + if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) { + rrs.mu.Lock() dst = append(dst, rrs.rows...) rrs.rows = rrs.rows[:0] + atomic.StoreUint64(&rrs.lastFlushTime, currentTime) + rrs.mu.Unlock() } - rrs.mu.Unlock() - return dst }