lib/storage: do not stop data ingestion on the first error in Storage.AddRows

Continue data ingestion for the rest of blocks.
This commit is contained in:
Aliaksandr Valialkin 2021-05-24 15:30:39 +03:00
parent 4b01c9fb2e
commit 39ef1e7a51

View file

@ -1373,10 +1373,10 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
}
// Add rows to the storage in blocks with limited size in order to reduce memory usage.
var err error
var firstErr error
ic := getMetricRowsInsertCtx()
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 && err == nil {
for len(mrs) > 0 {
mrsBlock := mrs
if len(mrs) > maxBlockLen {
mrsBlock = mrs[:maxBlockLen]
@ -1384,14 +1384,19 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
} else {
mrs = nil
}
err = s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits)
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
putMetricRowsInsertCtx(ic)
<-addRowsConcurrencyCh
return err
return firstErr
}
type metricRowsInsertCtx struct {