lib/{mergeset,storage}: convert bufferred items to searchable parts more optimally

Do not convert shard items to part when a shard becomes full. Instead, collect multiple
full shards and then convert them to a searchable part at once. This reduces
the number of searchable parts, which, in turn, should increase query performance,
since queries need to scan smaller number of parts.
This commit is contained in:
Aliaksandr Valialkin 2024-02-23 00:02:22 +02:00
parent 225fd781a5
commit 19032f9913
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 161 additions and 53 deletions

View file

@ -149,10 +149,16 @@ type Table struct {
} }
type rawItemsShards struct { type rawItemsShards struct {
// Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures
flushDeadlineMs int64
shardIdx uint32 shardIdx uint32
// shards reduce lock contention when adding rows on multi-CPU systems. // shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawItemsShard shards []rawItemsShard
ibsToFlushLock sync.Mutex
ibsToFlush []*inmemoryBlock
} }
// The number of shards for rawItems per table. // The number of shards for rawItems per table.
@ -179,10 +185,33 @@ func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
for len(items) > 0 { for len(items) > 0 {
n := atomic.AddUint32(&riss.shardIdx, 1) n := atomic.AddUint32(&riss.shardIdx, 1)
idx := n % shardsLen idx := n % shardsLen
items = shards[idx].addItems(tb, items) tailItems, ibsToFlush := shards[idx].addItems(items)
riss.addIbsToFlush(tb, ibsToFlush)
items = tailItems
} }
} }
func (riss *rawItemsShards) addIbsToFlush(tb *Table, ibsToFlush []*inmemoryBlock) {
if len(ibsToFlush) == 0 {
return
}
var ibsToMerge []*inmemoryBlock
riss.ibsToFlushLock.Lock()
if len(riss.ibsToFlush) == 0 {
riss.updateFlushDeadline()
}
riss.ibsToFlush = append(riss.ibsToFlush, ibsToFlush...)
if len(riss.ibsToFlush) >= maxBlocksPerShard * cgroup.AvailableCPUs() {
ibsToMerge = ibsToFlush
riss.ibsToFlush = nil
}
riss.ibsToFlushLock.Unlock()
tb.flushBlocksToInmemoryParts(ibsToMerge, false)
}
func (riss *rawItemsShards) Len() int { func (riss *rawItemsShards) Len() int {
n := 0 n := 0
for i := range riss.shards { for i := range riss.shards {
@ -191,9 +220,13 @@ func (riss *rawItemsShards) Len() int {
return n return n
} }
func (riss *rawItemsShards) updateFlushDeadline() {
atomic.StoreInt64(&riss.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli())
}
type rawItemsShardNopad struct { type rawItemsShardNopad struct {
// Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures
lastFlushTimeMs int64 flushDeadlineMs int64
mu sync.Mutex mu sync.Mutex
ibs []*inmemoryBlock ibs []*inmemoryBlock
@ -217,7 +250,7 @@ func (ris *rawItemsShard) Len() int {
return n return n
} }
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { func (ris *rawItemsShard) addItems(items [][]byte) ([][]byte, []*inmemoryBlock) {
var ibsToFlush []*inmemoryBlock var ibsToFlush []*inmemoryBlock
var tailItems [][]byte var tailItems [][]byte
@ -225,6 +258,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
ibs := ris.ibs ibs := ris.ibs
if len(ibs) == 0 { if len(ibs) == 0 {
ibs = append(ibs, &inmemoryBlock{}) ibs = append(ibs, &inmemoryBlock{})
ris.updateFlushDeadline()
ris.ibs = ibs ris.ibs = ibs
} }
ib := ibs[len(ibs)-1] ib := ibs[len(ibs)-1]
@ -236,7 +270,6 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
ibsToFlush = append(ibsToFlush, ibs...) ibsToFlush = append(ibsToFlush, ibs...)
ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
tailItems = items[i:] tailItems = items[i:]
atomic.StoreInt64(&ris.lastFlushTimeMs, time.Now().UnixMilli())
break break
} }
ib = &inmemoryBlock{} ib = &inmemoryBlock{}
@ -255,9 +288,11 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
ris.ibs = ibs ris.ibs = ibs
ris.mu.Unlock() ris.mu.Unlock()
tb.flushBlocksToInmemoryParts(ibsToFlush, false) return tailItems, ibsToFlush
}
return tailItems func (ris *rawItemsShard) updateFlushDeadline() {
atomic.StoreInt64(&ris.flushDeadlineMs, time.Now().Add(pendingItemsFlushInterval).UnixMilli())
} }
var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second) var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second)
@ -756,19 +791,30 @@ func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) {
func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
var dst []*inmemoryBlock var dst []*inmemoryBlock
for i := range riss.shards {
dst = riss.shards[i].appendBlocksToFlush(dst, isFinal) currentTimeMs := time.Now().UnixMilli()
flushDeadlineMs := atomic.LoadInt64(&riss.flushDeadlineMs)
if isFinal || currentTimeMs >= flushDeadlineMs {
riss.ibsToFlushLock.Lock()
dst = riss.ibsToFlush
riss.ibsToFlush = nil
riss.ibsToFlushLock.Unlock()
} }
for i := range riss.shards {
dst = riss.shards[i].appendBlocksToFlush(dst, currentTimeMs, isFinal)
}
tb.flushBlocksToInmemoryParts(dst, isFinal) tb.flushBlocksToInmemoryParts(dst, isFinal)
} }
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, currentTimeMs int64, isFinal bool) []*inmemoryBlock {
currentTime := time.Now().UnixMilli() flushDeadlineMs := atomic.LoadInt64(&ris.flushDeadlineMs)
lastFlushTime := atomic.LoadInt64(&ris.lastFlushTimeMs) if !isFinal && currentTimeMs < flushDeadlineMs {
if !isFinal && currentTime < lastFlushTime+pendingItemsFlushInterval.Milliseconds() {
// Fast path - nothing to flush // Fast path - nothing to flush
return dst return dst
} }
// Slow path - move ris.ibs to dst // Slow path - move ris.ibs to dst
ris.mu.Lock() ris.mu.Lock()
ibs := ris.ibs ibs := ris.ibs
@ -777,8 +823,8 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool
ibs[i] = nil ibs[i] = nil
} }
ris.ibs = ibs[:0] ris.ibs = ibs[:0]
atomic.StoreInt64(&ris.lastFlushTimeMs, currentTime)
ris.mu.Unlock() ris.mu.Unlock()
return dst return dst
} }

