From fd7c86ae255d63d500e3bb9d95d7dbed7e4e0360 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 21 Oct 2022 14:33:03 +0300 Subject: [PATCH 1/3] lib/{mergeset,storage}: simplify the code a bit after ae55ad8749c3ed9db1cb26438eb92640d15bc694 --- lib/mergeset/table.go | 23 +++++++++++++---------- lib/storage/partition.go | 15 +++++++++------ 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 28e632dd0..25d535d9c 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -644,17 +644,20 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i flushSeconds = 1 } lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime) - if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) { - ris.mu.Lock() - ibs := ris.ibs - dst = append(dst, ibs...) - for i := range ibs { - ibs[i] = nil - } - ris.ibs = ibs[:0] - atomic.StoreUint64(&ris.lastFlushTime, currentTime) - ris.mu.Unlock() + if !isFinal && currentTime <= lastFlushTime+uint64(flushSeconds) { + // Fast path - nothing to flush + return dst } + // Slow path - move ris.ibs to dst + ris.mu.Lock() + ibs := ris.ibs + dst = append(dst, ibs...) + for i := range ibs { + ibs[i] = nil + } + ris.ibs = ibs[:0] + atomic.StoreUint64(&ris.lastFlushTime, currentTime) + ris.mu.Unlock() return dst } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 9e1ec3ed4..6bda9ddc4 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -747,13 +747,16 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFin flushSeconds = 1 } lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime) - if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) { - rrs.mu.Lock() - dst = append(dst, rrs.rows...) - rrs.rows = rrs.rows[:0] - atomic.StoreUint64(&rrs.lastFlushTime, currentTime) - rrs.mu.Unlock() + if !isFinal && currentTime <= lastFlushTime+uint64(flushSeconds) { + // Fast path - nothing to flush + return dst } + // Slow path - move rrs.rows to dst. + rrs.mu.Lock() + dst = append(dst, rrs.rows...) + rrs.rows = rrs.rows[:0] + atomic.StoreUint64(&rrs.lastFlushTime, currentTime) + rrs.mu.Unlock() return dst } From b5674164c6ef0fec847106f4ef0840cef6897f6f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 21 Oct 2022 14:39:27 +0300 Subject: [PATCH 2/3] lib/storage: simplify code a bit after 3f5959c053e61b6a36242525ae3796d837c2358a --- lib/storage/partition.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 6bda9ddc4..30d601311 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -476,17 +476,13 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { n := getMaxRawRowsPerShard() rrs.rows = make([]rawRow, 0, n) } - maxRowsCount := cap(rrs.rows) - capacity := maxRowsCount - len(rrs.rows) - if capacity >= len(rows) { - // Fast path - rows fit rrs.rows capacity. - rrs.rows = append(rrs.rows, rows...) - } else { - // Slow path - rows don't fit rrs.rows capacity. - // Fill rrs.rows with rows until capacity, - // then put rrs.rows to rowsToFlush and convert it to a part. - n := copy(rrs.rows[:cap(rrs.rows)], rows) - rows = rows[n:] + n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows) + rrs.rows = rrs.rows[:len(rrs.rows)+n] + rows = rows[n:] + if len(rows) > 0 { + // Slow path - rows did't fit rrs.rows capacity. + // Convert rrs.rows to rowsToFlush and convert it to a part, + // then try moving the remaining rows to rrs.rows. rowsToFlush = rrs.rows n = getMaxRawRowsPerShard() rrs.rows = make([]rawRow, 0, n) From 4128ad71e24b3ac1391bcbc3f2f04b0674ffefc6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 21 Oct 2022 14:46:06 +0300 Subject: [PATCH 3/3] lib/storage: move common code to newRawRowsBlock() function --- lib/storage/partition.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 30d601311..62f7982cd 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -473,8 +473,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { rrs.mu.Lock() if cap(rrs.rows) == 0 { - n := getMaxRawRowsPerShard() - rrs.rows = make([]rawRow, 0, n) + rrs.rows = newRawRowsBlock() } n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows) rrs.rows = rrs.rows[:len(rrs.rows)+n] @@ -484,8 +483,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { // Convert rrs.rows to rowsToFlush and convert it to a part, // then try moving the remaining rows to rrs.rows. rowsToFlush = rrs.rows - n = getMaxRawRowsPerShard() - rrs.rows = make([]rawRow, 0, n) + rrs.rows = newRawRowsBlock() if len(rows) <= n { rrs.rows = append(rrs.rows[:0], rows...) } else { @@ -500,6 +498,11 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { pt.flushRowsToParts(rowsToFlush) } +func newRawRowsBlock() []rawRow { + n := getMaxRawRowsPerShard() + return make([]rawRow, 0, n) +} + func (pt *partition) flushRowsToParts(rows []rawRow) { maxRows := getMaxRawRowsPerShard() wg := getWaitGroup()