lib/storage: reduce memory usage when ingesting samples for the same time series with distinct order of labels

This commit is contained in:
Aliaksandr Valialkin 2021-03-31 21:22:40 +03:00
parent db963205cc
commit e1f699bb6c
3 changed files with 43 additions and 27 deletions

View file

@ -557,6 +557,9 @@ func registerStorageMetrics() {
return float64(m().SearchDelays)
})
metrics.NewGauge(`vm_sorted_row_labels_inserts_total`, func() float64 {
return float64(m().SortedRowLabelsInserts)
})
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
return float64(m().SlowRowInserts)
})

View file

@ -2,6 +2,7 @@
# tip
* FEATURE: reduce the size of `MetricName -> internal_series_id` cache (aka `vm_cache_size_bytes{type="storage/tsid"}`) when ingesting samples for the same time series with distinct order of labels. For example, `foo{k1="v1",k2="v2"}` and `foo{k2="v2",k1="v1"}` represent a single time series. Previously VictoriaMetrics could need additional memory when ingesting such samples. The number of ingested samples with distinct order of labels for the same time series can be monitored with `vm_sorted_row_labels_inserts_total` metric.
* FEATURE: vmagent: reduce memory usage when `-remoteWrite.queues` is set to a big value. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1167).

View file

@ -48,6 +48,7 @@ type Storage struct {
searchTSIDsConcurrencyLimitReached uint64
searchTSIDsConcurrencyLimitTimeout uint64
sortedRowLabelsInserts uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
slowMetricNameLoads uint64
@ -358,6 +359,7 @@ type Metrics struct {
SearchDelays uint64
SortedRowLabelsInserts uint64
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
SlowMetricNameLoads uint64
@ -427,6 +429,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
m.SortedRowLabelsInserts += atomic.LoadUint64(&s.sortedRowLabelsInserts)
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
@ -1318,6 +1321,8 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error {
var rowsAddedTotal uint64
// AddRows adds the given mrs to s.
//
// AddRows can modify mrs contents.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 {
return nil
@ -1442,6 +1447,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
prevMetricNameRaw []byte
)
var pmrs *pendingMetricRows
var mn MetricName
var metricNameRawSorted []byte
var sortedRowLabelsInserts uint64
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
// Return only the first error, since it has no sense in returning all errors.
var firstWarn error
@ -1485,7 +1493,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
continue
}
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteMetrics code for details.
@ -1494,22 +1502,40 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
continue
}
// Slower path - sort labels in MetricNameRaw and check the cache again.
// This should limit the number of cache entries for metrics with distinct order of labels to 1.
if err := mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
j--
continue
}
mn.sortTags()
metricNameRawSorted = mn.marshalRaw(metricNameRawSorted[:0])
if s.getTSIDFromCache(&r.TSID, metricNameRawSorted) {
// The TSID for the given metricNameRawSorted has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteMetrics code for details.
sortedRowLabelsInserts++
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
continue
}
// Slow path - the TSID is missing in the cache.
// Postpone its search in the loop below.
j--
if pmrs == nil {
pmrs = getPendingMetricRows()
}
if err := pmrs.addRow(mr); err != nil {
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = err
}
continue
if string(mr.MetricNameRaw) != string(metricNameRawSorted) {
mr.MetricNameRaw = append(mr.MetricNameRaw[:0], metricNameRawSorted...)
}
pmrs.addRow(mr, &mn)
}
atomic.AddUint64(&s.sortedRowLabelsInserts, sortedRowLabelsInserts)
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
@ -1533,15 +1559,6 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
r.TSID = prevTSID
continue
}
if s.getTSIDFromCache(&r.TSID, mr.MetricNameRaw) {
// Fast path - the TSID for the given MetricName has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
// See Storage.DeleteMetrics code for details.
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
continue
}
slowInsertsCount++
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil {
// Do not stop adding rows on error - just skip invalid row.
@ -1554,6 +1571,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
continue
}
s.putTSIDToCache(&r.TSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
}
idb.putIndexSearch(is)
putPendingMetricRows(pmrs)
@ -1596,7 +1615,6 @@ type pendingMetricRows struct {
lastMetricNameRaw []byte
lastMetricName []byte
mn MetricName
}
func (pmrs *pendingMetricRows) reset() {
@ -1608,19 +1626,14 @@ func (pmrs *pendingMetricRows) reset() {
pmrs.metricNamesBuf = pmrs.metricNamesBuf[:0]
pmrs.lastMetricNameRaw = nil
pmrs.lastMetricName = nil
pmrs.mn.Reset()
}
func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
func (pmrs *pendingMetricRows) addRow(mr *MetricRow, mn *MetricName) {
// Do not spend CPU time on re-calculating canonical metricName during bulk import
// of many rows for the same metric.
if string(mr.MetricNameRaw) != string(pmrs.lastMetricNameRaw) {
if err := pmrs.mn.unmarshalRaw(mr.MetricNameRaw); err != nil {
return fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", mr.MetricNameRaw, err)
}
pmrs.mn.sortTags()
metricNamesBufLen := len(pmrs.metricNamesBuf)
pmrs.metricNamesBuf = pmrs.mn.Marshal(pmrs.metricNamesBuf)
pmrs.metricNamesBuf = mn.Marshal(pmrs.metricNamesBuf)
pmrs.lastMetricName = pmrs.metricNamesBuf[metricNamesBufLen:]
pmrs.lastMetricNameRaw = mr.MetricNameRaw
}
@ -1628,7 +1641,6 @@ func (pmrs *pendingMetricRows) addRow(mr *MetricRow) error {
MetricName: pmrs.lastMetricName,
mr: *mr,
})
return nil
}
func getPendingMetricRows() *pendingMetricRows {