diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2192951ca..2ae8fd0a8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -47,6 +47,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [Graphite render API](https://docs.victoriametrics.com/#graphite-render-api-usage): properly calculate sample timestamps when `moving*()` functions such as [movingAverage()](https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.movingAverage) are applied over [summarize()](https://graphite.readthedocs.io/en/stable/functions.html#graphite.render.functions.summarize). * BUGFIX: limit the `end` query arg value to `+2 days` in the future at `/api/v1/*` endpoints, because VictoriaMetrics doesn't allow storing samples with timestamps bigger than +2 days in the future. This should help resolving [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2669). * BUGFIX: properly register time series in per-day inverted index during the first hour after `indexdb` rotation. Previously this could lead to missing time series during querying if these time series stopped receiving new samples during the first hour after `indexdb` rotation. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698). +* BUGFIX: do not register new series when `-storage.maxHourlySeries` or `-storage.maxDailySeries` limits were reached. Previously samples for new series weren't added to the database when the [cardinality limit](https://docs.victoriametrics.com/#cardinality-limiter) was reached, but series were still registered in the inverted index (aka `indexdb`). This could lead to unbound `indexdb` growth during [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate). ## [v1.77.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.77.2) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 555853385..f77e3f83b 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -602,6 +602,9 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { if err != nil { return fmt.Errorf("cannot generate TSID: %w", err) } + if !db.s.registerSeriesCardinality(dst.MetricID, mn) { + return errSeriesCardinalityExceeded + } if err := db.createIndexes(dst, mn); err != nil { return fmt.Errorf("cannot create indexes: %w", err) } @@ -619,6 +622,8 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { return nil } +var errSeriesCardinalityExceeded = fmt.Errorf("cannot create series because series cardinality limit exceeded") + // SetLogNewSeries updates new series logging. // // This function must be called before any calling any storage functions. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index fe4b5aa20..112294166 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "errors" "fmt" "io" "io/ioutil" @@ -1812,6 +1813,9 @@ func (s *Storage) RegisterMetricNames(mrs []MetricRow) error { mn.sortTags() metricName = mn.Marshal(metricName[:0]) if err := is.GetOrCreateTSIDByName(&genTSID.TSID, metricName); err != nil { + if errors.Is(err, errSeriesCardinalityExceeded) { + continue + } return fmt.Errorf("cannot register the metric because cannot create TSID for metricName %q: %w", metricName, err) } s.putTSIDToCache(&genTSID, mr.MetricNameRaw) @@ -1907,11 +1911,6 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci } if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { r.TSID = genTSID.TSID - if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) { - // Skip the row, since the limit on the number of unique series has been exceeded. - j-- - continue - } // Fast path - the TSID for the given MetricNameRaw has been found in cache and isn't deleted. // There is no need in checking whether r.TSID.MetricID is deleted, since tsidCache doesn't // contain MetricName->TSID entries for deleted time series. @@ -1973,34 +1972,28 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci // Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID. // This path should trigger on bulk imports when many rows contain the same MetricNameRaw. r.TSID = prevTSID - if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) { - // Skip the row, since the limit on the number of unique series has been exceeded. - j-- - continue - } continue } slowInsertsCount++ if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil { + j-- + if errors.Is(err, errSeriesCardinalityExceeded) { + continue + } // Do not stop adding rows on error - just skip invalid row. // This guarantees that invalid rows don't prevent // from adding valid rows into the storage. if firstWarn == nil { firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %w", pmr.MetricName, err) } - j-- continue } genTSID.generation = idb.generation genTSID.TSID = r.TSID s.putTSIDToCache(&genTSID, mr.MetricNameRaw) + prevTSID = r.TSID prevMetricNameRaw = mr.MetricNameRaw - if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) { - // Skip the row, since the limit on the number of unique series has been exceeded. - j-- - continue - } } idb.putIndexSearch(is) putPendingMetricRows(pmrs) @@ -2025,26 +2018,26 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci return nil } -func (s *Storage) isSeriesCardinalityExceeded(metricID uint64, metricNameRaw []byte) bool { +func (s *Storage) registerSeriesCardinality(metricID uint64, mn *MetricName) bool { if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) { atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1) - logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems()) - return true + logSkippedSeries(mn, "-storage.maxHourlySeries", sl.MaxItems()) + return false } if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) { atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1) - logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems()) - return true + logSkippedSeries(mn, "-storage.maxDailySeries", sl.MaxItems()) + return false } - return false + return true } -func logSkippedSeries(metricNameRaw []byte, flagName string, flagValue int) { +func logSkippedSeries(mn *MetricName, flagName string, flagValue int) { select { case <-logSkippedSeriesTicker.C: // Do not use logger.WithThrottler() here, since this will result in increased CPU load // because of getUserReadableMetricName() calls per each logSkippedSeries call. - logger.Warnf("skip series %s because %s=%d reached", getUserReadableMetricName(metricNameRaw), flagName, flagValue) + logger.Warnf("skip series %s because %s=%d reached", mn, flagName, flagValue) default: } }