lib/storage: do not register new series if -storage.maxHourlySeries or -storage.maxDailySeries limits are exceeded

Previously samples for new series weren't added as expected when series limits were reached,
but new series were still registered in indexdb.
This commit is contained in:
Aliaksandr Valialkin 2022-06-19 21:47:35 +03:00
parent 62e2371a67
commit 5fb45173ae
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 23 additions and 24 deletions

View file

@ -47,6 +47,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [Graphite render API](https://docs.victoriametrics.com/#graphite-render-api-usage): properly calculate sample timestamps when `moving*()` functions such as [movingAverage()](https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.movingAverage) are applied over [summarize()](https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.summarize).
* BUGFIX: limit the `end` query arg value to `+2 days` in the future at `/api/v1/*` endpoints, because VictoriaMetrics doesn't allow storing samples with timestamps bigger than +2 days in the future. This should help resolving [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2669).
* BUGFIX: properly register time series in per-day inverted index during the first hour after `indexdb` rotation. Previously this could lead to missing time series during querying if these time series stopped receiving new samples during the first hour after `indexdb` rotation. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698).
* BUGFIX: do not register new series when `-storage.maxHourlySeries` or `-storage.maxDailySeries` limits were reached. Previously samples for new series weren't added to the database when the [cardinality limit](https://docs.victoriametrics.com/#cardinality-limiter) was reached, but series were still registered in the inverted index (aka `indexdb`). This could lead to unbound `indexdb` growth during [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate).
## [v1.77.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.2)

View file

@ -582,6 +582,9 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
if err != nil {
return fmt.Errorf("cannot generate TSID: %w", err)
}
if !db.s.registerSeriesCardinality(dst.MetricID, mn) {
return errSeriesCardinalityExceeded
}
if err := db.createIndexes(dst, mn); err != nil {
return fmt.Errorf("cannot create indexes: %w", err)
}
@ -599,6 +602,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {
return nil
}
var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded")
// SetLogNewSeries updates new series logging.
//
// This function must be called before any calling any storage functions.

View file

@ -2,6 +2,7 @@ package storage
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
@ -1703,6 +1704,9 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error {
mn.sortTags()
metricName = mn.Marshal(metricName[:0])
if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName); err != nil {
if errors.Is(err, errSeriesCardinalityExceeded) {
continue
}
return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err)
}
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
@ -1793,11 +1797,6 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {
r.TSID = genTSID.TSID
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j--
continue
}
// Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted.
// There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't
// contain MetricName->TSID entries for deleted time series.
@ -1859,34 +1858,28 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r.TSID = prevTSID
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j--
continue
}
continue
}
slowInsertsCount++
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil {
j--
if errors.Is(err, errSeriesCardinalityExceeded) {
continue
}
// Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent
// from adding valid rows into the storage.
if firstWarn == nil {
firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %w", pmr.MetricName, err)
}
j--
continue
}
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {
// Skip the row, since the limit on the number of unique series has been exceeded.
j--
continue
}
}
idb.putIndexSearch(is)
putPendingMetricRows(pmrs)
@ -1911,26 +1904,26 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
return nil
}
func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool {
func (s *Storage) registerSeriesCardinality(metricID uint64, mn *MetricName) bool {
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)
logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems())
return true
logSkippedSeries(mn, "-storage.maxHourlySeries", sl.MaxItems())
return false
}
if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1)
logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems())
return true
logSkippedSeries(mn, "-storage.maxDailySeries", sl.MaxItems())
return false
}
return false
return true
}
func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) {
func logSkippedSeries(mn *MetricName, flagName string, flagValue int) {
select {
case <-logSkippedSeriesTicker.C:
// Do not use logger.WithThrottler() here, since this will result in increased CPU load
// because of getUserReadableMetricName() calls per each logSkippedSeries call.
logger.Warnf("skip series %s because %s=%d reached", getUserReadableMetricName(metricNameRaw), flagName, flagValue)
logger.Warnf("skip series %s because %s=%d reached", mn, flagName, flagValue)
default:
}
}