View file

@ -421,10 +421,16 @@ func (pt *partition) AddRows(rows []rawRow) {
var isDebug = false var isDebug = false
type rawRowsShards struct { type rawRowsShards struct {
// Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures:w
flushDeadlineMs int64
shardIdx uint32 shardIdx uint32
// Shards reduce lock contention when adding rows on multi-CPU systems. // Shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawRowsShard shards []rawRowsShard
rowssToFlushLock sync.Mutex
rowssToFlush [][]rawRow
} }
func (rrss *rawRowsShards) init() { func (rrss *rawRowsShards) init() {
@ -437,21 +443,55 @@ func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {
for len(rows) > 0 { for len(rows) > 0 {
n := atomic.AddUint32(&rrss.shardIdx, 1) n := atomic.AddUint32(&rrss.shardIdx, 1)
idx := n % shardsLen idx := n % shardsLen
rows = shards[idx].addRows(pt, rows) tailRows, rowsToFlush := shards[idx].addRows(rows)
rrss.addRowsToFlush(pt, rowsToFlush)
rows = tailRows
} }
} }
func (rrss *rawRowsShards) addRowsToFlush(pt *partition, rowsToFlush []rawRow) {
if len(rowsToFlush) == 0 {
return
}
var rowssToMerge [][]rawRow
rrss.rowssToFlushLock.Lock()
if len(rrss.rowssToFlush) == 0 {
rrss.updateFlushDeadline()
}
rrss.rowssToFlush = append(rrss.rowssToFlush, rowsToFlush)
if len(rrss.rowssToFlush) >= defaultPartsToMerge {
rowssToMerge = rrss.rowssToFlush
rrss.rowssToFlush = nil
}
rrss.rowssToFlushLock.Unlock()
pt.flushRowssToInmemoryParts(rowssToMerge)
}
func (rrss *rawRowsShards) Len() int { func (rrss *rawRowsShards) Len() int {
n := 0 n := 0
for i := range rrss.shards[:] { for i := range rrss.shards[:] {
n += rrss.shards[i].Len() n += rrss.shards[i].Len()
} }
rrss.rowssToFlushLock.Lock()
for _, rows := range rrss.rowssToFlush {
n += len(rows)
}
rrss.rowssToFlushLock.Unlock()
return n return n
} }
func (rrss *rawRowsShards) updateFlushDeadline() {
atomic.StoreInt64(&rrss.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli())
}
type rawRowsShardNopad struct { type rawRowsShardNopad struct {
// Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures // Put flushDeadlineMs to the top in order to avoid unaligned memory access on 32-bit architectures
lastFlushTimeMs int64 flushDeadlineMs int64
mu sync.Mutex mu sync.Mutex
rows []rawRow rows []rawRow
@ -472,59 +512,46 @@ func (rrs *rawRowsShard) Len() int {
return n return n
} }
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { func (rrs *rawRowsShard) addRows(rows []rawRow) ([]rawRow, []rawRow) {
var rowsToFlush []rawRow var rowsToFlush []rawRow
rrs.mu.Lock() rrs.mu.Lock()
if cap(rrs.rows) == 0 { if cap(rrs.rows) == 0 {
rrs.rows = newRawRows() rrs.rows = newRawRows()
} }
if len(rrs.rows) == 0 {
rrs.updateFlushDeadline()
}
n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows) n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows)
rrs.rows = rrs.rows[:len(rrs.rows)+n] rrs.rows = rrs.rows[:len(rrs.rows)+n]
rows = rows[n:] rows = rows[n:]
if len(rows) > 0 { if len(rows) > 0 {
rowsToFlush = rrs.rows rowsToFlush = rrs.rows
rrs.rows = newRawRows() rrs.rows = newRawRows()
rrs.updateFlushDeadline()
n = copy(rrs.rows[:cap(rrs.rows)], rows) n = copy(rrs.rows[:cap(rrs.rows)], rows)
rrs.rows = rrs.rows[:n] rrs.rows = rrs.rows[:n]
rows = rows[n:] rows = rows[n:]
atomic.StoreInt64(&rrs.lastFlushTimeMs, time.Now().UnixMilli())
} }
rrs.mu.Unlock() rrs.mu.Unlock()
pt.flushRowsToInmemoryParts(rowsToFlush) return rows, rowsToFlush
return rows
} }
func newRawRows() []rawRow { func newRawRows() []rawRow {
return make([]rawRow, 0, maxRawRowsPerShard) return make([]rawRow, 0, maxRawRowsPerShard)
} }
func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { func (pt *partition) flushRowssToInmemoryParts(rowss [][]rawRow) {
if len(rows) == 0 { if len(rowss) == 0 {
return return
} }
maxRows := maxRawRowsPerShard // Convert rowss into in-memory parts.
if len(rows) <= maxRows {
// Common case - convert rows to a single in-memory part
pw := pt.createInmemoryPart(rows)
if pw != nil {
pt.addToInmemoryParts(pw)
}
return
}
// Merge rows into in-memory parts.
var pwsLock sync.Mutex var pwsLock sync.Mutex
pws := make([]*partWrapper, 0, (len(rows)+maxRows-1)/maxRows) pws := make([]*partWrapper, 0, len(rowss))
wg := getWaitGroup() wg := getWaitGroup()
for len(rows) > 0 { for _, rows := range rowss {
n := maxRows
if n > len(rows) {
n = len(rows)
}
wg.Add(1) wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{} inmemoryPartsConcurrencyCh <- struct{}{}
go func(rowsChunk []rawRow) { go func(rowsChunk []rawRow) {
@ -539,8 +566,7 @@ func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) {
pws = append(pws, pw) pws = append(pws, pw)
pwsLock.Unlock() pwsLock.Unlock()
} }
}(rows[:n]) }(rows)
rows = rows[n:]
} }
wg.Wait() wg.Wait()
putWaitGroup(wg) putWaitGroup(wg)
@ -1066,26 +1092,62 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) {
} }
func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
var dst []rawRow var dst [][]rawRow
for i := range rrss.shards {
dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal) currentTimeMs := time.Now().UnixMilli()
flushDeadlineMs := atomic.LoadInt64(&rrss.flushDeadlineMs)
if isFinal || currentTimeMs >= flushDeadlineMs {
rrss.rowssToFlushLock.Lock()
dst = rrss.rowssToFlush
rrss.rowssToFlush = nil
rrss.rowssToFlushLock.Unlock()
} }
pt.flushRowsToInmemoryParts(dst)
for i := range rrss.shards {
dst = rrss.shards[i].appendRawRowsToFlush(dst, currentTimeMs, isFinal)
}
pt.flushRowssToInmemoryParts(dst)
} }
func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow { func (rrs *rawRowsShard) appendRawRowsToFlush(dst [][]rawRow, currentTimeMs int64, isFinal bool) [][]rawRow {
currentTime := time.Now().UnixMilli() flushDeadlineMs := atomic.LoadInt64(&rrs.flushDeadlineMs)
lastFlushTime := atomic.LoadInt64(&rrs.lastFlushTimeMs) if !isFinal && currentTimeMs < flushDeadlineMs {
if !isFinal && currentTime < lastFlushTime+pendingRowsFlushInterval.Milliseconds() {
// Fast path - nothing to flush // Fast path - nothing to flush
return dst return dst
} }
// Slow path - move rrs.rows to dst. // Slow path - move rrs.rows to dst.
rrs.mu.Lock() rrs.mu.Lock()
dst = append(dst, rrs.rows...) dst = appendRawRowss(dst, rrs.rows)
rrs.rows = rrs.rows[:0] rrs.rows = rrs.rows[:0]
atomic.StoreInt64(&rrs.lastFlushTimeMs, currentTime)
rrs.mu.Unlock() rrs.mu.Unlock()
return dst
}
func (rrs *rawRowsShard) updateFlushDeadline() {
atomic.StoreInt64(&rrs.flushDeadlineMs, time.Now().Add(pendingRowsFlushInterval).UnixMilli())
}
func appendRawRowss(dst [][]rawRow, src []rawRow) [][]rawRow {
if len(src) == 0 {
return dst
}
if len(dst) == 0 {
dst = append(dst, newRawRows())
}
prows := &dst[len(dst)-1]
n := copy((*prows)[len(*prows):cap(*prows)], src)
*prows = (*prows)[:len(*prows)+n]
src = src[n:]
for len(src) > 0 {
rows := newRawRows()
n := copy(rows[:cap(rows)], src)
rows = rows[:len(rows)+n]
src = src[n:]
dst = append(dst, rows)
}
return dst return dst
} }