diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 0d95d837e..f9e66dc52 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -36,7 +36,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did * Uses lower amounts of RAM, CPU, disk IO and network bandwidth compared with Prometheus. * Scrape targets can be spread among multiple `vmagent` instances when big number of targets must be scraped. See [these docs](#scraping-big-number-of-targets). * Can efficiently scrape targets that expose millions of time series such as [/federate endpoint in Prometheus](https://prometheus.io/docs/prometheus/latest/federation/). See [these docs](#stream-parsing-mode). -* Can deal with [high cardinality](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues by limiting the number of unique time series sent to remote storage systems. See [these docs](#cardinality-limiter). +* Can deal with [high cardinality](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues by limiting the number of unique time series at scrape time and before sending them to remote storage systems. See [these docs](#cardinality-limiter). * Can load scrape configs from multiple files. See [these docs](#loading-scrape-configs-from-multiple-files). ## Quick Start @@ -196,7 +196,12 @@ Please file feature requests to [our issue tracker](https://github.com/VictoriaM to save network bandwidth. * `disable_keepalive: true` - to disable [HTTP keep-alive connections](https://en.wikipedia.org/wiki/HTTP_persistent_connection) on a per-job basis. By default, `vmagent` uses keep-alive connections to scrape targets to reduce overhead on connection re-establishing. +* `series_limit: N` - for limiting the number of unique time series a single scrape target can expose. See [these docs](#cardinality-limiter). * `stream_parse: true` - for scraping targets in a streaming manner. This may be useful for targets exporting big number of metrics. See [these docs](#stream-parsing-mode). +* `scrape_align_interval: duration` - for aligning scrapes to the given interval instead of using random offset in the range `[0 ... scrape_interval]` for scraping each target. The random offset helps spreading scrapes evenly in time. +* `scrape_offset: duration` - for specifying the exact offset for scraping instead of using random offset in the range `[0 ... scrape_interval]`. +* `relabel_debug: true` - for enabling debug logging during relabeling of the discovered targets. See [these docs](#relabeling). +* `metric_relabel_debug: true` - for enabling debug logging during relabeling of the scraped metrics. See [these docs](#relabeling). Note that `vmagent` doesn't support `refresh_interval` option for these scrape configs. Use the corresponding `-promscrape.*CheckInterval` command-line flag instead. For example, `-promscrape.consulSDCheckInterval=60s` sets `refresh_interval` for all the `consul_sd_configs` @@ -359,6 +364,10 @@ scrape_configs: ## Cardinality limiter +By default `vmagent` doesn't limit the number of time series each scrape target can expose. The limit can be enforced across all the scrape targets by specifying `-promscrape.seriesLimitPerTarget` command-line option. The limit also can be specified via `series_limit` option at `scrape_config` section. All the scraped metrics are dropped for time series exceeding the given limit. The exceeded limit can be [monitored](#monitoring) via `promscrape_series_limit_rows_dropped_total` metric, which shows the number of metrics dropped due to the exceeded limit. + +See also `sample_limit` option at [scrape_config section](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config). + By default `vmagent` doesn't limit the number of time series written to remote storage systems specified at `-remoteWrite.url`. The limit can be enforced by setting the following command-line flags: * `-remoteWrite.maxHourlySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last hour. Useful for limiting the number of active time series. diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 2bbcc2204..00a8d34f4 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -218,6 +218,13 @@ func Stop() { } } rwctxsMap = nil + + if sl := hourlySeriesLimiter; sl != nil { + sl.MustStop() + } + if sl := dailySeriesLimiter; sl != nil { + sl.MustStop() + } } // Push sends wr to remote storage systems set via `-remoteWrite.url`. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 4db267fa2..060b4c94e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,7 @@ sort: 15 * FEATURE: vmagent: add ability to read scrape configs from multiple files specified in `scrape_config_files` section. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1559). * FEATURE: vmagent: reduce memory usage and CPU usage when [Prometheus staleness tracking](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) is enabled for metrics exported from the deleted or disappeared scrape targets. +* FEATURE: vmagent: add the ability to limit the number of unique time series scraped per each target. This can be done either globally via `-promscrape.seriesLimitPerTarget` command-line option or on per-target basis via `series_limit` option at `scrape_config` section. See [the updated docs on cardinality limiter](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1561). * FEATURE: vmagent: discover `role: ingress` and `role: endpointslice` in [kubernetes_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config) via v1 API instead of v1beta1 API if Kubernetes supports it. This fixes service discovery in Kubernetes v1.22 and newer versions. See [these docs](https://kubernetes.io/docs/reference/using-api/deprecation-guide/#ingress-v122). * FEATURE: take into account failed queries in `vm_request_duration_seconds` summary at `/metrics`. Previously only successful queries were taken into account. This could result in skewed summary. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1537). * FEATURE: vmalert: add an official dashboard for vmalert. See [these docs](https://docs.victoriametrics.com/vmalert.html#monitoring). diff --git a/docs/vmagent.md b/docs/vmagent.md index d6ae0550c..9137e8978 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -40,7 +40,7 @@ to `vmagent` such as the ability to push metrics instead of pulling them. We did * Uses lower amounts of RAM, CPU, disk IO and network bandwidth compared with Prometheus. * Scrape targets can be spread among multiple `vmagent` instances when big number of targets must be scraped. See [these docs](#scraping-big-number-of-targets). * Can efficiently scrape targets that expose millions of time series such as [/federate endpoint in Prometheus](https://prometheus.io/docs/prometheus/latest/federation/). See [these docs](#stream-parsing-mode). -* Can deal with [high cardinality](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues by limiting the number of unique time series sent to remote storage systems. See [these docs](#cardinality-limiter). +* Can deal with [high cardinality](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues by limiting the number of unique time series at scrape time and before sending them to remote storage systems. See [these docs](#cardinality-limiter). * Can load scrape configs from multiple files. See [these docs](#loading-scrape-configs-from-multiple-files). ## Quick Start @@ -200,7 +200,12 @@ Please file feature requests to [our issue tracker](https://github.com/VictoriaM to save network bandwidth. * `disable_keepalive: true` - to disable [HTTP keep-alive connections](https://en.wikipedia.org/wiki/HTTP_persistent_connection) on a per-job basis. By default, `vmagent` uses keep-alive connections to scrape targets to reduce overhead on connection re-establishing. +* `series_limit: N` - for limiting the number of unique time series a single scrape target can expose. See [these docs](#cardinality-limiter). * `stream_parse: true` - for scraping targets in a streaming manner. This may be useful for targets exporting big number of metrics. See [these docs](#stream-parsing-mode). +* `scrape_align_interval: duration` - for aligning scrapes to the given interval instead of using random offset in the range `[0 ... scrape_interval]` for scraping each target. The random offset helps spreading scrapes evenly in time. +* `scrape_offset: duration` - for specifying the exact offset for scraping instead of using random offset in the range `[0 ... scrape_interval]`. +* `relabel_debug: true` - for enabling debug logging during relabeling of the discovered targets. See [these docs](#relabeling). +* `metric_relabel_debug: true` - for enabling debug logging during relabeling of the scraped metrics. See [these docs](#relabeling). Note that `vmagent` doesn't support `refresh_interval` option for these scrape configs. Use the corresponding `-promscrape.*CheckInterval` command-line flag instead. For example, `-promscrape.consulSDCheckInterval=60s` sets `refresh_interval` for all the `consul_sd_configs` @@ -363,6 +368,10 @@ scrape_configs: ## Cardinality limiter +By default `vmagent` doesn't limit the number of time series each scrape target can expose. The limit can be enforced across all the scrape targets by specifying `-promscrape.seriesLimitPerTarget` command-line option. The limit also can be specified via `series_limit` option at `scrape_config` section. All the scraped metrics are dropped for time series exceeding the given limit. The exceeded limit can be [monitored](#monitoring) via `promscrape_series_limit_rows_dropped_total` metric, which shows the number of metrics dropped due to the exceeded limit. + +See also `sample_limit` option at [scrape_config section](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config). + By default `vmagent` doesn't limit the number of time series written to remote storage systems specified at `-remoteWrite.url`. The limit can be enforced by setting the following command-line flags: * `-remoteWrite.maxHourlySeries` - limits the number of unique time series `vmagent` can write to remote storage systems during the last hour. Useful for limiting the number of active time series. diff --git a/lib/bloomfilter/limiter.go b/lib/bloomfilter/limiter.go index 152d999b0..1d047414b 100644 --- a/lib/bloomfilter/limiter.go +++ b/lib/bloomfilter/limiter.go @@ -1,6 +1,7 @@ package bloomfilter import ( + "sync" "sync/atomic" "time" ) @@ -11,23 +12,42 @@ import ( type Limiter struct { maxItems int v atomic.Value + + wg sync.WaitGroup + stopCh chan struct{} } // NewLimiter creates new Limiter, which can hold up to maxItems unique items during the given refreshInterval. func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter { l := &Limiter{ maxItems: maxItems, + stopCh: make(chan struct{}), } l.v.Store(newLimiter(maxItems)) + l.wg.Add(1) go func() { + defer l.wg.Done() + t := time.NewTicker(refreshInterval) + defer t.Stop() for { - time.Sleep(refreshInterval) - l.v.Store(newLimiter(maxItems)) + select { + case <-t.C: + l.v.Store(newLimiter(maxItems)) + case <-l.stopCh: + return + } } }() return l } +// MustStop stops the given limiter. +// It is expected that nobody access the limiter at MustStop call. +func (l *Limiter) MustStop() { + close(l.stopCh) + l.wg.Wait() +} + // MaxItems returns the maxItems passed to NewLimiter. func (l *Limiter) MaxItems() int { return l.maxItems diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 80c59ad40..1210485d1 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -151,6 +151,7 @@ type ScrapeConfig struct { StreamParse bool `yaml:"stream_parse,omitempty"` ScrapeAlignInterval time.Duration `yaml:"scrape_align_interval,omitempty"` ScrapeOffset time.Duration `yaml:"scrape_offset,omitempty"` + SeriesLimit int `yaml:"series_limit,omitempty"` ProxyClientConfig promauth.ProxyClientConfig `yaml:",inline"` // This is set in loadConfig @@ -773,6 +774,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf streamParse: sc.StreamParse, scrapeAlignInterval: sc.ScrapeAlignInterval, scrapeOffset: sc.ScrapeOffset, + seriesLimit: sc.SeriesLimit, } return swc, nil } @@ -799,6 +801,7 @@ type scrapeWorkConfig struct { streamParse bool scrapeAlignInterval time.Duration scrapeOffset time.Duration + seriesLimit int } type targetLabelsGetter interface { @@ -1066,6 +1069,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel StreamParse: swc.streamParse, ScrapeAlignInterval: swc.scrapeAlignInterval, ScrapeOffset: swc.scrapeOffset, + SeriesLimit: swc.seriesLimit, jobNameOriginal: swc.jobName, } diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index 5ce96b4fd..00a732e4a 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -1344,6 +1344,7 @@ scrape_configs: stream_parse: true scrape_align_interval: 1s scrape_offset: 0.5s + series_limit: 123 static_configs: - targets: - 192.168.1.2 # SNMP device. @@ -1400,6 +1401,7 @@ scrape_configs: StreamParse: true, ScrapeAlignInterval: time.Second, ScrapeOffset: 500 * time.Millisecond, + SeriesLimit: 123, jobNameOriginal: "snmp", }, }) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index a62459c5f..8e0a1fe8a 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" @@ -26,7 +27,8 @@ import ( var ( suppressScrapeErrors = flag.Bool("promscrape.suppressScrapeErrors", false, "Whether to suppress scrape errors logging. "+ "The last error for each target is always available at '/targets' page even if scrape errors logging is suppressed") - noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable seding Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. See also https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode") + noStaleMarkers = flag.Bool("promscrape.noStaleMarkers", false, "Whether to disable seding Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. See also https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode") + seriesLimitPerTarget = flag.Int("promscrape.seriesLimitPerTarget", 0, "Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter for more info") ) // ScrapeWork represents a unit of work for scraping Prometheus metrics. @@ -103,6 +105,9 @@ type ScrapeWork struct { // The offset for the first scrape. ScrapeOffset time.Duration + // Optional limit on the number of unique series the scrape target can expose. + SeriesLimit int + // The original 'job_name' jobNameOriginal string } @@ -114,11 +119,11 @@ func (sw *ScrapeWork) key() string { // Do not take into account OriginalLabels. key := fmt.Sprintf("ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, DenyRedirects=%v, Labels=%s, "+ "ProxyURL=%s, ProxyAuthConfig=%s, AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v, "+ - "ScrapeAlignInterval=%s, ScrapeOffset=%s", + "ScrapeAlignInterval=%s, ScrapeOffset=%s, SeriesLimit=%d", sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.DenyRedirects, sw.LabelsString(), sw.ProxyURL.String(), sw.ProxyAuthConfig.String(), sw.AuthConfig.String(), sw.MetricRelabelConfigs.String(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive, sw.StreamParse, - sw.ScrapeAlignInterval, sw.ScrapeOffset) + sw.ScrapeAlignInterval, sw.ScrapeOffset, sw.SeriesLimit) return key } @@ -178,6 +183,9 @@ type scrapeWork struct { seriesAdded int labelsHashBuf []byte + // Optional limiter on the number of unique series per scrape target. + seriesLimiter *bloomfilter.Limiter + // prevBodyLen contains the previous response body length for the given scrape work. // It is used as a hint in order to reduce memory usage for body buffers. prevBodyLen int @@ -241,6 +249,9 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) { case <-stopCh: t := time.Now().UnixNano() / 1e6 sw.sendStaleMarkersForLastScrape(t, true) + if sw.seriesLimiter != nil { + sw.seriesLimiter.MustStop() + } return case tt := <-ticker.C: t := tt.UnixNano() / 1e6 @@ -481,13 +492,31 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { sw.seriesMap = make(map[uint64]struct{}, len(wc.writeRequest.Timeseries)) } m := sw.seriesMap + seriesLimit := *seriesLimitPerTarget + if sw.Config.SeriesLimit > 0 { + seriesLimit = sw.Config.SeriesLimit + } + if sw.seriesLimiter == nil && seriesLimit > 0 { + sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour) + } + hsl := sw.seriesLimiter + dstSeries := wc.writeRequest.Timeseries[:0] for _, ts := range wc.writeRequest.Timeseries { h := sw.getLabelsHash(ts.Labels) + if hsl != nil && !hsl.Add(h) { + // The limit on the number of hourly unique series per scrape target has been exceeded. + // Drop the metric. + metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_series_limit_rows_dropped_total{job=%q,target=%q}`, + sw.Config.jobNameOriginal, sw.Config.ScrapeURL)).Inc() + continue + } + dstSeries = append(dstSeries, ts) if _, ok := m[h]; !ok { m[h] = struct{}{} sw.seriesAdded++ } } + wc.writeRequest.Timeseries = dstSeries } func (sw *scrapeWork) updateLastScrape(response string) { diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index c98bb0fb9..b19093c7d 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -333,6 +333,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { `, &ScrapeWork{ HonorLabels: true, SampleLimit: 1, + SeriesLimit: 123, }, ` up 0 123 scrape_samples_scraped 2 123 diff --git a/lib/storage/storage.go b/lib/storage/storage.go index f6829cee3..dd4d70ed2 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -699,6 +699,14 @@ func (s *Storage) MustClose() { if err := s.flockF.Close(); err != nil { logger.Panicf("FATAL: cannot close lock file %q: %s", s.flockF.Name(), err) } + + // Stop series limiters. + if sl := s.hourlySeriesLimiter; sl != nil { + sl.MustStop() + } + if sl := s.dailySeriesLimiter; sl != nil { + sl.MustStop() + } } func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {