From b7dfe9894c0100ce016d12ec5fc3128b334eefaa Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 22 Feb 2024 17:22:23 +0200 Subject: [PATCH] lib/storage: do not keep rawRows buffer across flush() calls The buffer can be quite big under high ingestion rate (e.g. more than 100MB). This leads to increased memory usage between buffer flushes. So it is better to re-create the buffer on every flush in order to reduce memory usage between buffer flushes. --- lib/mergeset/table.go | 20 ++++++++++---------- lib/storage/partition.go | 13 ++++++------- lib/storage/partition_search_test.go | 3 +-- lib/storage/table.go | 3 +-- 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a7bf1f78e..f55a4730e 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -255,7 +255,9 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ris.ibs = ibs ris.mu.Unlock() - tb.flushBlocksToInmemoryParts(ibsToFlush, false) + if len(ibsToFlush) > 0 { + tb.flushBlocksToInmemoryParts(ibsToFlush, false) + } return tailItems } @@ -689,7 +691,7 @@ func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error { // This function is for debugging and testing purposes only, // since it may slow down data ingestion when used frequently. func (tb *Table) DebugFlush() { - tb.flushPendingItems(nil, true) + tb.flushPendingItems(true) // Wait for background flushers to finish. tb.flushPendingItemsWG.Wait() @@ -699,13 +701,12 @@ func (tb *Table) pendingItemsFlusher() { d := timeutil.AddJitterToDuration(pendingItemsFlushInterval) ticker := time.NewTicker(d) defer ticker.Stop() - var ibs []*inmemoryBlock for { select { case <-tb.stopCh: return case <-ticker.C: - ibs = tb.flushPendingItems(ibs[:0], false) + tb.flushPendingItems(false) } } } @@ -724,15 +725,14 @@ func (tb *Table) inmemoryPartsFlusher() { } } -func (tb *Table) flushPendingItems(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { +func (tb *Table) flushPendingItems(isFinal bool) { tb.flushPendingItemsWG.Add(1) - dst = tb.rawItems.flush(tb, dst, isFinal) + tb.rawItems.flush(tb, isFinal) tb.flushPendingItemsWG.Done() - return dst } func (tb *Table) flushInmemoryItemsToFiles() { - tb.flushPendingItems(nil, true) + tb.flushPendingItems(true) tb.flushInmemoryPartsToFiles(true) } @@ -754,12 +754,12 @@ func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) { } } -func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { +func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { + var dst []*inmemoryBlock for i := range riss.shards { dst = riss.shards[i].appendBlocksToFlush(dst, isFinal) } tb.flushBlocksToInmemoryParts(dst, isFinal) - return dst } func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 108d89f63..ef5937d24 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1058,23 +1058,22 @@ func (pt *partition) pendingRowsFlusher() { d := timeutil.AddJitterToDuration(pendingRowsFlushInterval) ticker := time.NewTicker(d) defer ticker.Stop() - var rows []rawRow for { select { case <-pt.stopCh: return case <-ticker.C: - rows = pt.flushPendingRows(rows[:0], false) + pt.flushPendingRows(false) } } } -func (pt *partition) flushPendingRows(dst []rawRow, isFinal bool) []rawRow { - return pt.rawRows.flush(pt, dst, isFinal) +func (pt *partition) flushPendingRows(isFinal bool) { + pt.rawRows.flush(pt, isFinal) } func (pt *partition) flushInmemoryRowsToFiles() { - pt.flushPendingRows(nil, true) + pt.flushPendingRows(true) pt.flushInmemoryPartsToFiles(true) } @@ -1096,12 +1095,12 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) { } } -func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []rawRow { +func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { + var dst []rawRow for i := range rrss.shards { dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal) } pt.flushRowsToInmemoryParts(dst) - return dst } func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 4e37ea6ae..c846ec43f 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -172,12 +172,11 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt := mustCreatePartition(ptt, "small-table", "big-table", strg) smallPartsPath := pt.smallPartsPath bigPartsPath := pt.bigPartsPath - var tmpRows []rawRow for _, rows := range rowss { pt.AddRows(rows) // Flush just added rows to a separate partitions. - tmpRows = pt.flushPendingRows(tmpRows[:0], true) + pt.flushPendingRows(true) } testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1) pt.MustClose() diff --git a/lib/storage/table.go b/lib/storage/table.go index 9a780d2ea..6ead99d61 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -199,9 +199,8 @@ func (tb *table) flushPendingRows() { ptws := tb.GetPartitions(nil) defer tb.PutPartitions(ptws) - var rows []rawRow for _, ptw := range ptws { - rows = ptw.pt.flushPendingRows(rows[:0], true) + ptw.pt.flushPendingRows(true) } }