From 2601844de38be19c9af4c178eb14cc719c3d81d3 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Mon, 24 May 2021 15:24:04 +0300
Subject: [PATCH] lib/storage: limit the number of rows per each block in
 Storage.AddRows()

This should reduce memory usage when ingesting big blocks or rows.
---
 lib/storage/storage.go | 59 ++++++++++++++++++++++++++++++++----------
 1 file changed, 46 insertions(+), 13 deletions(-)

diff --git a/lib/storage/storage.go b/lib/storage/storage.go
index 7c61d789f3..5f5467ff9f 100644
--- a/lib/storage/storage.go
+++ b/lib/storage/storage.go
@@ -1480,18 +1480,56 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
 		}
 	}
 
-	// Add rows to the storage.
+	// Add rows to the storage in blocks with limited size in order to reduce memory usage.
 	var err error
-	rr := getRawRowsWithSize(len(mrs))
-	rr.rows, err = s.add(rr.rows[:0], mrs, precisionBits)
-	putRawRows(rr)
+	ic := getMetricRowsInsertCtx()
+	maxBlockLen := len(ic.rrs)
+	for len(mrs) > 0 && err == nil {
+		mrsBlock := mrs
+		if len(mrs) > maxBlockLen {
+			mrsBlock = mrs[:maxBlockLen]
+			mrs = mrs[maxBlockLen:]
+		} else {
+			mrs = nil
+		}
+		err = s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
+		atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
+	}
+	putMetricRowsInsertCtx(ic)
 
 	<-addRowsConcurrencyCh
 
-	atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
 	return err
 }
 
+type metricRowsInsertCtx struct {
+	rrs    []rawRow
+	tmpMrs []*MetricRow
+}
+
+func getMetricRowsInsertCtx() *metricRowsInsertCtx {
+	v := metricRowsInsertCtxPool.Get()
+	if v == nil {
+		v = &metricRowsInsertCtx{
+			rrs:    make([]rawRow, maxMetricRowsPerBlock),
+			tmpMrs: make([]*MetricRow, maxMetricRowsPerBlock),
+		}
+	}
+	return v.(*metricRowsInsertCtx)
+}
+
+func putMetricRowsInsertCtx(ic *metricRowsInsertCtx) {
+	tmpMrs := ic.tmpMrs
+	for i := range tmpMrs {
+		tmpMrs[i] = nil
+	}
+	metricRowsInsertCtxPool.Put(ic)
+}
+
+var metricRowsInsertCtxPool sync.Pool
+
+const maxMetricRowsPerBlock = 8000
+
 var (
 	// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
 	// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
@@ -1564,13 +1602,8 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
 	return nil
 }
 
-func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
+func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
 	idb := s.idb()
-	dstMrs := make([]*MetricRow, len(mrs))
-	if n := len(mrs) - cap(rows); n > 0 {
-		rows = append(rows[:cap(rows)], make([]rawRow, n)...)
-	}
-	rows = rows[:len(mrs)]
 	j := 0
 	var (
 		// These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName.
@@ -1714,9 +1747,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
 		firstError = fmt.Errorf("cannot update per-date data: %w", err)
 	}
 	if firstError != nil {
-		return rows, fmt.Errorf("error occurred during rows addition: %w", firstError)
+		return fmt.Errorf("error occurred during rows addition: %w", firstError)
 	}
-	return rows, nil
+	return nil
 }
 
 func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool {