lib/promscrape: adds job restart method (#2455)

* lib/promscrape: adds job restart method
it must restart only ScrapeConfig with changed content
this change greatly reduce time, that needed for job restart
and it should decrease possible data loss when config frequently changed at kubernetes based deployments

Apply suggestions from code review

Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>

* wip

Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2022-04-16 20:28:46 +03:00 committed by GitHub
parent 60ad8c74bc
commit 26b78ad707
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 46 deletions

View file

@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce `-promscrape.config` reload duration when the config contains big number of jobs (aka [scrape_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) sections) and only a few of them are changed. Previously all the jobs were restarted. Now only the jobs with changed configs are restarted. This should reduce the probability of data miss because of slow config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2270).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add support for DNS-based discovery for notifiers in the same way as Prometheus does. See [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2460).
* FEATURE: allow specifying TLS cipher suites for incoming https requests via `-tlsCipherSuites` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2404).
* FEATURE: allow specifying TLS cipher suites for mTLS connections between cluster components via `-cluster.tlsCipherSuites` command-line flag. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection).

View file

@ -75,9 +75,9 @@ func mustInitClusterMemberID() {
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
type Config struct {
Global GlobalConfig `yaml:"global,omitempty"`
ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs,omitempty"`
ScrapeConfigFiles []string `yaml:"scrape_config_files,omitempty"`
Global GlobalConfig `yaml:"global,omitempty"`
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
ScrapeConfigFiles []string `yaml:"scrape_config_files,omitempty"`
// This is set to the directory from where the config has been loaded.
baseDir string
@ -107,19 +107,77 @@ func (cfg *Config) marshal() []byte {
func (cfg *Config) mustStart() {
startTime := time.Now()
logger.Infof("starting service discovery routines...")
for i := range cfg.ScrapeConfigs {
cfg.ScrapeConfigs[i].mustStart(cfg.baseDir)
for _, sc := range cfg.ScrapeConfigs {
sc.mustStart(cfg.baseDir)
}
jobNames := cfg.getJobNames()
tsmGlobal.registerJobNames(jobNames)
logger.Infof("started service discovery routines in %.3f seconds", time.Since(startTime).Seconds())
}
func (cfg *Config) mustRestart(prevCfg *Config) {
startTime := time.Now()
logger.Infof("restarting service discovery routines...")
prevScrapeCfgByName := make(map[string]*ScrapeConfig, len(prevCfg.ScrapeConfigs))
for _, scPrev := range prevCfg.ScrapeConfigs {
prevScrapeCfgByName[scPrev.JobName] = scPrev
}
// Loop over the the new jobs, start new ones and restart updated ones.
var started, stopped, restarted int
currentJobNames := make(map[string]struct{}, len(cfg.ScrapeConfigs))
for i, sc := range cfg.ScrapeConfigs {
currentJobNames[sc.JobName] = struct{}{}
scPrev := prevScrapeCfgByName[sc.JobName]
if scPrev == nil {
// New scrape config has been appeared. Start it.
sc.mustStart(cfg.baseDir)
started++
continue
}
if areEqualScrapeConfigs(scPrev, sc) {
// The scrape config didn't change, so no need to restart it.
// Use the reference to the previous job, so it could be stopped properly later.
cfg.ScrapeConfigs[i] = scPrev
} else {
// The scrape config has been changed. Stop the previous scrape config and start new one.
scPrev.mustStop()
sc.mustStart(cfg.baseDir)
restarted++
}
}
// Stop preious jobs which weren't found in the current configuration.
for _, scPrev := range prevCfg.ScrapeConfigs {
if _, ok := currentJobNames[scPrev.JobName]; !ok {
scPrev.mustStop()
stopped++
}
}
jobNames := cfg.getJobNames()
tsmGlobal.registerJobNames(jobNames)
logger.Infof("restarted service discovery routines in %.3f seconds, stopped=%d, started=%d, restarted=%d", time.Since(startTime).Seconds(), stopped, started, restarted)
}
func areEqualScrapeConfigs(a, b *ScrapeConfig) bool {
sa := a.marshal()
sb := b.marshal()
return string(sa) == string(sb)
}
func (sc *ScrapeConfig) marshal() []byte {
data, err := yaml.Marshal(sc)
if err != nil {
logger.Panicf("BUG: cannot marshal ScrapeConfig: %s", err)
}
return data
}
func (cfg *Config) mustStop() {
startTime := time.Now()
logger.Infof("stopping service discovery routines...")
for i := range cfg.ScrapeConfigs {
cfg.ScrapeConfigs[i].mustStop()
for _, sc := range cfg.ScrapeConfigs {
sc.mustStop()
}
logger.Infof("stopped service discovery routines in %.3f seconds", time.Since(startTime).Seconds())
}
@ -127,8 +185,8 @@ func (cfg *Config) mustStop() {
// getJobNames returns all the scrape job names from the cfg.
func (cfg *Config) getJobNames() []string {
a := make([]string, 0, len(cfg.ScrapeConfigs))
for i := range cfg.ScrapeConfigs {
a = append(a, cfg.ScrapeConfigs[i].JobName)
for _, sc := range cfg.ScrapeConfigs {
a = append(a, sc.JobName)
}
return a
}
@ -284,8 +342,8 @@ func loadConfig(path string) (*Config, []byte, error) {
return &c, dataNew, nil
}
func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]ScrapeConfig, []byte, error) {
var scrapeConfigs []ScrapeConfig
func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]*ScrapeConfig, []byte, error) {
var scrapeConfigs []*ScrapeConfig
var scsData []byte
for _, filePath := range scrapeConfigFiles {
filePath := fs.GetFilepath(baseDir, filePath)
@ -304,7 +362,7 @@ func loadScrapeConfigFiles(baseDir string, scrapeConfigFiles []string) ([]Scrape
return nil, nil, fmt.Errorf("cannot load %q: %w", path, err)
}
data = envtemplate.Replace(data)
var scs []ScrapeConfig
var scs []*ScrapeConfig
if err = yaml.UnmarshalStrict(data, &scs); err != nil {
return nil, nil, fmt.Errorf("cannot parse %q: %w", path, err)
}
@ -342,8 +400,8 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
// Check that all the scrape configs have unique JobName
m := make(map[string]struct{}, len(cfg.ScrapeConfigs))
for i := range cfg.ScrapeConfigs {
jobName := cfg.ScrapeConfigs[i].JobName
for _, sc := range cfg.ScrapeConfigs {
jobName := sc.JobName
if _, ok := m[jobName]; ok {
return nil, fmt.Errorf("duplicate `job_name` in `scrape_configs` loaded from %q: %q", path, jobName)
}
@ -351,8 +409,7 @@ func (cfg *Config) parseData(data []byte, path string) ([]byte, error) {
}
// Initialize cfg.ScrapeConfigs
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for i, sc := range cfg.ScrapeConfigs {
swc, err := getScrapeWorkConfig(sc, cfg.baseDir, &cfg.Global)
if err != nil {
return nil, fmt.Errorf("cannot parse `scrape_config` #%d: %w", i+1, err)
@ -374,8 +431,7 @@ func getSWSByJob(sws []*ScrapeWork) map[string][]*ScrapeWork {
func (cfg *Config) getConsulSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.ConsulSDConfigs {
@ -402,8 +458,7 @@ func (cfg *Config) getConsulSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getDigitalOceanDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.DigitaloceanSDConfigs {
@ -430,8 +485,7 @@ func (cfg *Config) getDigitalOceanDScrapeWork(prev []*ScrapeWork) []*ScrapeWork
func (cfg *Config) getDNSSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.DNSSDConfigs {
@ -458,8 +512,7 @@ func (cfg *Config) getDNSSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getDockerSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.DockerSDConfigs {
@ -486,8 +539,7 @@ func (cfg *Config) getDockerSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getDockerSwarmSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.DockerSwarmSDConfigs {
@ -514,8 +566,7 @@ func (cfg *Config) getDockerSwarmSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork
func (cfg *Config) getEC2SDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.EC2SDConfigs {
@ -542,8 +593,7 @@ func (cfg *Config) getEC2SDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getEurekaSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.EurekaSDConfigs {
@ -579,8 +629,7 @@ func (cfg *Config) getFileSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
}
}
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
for j := range sc.FileSDConfigs {
sdc := &sc.FileSDConfigs[j]
dst = sdc.appendScrapeWork(dst, swsMapPrev, cfg.baseDir, sc.swc)
@ -593,8 +642,7 @@ func (cfg *Config) getFileSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getGCESDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.GCESDConfigs {
@ -621,8 +669,7 @@ func (cfg *Config) getGCESDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getHTTPDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.HTTPSDConfigs {
@ -649,8 +696,7 @@ func (cfg *Config) getHTTPDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.KubernetesSDConfigs {
@ -682,8 +728,7 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
func (cfg *Config) getOpenStackSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
dst := make([]*ScrapeWork, 0, len(prev))
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
dstLen := len(dst)
ok := true
for j := range sc.OpenStackSDConfigs {
@ -709,8 +754,7 @@ func (cfg *Config) getOpenStackSDScrapeWork(prev []*ScrapeWork) []*ScrapeWork {
// getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg.
func (cfg *Config) getStaticScrapeWork() []*ScrapeWork {
var dst []*ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
for _, sc := range cfg.ScrapeConfigs {
for j := range sc.StaticConfigs {
stc := &sc.StaticConfigs[j]
dst = stc.appendScrapeWork(dst, sc.swc, nil)

View file

@ -145,8 +145,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
logger.Infof("nothing changed in %q", configFile)
goto waitForChans
}
cfg.mustStop()
cfgNew.mustStart()
cfgNew.mustRestart(cfg)
cfg = cfgNew
data = dataNew
marshaledData = cfgNew.marshal()
@ -161,8 +160,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
// Nothing changed since the previous loadConfig
goto waitForChans
}
cfg.mustStop()
cfgNew.mustStart()
cfgNew.mustRestart(cfg)
cfg = cfgNew
data = dataNew
configData.Store(&marshaledData)