2020-02-23 11:35:47 +00:00
package promscrape
import (
2023-02-09 19:13:06 +00:00
"context"
2020-02-23 11:35:47 +00:00
"flag"
2020-05-03 09:41:13 +00:00
"fmt"
2021-10-12 13:23:42 +00:00
"io"
2020-02-23 11:35:47 +00:00
"sync"
2020-11-04 18:29:18 +00:00
"sync/atomic"
2020-02-23 11:35:47 +00:00
"time"
2024-10-14 07:26:00 +00:00
"github.com/VictoriaMetrics/metrics"
2022-11-21 22:38:43 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-05-05 06:27:38 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2022-07-13 20:43:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
2020-12-03 17:47:40 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
2023-05-04 09:36:21 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consulagent"
2021-06-25 09:10:20 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
2021-06-25 10:20:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
2021-06-25 09:10:20 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
2024-01-15 09:13:22 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/hetzner"
2021-06-22 10:33:37 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
2021-06-25 09:10:20 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
2023-02-22 12:59:56 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kuma"
2024-10-14 07:26:00 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/marathon"
2023-01-05 22:03:58 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
2021-06-25 09:10:20 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
2024-09-30 12:42:46 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ovhcloud"
2024-05-08 08:01:48 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/vultr"
2022-08-04 17:44:16 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
2022-11-30 05:22:12 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
2020-02-23 11:35:47 +00:00
)
var (
2023-10-25 21:19:33 +00:00
configCheckInterval = flag . Duration ( "promscrape.configCheckInterval" , 0 , "Interval for checking for changes in -promscrape.config file. " +
2024-04-17 23:31:37 +00:00
"By default, the checking is disabled. See how to reload -promscrape.config file at https://docs.victoriametrics.com/vmagent/#configuration-update" )
2021-03-15 19:59:25 +00:00
suppressDuplicateScrapeTargetErrors = flag . Bool ( "promscrape.suppressDuplicateScrapeTargetErrors" , false , "Whether to suppress 'duplicate scrape target' errors; " +
2024-04-17 23:31:37 +00:00
"see https://docs.victoriametrics.com/vmagent/#troubleshooting for details" )
2021-06-25 09:10:20 +00:00
promscrapeConfigFile = flag . String ( "promscrape.config" , "" , "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. " +
2021-12-02 22:08:42 +00:00
"The path can point to local file and to http url. " +
2021-06-25 09:10:20 +00:00
"See https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details" )
2022-08-14 22:40:20 +00:00
fileSDCheckInterval = flag . Duration ( "promscrape.fileSDCheckInterval" , time . Minute , "Interval for checking for changes in 'file_sd_config'. " +
2024-04-18 00:27:47 +00:00
"See https://docs.victoriametrics.com/sd_configs/#file_sd_configs for details" )
2020-02-23 11:35:47 +00:00
)
2020-05-21 12:22:01 +00:00
// CheckConfig checks -promscrape.config for errors and unsupported options.
func CheckConfig ( ) error {
if * promscrapeConfigFile == "" {
2023-04-01 04:27:45 +00:00
return nil
2020-05-21 12:22:01 +00:00
}
2023-10-25 21:19:33 +00:00
_ , err := loadConfig ( * promscrapeConfigFile )
2020-05-21 12:22:01 +00:00
return err
}
2020-02-23 11:35:47 +00:00
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
2023-11-25 09:31:30 +00:00
func Init ( pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) ) {
2022-04-12 09:36:17 +00:00
mustInitClusterMemberID ( )
2022-01-06 23:17:55 +00:00
globalStopChan = make ( chan struct { } )
2020-02-23 11:35:47 +00:00
scraperWG . Add ( 1 )
go func ( ) {
defer scraperWG . Done ( )
2023-11-25 09:31:30 +00:00
runScraper ( * promscrapeConfigFile , pushData , globalStopChan )
2020-02-23 11:35:47 +00:00
} ( )
}
// Stop stops Prometheus scraper.
func Stop ( ) {
2022-01-06 23:17:55 +00:00
close ( globalStopChan )
2020-02-23 11:35:47 +00:00
scraperWG . Wait ( )
}
var (
2022-01-06 23:17:55 +00:00
globalStopChan chan struct { }
scraperWG sync . WaitGroup
2024-02-24 00:07:51 +00:00
// PendingScrapeConfigs - zero value means, that all scrapeConfigs are inited and ready for work.
PendingScrapeConfigs atomic . Int32
2021-10-12 13:23:42 +00:00
// configData contains -promscrape.config data
2023-07-20 00:37:49 +00:00
configData atomic . Pointer [ [ ] byte ]
2020-02-23 11:35:47 +00:00
)
2021-10-12 13:23:42 +00:00
// WriteConfigData writes -promscrape.config contents to w
func WriteConfigData ( w io . Writer ) {
2023-07-20 00:37:49 +00:00
p := configData . Load ( )
if p == nil {
2021-10-12 13:23:42 +00:00
// Nothing to write to w
return
}
2023-07-20 00:37:49 +00:00
_ , _ = w . Write ( * p )
2021-10-12 13:23:42 +00:00
}
2022-08-08 11:10:18 +00:00
func runScraper ( configFile string , pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) {
2020-02-23 11:35:47 +00:00
if configFile == "" {
// Nothing to scrape.
return
}
2023-02-27 19:53:53 +00:00
metrics . RegisterSet ( configMetricsSet )
2021-05-21 13:34:03 +00:00
// Register SIGHUP handler for config reload before loadConfig.
// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
sighupCh := procutil . NewSighupChan ( )
2023-10-25 21:19:33 +00:00
logger . Infof ( "reading scrape configs from %q" , configFile )
cfg , err := loadConfig ( configFile )
2020-02-23 11:35:47 +00:00
if err != nil {
logger . Fatalf ( "cannot read %q: %s" , configFile , err )
}
2021-11-05 12:41:14 +00:00
marshaledData := cfg . marshal ( )
configData . Store ( & marshaledData )
2021-04-05 19:02:09 +00:00
cfg . mustStart ( )
2020-02-23 11:35:47 +00:00
2022-11-21 22:38:43 +00:00
configSuccess . Set ( 1 )
configTimestamp . Set ( fasttime . UnixTimestamp ( ) )
2022-01-06 23:17:55 +00:00
scs := newScrapeConfigs ( pushData , globalStopCh )
2022-07-13 20:43:18 +00:00
scs . add ( "azure_sd_configs" , * azure . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getAzureSDScrapeWork ( swsPrev ) } )
2020-12-08 15:50:03 +00:00
scs . add ( "consul_sd_configs" , * consul . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getConsulSDScrapeWork ( swsPrev ) } )
2023-05-04 09:36:21 +00:00
scs . add ( "consulagent_sd_configs" , * consulagent . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getConsulAgentSDScrapeWork ( swsPrev ) } )
2021-06-25 09:10:20 +00:00
scs . add ( "digitalocean_sd_configs" , * digitalocean . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDigitalOceanDScrapeWork ( swsPrev ) } )
scs . add ( "dns_sd_configs" , * dns . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDNSSDScrapeWork ( swsPrev ) } )
scs . add ( "docker_sd_configs" , * docker . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDockerSDScrapeWork ( swsPrev ) } )
2021-06-25 10:20:18 +00:00
scs . add ( "dockerswarm_sd_configs" , * dockerswarm . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getDockerSwarmSDScrapeWork ( swsPrev ) } )
2021-06-25 09:10:20 +00:00
scs . add ( "ec2_sd_configs" , * ec2 . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getEC2SDScrapeWork ( swsPrev ) } )
scs . add ( "eureka_sd_configs" , * eureka . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getEurekaSDScrapeWork ( swsPrev ) } )
scs . add ( "file_sd_configs" , * fileSDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getFileSDScrapeWork ( swsPrev ) } )
scs . add ( "gce_sd_configs" , * gce . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getGCESDScrapeWork ( swsPrev ) } )
2024-01-20 14:52:41 +00:00
scs . add ( "hetzner_sd_configs" , * hetzner . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getHetznerSDScrapeWork ( swsPrev ) } )
2021-06-22 10:33:37 +00:00
scs . add ( "http_sd_configs" , * http . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getHTTPDScrapeWork ( swsPrev ) } )
2021-06-25 09:10:20 +00:00
scs . add ( "kubernetes_sd_configs" , * kubernetes . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getKubernetesSDScrapeWork ( swsPrev ) } )
2023-02-22 12:59:56 +00:00
scs . add ( "kuma_sd_configs" , * kuma . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getKumaSDScrapeWork ( swsPrev ) } )
2024-10-14 07:26:00 +00:00
scs . add ( "marathon_sd_configs" , * marathon . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getMarathonSDScrapeWork ( swsPrev ) } )
2023-01-05 22:03:58 +00:00
scs . add ( "nomad_sd_configs" , * nomad . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getNomadSDScrapeWork ( swsPrev ) } )
2021-06-25 09:10:20 +00:00
scs . add ( "openstack_sd_configs" , * openstack . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getOpenStackSDScrapeWork ( swsPrev ) } )
2024-09-30 12:42:46 +00:00
scs . add ( "ovhcloud_sd_configs" , * ovhcloud . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getOVHCloudSDScrapeWork ( swsPrev ) } )
2024-05-08 08:01:48 +00:00
scs . add ( "vultr_sd_configs" , * vultr . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getVultrSDScrapeWork ( swsPrev ) } )
2022-08-04 17:44:16 +00:00
scs . add ( "yandexcloud_sd_configs" , * yandexcloud . SDCheckInterval , func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getYandexCloudSDScrapeWork ( swsPrev ) } )
2024-04-02 20:16:24 +00:00
scs . add ( "static_configs" , 0 , func ( cfg * Config , _ [ ] * ScrapeWork ) [ ] * ScrapeWork { return cfg . getStaticScrapeWork ( ) } )
2020-05-03 09:41:13 +00:00
2020-04-23 20:40:50 +00:00
var tickerCh <- chan time . Time
if * configCheckInterval > 0 {
ticker := time . NewTicker ( * configCheckInterval )
tickerCh = ticker . C
defer ticker . Stop ( )
}
2020-05-03 09:41:13 +00:00
for {
scs . updateConfig ( cfg )
2020-02-23 11:35:47 +00:00
waitForChans :
select {
case <- sighupCh :
logger . Infof ( "SIGHUP received; reloading Prometheus configs from %q" , configFile )
2023-10-25 21:19:33 +00:00
cfgNew , err := loadConfig ( configFile )
2020-04-23 20:40:50 +00:00
if err != nil {
2022-11-21 22:38:43 +00:00
configReloadErrors . Inc ( )
configSuccess . Set ( 0 )
2020-04-23 20:40:50 +00:00
logger . Errorf ( "cannot read %q on SIGHUP: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
2023-10-25 21:19:33 +00:00
configSuccess . Set ( 1 )
if ! cfgNew . mustRestart ( cfg ) {
2020-04-23 20:40:50 +00:00
logger . Infof ( "nothing changed in %q" , configFile )
goto waitForChans
}
cfg = cfgNew
2023-10-25 21:19:33 +00:00
marshaledData = cfg . marshal ( )
2021-11-05 12:41:14 +00:00
configData . Store ( & marshaledData )
2023-10-25 21:19:33 +00:00
configReloads . Inc ( )
configTimestamp . Set ( fasttime . UnixTimestamp ( ) )
2020-04-23 20:40:50 +00:00
case <- tickerCh :
2023-10-25 21:19:33 +00:00
cfgNew , err := loadConfig ( configFile )
2020-02-23 11:35:47 +00:00
if err != nil {
2022-11-21 22:38:43 +00:00
configReloadErrors . Inc ( )
configSuccess . Set ( 0 )
2020-02-23 11:35:47 +00:00
logger . Errorf ( "cannot read %q: %s; continuing with the previous config" , configFile , err )
goto waitForChans
}
2023-10-25 21:19:33 +00:00
configSuccess . Set ( 1 )
if ! cfgNew . mustRestart ( cfg ) {
2020-04-23 20:40:50 +00:00
goto waitForChans
}
2020-02-23 11:35:47 +00:00
cfg = cfgNew
2023-10-25 21:19:33 +00:00
marshaledData = cfg . marshal ( )
2021-11-05 12:41:14 +00:00
configData . Store ( & marshaledData )
2023-10-25 21:19:33 +00:00
configReloads . Inc ( )
configTimestamp . Set ( fasttime . UnixTimestamp ( ) )
2020-02-23 11:35:47 +00:00
case <- globalStopCh :
2021-03-01 12:13:56 +00:00
cfg . mustStop ( )
2020-05-03 09:41:13 +00:00
logger . Infof ( "stopping Prometheus scrapers" )
startTime := time . Now ( )
scs . stop ( )
logger . Infof ( "stopped Prometheus scrapers in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
return
2020-04-23 20:40:50 +00:00
}
2020-02-23 11:35:47 +00:00
}
}
2022-11-21 22:38:43 +00:00
var (
2023-02-27 19:53:53 +00:00
configMetricsSet = metrics . NewSet ( )
configReloads = configMetricsSet . NewCounter ( ` vm_promscrape_config_reloads_total ` )
configReloadErrors = configMetricsSet . NewCounter ( ` vm_promscrape_config_reloads_errors_total ` )
2023-12-20 12:23:38 +00:00
configSuccess = configMetricsSet . NewGauge ( ` vm_promscrape_config_last_reload_successful ` , nil )
2023-02-27 19:53:53 +00:00
configTimestamp = configMetricsSet . NewCounter ( ` vm_promscrape_config_last_reload_success_timestamp_seconds ` )
2022-11-21 22:38:43 +00:00
)
2020-02-23 11:35:47 +00:00
2020-05-03 09:41:13 +00:00
type scrapeConfigs struct {
2022-08-08 11:10:18 +00:00
pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest )
2022-01-06 23:17:55 +00:00
wg sync . WaitGroup
stopCh chan struct { }
globalStopCh <- chan struct { }
scfgs [ ] * scrapeConfig
2020-02-23 11:35:47 +00:00
}
2022-08-08 11:10:18 +00:00
func newScrapeConfigs ( pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) * scrapeConfigs {
2020-05-03 09:41:13 +00:00
return & scrapeConfigs {
2022-01-06 23:17:55 +00:00
pushData : pushData ,
stopCh : make ( chan struct { } ) ,
globalStopCh : globalStopCh ,
2020-04-13 18:02:27 +00:00
}
2020-05-03 09:41:13 +00:00
}
2020-04-13 18:02:27 +00:00
2020-12-08 15:50:03 +00:00
func ( scs * scrapeConfigs ) add ( name string , checkInterval time . Duration , getScrapeWork func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork ) {
2024-02-24 00:07:51 +00:00
PendingScrapeConfigs . Add ( 1 )
2020-05-03 09:41:13 +00:00
scfg := & scrapeConfig {
name : name ,
pushData : scs . pushData ,
getScrapeWork : getScrapeWork ,
checkInterval : checkInterval ,
cfgCh : make ( chan * Config , 1 ) ,
stopCh : scs . stopCh ,
2021-02-02 14:13:59 +00:00
discoveryDuration : metrics . GetOrCreateHistogram ( fmt . Sprintf ( "vm_promscrape_service_discovery_duration_seconds{type=%q}" , name ) ) ,
2020-04-13 18:02:27 +00:00
}
2020-05-03 09:41:13 +00:00
scs . wg . Add ( 1 )
go func ( ) {
defer scs . wg . Done ( )
2022-01-06 23:17:55 +00:00
scfg . run ( scs . globalStopCh )
2020-05-03 09:41:13 +00:00
} ( )
scs . scfgs = append ( scs . scfgs , scfg )
2020-04-13 18:02:27 +00:00
}
2020-05-03 09:41:13 +00:00
func ( scs * scrapeConfigs ) updateConfig ( cfg * Config ) {
for _ , scfg := range scs . scfgs {
scfg . cfgCh <- cfg
2020-04-27 16:25:45 +00:00
}
2020-05-03 09:41:13 +00:00
}
2020-04-27 16:25:45 +00:00
2020-05-03 09:41:13 +00:00
func ( scs * scrapeConfigs ) stop ( ) {
close ( scs . stopCh )
scs . wg . Wait ( )
scs . scfgs = nil
2020-04-27 16:25:45 +00:00
}
2020-05-03 09:41:13 +00:00
type scrapeConfig struct {
name string
2022-08-08 11:10:18 +00:00
pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest )
2020-12-08 15:50:03 +00:00
getScrapeWork func ( cfg * Config , swsPrev [ ] * ScrapeWork ) [ ] * ScrapeWork
2020-05-03 09:41:13 +00:00
checkInterval time . Duration
cfgCh chan * Config
stopCh <- chan struct { }
2021-02-02 14:13:59 +00:00
discoveryDuration * metrics . Histogram
2020-05-03 09:41:13 +00:00
}
2020-04-27 16:25:45 +00:00
2022-01-06 23:17:55 +00:00
func ( scfg * scrapeConfig ) run ( globalStopCh <- chan struct { } ) {
sg := newScraperGroup ( scfg . name , scfg . pushData , globalStopCh )
2020-05-03 09:41:13 +00:00
defer sg . stop ( )
2020-04-24 14:50:21 +00:00
2020-05-03 09:41:13 +00:00
var tickerCh <- chan time . Time
if scfg . checkInterval > 0 {
ticker := time . NewTicker ( scfg . checkInterval )
defer ticker . Stop ( )
tickerCh = ticker . C
2020-04-24 14:50:21 +00:00
}
2020-05-03 09:41:13 +00:00
cfg := <- scfg . cfgCh
2020-12-08 15:50:03 +00:00
var swsPrev [ ] * ScrapeWork
2020-11-04 18:29:18 +00:00
updateScrapeWork := func ( cfg * Config ) {
2021-04-05 10:53:26 +00:00
startTime := time . Now ( )
sws := scfg . getScrapeWork ( cfg , swsPrev )
sg . update ( sws )
swsPrev = sws
2022-06-07 12:46:44 +00:00
if sg . scrapersStarted . Get ( ) > 0 {
// update duration only if at least one scraper has started
// otherwise this SD is considered as inactive
scfg . discoveryDuration . UpdateDuration ( startTime )
}
2020-11-04 18:29:18 +00:00
}
updateScrapeWork ( cfg )
2024-02-24 00:07:51 +00:00
PendingScrapeConfigs . Add ( - 1 )
2020-11-04 18:29:18 +00:00
for {
2020-04-24 14:50:21 +00:00
2020-02-23 11:35:47 +00:00
select {
2020-05-03 09:41:13 +00:00
case <- scfg . stopCh :
return
case cfg = <- scfg . cfgCh :
case <- tickerCh :
2020-02-23 11:35:47 +00:00
}
2020-11-04 18:29:18 +00:00
updateScrapeWork ( cfg )
2020-02-23 11:35:47 +00:00
}
}
2020-05-03 09:41:13 +00:00
type scraperGroup struct {
2020-12-07 23:54:13 +00:00
name string
wg sync . WaitGroup
mLock sync . Mutex
m map [ string ] * scraper
2022-08-08 11:10:18 +00:00
pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest )
2020-12-07 23:54:13 +00:00
2020-12-08 09:57:52 +00:00
changesCount * metrics . Counter
activeScrapers * metrics . Counter
scrapersStarted * metrics . Counter
scrapersStopped * metrics . Counter
2022-01-06 23:17:55 +00:00
globalStopCh <- chan struct { }
2020-05-03 09:41:13 +00:00
}
2020-02-23 11:35:47 +00:00
2022-08-08 11:10:18 +00:00
func newScraperGroup ( name string , pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) , globalStopCh <- chan struct { } ) * scraperGroup {
2020-05-03 09:41:13 +00:00
sg := & scraperGroup {
2020-12-07 23:54:13 +00:00
name : name ,
m : make ( map [ string ] * scraper ) ,
pushData : pushData ,
2020-12-08 09:57:52 +00:00
changesCount : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_config_changes_total { type=%q} ` , name ) ) ,
activeScrapers : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_active_scrapers { type=%q} ` , name ) ) ,
scrapersStarted : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_scrapers_started_total { type=%q} ` , name ) ) ,
scrapersStopped : metrics . NewCounter ( fmt . Sprintf ( ` vm_promscrape_scrapers_stopped_total { type=%q} ` , name ) ) ,
2022-01-06 23:17:55 +00:00
globalStopCh : globalStopCh ,
2020-02-23 11:35:47 +00:00
}
2020-07-13 18:52:03 +00:00
metrics . NewGauge ( fmt . Sprintf ( ` vm_promscrape_targets { type=%q, status="up"} ` , name ) , func ( ) float64 {
return float64 ( tsmGlobal . StatusByGroup ( sg . name , true ) )
} )
metrics . NewGauge ( fmt . Sprintf ( ` vm_promscrape_targets { type=%q, status="down"} ` , name ) , func ( ) float64 {
return float64 ( tsmGlobal . StatusByGroup ( sg . name , false ) )
2020-05-03 09:41:13 +00:00
} )
return sg
2020-02-23 11:35:47 +00:00
}
2020-05-03 09:41:13 +00:00
func ( sg * scraperGroup ) stop ( ) {
sg . mLock . Lock ( )
for _ , sc := range sg . m {
2023-02-09 19:13:06 +00:00
sc . cancel ( )
2020-02-23 11:35:47 +00:00
}
2020-05-03 09:41:13 +00:00
sg . m = nil
sg . mLock . Unlock ( )
sg . wg . Wait ( )
2020-02-23 11:35:47 +00:00
}
2021-04-05 10:53:26 +00:00
func ( sg * scraperGroup ) update ( sws [ ] * ScrapeWork ) {
2020-05-03 09:41:13 +00:00
sg . mLock . Lock ( )
defer sg . mLock . Unlock ( )
additionsCount := 0
deletionsCount := 0
2022-11-30 05:22:12 +00:00
swsMap := make ( map [ string ] * promutils . Labels , len ( sws ) )
2021-08-16 21:52:42 +00:00
var swsToStart [ ] * ScrapeWork
2020-12-08 15:50:03 +00:00
for _ , sw := range sws {
2020-05-03 09:41:13 +00:00
key := sw . key ( )
2021-11-29 23:12:24 +00:00
originalLabels , ok := swsMap [ key ]
if ok {
2020-10-08 15:50:22 +00:00
if ! * suppressDuplicateScrapeTargetErrors {
logger . Errorf ( "skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; " +
"make sure service discovery and relabeling is set up properly; " +
2024-04-17 23:31:37 +00:00
"see also https://docs.victoriametrics.com/vmagent/#troubleshooting; " +
2020-10-08 15:50:22 +00:00
"original labels for target1: %s; original labels for target2: %s" ,
2022-12-10 10:09:21 +00:00
sw . ScrapeURL , sw . Labels . String ( ) , originalLabels . String ( ) , sw . OriginalLabels . String ( ) )
2020-10-08 15:50:22 +00:00
}
2023-12-06 22:05:29 +00:00
droppedTargetsMap . Register ( sw . OriginalLabels , sw . RelabelConfigs , targetDropReasonDuplicate , nil )
2020-05-03 09:41:13 +00:00
continue
2020-02-23 11:35:47 +00:00
}
2020-10-08 15:50:22 +00:00
swsMap [ key ] = sw . OriginalLabels
2020-05-03 09:41:13 +00:00
if sg . m [ key ] != nil {
// The scraper for the given key already exists.
continue
}
2021-08-16 21:52:42 +00:00
swsToStart = append ( swsToStart , sw )
}
2020-05-03 09:41:13 +00:00
2021-08-16 21:52:42 +00:00
// Stop deleted scrapers before starting new scrapers in order to prevent
// series overlap when old scrape target is substituted by new scrape target.
var stoppedChs [ ] <- chan struct { }
for key , sc := range sg . m {
if _ , ok := swsMap [ key ] ; ! ok {
2023-02-09 19:13:06 +00:00
sc . cancel ( )
2021-08-16 21:52:42 +00:00
stoppedChs = append ( stoppedChs , sc . stoppedCh )
delete ( sg . m , key )
deletionsCount ++
}
}
// Wait until all the deleted scrapers are stopped before starting new scrapers.
for _ , ch := range stoppedChs {
<- ch
}
// Start new scrapers only after the deleted scrapers are stopped.
for _ , sw := range swsToStart {
2023-10-17 09:58:19 +00:00
sc , err := newScraper ( sw , sg . name , sg . pushData )
if err != nil {
2023-10-25 21:19:33 +00:00
logger . Errorf ( "skipping scraper for url=%s, job=%s because of error: %s" , sw . ScrapeURL , sg . name , err )
2023-10-17 09:58:19 +00:00
continue
}
2020-12-07 23:54:13 +00:00
sg . activeScrapers . Inc ( )
2020-12-08 09:57:52 +00:00
sg . scrapersStarted . Inc ( )
2020-05-03 09:41:13 +00:00
sg . wg . Add ( 1 )
2022-02-03 16:57:36 +00:00
tsmGlobal . Register ( & sc . sw )
2024-01-31 17:50:36 +00:00
go func ( ) {
2021-08-16 21:52:42 +00:00
defer func ( ) {
sg . wg . Done ( )
close ( sc . stoppedCh )
} ( )
2023-02-09 19:13:06 +00:00
sc . sw . run ( sc . ctx . Done ( ) , sg . globalStopCh )
2022-02-03 16:57:36 +00:00
tsmGlobal . Unregister ( & sc . sw )
2020-12-07 23:54:13 +00:00
sg . activeScrapers . Dec ( )
2020-12-08 09:57:52 +00:00
sg . scrapersStopped . Inc ( )
2024-01-31 17:50:36 +00:00
} ( )
2021-08-16 21:52:42 +00:00
key := sw . key ( )
2020-05-03 09:41:13 +00:00
sg . m [ key ] = sc
additionsCount ++
2020-02-23 11:35:47 +00:00
}
2020-05-03 09:41:13 +00:00
if additionsCount > 0 || deletionsCount > 0 {
sg . changesCount . Add ( additionsCount + deletionsCount )
logger . Infof ( "%s: added targets: %d, removed targets: %d; total targets: %d" , sg . name , additionsCount , deletionsCount , len ( sg . m ) )
2020-02-23 11:35:47 +00:00
}
}
2020-05-03 09:41:13 +00:00
type scraper struct {
2021-08-16 21:52:42 +00:00
sw scrapeWork
2023-02-09 19:13:06 +00:00
ctx context . Context
cancel context . CancelFunc
2021-08-16 21:52:42 +00:00
// stoppedCh is unblocked when the given scraper is stopped.
stoppedCh chan struct { }
2020-05-03 09:41:13 +00:00
}
2023-10-17 09:58:19 +00:00
func newScraper ( sw * ScrapeWork , group string , pushData func ( at * auth . Token , wr * prompbmarshal . WriteRequest ) ) ( * scraper , error ) {
2023-02-09 19:13:06 +00:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2020-05-03 09:41:13 +00:00
sc := & scraper {
2023-02-09 19:13:06 +00:00
ctx : ctx ,
cancel : cancel ,
2021-08-16 21:52:42 +00:00
stoppedCh : make ( chan struct { } ) ,
2020-02-23 11:35:47 +00:00
}
2023-10-17 09:58:19 +00:00
c , err := newClient ( ctx , sw )
if err != nil {
2023-10-25 21:19:33 +00:00
return nil , err
2023-10-17 09:58:19 +00:00
}
2020-12-17 12:30:33 +00:00
sc . sw . Config = sw
2020-07-13 18:52:03 +00:00
sc . sw . ScrapeGroup = group
2020-05-03 09:41:13 +00:00
sc . sw . ReadData = c . ReadData
sc . sw . PushData = pushData
2023-10-17 09:58:19 +00:00
return sc , nil
2020-02-23 11:35:47 +00:00
}