From 4b01c9fb2e5bc81768c8eff51c64ba98a15e9997 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 24 May 2021 15:24:04 +0300 Subject: [PATCH] lib/storage: limit the number of rows per each block in Storage.AddRows() This should reduce memory usage when ingesting big blocks or rows. --- lib/storage/storage.go | 59 ++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index dd87dd593..44104bd7e 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1372,18 +1372,56 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { } } - // Add rows to the storage. + // Add rows to the storage in blocks with limited size in order to reduce memory usage. var err error - rr := getRawRowsWithSize(len(mrs)) - rr.rows, err = s.add(rr.rows[:0], mrs, precisionBits) - putRawRows(rr) + ic := getMetricRowsInsertCtx() + maxBlockLen := len(ic.rrs) + for len(mrs) > 0 && err == nil { + mrsBlock := mrs + if len(mrs) > maxBlockLen { + mrsBlock = mrs[:maxBlockLen] + mrs = mrs[maxBlockLen:] + } else { + mrs = nil + } + err = s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits) + atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock))) + } + putMetricRowsInsertCtx(ic) <-addRowsConcurrencyCh - atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs))) return err } +type metricRowsInsertCtx struct { + rrs []rawRow + tmpMrs []*MetricRow +} + +func getMetricRowsInsertCtx() *metricRowsInsertCtx { + v := metricRowsInsertCtxPool.Get() + if v == nil { + v = &metricRowsInsertCtx{ + rrs: make([]rawRow, maxMetricRowsPerBlock), + tmpMrs: make([]*MetricRow, maxMetricRowsPerBlock), + } + } + return v.(*metricRowsInsertCtx) +} + +func putMetricRowsInsertCtx(ic *metricRowsInsertCtx) { + tmpMrs := ic.tmpMrs + for i := range tmpMrs { + tmpMrs[i] = nil + } + metricRowsInsertCtxPool.Put(ic) +} + +var metricRowsInsertCtxPool sync.Pool + +const maxMetricRowsPerBlock = 8000 + var ( // Limit the concurrency for data ingestion to GOMAXPROCS, since this operation // is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent @@ -1451,13 +1489,8 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { return nil } -func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { +func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error { idb := s.idb() - dstMrs := make([]*MetricRow, len(mrs)) - if n := len(mrs) - cap(rows); n > 0 { - rows = append(rows[:cap(rows)], make([]rawRow, n)...) - } - rows = rows[:len(mrs)] j := 0 var ( // These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName. @@ -1601,9 +1634,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra firstError = fmt.Errorf("cannot update per-date data: %w", err) } if firstError != nil { - return rows, fmt.Errorf("error occurred during rows addition: %w", firstError) + return fmt.Errorf("error occurred during rows addition: %w", firstError) } - return rows, nil + return nil } func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool {