lib/storage: properly update vm_slow_row_inserts_total metric when importing multiple data points per time series at once

Previously the `vm_slow_row_inserts_total` metric may be incremented multiple times for different data points per a single time series,
while only a single increment is needed when inserting the first data point for this time series.
This commit is contained in:
Aliaksandr Valialkin 2020-07-30 16:14:51 +03:00
parent 68716488db
commit 922d9aadf2

View file

@ -1129,7 +1129,6 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
}
}
if pmrs != nil {
atomic.AddUint64(&s.slowRowInserts, uint64(len(pmrs.pmrs)))
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
sort.Slice(pendingMetricRows, func(i, j int) bool {
@ -1137,6 +1136,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
})
is := idb.getIndexSearch(noDeadline)
prevMetricNameRaw = nil
var slowInsertsCount uint64
for i := range pendingMetricRows {
pmr := &pendingMetricRows[i]
mr := &pmr.mr
@ -1160,6 +1160,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
prevMetricNameRaw = mr.MetricNameRaw
continue
}
slowInsertsCount++
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
@ -1174,6 +1175,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
}
idb.putIndexSearch(is)
putPendingMetricRows(pmrs)
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
}
if firstWarn != nil {
logger.Errorf("warn occurred during rows addition: %s", firstWarn)