From c33728befbba377d681aae0bd54db82780cd9a6b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 17 Jan 2023 10:14:46 -0800 Subject: [PATCH] lib/promscrape: properly apply series limit Fix the following issues: - Series limit wasn't applied when staleness tracking was disabled. - Series limit didn't prevent from sending staleness markers for new series exceeding the limit. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 Thanks to @hagen1778 for the initial attempt to fix the issue at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3665 --- docs/CHANGELOG.md | 2 ++ lib/promscrape/config.go | 13 +++++++---- lib/promscrape/scrapework.go | 44 +++++++++++++++++++++--------------- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 79050a901..ed16a1be8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send stale markers to remote storage for series exceeding the configured [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660). +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) when [staleness tracking](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) is disabled. * BUGFIX: [Pushgateway import](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format): properly return `200 OK` HTTP response code. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636). ## [v1.86.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.1) diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index b4d5317f7..dd64733cc 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -41,9 +41,10 @@ import ( ) var ( - noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series") - strictParse = flag.Bool("promscrape.config.strictParse", true, "Whether to deny unsupported fields in -promscrape.config . Set to false in order to silently skip unsupported fields") - dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+ + noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series") + seriesLimitPerTarget = flag.Int("promscrape.seriesLimitPerTarget", 0, "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info") + strictParse = flag.Bool("promscrape.config.strictParse", true, "Whether to deny unsupported fields in -promscrape.config . Set to false in order to silently skip unsupported fields") + dryRun = flag.Bool("promscrape.config.dryRun", false, "Checks -promscrape.config file for errors and unsupported fields and then exits. "+ "Returns non-zero exit code on parsing errors and emits these errors to stderr. "+ "See also -promscrape.config.strictParse command-line flag. "+ "Pass -loggerLevel=ERROR if you don't need to see info messages in the output.") @@ -971,6 +972,10 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf if sc.NoStaleMarkers != nil { noStaleTracking = *sc.NoStaleMarkers } + seriesLimit := *seriesLimitPerTarget + if sc.SeriesLimit > 0 { + seriesLimit = sc.SeriesLimit + } swc := &scrapeWorkConfig{ scrapeInterval: scrapeInterval, scrapeIntervalString: scrapeInterval.String(), @@ -995,7 +1000,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf streamParse: sc.StreamParse, scrapeAlignInterval: sc.ScrapeAlignInterval.Duration(), scrapeOffset: sc.ScrapeOffset.Duration(), - seriesLimit: sc.SeriesLimit, + seriesLimit: seriesLimit, noStaleMarkers: noStaleTracking, } return swc, nil diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index ed4c77018..2370869db 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -37,7 +37,6 @@ var ( "See also -promscrape.suppressScrapeErrorsDelay") suppressScrapeErrorsDelay = flag.Duration("promscrape.suppressScrapeErrorsDelay", 0, "The delay for suppressing repeated scrape errors logging per each scrape targets. "+ "This may be used for reducing the number of log lines related to scrape errors. See also -promscrape.suppressScrapeErrors") - seriesLimitPerTarget = flag.Int("promscrape.seriesLimitPerTarget", 0, "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info") minResponseSizeForStreamParse = flagutil.NewBytes("promscrape.minResponseSizeForStreamParse", 1e6, "The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode") ) @@ -451,7 +450,7 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b wc := writeRequestCtxPool.Get(sw.prevLabelsLen) lastScrape := sw.loadLastScrape() bodyString := bytesutil.ToUnsafeString(body.B) - areIdenticalSeries := sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString) + areIdenticalSeries := sw.areIdenticalSeries(lastScrape, bodyString) if err != nil { up = 0 scrapesFailed.Inc() @@ -485,9 +484,6 @@ func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, b samplesDropped := 0 if sw.seriesLimitExceeded || !areIdenticalSeries { samplesDropped = sw.applySeriesLimit(wc) - if samplesDropped > 0 { - sw.seriesLimitExceeded = true - } } am := &autoMetrics{ up: up, @@ -577,7 +573,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { err = sbr.Init(sr) if err == nil { bodyString = bytesutil.ToUnsafeString(sbr.body) - areIdenticalSeries = sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString) + areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString) err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { mu.Lock() defer mu.Unlock() @@ -594,9 +590,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } if sw.seriesLimitExceeded || !areIdenticalSeries { samplesDropped += sw.applySeriesLimit(wc) - if samplesDropped > 0 && !sw.seriesLimitExceeded { - sw.seriesLimitExceeded = true - } } // Push the collected rows to sw before returning from the callback, since they cannot be held // after returning from the callback - this will result in data race. @@ -655,6 +648,15 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { return err } +func (sw *scrapeWork) areIdenticalSeries(prevData, currData string) bool { + if sw.Config.NoStaleMarkers && sw.Config.SeriesLimit <= 0 { + // Do not spend CPU time on tracking the changes in series if stale markers are disabled. + // The check for series_limit is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 + return true + } + return parser.AreIdenticalSeriesFast(prevData, currData) +} + // leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx // structs contain mixed number of labels. // @@ -738,17 +740,13 @@ func (sw *scrapeWork) getSeriesAdded(lastScrape, currScrape string) int { } func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { - seriesLimit := *seriesLimitPerTarget - if sw.Config.SeriesLimit > 0 { - seriesLimit = sw.Config.SeriesLimit - } - if sw.seriesLimiter == nil && seriesLimit > 0 { - sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour) - } - sl := sw.seriesLimiter - if sl == nil { + if sw.Config.SeriesLimit <= 0 { return 0 } + if sw.seriesLimiter == nil { + sw.seriesLimiter = bloomfilter.NewLimiter(sw.Config.SeriesLimit, 24*time.Hour) + } + sl := sw.seriesLimiter dstSeries := wc.writeRequest.Timeseries[:0] samplesDropped := 0 for _, ts := range wc.writeRequest.Timeseries { @@ -761,6 +759,9 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { } prompbmarshal.ResetTimeSeries(wc.writeRequest.Timeseries[len(dstSeries):]) wc.writeRequest.Timeseries = dstSeries + if samplesDropped > 0 && !sw.seriesLimitExceeded { + sw.seriesLimitExceeded = true + } return samplesDropped } @@ -784,6 +785,13 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i am := &autoMetrics{} sw.addAutoMetrics(am, wc, timestamp) } + + // Apply series limit to stale markers in order to prevent sending stale markers for newly created series. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660 + if sw.seriesLimitExceeded { + sw.applySeriesLimit(wc) + } + series := wc.writeRequest.Timeseries if len(series) == 0 { return