lib/promscrape: do not send staleness markers on graceful shutdown

This follows Prometheus behavior.

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2013#issuecomment-1006994079
This commit is contained in:
Aliaksandr Valialkin 2022-01-07 01:17:55 +02:00
parent bc03ab6688
commit e4e36383e2
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 33 additions and 22 deletions

View file

@ -301,7 +301,6 @@ You can read more about relabeling in the following articles:
* If the metric disappears from the list of scraped metrics, then stale marker is sent to this particular metric.
* If the scrape target becomes temporarily unavailable, then stale markers are sent for all the metrics scraped from this target.
* If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target.
* Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`.
Prometheus staleness markers' tracking needs additional memory, since it must store the previous response body per each scrape target in order to compare it to the current response body. The memory usage may be reduced by passing `-promscrape.noStaleMarkers` command-line flag to `vmagent`. This disables staleness tracking. This also disables tracking the number of new time series per each scrape with the auto-generated `scrape_series_added` metric. See [these docs](https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series) for details.

View file

@ -14,6 +14,7 @@ sort: 15
* BUGFIX: return the proper response stub from `/api/v1/query_exemplars` handler, which is needed for Grafana v8+. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1999).
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix a few edge cases and improve migration speed for OpenTSDB importer. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2019).
* BUGFIX: fix possible data race when searching for time series matching `{key=~"value|"}` filter over time range covering multipe days. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2032). Thanks to @waldoweng for the provided fix.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send staleness markers on graceful shutdown. This follows Prometheus behavior. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2013#issuecomment-1006994079).
## [v1.71.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.71.0)

View file

@ -305,7 +305,6 @@ You can read more about relabeling in the following articles:
* If the metric disappears from the list of scraped metrics, then stale marker is sent to this particular metric.
* If the scrape target becomes temporarily unavailable, then stale markers are sent for all the metrics scraped from this target.
* If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target.
* Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`.
Prometheus staleness markers' tracking needs additional memory, since it must store the previous response body per each scrape target in order to compare it to the current response body. The memory usage may be reduced by passing `-promscrape.noStaleMarkers` command-line flag to `vmagent`. This disables staleness tracking. This also disables tracking the number of new time series per each scrape with the auto-generated `scrape_series_added` metric. See [these docs](https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series) for details.

View file

@ -52,22 +52,22 @@ func CheckConfig() error {
//
// Scraped data is passed to pushData.
func Init(pushData func(wr *prompbmarshal.WriteRequest)) {
globalStopCh = make(chan struct{})
globalStopChan = make(chan struct{})
scraperWG.Add(1)
go func() {
defer scraperWG.Done()
runScraper(*promscrapeConfigFile, pushData, globalStopCh)
runScraper(*promscrapeConfigFile, pushData, globalStopChan)
}()
}
// Stop stops Prometheus scraper.
func Stop() {
close(globalStopCh)
close(globalStopChan)
scraperWG.Wait()
}
var (
globalStopCh chan struct{}
globalStopChan chan struct{}
scraperWG sync.WaitGroup
// PendingScrapeConfigs - zero value means, that
// all scrapeConfigs are inited and ready for work.
@ -108,7 +108,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
configData.Store(&marshaledData)
cfg.mustStart()
scs := newScrapeConfigs(pushData)
scs := newScrapeConfigs(pushData, globalStopCh)
scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("digitalocean_sd_configs", *digitalocean.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDigitalOceanDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dns.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
@ -184,13 +184,15 @@ type scrapeConfigs struct {
pushData func(wr *prompbmarshal.WriteRequest)
wg sync.WaitGroup
stopCh chan struct{}
globalStopCh <-chan struct{}
scfgs []*scrapeConfig
}
func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConfigs {
func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs {
return &scrapeConfigs{
pushData: pushData,
stopCh: make(chan struct{}),
globalStopCh: globalStopCh,
}
}
@ -209,7 +211,7 @@ func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrap
scs.wg.Add(1)
go func() {
defer scs.wg.Done()
scfg.run()
scfg.run(scs.globalStopCh)
}()
scs.scfgs = append(scs.scfgs, scfg)
}
@ -237,8 +239,8 @@ type scrapeConfig struct {
discoveryDuration *metrics.Histogram
}
func (scfg *scrapeConfig) run() {
sg := newScraperGroup(scfg.name, scfg.pushData)
func (scfg *scrapeConfig) run(globalStopCh <-chan struct{}) {
sg := newScraperGroup(scfg.name, scfg.pushData, globalStopCh)
defer sg.stop()
var tickerCh <-chan time.Time
@ -283,9 +285,11 @@ type scraperGroup struct {
activeScrapers *metrics.Counter
scrapersStarted *metrics.Counter
scrapersStopped *metrics.Counter
globalStopCh <-chan struct{}
}
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup {
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup {
sg := &scraperGroup{
name: name,
m: make(map[string]*scraper),
@ -295,6 +299,8 @@ func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest))
activeScrapers: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_active_scrapers{type=%q}`, name)),
scrapersStarted: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_started_total{type=%q}`, name)),
scrapersStopped: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_stopped_total{type=%q}`, name)),
globalStopCh: globalStopCh,
}
metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="up"}`, name), func() float64 {
return float64(tsmGlobal.StatusByGroup(sg.name, true))
@ -373,7 +379,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
sg.wg.Done()
close(sc.stoppedCh)
}()
sc.sw.run(sc.stopCh)
sc.sw.run(sc.stopCh, sg.globalStopCh)
tsmGlobal.Unregister(sw)
sg.activeScrapers.Dec()
sg.scrapersStopped.Inc()

View file

@ -256,7 +256,7 @@ func (sw *scrapeWork) finalizeLastScrape() {
}
}
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
func (sw *scrapeWork) run(stopCh <-chan struct{}, globalStopCh <-chan struct{}) {
var randSleep uint64
scrapeInterval := sw.Config.ScrapeInterval
scrapeAlignInterval := sw.Config.ScrapeAlignInterval
@ -311,7 +311,13 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
case <-stopCh:
t := time.Now().UnixNano() / 1e6
lastScrape := sw.loadLastScrape()
select {
case <-globalStopCh:
// Do not send staleness markers on graceful shutdown as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2013#issuecomment-1006994079
default:
sw.sendStaleSeries(lastScrape, "", t, true)
}
if sw.seriesLimiter != nil {
job := sw.Config.Job()
metrics.UnregisterMetric(fmt.Sprintf(`promscrape_series_limit_rows_dropped_total{scrape_job_original=%q,scrape_job=%q,scrape_target=%q}`,