diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index a7bf1f78e..f55a4730e 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -255,7 +255,9 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ris.ibs = ibs ris.mu.Unlock() - tb.flushBlocksToInmemoryParts(ibsToFlush, false) + if len(ibsToFlush) > 0 { + tb.flushBlocksToInmemoryParts(ibsToFlush, false) + } return tailItems } @@ -689,7 +691,7 @@ func (tb *Table) mergeInmemoryPartsToFiles(pws []*partWrapper) error { // This function is for debugging and testing purposes only, // since it may slow down data ingestion when used frequently. func (tb *Table) DebugFlush() { - tb.flushPendingItems(nil, true) + tb.flushPendingItems(true) // Wait for background flushers to finish. tb.flushPendingItemsWG.Wait() @@ -699,13 +701,12 @@ func (tb *Table) pendingItemsFlusher() { d := timeutil.AddJitterToDuration(pendingItemsFlushInterval) ticker := time.NewTicker(d) defer ticker.Stop() - var ibs []*inmemoryBlock for { select { case <-tb.stopCh: return case <-ticker.C: - ibs = tb.flushPendingItems(ibs[:0], false) + tb.flushPendingItems(false) } } } @@ -724,15 +725,14 @@ func (tb *Table) inmemoryPartsFlusher() { } } -func (tb *Table) flushPendingItems(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { +func (tb *Table) flushPendingItems(isFinal bool) { tb.flushPendingItemsWG.Add(1) - dst = tb.rawItems.flush(tb, dst, isFinal) + tb.rawItems.flush(tb, isFinal) tb.flushPendingItemsWG.Done() - return dst } func (tb *Table) flushInmemoryItemsToFiles() { - tb.flushPendingItems(nil, true) + tb.flushPendingItems(true) tb.flushInmemoryPartsToFiles(true) } @@ -754,12 +754,12 @@ func (tb *Table) flushInmemoryPartsToFiles(isFinal bool) { } } -func (riss *rawItemsShards) flush(tb *Table, dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { +func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { + var dst []*inmemoryBlock for i := range riss.shards { dst = riss.shards[i].appendBlocksToFlush(dst, isFinal) } tb.flushBlocksToInmemoryParts(dst, isFinal) - return dst } func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 108d89f63..ef5937d24 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1058,23 +1058,22 @@ func (pt *partition) pendingRowsFlusher() { d := timeutil.AddJitterToDuration(pendingRowsFlushInterval) ticker := time.NewTicker(d) defer ticker.Stop() - var rows []rawRow for { select { case <-pt.stopCh: return case <-ticker.C: - rows = pt.flushPendingRows(rows[:0], false) + pt.flushPendingRows(false) } } } -func (pt *partition) flushPendingRows(dst []rawRow, isFinal bool) []rawRow { - return pt.rawRows.flush(pt, dst, isFinal) +func (pt *partition) flushPendingRows(isFinal bool) { + pt.rawRows.flush(pt, isFinal) } func (pt *partition) flushInmemoryRowsToFiles() { - pt.flushPendingRows(nil, true) + pt.flushPendingRows(true) pt.flushInmemoryPartsToFiles(true) } @@ -1096,12 +1095,12 @@ func (pt *partition) flushInmemoryPartsToFiles(isFinal bool) { } } -func (rrss *rawRowsShards) flush(pt *partition, dst []rawRow, isFinal bool) []rawRow { +func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { + var dst []rawRow for i := range rrss.shards { dst = rrss.shards[i].appendRawRowsToFlush(dst, isFinal) } pt.flushRowsToInmemoryParts(dst) - return dst } func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 4e37ea6ae..c846ec43f 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -172,12 +172,11 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt := mustCreatePartition(ptt, "small-table", "big-table", strg) smallPartsPath := pt.smallPartsPath bigPartsPath := pt.bigPartsPath - var tmpRows []rawRow for _, rows := range rowss { pt.AddRows(rows) // Flush just added rows to a separate partitions. - tmpRows = pt.flushPendingRows(tmpRows[:0], true) + pt.flushPendingRows(true) } testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1) pt.MustClose() diff --git a/lib/storage/table.go b/lib/storage/table.go index 9a780d2ea..6ead99d61 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -199,9 +199,8 @@ func (tb *table) flushPendingRows() { ptws := tb.GetPartitions(nil) defer tb.PutPartitions(ptws) - var rows []rawRow for _, ptw := range ptws { - rows = ptw.pt.flushPendingRows(rows[:0], true) + ptw.pt.flushPendingRows(true) } }