lib/storage: try generating initial parts from inmemory rows with identical sizes under high ingestion rate

This should improve background merge rate under high load a bit
This commit is contained in:
Aliaksandr Valialkin 2022-10-20 23:26:49 +03:00
parent 4d71023eb9
commit edf3b7be47
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -479,14 +479,24 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
maxRowsCount := cap(rrs.rows) maxRowsCount := cap(rrs.rows)
capacity := maxRowsCount - len(rrs.rows) capacity := maxRowsCount - len(rrs.rows)
if capacity >= len(rows) { if capacity >= len(rows) {
// Fast path - rows fit capacity. // Fast path - rows fit rrs.rows capacity.
rrs.rows = append(rrs.rows, rows...) rrs.rows = append(rrs.rows, rows...)
} else { } else {
// Slow path - rows don't fit capacity. // Slow path - rows don't fit rrs.rows capacity.
// Put rrs.rows and rows to rowsToFlush and convert it to a part. // Fill rrs.rows with rows until capacity,
rowsToFlush = append(rowsToFlush, rrs.rows...) // then put rrs.rows to rowsToFlush and convert it to a part.
rowsToFlush = append(rowsToFlush, rows...) n := copy(rrs.rows[:cap(rrs.rows)], rows)
rrs.rows = rrs.rows[:0] rows = rows[n:]
rowsToFlush = rrs.rows
n = getMaxRawRowsPerShard()
rrs.rows = make([]rawRow, 0, n)
if len(rows) <= n {
rrs.rows = append(rrs.rows[:0], rows...)
} else {
// The slowest path - rows do not fit rrs.rows capacity.
// So append them directly to rowsToFlush.
rowsToFlush = append(rowsToFlush, rows...)
}
atomic.StoreUint64(&rrs.lastFlushTime, fasttime.UnixTimestamp()) atomic.StoreUint64(&rrs.lastFlushTime, fasttime.UnixTimestamp())
} }
rrs.mu.Unlock() rrs.mu.Unlock()