lib/storage: limit the number of rows per each block in Storage.AddRows()

This should reduce memory usage when ingesting big blocks or rows.
This commit is contained in:
Aliaksandr Valialkin 2021-05-24 15:24:04 +03:00
parent a4ff4b8e65
commit 4b01c9fb2e

View file

@ -1372,18 +1372,56 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
}
}
// Add rows to the storage.
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
var err error
rr := getRawRowsWithSize(len(mrs))
rr.rows, err = s.add(rr.rows[:0], mrs, precisionBits)
putRawRows(rr)
ic := getMetricRowsInsertCtx()
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 && err == nil {
mrsBlock := mrs
if len(mrs) > maxBlockLen {
mrsBlock = mrs[:maxBlockLen]
mrs = mrs[maxBlockLen:]
} else {
mrs = nil
}
err = s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
putMetricRowsInsertCtx(ic)
<-addRowsConcurrencyCh
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrs)))
return err
}
type metricRowsInsertCtx struct {
rrs []rawRow
tmpMrs []*MetricRow
}
func getMetricRowsInsertCtx() *metricRowsInsertCtx {
v := metricRowsInsertCtxPool.Get()
if v == nil {
v = &metricRowsInsertCtx{
rrs: make([]rawRow, maxMetricRowsPerBlock),
tmpMrs: make([]*MetricRow, maxMetricRowsPerBlock),
}
}
return v.(*metricRowsInsertCtx)
}
func putMetricRowsInsertCtx(ic *metricRowsInsertCtx) {
tmpMrs := ic.tmpMrs
for i := range tmpMrs {
tmpMrs[i] = nil
}
metricRowsInsertCtxPool.Put(ic)
}
var metricRowsInsertCtxPool sync.Pool
const maxMetricRowsPerBlock = 8000
var (
// Limit the concurrency for data ingestion to GOMAXPROCS, since this operation
// is CPU bound, so there is no sense in running more than GOMAXPROCS concurrent
@ -1451,13 +1489,8 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
return nil
}
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
idb := s.idb()
dstMrs := make([]*MetricRow, len(mrs))
if n := len(mrs) - cap(rows); n > 0 {
rows = append(rows[:cap(rows)], make([]rawRow, n)...)
}
rows = rows[:len(mrs)]
j := 0
var (
// These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName.
@ -1601,9 +1634,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
firstError = fmt.Errorf("cannot update per-date data: %w", err)
}
if firstError != nil {
return rows, fmt.Errorf("error occurred during rows addition: %w", firstError)
return fmt.Errorf("error occurred during rows addition: %w", firstError)
}
return rows, nil
return nil
}
func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool {