lib/storage: properly limit cardinality when ingesting multiple samples for the same time series in a single request

This commit is contained in:
Aliaksandr Valialkin 2022-01-21 12:37:57 +02:00
parent 3ea054a52c
commit 5f84b17ed6
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -1787,6 +1787,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r.TSID = prevTSID
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j--
continue
}
continue
}
slowInsertsCount++
@ -1800,14 +1805,14 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
j--
continue
}
s.putTSIDToCache(&r.TSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j--
continue
}
s.putTSIDToCache(&r.TSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
}
idb.putIndexSearch(is)
putPendingMetricRows(pmrs)