mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{storage,mergeset}: do not hold per-shard lock in fast path when adding per-shard items to the flush list
This commit is contained in:
parent
c4a3d8b169
commit
68f3a02589
2 changed files with 12 additions and 13 deletions
|
@ -204,7 +204,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
|
|||
ibs[i] = nil
|
||||
}
|
||||
ris.ibs = ibs[:0]
|
||||
ris.lastFlushTime = fasttime.UnixTimestamp()
|
||||
atomic.StoreUint64(&ris.lastFlushTime, fasttime.UnixTimestamp())
|
||||
}
|
||||
ris.mu.Unlock()
|
||||
|
||||
|
@ -632,19 +632,18 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i
|
|||
if flushSeconds <= 0 {
|
||||
flushSeconds = 1
|
||||
}
|
||||
|
||||
ris.mu.Lock()
|
||||
if isFinal || currentTime-ris.lastFlushTime > uint64(flushSeconds) {
|
||||
lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime)
|
||||
if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) {
|
||||
ris.mu.Lock()
|
||||
ibs := ris.ibs
|
||||
dst = append(dst, ibs...)
|
||||
for i := range ibs {
|
||||
ibs[i] = nil
|
||||
}
|
||||
ris.ibs = ibs[:0]
|
||||
ris.lastFlushTime = currentTime
|
||||
atomic.StoreUint64(&ris.lastFlushTime, currentTime)
|
||||
ris.mu.Unlock()
|
||||
}
|
||||
ris.mu.Unlock()
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
|
|
|
@ -477,7 +477,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
|
|||
rowsToFlush = append(rowsToFlush, rrs.rows...)
|
||||
rowsToFlush = append(rowsToFlush, rows...)
|
||||
rrs.rows = rrs.rows[:0]
|
||||
rrs.lastFlushTime = fasttime.UnixTimestamp()
|
||||
atomic.StoreUint64(&rrs.lastFlushTime, fasttime.UnixTimestamp())
|
||||
}
|
||||
rrs.mu.Unlock()
|
||||
|
||||
|
@ -726,14 +726,14 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFin
|
|||
if flushSeconds <= 0 {
|
||||
flushSeconds = 1
|
||||
}
|
||||
|
||||
rrs.mu.Lock()
|
||||
if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) {
|
||||
lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime)
|
||||
if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) {
|
||||
rrs.mu.Lock()
|
||||
dst = append(dst, rrs.rows...)
|
||||
rrs.rows = rrs.rows[:0]
|
||||
atomic.StoreUint64(&rrs.lastFlushTime, currentTime)
|
||||
rrs.mu.Unlock()
|
||||
}
|
||||
rrs.mu.Unlock()
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue