lib/storage: do not keep rawRows buffer across flush() calls

The buffer can be quite big under high ingestion rate (e.g. more than 100MB).
This leads to increased memory usage between buffer flushes.
So it is better to re-create the buffer on every flush in order to reduce memory usage
between buffer flushes.
This commit is contained in:
Aliaksandr Valialkin 2024-02-22 17:22:23 +02:00
parent 843f3ec94e
commit bf07e2ac87
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 18 additions and 21 deletions

View file

@ -255,7 +255,9 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
ris.ibs = ibs
ris.mu.Unlock()
tb.flushBlocksToInmemoryParts(ibsToFlush, false)
if len(ibsToFlush) > 0 {
tb.flushBlocksToInmemoryParts(ibsToFlush, false)
}
return tailItems
}
@ -689,7 +691,7 @@ func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error {
// This function is for debugging and testing purposes only,
// since it may slow down data ingestion when used frequently.
func (tb *Table) DebugFlush() {
tb.flushPendingItems(nil, true)
tb.flushPendingItems(true)
// Wait for background flushers to finish.
tb.flushPendingItemsWG.Wait()
@ -699,13 +701,12 @@ func (tb *Table) pendingItemsFlusher() {
d := timeutil.AddJitterToDuration(pendingItemsFlushInterval)
ticker := time.NewTicker(d)
defer ticker.Stop()
var ibs []*inmemoryBlock
for {
select {
case <-tb.stopCh:
return
case <-ticker.C:
ibs = tb.flushPendingItems(ibs[:0], false)
tb.flushPendingItems(false)
}
}
}
@ -724,15 +725,14 @@ func (tb *Table) inmemoryPartsFlusher() {
}
}
func (tb *Table) flushPendingItems(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock {
func (tb *Table) flushPendingItems(isFinal bool) {
tb.flushPendingItemsWG.Add(1)
dst = tb.rawItems.flush(tb, dst, isFinal)
tb.rawItems.flush(tb, isFinal)
tb.flushPendingItemsWG.Done()
return dst
}
func (tb *Table) flushInmemoryItemsToFiles() {
tb.flushPendingItems(nil, true)
tb.flushPendingItems(true)
tb.flushInmemoryPartsToFiles(true)
}
@ -754,12 +754,12 @@ func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) {
}
}
func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock {
func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
var dst []*inmemoryBlock
for i := range riss.shards {
dst = riss.shards[i].appendBlocksToFlush(dst, isFinal)
}
tb.flushBlocksToInmemoryParts(dst, isFinal)
return dst
}
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock {

View file

@ -1058,23 +1058,22 @@ func (pt *partition) pendingRowsFlusher() {
d := timeutil.AddJitterToDuration(pendingRowsFlushInterval)
ticker := time.NewTicker(d)
defer ticker.Stop()
var rows []rawRow
for {
select {
case <-pt.stopCh:
return
case <-ticker.C:
rows = pt.flushPendingRows(rows[:0], false)
pt.flushPendingRows(false)
}
}
}
func (pt *partition) flushPendingRows(dst []rawRow, isFinal bool) []rawRow {
return pt.rawRows.flush(pt, dst, isFinal)
func (pt *partition) flushPendingRows(isFinal bool) {
pt.rawRows.flush(pt, isFinal)
}
func (pt *partition) flushInmemoryRowsToFiles() {
pt.flushPendingRows(nil, true)
pt.flushPendingRows(true)
pt.flushInmemoryPartsToFiles(true)
}
@ -1096,12 +1095,12 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) {
}
}
func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []rawRow {
func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
var dst []rawRow
for i := range rrss.shards {
dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal)
}
pt.flushRowsToInmemoryParts(dst)
return dst
}
func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow {

View file

@ -172,12 +172,11 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt := mustCreatePartition(ptt, "small-table", "big-table", strg)
smallPartsPath := pt.smallPartsPath
bigPartsPath := pt.bigPartsPath
var tmpRows []rawRow
for _, rows := range rowss {
pt.AddRows(rows)
// Flush just added rows to a separate partitions.
tmpRows = pt.flushPendingRows(tmpRows[:0], true)
pt.flushPendingRows(true)
}
testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1)
pt.MustClose()

View file

@ -199,9 +199,8 @@ func (tb *table) flushPendingRows() {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
var rows []rawRow
for _, ptw := range ptws {
rows = ptw.pt.flushPendingRows(rows[:0], true)
ptw.pt.flushPendingRows(true)
}
}