lib/promscrape: allow using sample_limit and series_limit options in stream parsing mode

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3458
This commit is contained in:
Aliaksandr Valialkin 2022-12-08 16:33:33 -08:00
parent 353aad9da9
commit a13f16e48a
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 28 additions and 21 deletions

View file

@ -654,8 +654,9 @@ scrape_configs:
'match[]': ['{__name__!=""}'] 'match[]': ['{__name__!=""}']
``` ```
Note that `sample_limit` and `series_limit` [scrape_config options](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) Note that `vmagent` in stream parsing mode stores up to `sample_limit` samples to the configured `-remoteStorage.url`
cannot be used in stream parsing mode because the parsed data is pushed to remote storage as soon as it is parsed. instead of droping all the samples read from the target, because the parsed data is sent to the remote storage
as soon as it is parsed in stream parsing mode.
## Scraping big number of targets ## Scraping big number of targets
@ -744,8 +745,8 @@ By default `vmagent` doesn't limit the number of time series each scrape target
* Via `-promscrape.seriesLimitPerTarget` command-line option. This limit is applied individually * Via `-promscrape.seriesLimitPerTarget` command-line option. This limit is applied individually
to all the scrape targets defined in the file pointed by `-promscrape.config`. to all the scrape targets defined in the file pointed by `-promscrape.config`.
* Via `series_limit` config option at `scrape_config` section. This limit is applied individually * Via `series_limit` config option at [scrape_config](https://docs.victoriametrics.com/sd_configs.html#scrape_configs) section.
to all the scrape targets defined in the given `scrape_config`. This limit is applied individually to all the scrape targets defined in the given `scrape_config`.
* Via `__series_limit__` label, which can be set with [relabeling](#relabeling) at `relabel_configs` section. * Via `__series_limit__` label, which can be set with [relabeling](#relabeling) at `relabel_configs` section.
This limit is applied to the corresponding scrape targets. Typical use case: to set the limit This limit is applied to the corresponding scrape targets. Typical use case: to set the limit
via [Kubernetes annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) for targets, via [Kubernetes annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) for targets,

View file

@ -47,6 +47,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* `vm_rows{type="indexdb/file"}` - the total number of file-based `indexdb` rows * `vm_rows{type="indexdb/file"}` - the total number of file-based `indexdb` rows
* FEATURE: [DataDog parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): add `device` tag when it is passed in the `device` field is present in the `series` object of the input request. Thanks to @PerGon for the provided [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3431). * FEATURE: [DataDog parser](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent): add `device` tag when it is passed in the `device` field is present in the `series` object of the input request. Thanks to @PerGon for the provided [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3431).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [service discovery](https://docs.victoriametrics.com/sd_configs.html) performance when discovering big number of targets (10K and more). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [service discovery](https://docs.victoriametrics.com/sd_configs.html) performance when discovering big number of targets (10K and more).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow using `series_limit` option for [limiting the number of series a single scrape target generates](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3458).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow using `sample_limit` option for limiting the number of metrics a single scrape target can expose in every response sent over [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `exported_` prefix to metric names exported by scrape targets if these metric names clash with [automatically generated metrics](https://docs.victoriametrics.com/vmagent.html#automatically-generated-metrics) such as `up`, `scrape_samples_scraped`, etc. This prevents from corruption of automatically generated metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `exported_` prefix to metric names exported by scrape targets if these metric names clash with [automatically generated metrics](https://docs.victoriametrics.com/vmagent.html#automatically-generated-metrics) such as `up`, `scrape_samples_scraped`, etc. This prevents from corruption of automatically generated metrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3406).
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve error message when the requested path cannot be properly parsed, so users could identify the issue and properly fix the path. Now the error message links to [url format docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3402).
* FEATURE: [VictoriaMetrics enterprise cluster](https://docs.victoriametrics.com/enterprise.html): add `-storageNode.discoveryInterval` command-line flag to `vmselect` and `vminsert` to control load on DNS servers when [automatic discovery of vmstorage nodes](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3417). * FEATURE: [VictoriaMetrics enterprise cluster](https://docs.victoriametrics.com/enterprise.html): add `-storageNode.discoveryInterval` command-line flag to `vmselect` and `vminsert` to control load on DNS servers when [automatic discovery of vmstorage nodes](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3417).

View file

@ -936,12 +936,6 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse `metric_relabel_configs` for `job_name` %q: %w", jobName, err) return nil, fmt.Errorf("cannot parse `metric_relabel_configs` for `job_name` %q: %w", jobName, err)
} }
if (*streamParse || sc.StreamParse) && sc.SampleLimit > 0 {
return nil, fmt.Errorf("cannot use stream parsing mode when `sample_limit` is set for `job_name` %q", jobName)
}
if (*streamParse || sc.StreamParse) && sc.SeriesLimit > 0 {
return nil, fmt.Errorf("cannot use stream parsing mode when `series_limit` is set for `job_name` %q", jobName)
}
externalLabels := globalCfg.ExternalLabels externalLabels := globalCfg.ExternalLabels
noStaleTracking := *noStaleMarkers noStaleTracking := *noStaleMarkers
if sc.NoStaleMarkers != nil { if sc.NoStaleMarkers != nil {

View file

@ -543,6 +543,10 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
// Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses. // Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses.
var sbr streamBodyReader var sbr streamBodyReader
lastScrape := sw.loadLastScrape()
bodyString := ""
areIdenticalSeries := true
samplesDropped := 0
sr, err := sw.GetStreamReader() sr, err := sw.GetStreamReader()
if err != nil { if err != nil {
err = fmt.Errorf("cannot read data: %s", err) err = fmt.Errorf("cannot read data: %s", err)
@ -550,6 +554,8 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
var mu sync.Mutex var mu sync.Mutex
err = sbr.Init(sr) err = sbr.Init(sr)
if err == nil { if err == nil {
bodyString = bytesutil.ToUnsafeString(sbr.body)
areIdenticalSeries = sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString)
err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
@ -557,9 +563,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
for i := range rows { for i := range rows {
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
} }
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
samplesPostRelabeling += len(wc.writeRequest.Timeseries) samplesPostRelabeling += len(wc.writeRequest.Timeseries)
if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit {
wc.resetNoRows() wc.resetNoRows()
@ -567,6 +570,15 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
} }
if sw.seriesLimitExceeded || !areIdenticalSeries {
samplesDropped += sw.applySeriesLimit(wc)
if samplesDropped > 0 && !sw.seriesLimitExceeded {
sw.seriesLimitExceeded = true
}
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
sw.pushData(sw.Config.AuthToken, &wc.writeRequest) sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.resetNoRows() wc.resetNoRows()
return nil return nil
@ -574,9 +586,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
} }
sr.MustClose() sr.MustClose()
} }
lastScrape := sw.loadLastScrape()
bodyString := bytesutil.ToUnsafeString(sbr.body)
areIdenticalSeries := sw.Config.NoStaleMarkers || parser.AreIdenticalSeriesFast(lastScrape, bodyString)
scrapedSamples.Update(float64(samplesScraped)) scrapedSamples.Update(float64(samplesScraped))
endTimestamp := time.Now().UnixNano() / 1e6 endTimestamp := time.Now().UnixNano() / 1e6
@ -603,6 +612,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesScraped: samplesScraped, samplesScraped: samplesScraped,
samplesPostRelabeling: samplesPostRelabeling, samplesPostRelabeling: samplesPostRelabeling,
seriesAdded: seriesAdded, seriesAdded: seriesAdded,
seriesLimitSamplesDropped: samplesDropped,
} }
sw.addAutoMetrics(am, wc, scrapeTimestamp) sw.addAutoMetrics(am, wc, scrapeTimestamp)
sw.pushData(sw.Config.AuthToken, &wc.writeRequest) sw.pushData(sw.Config.AuthToken, &wc.writeRequest)