From a13f16e48ae3fe41994bdaaac28fc3f483563334 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 8 Dec 2022 16:33:33 -0800 Subject: [PATCH] lib/promscrape: allow using `sample_limit` and `series_limit` options in stream parsing mode Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3458 --- app/vmagent/README.md | 9 +++++---- docs/CHANGELOG.md | 2 ++ lib/promscrape/config.go | 6 ------ lib/promscrape/scrapework.go | 32 +++++++++++++++++++++----------- 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/app/vmagent/README.md b/app/vmagent/README.md index e4da010bb..1f5f4f681 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -654,8 +654,9 @@ scrape_configs: 'match[]': ['{__name__!=""}'] ``` -Note that `sample_limit` and `series_limit` [scrape_config options](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) -cannot be used in stream parsing mode because the parsed data is pushed to remote storage as soon as it is parsed. +Note that `vmagent` in stream parsing mode stores up to `sample_limit` samples to the configured `-remoteStorage.url` +instead of droping all the samples read from the target, because the parsed data is sent to the remote storage +as soon as it is parsed in stream parsing mode. ## Scraping big number of targets @@ -744,8 +745,8 @@ By default `vmagent` doesn't limit the number of time series each scrape target * Via `-promscrape.seriesLimitPerTarget` command-line option. This limit is applied individually to all the scrape targets defined in the file pointed by `-promscrape.config`. -* Via `series_limit` config option at `scrape_config` section. This limit is applied individually - to all the scrape targets defined in the given `scrape_config`. +* Via `series_limit` config option at [scrape_config](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) section. + This limit is applied individually to all the scrape targets defined in the given `scrape_config`. * Via `__series_limit__` label, which can be set with [relabeling](#relabeling) at `relabel_configs` section. This limit is applied to the corresponding scrape targets. Typical use case: to set the limit via [Kubernetes annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) for targets, diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 1c9758aa4..08981a6b5 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -47,6 +47,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * `vm_rows{type="indexdb/file"}` - the total number of file-based `indexdb` rows * FEATURE: [DataDog parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): add `device` tag when it is passed in the `device` field is present in the `series` object of the input request. Thanks to @PerGon for the provided [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3431). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [service discovery](https://docs.victoriametrics.com/sd_configs.html) performance when discovering big number of targets (10K and more). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow using `series_limit` option for [limiting the number of series a single scrape target generates](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3458). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow using `sample_limit` option for limiting the number of metrics a single scrape target can expose in every response sent over [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `exported_` prefix to metric names exported by scrape targets if these metric names clash with [automatically generated metrics](https://docs.victoriametrics.com/vmagent.html#automatically-generated-metrics) such as `up`, `scrape_samples_scraped`, etc. This prevents from corruption of automatically generated metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402). * FEATURE: [VictoriaMetrics enterprise cluster](https://docs.victoriametrics.com/enterprise.html): add `-storageNode.discoveryInterval` command-line flag to `vmselect` and `vminsert` to control load on DNS servers when [automatic discovery of vmstorage nodes](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3417). diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 136e2d145..4591c9fc8 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -936,12 +936,6 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf if err != nil { return nil, fmt.Errorf("cannot parse `metric_relabel_configs` for `job_name` %q: %w", jobName, err) } - if (*streamParse || sc.StreamParse) && sc.SampleLimit > 0 { - return nil, fmt.Errorf("cannot use stream parsing mode when `sample_limit` is set for `job_name` %q", jobName) - } - if (*streamParse || sc.StreamParse) && sc.SeriesLimit > 0 { - return nil, fmt.Errorf("cannot use stream parsing mode when `series_limit` is set for `job_name` %q", jobName) - } externalLabels := globalCfg.ExternalLabels noStaleTracking := *noStaleMarkers if sc.NoStaleMarkers != nil { diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 31332cd9a..52c39ddd9 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -543,6 +543,10 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { // Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses. var sbr streamBodyReader + lastScrape := sw.loadLastScrape() + bodyString := "" + areIdenticalSeries := true + samplesDropped := 0 sr, err := sw.GetStreamReader() if err != nil { err = fmt.Errorf("cannot read data: %s", err) @@ -550,6 +554,8 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { var mu sync.Mutex err = sbr.Init(sr) if err == nil { + bodyString = bytesutil.ToUnsafeString(sbr.body) + areIdenticalSeries = sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString) err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { mu.Lock() defer mu.Unlock() @@ -557,9 +563,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { for i := range rows { sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) } - // Push the collected rows to sw before returning from the callback, since they cannot be held - // after returning from the callback - this will result in data race. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 samplesPostRelabeling += len(wc.writeRequest.Timeseries) if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { wc.resetNoRows() @@ -567,6 +570,15 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } + if sw.seriesLimitExceeded || !areIdenticalSeries { + samplesDropped += sw.applySeriesLimit(wc) + if samplesDropped > 0 && !sw.seriesLimitExceeded { + sw.seriesLimitExceeded = true + } + } + // Push the collected rows to sw before returning from the callback, since they cannot be held + // after returning from the callback - this will result in data race. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 sw.pushData(sw.Config.AuthToken, &wc.writeRequest) wc.resetNoRows() return nil @@ -574,9 +586,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } sr.MustClose() } - lastScrape := sw.loadLastScrape() - bodyString := bytesutil.ToUnsafeString(sbr.body) - areIdenticalSeries := sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString) scrapedSamples.Update(float64(samplesScraped)) endTimestamp := time.Now().UnixNano() / 1e6 @@ -598,11 +607,12 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { seriesAdded = sw.getSeriesAdded(lastScrape, bodyString) } am := &autoMetrics{ - up: up, - scrapeDurationSeconds: duration, - samplesScraped: samplesScraped, - samplesPostRelabeling: samplesPostRelabeling, - seriesAdded: seriesAdded, + up: up, + scrapeDurationSeconds: duration, + samplesScraped: samplesScraped, + samplesPostRelabeling: samplesPostRelabeling, + seriesAdded: seriesAdded, + seriesLimitSamplesDropped: samplesDropped, } sw.addAutoMetrics(am, wc, scrapeTimestamp) sw.pushData(sw.Config.AuthToken, &wc.writeRequest)