2020-02-23 11:35:47 +00:00
package promscrape
import (
2020-04-13 10:15:30 +00:00
"flag"
2020-02-23 11:35:47 +00:00
"fmt"
"io/ioutil"
"net/url"
"path/filepath"
2021-02-26 19:41:54 +00:00
"sort"
2021-09-09 15:49:37 +00:00
"strconv"
2020-02-23 11:35:47 +00:00
"strings"
2020-12-08 10:22:57 +00:00
"sync"
2020-02-23 11:35:47 +00:00
"time"
2021-02-28 16:39:57 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2021-02-26 10:46:28 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-08-13 13:43:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-04-13 09:59:05 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2020-05-04 17:48:02 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
2021-06-14 10:15:04 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
2020-05-05 21:01:49 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
2021-06-25 08:42:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
2021-06-25 10:20:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
2020-04-27 16:25:45 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
2020-11-20 11:38:12 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
2020-04-24 14:50:21 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
2021-06-22 10:33:37 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
2020-04-13 18:02:27 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
2020-10-05 13:45:33 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
2020-12-24 08:56:10 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
2021-02-21 21:21:17 +00:00
"github.com/VictoriaMetrics/metrics"
2021-02-28 16:39:57 +00:00
xxhash "github.com/cespare/xxhash/v2"
2020-02-23 11:35:47 +00:00
"gopkg.in/yaml.v2"
)
2020-04-13 10:15:30 +00:00
var (
2020-12-07 11:15:42 +00:00
strictParse = flag . Bool ( "promscrape.config.strictParse" , false , "Whether to allow only supported fields in -promscrape.config . " +
"By default unsupported fields are silently skipped" )
2020-05-21 11:54:28 +00:00
dryRun = flag . Bool ( "promscrape.config.dryRun" , false , "Checks -promscrape.config file for errors and unsupported fields and then exits. " +
"Returns non-zero exit code on parsing errors and emits these errors to stderr. " +
2020-12-07 11:15:42 +00:00
"See also -promscrape.config.strictParse command-line flag. " +
"Pass -loggerLevel=ERROR if you don't need to see info messages in the output." )
2020-11-04 09:08:30 +00:00
dropOriginalLabels = flag . Bool ( "promscrape.dropOriginalLabels" , false , "Whether to drop original labels for scrape targets at /targets and /api/v1/targets pages. " +
"This may be needed for reducing memory usage when original labels for big number of scrape targets occupy big amounts of memory. " +
"Note that this reduces debuggability for improper per-target relabeling configs" )
2021-02-28 16:39:57 +00:00
clusterMembersCount = flag . Int ( "promscrape.cluster.membersCount" , 0 , "The number of members in a cluster of scrapers. " +
"Each member must have an unique -promscrape.cluster.memberNum in the range 0 ... promscrape.cluster.membersCount-1 . " +
"Each member then scrapes roughly 1/N of all the targets. By default cluster scraping is disabled, i.e. a single scraper scrapes all the targets" )
clusterMemberNum = flag . Int ( "promscrape.cluster.memberNum" , 0 , "The number of number in the cluster of scrapers. " +
"It must be an unique value in the range 0 ... promscrape.cluster.membersCount-1 across scrapers in the cluster" )
2021-03-04 08:20:15 +00:00
clusterReplicationFactor = flag . Int ( "promscrape.cluster.replicationFactor" , 1 , "The number of members in the cluster, which scrape the same targets. " +
2021-04-20 17:16:17 +00:00
"If the replication factor is greater than 2, then the deduplication must be enabled at remote storage side. See https://docs.victoriametrics.com/#deduplication" )
2020-04-13 10:15:30 +00:00
)
2020-02-23 11:35:47 +00:00
// Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/
type Config struct {
2021-08-26 05:51:14 +00:00
Global GlobalConfig ` yaml:"global,omitempty" `
2021-10-12 13:23:42 +00:00
ScrapeConfigs [ ] ScrapeConfig ` yaml:"scrape_configs,omitempty" `
ScrapeConfigFiles [ ] string ` yaml:"scrape_config_files,omitempty" `
2020-02-23 11:35:47 +00:00
// This is set to the directory from where the config has been loaded.
baseDir string
}
2021-08-26 05:51:14 +00:00
func ( cfg * Config ) marshal ( ) [ ] byte {
data , err := yaml . Marshal ( cfg )
if err != nil {
logger . Panicf ( "BUG: cannot marshal Config: %s" , err )
}
return data
}
2021-04-05 19:02:09 +00:00
func ( cfg * Config ) mustStart ( ) {
startTime := time . Now ( )
logger . Infof ( "starting service discovery routines..." )
for i := range cfg . ScrapeConfigs {
cfg . ScrapeConfigs [ i ] . mustStart ( cfg . baseDir )
}
2021-06-18 07:53:10 +00:00
jobNames := cfg . getJobNames ( )
tsmGlobal . registerJobNames ( jobNames )
2021-04-05 19:02:09 +00:00
logger . Infof ( "started service discovery routines in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
}
2021-03-01 12:13:56 +00:00
func ( cfg * Config ) mustStop ( ) {
startTime := time . Now ( )
logger . Infof ( "stopping service discovery routines..." )
for i := range cfg . ScrapeConfigs {
cfg . ScrapeConfigs [ i ] . mustStop ( )
}
logger . Infof ( "stopped service discovery routines in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
}
2021-06-18 07:53:10 +00:00
// getJobNames returns all the scrape job names from the cfg.
func ( cfg * Config ) getJobNames ( ) [ ] string {
a := make ( [ ] string , 0 , len ( cfg . ScrapeConfigs ) )
for i := range cfg . ScrapeConfigs {
a = append ( a , cfg . ScrapeConfigs [ i ] . JobName )
}
return a
}
2020-02-23 11:35:47 +00:00
// GlobalConfig represents essential parts for `global` section of Prometheus config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/
type GlobalConfig struct {
2020-11-13 14:17:03 +00:00
ScrapeInterval time . Duration ` yaml:"scrape_interval,omitempty" `
ScrapeTimeout time . Duration ` yaml:"scrape_timeout,omitempty" `
ExternalLabels map [ string ] string ` yaml:"external_labels,omitempty" `
2020-02-23 11:35:47 +00:00
}
// ScrapeConfig represents essential parts for `scrape_config` section of Prometheus config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
type ScrapeConfig struct {
JobName string ` yaml:"job_name" `
2020-11-13 14:17:03 +00:00
ScrapeInterval time . Duration ` yaml:"scrape_interval,omitempty" `
ScrapeTimeout time . Duration ` yaml:"scrape_timeout,omitempty" `
MetricsPath string ` yaml:"metrics_path,omitempty" `
HonorLabels bool ` yaml:"honor_labels,omitempty" `
HonorTimestamps bool ` yaml:"honor_timestamps,omitempty" `
2021-10-12 13:23:42 +00:00
FollowRedirects * bool ` yaml:"follow_redirects,omitempty" `
2020-11-13 14:17:03 +00:00
Scheme string ` yaml:"scheme,omitempty" `
Params map [ string ] [ ] string ` yaml:"params,omitempty" `
2021-04-02 18:17:43 +00:00
HTTPClientConfig promauth . HTTPClientConfig ` yaml:",inline" `
2020-12-24 08:56:10 +00:00
ProxyURL proxy . URL ` yaml:"proxy_url,omitempty" `
2020-11-13 14:17:03 +00:00
RelabelConfigs [ ] promrelabel . RelabelConfig ` yaml:"relabel_configs,omitempty" `
MetricRelabelConfigs [ ] promrelabel . RelabelConfig ` yaml:"metric_relabel_configs,omitempty" `
SampleLimit int ` yaml:"sample_limit,omitempty" `
2020-02-23 11:35:47 +00:00
2021-06-25 10:20:18 +00:00
ConsulSDConfigs [ ] consul . SDConfig ` yaml:"consul_sd_configs,omitempty" `
DigitaloceanSDConfigs [ ] digitalocean . SDConfig ` yaml:"digitalocean_sd_configs,omitempty" `
DNSSDConfigs [ ] dns . SDConfig ` yaml:"dns_sd_configs,omitempty" `
DockerSDConfigs [ ] docker . SDConfig ` yaml:"docker_sd_configs,omitempty" `
DockerSwarmSDConfigs [ ] dockerswarm . SDConfig ` yaml:"dockerswarm_sd_configs,omitempty" `
EC2SDConfigs [ ] ec2 . SDConfig ` yaml:"ec2_sd_configs,omitempty" `
EurekaSDConfigs [ ] eureka . SDConfig ` yaml:"eureka_sd_configs,omitempty" `
FileSDConfigs [ ] FileSDConfig ` yaml:"file_sd_configs,omitempty" `
GCESDConfigs [ ] gce . SDConfig ` yaml:"gce_sd_configs,omitempty" `
HTTPSDConfigs [ ] http . SDConfig ` yaml:"http_sd_configs,omitempty" `
KubernetesSDConfigs [ ] kubernetes . SDConfig ` yaml:"kubernetes_sd_configs,omitempty" `
OpenStackSDConfigs [ ] openstack . SDConfig ` yaml:"openstack_sd_configs,omitempty" `
StaticConfigs [ ] StaticConfig ` yaml:"static_configs,omitempty" `
2021-04-02 18:17:43 +00:00
2020-07-02 11:19:11 +00:00
// These options are supported only by lib/promscrape.
2021-06-04 17:27:55 +00:00
RelabelDebug bool ` yaml:"relabel_debug,omitempty" `
MetricRelabelDebug bool ` yaml:"metric_relabel_debug,omitempty" `
2021-04-03 21:40:08 +00:00
DisableCompression bool ` yaml:"disable_compression,omitempty" `
DisableKeepAlive bool ` yaml:"disable_keepalive,omitempty" `
StreamParse bool ` yaml:"stream_parse,omitempty" `
ScrapeAlignInterval time . Duration ` yaml:"scrape_align_interval,omitempty" `
ScrapeOffset time . Duration ` yaml:"scrape_offset,omitempty" `
2021-09-01 11:14:37 +00:00
SeriesLimit int ` yaml:"series_limit,omitempty" `
2021-04-03 21:40:08 +00:00
ProxyClientConfig promauth . ProxyClientConfig ` yaml:",inline" `
2020-07-02 11:19:11 +00:00
2020-02-23 11:35:47 +00:00
// This is set in loadConfig
swc * scrapeWorkConfig
}
2021-04-05 19:02:09 +00:00
func ( sc * ScrapeConfig ) mustStart ( baseDir string ) {
2021-04-08 06:31:05 +00:00
swosFunc := func ( metaLabels map [ string ] string ) interface { } {
target := metaLabels [ "__address__" ]
sw , err := sc . swc . getScrapeWork ( target , nil , metaLabels )
if err != nil {
logger . Errorf ( "cannot create kubernetes_sd_config target %q for job_name %q: %s" , target , sc . swc . jobName , err )
return nil
2021-04-05 19:02:09 +00:00
}
2021-04-08 06:31:05 +00:00
return sw
}
for i := range sc . KubernetesSDConfigs {
2021-04-05 19:02:09 +00:00
sc . KubernetesSDConfigs [ i ] . MustStart ( baseDir , swosFunc )
}
}
2021-03-01 12:13:56 +00:00
func ( sc * ScrapeConfig ) mustStop ( ) {
for i := range sc . ConsulSDConfigs {
sc . ConsulSDConfigs [ i ] . MustStop ( )
}
2021-06-25 09:10:20 +00:00
for i := range sc . DigitaloceanSDConfigs {
sc . DigitaloceanSDConfigs [ i ] . MustStop ( )
}
for i := range sc . DNSSDConfigs {
sc . DNSSDConfigs [ i ] . MustStop ( )
2021-03-01 12:13:56 +00:00
}
2021-06-25 08:42:47 +00:00
for i := range sc . DockerSDConfigs {
sc . DockerSDConfigs [ i ] . MustStop ( )
}
2021-03-01 12:13:56 +00:00
for i := range sc . DockerSwarmSDConfigs {
sc . DockerSwarmSDConfigs [ i ] . MustStop ( )
}
for i := range sc . EC2SDConfigs {
sc . EC2SDConfigs [ i ] . MustStop ( )
}
2021-06-25 09:10:20 +00:00
for i := range sc . EurekaSDConfigs {
sc . EurekaSDConfigs [ i ] . MustStop ( )
}
2021-03-01 12:13:56 +00:00
for i := range sc . GCESDConfigs {
sc . GCESDConfigs [ i ] . MustStop ( )
}
2021-06-25 08:39:18 +00:00
for i := range sc . HTTPSDConfigs {
sc . HTTPSDConfigs [ i ] . MustStop ( )
}
2021-06-25 09:10:20 +00:00
for i := range sc . KubernetesSDConfigs {
sc . KubernetesSDConfigs [ i ] . MustStop ( )
}
for i := range sc . OpenStackSDConfigs {
sc . OpenStackSDConfigs [ i ] . MustStop ( )
}
2021-03-01 12:13:56 +00:00
}
2020-02-23 11:35:47 +00:00
// FileSDConfig represents file-based service discovery config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#file_sd_config
type FileSDConfig struct {
Files [ ] string ` yaml:"files" `
// `refresh_interval` is ignored. See `-prometheus.fileSDCheckInterval`
}
// StaticConfig represents essential parts for `static_config` section of Prometheus config.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#static_config
type StaticConfig struct {
Targets [ ] string ` yaml:"targets" `
2020-11-13 14:17:03 +00:00
Labels map [ string ] string ` yaml:"labels,omitempty" `
2020-02-23 11:35:47 +00:00
}
func loadStaticConfigs ( path string ) ( [ ] StaticConfig , error ) {
data , err := ioutil . ReadFile ( path )
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot read `static_configs` from %q: %w" , path , err )
2020-02-23 11:35:47 +00:00
}
2020-08-13 13:43:55 +00:00
data = envtemplate . Replace ( data )
2020-02-23 11:35:47 +00:00
var stcs [ ] StaticConfig
2020-03-06 18:18:28 +00:00
if err := yaml . UnmarshalStrict ( data , & stcs ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot unmarshal `static_configs` from %q: %w" , path , err )
2020-02-23 11:35:47 +00:00
}
return stcs , nil
}
// loadConfig loads Prometheus config from the given path.
2021-08-26 05:51:14 +00:00
func loadConfig ( path string ) ( * Config , error ) {
data , err := ioutil . ReadFile ( path )
2020-02-23 11:35:47 +00:00
if err != nil {
2021-08-26 05:51:14 +00:00
return nil , fmt . Errorf ( "cannot read Prometheus config from %q: %w" , path , err )
2020-02-23 11:35:47 +00:00
}
2021-08-26 05:51:14 +00:00
var c Config
if err := c . parseData ( data , path ) ; err != nil {
return nil , fmt . Errorf ( "cannot parse Prometheus config from %q: %w" , path , err )
}
return & c , nil
}
func loadScrapeConfigFiles ( baseDir string , scrapeConfigFiles [ ] string ) ( [ ] ScrapeConfig , error ) {
var scrapeConfigs [ ] ScrapeConfig
for _ , filePath := range scrapeConfigFiles {
filePath := getFilepath ( baseDir , filePath )
paths := [ ] string { filePath }
if strings . Contains ( filePath , "*" ) {
ps , err := filepath . Glob ( filePath )
if err != nil {
return nil , fmt . Errorf ( "invalid pattern %q in `scrape_config_files`: %w" , filePath , err )
}
sort . Strings ( ps )
paths = ps
}
for _ , path := range paths {
data , err := ioutil . ReadFile ( path )
if err != nil {
return nil , fmt . Errorf ( "cannot load %q from `scrape_config_files`: %w" , filePath , err )
}
data = envtemplate . Replace ( data )
var scs [ ] ScrapeConfig
if err = yaml . UnmarshalStrict ( data , & scs ) ; err != nil {
return nil , fmt . Errorf ( "cannot parse %q from `scrape_config_files`: %w" , filePath , err )
}
scrapeConfigs = append ( scrapeConfigs , scs ... )
}
2020-02-23 11:35:47 +00:00
}
2021-08-26 05:51:14 +00:00
return scrapeConfigs , nil
2020-02-23 11:35:47 +00:00
}
2020-11-25 20:59:13 +00:00
// IsDryRun returns true if -promscrape.config.dryRun command-line flag is set
func IsDryRun ( ) bool {
return * dryRun
}
2021-08-26 05:51:14 +00:00
func ( cfg * Config ) parseData ( data [ ] byte , path string ) error {
2020-04-13 10:15:30 +00:00
if err := unmarshalMaybeStrict ( data , cfg ) ; err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot unmarshal data: %w" , err )
2020-02-23 11:35:47 +00:00
}
absPath , err := filepath . Abs ( path )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot obtain abs path for %q: %w" , path , err )
2020-02-23 11:35:47 +00:00
}
cfg . baseDir = filepath . Dir ( absPath )
2021-08-26 05:51:14 +00:00
// Load cfg.ScrapeConfigFiles into c.ScrapeConfigs
scs , err := loadScrapeConfigFiles ( cfg . baseDir , cfg . ScrapeConfigFiles )
if err != nil {
return fmt . Errorf ( "cannot load `scrape_config_files` from %q: %w" , path , err )
}
cfg . ScrapeConfigFiles = nil
cfg . ScrapeConfigs = append ( cfg . ScrapeConfigs , scs ... )
// Check that all the scrape configs have unique JobName
m := make ( map [ string ] struct { } , len ( cfg . ScrapeConfigs ) )
for i := range cfg . ScrapeConfigs {
jobName := cfg . ScrapeConfigs [ i ] . JobName
if _ , ok := m [ jobName ] ; ok {
return fmt . Errorf ( "duplicate `job_name` in `scrape_configs` loaded from %q: %q" , path , jobName )
}
m [ jobName ] = struct { } { }
}
// Initialize cfg.ScrapeConfigs
2020-02-23 11:35:47 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
swc , err := getScrapeWorkConfig ( sc , cfg . baseDir , & cfg . Global )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot parse `scrape_config` #%d: %w" , i + 1 , err )
2020-02-23 11:35:47 +00:00
}
sc . swc = swc
}
return nil
}
2020-04-13 10:15:30 +00:00
func unmarshalMaybeStrict ( data [ ] byte , dst interface { } ) error {
2020-08-13 13:43:55 +00:00
data = envtemplate . Replace ( data )
2020-04-13 10:15:30 +00:00
var err error
2020-12-07 11:15:42 +00:00
if * strictParse {
2020-04-13 10:15:30 +00:00
err = yaml . UnmarshalStrict ( data , dst )
} else {
err = yaml . Unmarshal ( data , dst )
}
return err
}
2020-12-08 15:50:03 +00:00
func getSWSByJob ( sws [ ] * ScrapeWork ) map [ string ] [ ] * ScrapeWork {
m := make ( map [ string ] [ ] * ScrapeWork )
2020-06-23 12:35:19 +00:00
for _ , sw := range sws {
m [ sw . jobNameOriginal ] = append ( m [ sw . jobNameOriginal ] , sw )
}
return m
}
2021-06-25 09:10:20 +00:00
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getConsulSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2021-02-26 14:54:03 +00:00
swsPrevByJob := getSWSByJob ( prev )
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-04-23 11:38:12 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
2021-02-26 14:54:03 +00:00
dstLen := len ( dst )
ok := true
2021-06-25 09:10:20 +00:00
for j := range sc . ConsulSDConfigs {
sdc := & sc . ConsulSDConfigs [ j ]
var okLocal bool
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "consul_sd_config" )
if ok {
ok = okLocal
2021-03-02 14:42:48 +00:00
}
2021-06-25 09:10:20 +00:00
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
logger . Errorf ( "there were errors when discovering consul targets for job %q, so preserving the previous targets" , sc . swc . jobName )
dst = append ( dst [ : dstLen ] , swsPrev ... )
}
}
return dst
}
// getDigitalOceanDScrapeWork returns `digitalocean_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getDigitalOceanDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
swsPrevByJob := getSWSByJob ( prev )
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
dstLen := len ( dst )
ok := true
for j := range sc . DigitaloceanSDConfigs {
sdc := & sc . DigitaloceanSDConfigs [ j ]
var okLocal bool
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "digitalocean_sd_config" )
if ok {
ok = okLocal
2020-06-23 12:35:19 +00:00
}
2021-02-26 14:54:03 +00:00
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
2021-06-25 09:10:20 +00:00
logger . Errorf ( "there were errors when discovering digitalocean targets for job %q, so preserving the previous targets" , sc . swc . jobName )
2021-02-26 14:54:03 +00:00
dst = append ( dst [ : dstLen ] , swsPrev ... )
2020-06-23 12:35:19 +00:00
}
2020-04-13 18:02:27 +00:00
}
return dst
}
2021-06-25 09:10:20 +00:00
// getDNSSDScrapeWork returns `dns_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getDNSSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2020-10-05 13:45:33 +00:00
swsPrevByJob := getSWSByJob ( prev )
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-10-05 13:45:33 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
dstLen := len ( dst )
ok := true
2021-06-25 09:10:20 +00:00
for j := range sc . DNSSDConfigs {
sdc := & sc . DNSSDConfigs [ j ]
2020-10-05 13:45:33 +00:00
var okLocal bool
2021-06-25 09:10:20 +00:00
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "dns_sd_config" )
2020-10-05 13:45:33 +00:00
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
2021-06-25 09:10:20 +00:00
logger . Errorf ( "there were errors when discovering dns targets for job %q, so preserving the previous targets" , sc . swc . jobName )
2020-10-05 13:45:33 +00:00
dst = append ( dst [ : dstLen ] , swsPrev ... )
}
}
return dst
}
2021-06-25 08:42:47 +00:00
// getDockerSDScrapeWork returns `docker_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getDockerSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
swsPrevByJob := getSWSByJob ( prev )
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
dstLen := len ( dst )
ok := true
for j := range sc . DockerSDConfigs {
sdc := & sc . DockerSDConfigs [ j ]
var okLocal bool
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "docker_sd_config" )
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
logger . Errorf ( "there were errors when discovering docker targets for job %q, so preserving the previous targets" , sc . swc . jobName )
dst = append ( dst [ : dstLen ] , swsPrev ... )
}
}
return dst
}
2020-10-12 10:38:21 +00:00
// getDockerSwarmSDScrapeWork returns `dockerswarm_sd_configs` ScrapeWork from cfg.
2020-12-08 15:50:03 +00:00
func ( cfg * Config ) getDockerSwarmSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2020-10-12 10:38:21 +00:00
swsPrevByJob := getSWSByJob ( prev )
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-10-12 10:38:21 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
dstLen := len ( dst )
ok := true
2021-03-01 12:13:56 +00:00
for j := range sc . DockerSwarmSDConfigs {
sdc := & sc . DockerSwarmSDConfigs [ j ]
2020-10-12 10:38:21 +00:00
var okLocal bool
2021-02-26 13:53:42 +00:00
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "dockerswarm_sd_config" )
2020-10-12 10:38:21 +00:00
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
logger . Errorf ( "there were errors when discovering dockerswarm targets for job %q, so preserving the previous targets" , sc . swc . jobName )
dst = append ( dst [ : dstLen ] , swsPrev ... )
}
}
return dst
}
2021-06-25 09:10:20 +00:00
// getEC2SDScrapeWork returns `ec2_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getEC2SDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2020-06-23 12:35:19 +00:00
swsPrevByJob := getSWSByJob ( prev )
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-05-04 17:48:02 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
2020-06-23 12:35:19 +00:00
dstLen := len ( dst )
ok := true
2021-06-25 09:10:20 +00:00
for j := range sc . EC2SDConfigs {
sdc := & sc . EC2SDConfigs [ j ]
2020-06-23 12:35:19 +00:00
var okLocal bool
2021-06-25 09:10:20 +00:00
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "ec2_sd_config" )
2020-06-23 12:35:19 +00:00
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
2021-06-25 09:10:20 +00:00
logger . Errorf ( "there were errors when discovering ec2 targets for job %q, so preserving the previous targets" , sc . swc . jobName )
2020-06-23 12:35:19 +00:00
dst = append ( dst [ : dstLen ] , swsPrev ... )
2020-05-04 17:48:02 +00:00
}
}
return dst
}
2020-11-20 11:38:12 +00:00
// getEurekaSDScrapeWork returns `eureka_sd_configs` ScrapeWork from cfg.
2020-12-08 15:50:03 +00:00
func ( cfg * Config ) getEurekaSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2020-11-20 11:38:12 +00:00
swsPrevByJob := getSWSByJob ( prev )
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-11-20 11:38:12 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
dstLen := len ( dst )
ok := true
for j := range sc . EurekaSDConfigs {
sdc := & sc . EurekaSDConfigs [ j ]
var okLocal bool
2021-02-26 13:53:42 +00:00
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "eureka_sd_config" )
2020-11-20 11:38:12 +00:00
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
logger . Errorf ( "there were errors when discovering eureka targets for job %q, so preserving the previous targets" , sc . swc . jobName )
dst = append ( dst [ : dstLen ] , swsPrev ... )
}
}
return dst
}
2021-06-25 09:10:20 +00:00
// getFileSDScrapeWork returns `file_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getFileSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
// Create a map for the previous scrape work.
swsMapPrev := make ( map [ string ] [ ] * ScrapeWork )
for _ , sw := range prev {
filepath := promrelabel . GetLabelValueByName ( sw . Labels , "__vm_filepath" )
if len ( filepath ) == 0 {
logger . Panicf ( "BUG: missing `__vm_filepath` label" )
} else {
swsMapPrev [ filepath ] = append ( swsMapPrev [ filepath ] , sw )
}
}
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-05-05 21:01:49 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
2021-06-25 09:10:20 +00:00
for j := range sc . FileSDConfigs {
sdc := & sc . FileSDConfigs [ j ]
dst = sdc . appendScrapeWork ( dst , swsMapPrev , cfg . baseDir , sc . swc )
2020-05-05 21:01:49 +00:00
}
}
return dst
}
2021-06-25 09:10:20 +00:00
// getGCESDScrapeWork returns `gce_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getGCESDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2020-06-23 12:35:19 +00:00
swsPrevByJob := getSWSByJob ( prev )
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-04-27 16:25:45 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
2020-06-23 12:35:19 +00:00
dstLen := len ( dst )
ok := true
2021-06-25 09:10:20 +00:00
for j := range sc . GCESDConfigs {
sdc := & sc . GCESDConfigs [ j ]
2020-06-23 12:35:19 +00:00
var okLocal bool
2021-06-25 09:10:20 +00:00
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "gce_sd_config" )
2020-06-23 12:35:19 +00:00
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
2021-06-25 09:10:20 +00:00
logger . Errorf ( "there were errors when discovering gce targets for job %q, so preserving the previous targets" , sc . swc . jobName )
2020-06-23 12:35:19 +00:00
dst = append ( dst [ : dstLen ] , swsPrev ... )
2020-04-27 16:25:45 +00:00
}
}
return dst
}
2021-06-25 09:10:20 +00:00
// getHTTPDScrapeWork returns `http_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getHTTPDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2020-06-23 12:35:19 +00:00
swsPrevByJob := getSWSByJob ( prev )
2020-12-08 15:50:03 +00:00
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
2020-04-24 14:50:21 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
2020-06-23 12:35:19 +00:00
dstLen := len ( dst )
ok := true
2021-06-25 09:10:20 +00:00
for j := range sc . HTTPSDConfigs {
sdc := & sc . HTTPSDConfigs [ j ]
2020-06-23 12:35:19 +00:00
var okLocal bool
2021-06-25 09:10:20 +00:00
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "http_sd_config" )
2020-06-23 12:35:19 +00:00
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
2021-06-25 09:10:20 +00:00
logger . Errorf ( "there were errors when discovering http targets for job %q, so preserving the previous targets" , sc . swc . jobName )
2020-06-23 12:35:19 +00:00
dst = append ( dst [ : dstLen ] , swsPrev ... )
2020-04-24 14:50:21 +00:00
}
}
return dst
}
2021-06-25 09:10:20 +00:00
// getKubernetesSDScrapeWork returns `kubernetes_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getKubernetesSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2021-06-14 10:15:04 +00:00
swsPrevByJob := getSWSByJob ( prev )
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
dstLen := len ( dst )
ok := true
2021-06-25 09:10:20 +00:00
for j := range sc . KubernetesSDConfigs {
sdc := & sc . KubernetesSDConfigs [ j ]
swos , err := sdc . GetScrapeWorkObjects ( )
if err != nil {
logger . Errorf ( "skipping kubernetes_sd_config targets for job_name %q because of error: %s" , sc . swc . jobName , err )
ok = false
break
}
for _ , swo := range swos {
sw := swo . ( * ScrapeWork )
dst = append ( dst , sw )
2021-06-14 10:15:04 +00:00
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
2021-06-25 09:10:20 +00:00
logger . Errorf ( "there were errors when discovering kubernetes_sd_config targets for job %q, so preserving the previous targets" , sc . swc . jobName )
2021-06-14 10:15:04 +00:00
dst = append ( dst [ : dstLen ] , swsPrev ... )
}
}
return dst
}
2021-06-25 09:10:20 +00:00
// getOpenStackSDScrapeWork returns `openstack_sd_configs` ScrapeWork from cfg.
func ( cfg * Config ) getOpenStackSDScrapeWork ( prev [ ] * ScrapeWork ) [ ] * ScrapeWork {
2021-06-22 10:33:37 +00:00
swsPrevByJob := getSWSByJob ( prev )
dst := make ( [ ] * ScrapeWork , 0 , len ( prev ) )
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
dstLen := len ( dst )
ok := true
2021-06-25 09:10:20 +00:00
for j := range sc . OpenStackSDConfigs {
sdc := & sc . OpenStackSDConfigs [ j ]
2021-06-22 10:33:37 +00:00
var okLocal bool
2021-06-25 09:10:20 +00:00
dst , okLocal = appendSDScrapeWork ( dst , sdc , cfg . baseDir , sc . swc , "openstack_sd_config" )
2021-06-22 10:33:37 +00:00
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob [ sc . swc . jobName ]
if len ( swsPrev ) > 0 {
2021-06-25 09:10:20 +00:00
logger . Errorf ( "there were errors when discovering openstack targets for job %q, so preserving the previous targets" , sc . swc . jobName )
2021-06-22 10:33:37 +00:00
dst = append ( dst [ : dstLen ] , swsPrev ... )
}
}
return dst
}
2020-04-13 09:59:05 +00:00
// getStaticScrapeWork returns `static_configs` ScrapeWork from from cfg.
2020-12-08 15:50:03 +00:00
func ( cfg * Config ) getStaticScrapeWork ( ) [ ] * ScrapeWork {
var dst [ ] * ScrapeWork
2020-04-23 11:38:12 +00:00
for i := range cfg . ScrapeConfigs {
sc := & cfg . ScrapeConfigs [ i ]
for j := range sc . StaticConfigs {
stc := & sc . StaticConfigs [ j ]
2020-04-13 09:59:05 +00:00
dst = stc . appendScrapeWork ( dst , sc . swc , nil )
2020-02-23 11:35:47 +00:00
}
}
2020-04-13 09:59:05 +00:00
return dst
2020-02-23 11:35:47 +00:00
}
func getScrapeWorkConfig ( sc * ScrapeConfig , baseDir string , globalCfg * GlobalConfig ) ( * scrapeWorkConfig , error ) {
jobName := sc . JobName
if jobName == "" {
return nil , fmt . Errorf ( "missing `job_name` field in `scrape_config`" )
}
scrapeInterval := sc . ScrapeInterval
if scrapeInterval <= 0 {
scrapeInterval = globalCfg . ScrapeInterval
if scrapeInterval <= 0 {
scrapeInterval = defaultScrapeInterval
}
}
scrapeTimeout := sc . ScrapeTimeout
if scrapeTimeout <= 0 {
scrapeTimeout = globalCfg . ScrapeTimeout
if scrapeTimeout <= 0 {
scrapeTimeout = defaultScrapeTimeout
}
}
2021-05-13 13:09:45 +00:00
if scrapeTimeout > scrapeInterval {
// Limit the `scrape_timeout` with `scrape_interval` like Prometheus does.
// This guarantees that the scraper can miss only a single scrape if the target sometimes responds slowly.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1281#issuecomment-840538907
scrapeTimeout = scrapeInterval
}
2020-02-23 11:35:47 +00:00
honorLabels := sc . HonorLabels
honorTimestamps := sc . HonorTimestamps
2021-04-02 16:56:38 +00:00
denyRedirects := false
if sc . FollowRedirects != nil {
denyRedirects = ! * sc . FollowRedirects
}
2020-02-23 11:35:47 +00:00
metricsPath := sc . MetricsPath
if metricsPath == "" {
metricsPath = "/metrics"
}
scheme := sc . Scheme
if scheme == "" {
scheme = "http"
}
if scheme != "http" && scheme != "https" {
return nil , fmt . Errorf ( "unexpected `scheme` for `job_name` %q: %q; supported values: http or https" , jobName , scheme )
}
params := sc . Params
2021-04-02 18:17:43 +00:00
ac , err := sc . HTTPClientConfig . NewConfig ( baseDir )
2020-04-13 09:59:05 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot parse auth config for `job_name` %q: %w" , jobName , err )
2020-02-23 11:35:47 +00:00
}
2021-04-03 21:40:08 +00:00
proxyAC , err := sc . ProxyClientConfig . NewConfig ( baseDir )
2021-03-12 01:35:49 +00:00
if err != nil {
return nil , fmt . Errorf ( "cannot parse proxy auth config for `job_name` %q: %w" , jobName , err )
}
2021-06-04 17:27:55 +00:00
relabelConfigs , err := promrelabel . ParseRelabelConfigs ( sc . RelabelConfigs , sc . RelabelDebug )
2020-02-23 11:35:47 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot parse `relabel_configs` for `job_name` %q: %w" , jobName , err )
2020-02-23 11:35:47 +00:00
}
2021-06-04 17:27:55 +00:00
metricRelabelConfigs , err := promrelabel . ParseRelabelConfigs ( sc . MetricRelabelConfigs , sc . MetricRelabelDebug )
2020-02-23 11:35:47 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot parse `metric_relabel_configs` for `job_name` %q: %w" , jobName , err )
2020-02-23 11:35:47 +00:00
}
2021-10-14 09:10:13 +00:00
if ( * streamParse || sc . StreamParse ) && sc . SampleLimit > 0 {
return nil , fmt . Errorf ( "cannot use stream parsing mode when `sample_limit` is set for `job_name` %q" , jobName )
}
if ( * streamParse || sc . StreamParse ) && sc . SeriesLimit > 0 {
return nil , fmt . Errorf ( "cannot use stream parsing mode when `series_limit` is set for `job_name` %q" , jobName )
}
2020-02-23 11:35:47 +00:00
swc := & scrapeWorkConfig {
2020-04-13 09:59:05 +00:00
scrapeInterval : scrapeInterval ,
scrapeTimeout : scrapeTimeout ,
jobName : jobName ,
metricsPath : metricsPath ,
scheme : scheme ,
params : params ,
2020-12-24 08:56:10 +00:00
proxyURL : sc . ProxyURL ,
2021-03-12 01:35:49 +00:00
proxyAuthConfig : proxyAC ,
2020-04-13 09:59:05 +00:00
authConfig : ac ,
honorLabels : honorLabels ,
honorTimestamps : honorTimestamps ,
2021-04-02 16:56:38 +00:00
denyRedirects : denyRedirects ,
2020-04-13 09:59:05 +00:00
externalLabels : globalCfg . ExternalLabels ,
relabelConfigs : relabelConfigs ,
metricRelabelConfigs : metricRelabelConfigs ,
2020-04-14 08:58:15 +00:00
sampleLimit : sc . SampleLimit ,
2020-07-02 11:19:11 +00:00
disableCompression : sc . DisableCompression ,
disableKeepAlive : sc . DisableKeepAlive ,
2020-11-01 21:12:13 +00:00
streamParse : sc . StreamParse ,
2021-02-18 21:51:29 +00:00
scrapeAlignInterval : sc . ScrapeAlignInterval ,
2021-03-08 09:58:25 +00:00
scrapeOffset : sc . ScrapeOffset ,
2021-09-01 11:14:37 +00:00
seriesLimit : sc . SeriesLimit ,
2020-02-23 11:35:47 +00:00
}
return swc , nil
}
type scrapeWorkConfig struct {
2020-04-13 09:59:05 +00:00
scrapeInterval time . Duration
scrapeTimeout time . Duration
jobName string
metricsPath string
scheme string
params map [ string ] [ ] string
2020-12-24 08:56:10 +00:00
proxyURL proxy . URL
2021-03-12 01:35:49 +00:00
proxyAuthConfig * promauth . Config
2020-04-13 09:59:05 +00:00
authConfig * promauth . Config
honorLabels bool
honorTimestamps bool
2021-04-02 16:56:38 +00:00
denyRedirects bool
2020-04-13 09:59:05 +00:00
externalLabels map [ string ] string
2021-02-22 14:33:55 +00:00
relabelConfigs * promrelabel . ParsedConfigs
metricRelabelConfigs * promrelabel . ParsedConfigs
2020-04-14 08:58:15 +00:00
sampleLimit int
2020-07-02 11:19:11 +00:00
disableCompression bool
disableKeepAlive bool
2020-11-01 21:12:13 +00:00
streamParse bool
2021-02-18 21:51:29 +00:00
scrapeAlignInterval time . Duration
2021-03-08 09:58:25 +00:00
scrapeOffset time . Duration
2021-09-01 11:14:37 +00:00
seriesLimit int
2020-02-23 11:35:47 +00:00
}
2021-02-26 13:53:42 +00:00
type targetLabelsGetter interface {
GetLabels ( baseDir string ) ( [ ] map [ string ] string , error )
2020-04-27 16:25:45 +00:00
}
2021-02-26 13:53:42 +00:00
func appendSDScrapeWork ( dst [ ] * ScrapeWork , sdc targetLabelsGetter , baseDir string , swc * scrapeWorkConfig , discoveryType string ) ( [ ] * ScrapeWork , bool ) {
targetLabels , err := sdc . GetLabels ( baseDir )
2020-04-24 14:50:21 +00:00
if err != nil {
2021-02-26 13:53:42 +00:00
logger . Errorf ( "skipping %s targets for job_name %q because of error: %s" , discoveryType , swc . jobName , err )
2020-06-23 12:35:19 +00:00
return dst , false
2020-04-24 14:50:21 +00:00
}
2021-02-26 13:53:42 +00:00
return appendScrapeWorkForTargetLabels ( dst , swc , targetLabels , discoveryType ) , true
2020-04-24 14:50:21 +00:00
}
2021-02-26 13:53:42 +00:00
func appendScrapeWorkForTargetLabels ( dst [ ] * ScrapeWork , swc * scrapeWorkConfig , targetLabels [ ] map [ string ] string , discoveryType string ) [ ] * ScrapeWork {
2021-02-21 21:21:17 +00:00
startTime := time . Now ( )
2021-02-26 10:46:28 +00:00
// Process targetLabels in parallel in order to reduce processing time for big number of targetLabels.
type result struct {
sw * ScrapeWork
err error
}
goroutines := cgroup . AvailableCPUs ( )
2021-02-28 14:05:13 +00:00
resultCh := make ( chan result , len ( targetLabels ) )
workCh := make ( chan map [ string ] string , goroutines )
2021-02-26 10:46:28 +00:00
for i := 0 ; i < goroutines ; i ++ {
go func ( ) {
for metaLabels := range workCh {
target := metaLabels [ "__address__" ]
2021-02-26 19:41:54 +00:00
sw , err := swc . getScrapeWork ( target , nil , metaLabels )
2021-02-26 10:46:28 +00:00
if err != nil {
2021-02-26 13:53:42 +00:00
err = fmt . Errorf ( "skipping %s target %q for job_name %q because of error: %w" , discoveryType , target , swc . jobName , err )
2021-02-26 10:46:28 +00:00
}
resultCh <- result {
sw : sw ,
err : err ,
}
}
} ( )
}
2020-04-13 18:02:27 +00:00
for _ , metaLabels := range targetLabels {
2021-02-26 10:46:28 +00:00
workCh <- metaLabels
}
close ( workCh )
for range targetLabels {
r := <- resultCh
if r . err != nil {
logger . Errorf ( "%s" , r . err )
2020-04-13 18:02:27 +00:00
continue
}
2021-02-26 10:46:28 +00:00
if r . sw != nil {
dst = append ( dst , r . sw )
}
2020-04-13 18:02:27 +00:00
}
2021-02-26 13:53:42 +00:00
metrics . GetOrCreateHistogram ( fmt . Sprintf ( "vm_promscrape_target_relabel_duration_seconds{type=%q}" , discoveryType ) ) . UpdateDuration ( startTime )
2020-04-13 18:02:27 +00:00
return dst
}
2020-12-08 15:50:03 +00:00
func ( sdc * FileSDConfig ) appendScrapeWork ( dst [ ] * ScrapeWork , swsMapPrev map [ string ] [ ] * ScrapeWork , baseDir string , swc * scrapeWorkConfig ) [ ] * ScrapeWork {
2020-02-23 11:35:47 +00:00
for _ , file := range sdc . Files {
pathPattern := getFilepath ( baseDir , file )
paths := [ ] string { pathPattern }
if strings . Contains ( pathPattern , "*" ) {
var err error
paths , err = filepath . Glob ( pathPattern )
if err != nil {
2020-04-13 09:59:05 +00:00
// Do not return this error, since other files may contain valid scrape configs.
logger . Errorf ( "invalid pattern %q in `files` section: %s; skipping it" , file , err )
continue
2020-02-23 11:35:47 +00:00
}
}
for _ , path := range paths {
stcs , err := loadStaticConfigs ( path )
if err != nil {
// Do not return this error, since other paths may contain valid scrape configs.
2020-05-03 09:41:13 +00:00
if sws := swsMapPrev [ path ] ; sws != nil {
2020-02-23 11:35:47 +00:00
// Re-use the previous valid scrape work for this path.
logger . Errorf ( "keeping the previously loaded `static_configs` from %q because of error when re-loading the file: %s" , path , err )
dst = append ( dst , sws ... )
} else {
logger . Errorf ( "skipping loading `static_configs` from %q because of error: %s" , path , err )
}
continue
}
pathShort := path
if strings . HasPrefix ( pathShort , baseDir ) {
pathShort = path [ len ( baseDir ) : ]
if len ( pathShort ) > 0 && pathShort [ 0 ] == filepath . Separator {
pathShort = pathShort [ 1 : ]
}
}
2020-04-13 09:59:05 +00:00
metaLabels := map [ string ] string {
2020-02-23 11:35:47 +00:00
"__meta_filepath" : pathShort ,
2020-05-03 09:41:13 +00:00
"__vm_filepath" : path , // This label is needed for internal promscrape logic
2020-02-23 11:35:47 +00:00
}
for i := range stcs {
2020-04-13 09:59:05 +00:00
dst = stcs [ i ] . appendScrapeWork ( dst , swc , metaLabels )
2020-02-23 11:35:47 +00:00
}
}
}
2020-04-13 09:59:05 +00:00
return dst
2020-02-23 11:35:47 +00:00
}
2020-12-08 15:50:03 +00:00
func ( stc * StaticConfig ) appendScrapeWork ( dst [ ] * ScrapeWork , swc * scrapeWorkConfig , metaLabels map [ string ] string ) [ ] * ScrapeWork {
2020-02-23 11:35:47 +00:00
for _ , target := range stc . Targets {
if target == "" {
2020-04-13 09:59:05 +00:00
// Do not return this error, since other targets may be valid
logger . Errorf ( "`static_configs` target for `job_name` %q cannot be empty; skipping it" , swc . jobName )
2020-02-23 11:35:47 +00:00
continue
}
2021-02-26 19:41:54 +00:00
sw , err := swc . getScrapeWork ( target , stc . Labels , metaLabels )
2020-04-13 09:59:05 +00:00
if err != nil {
// Do not return this error, since other targets may be valid
logger . Errorf ( "error when parsing `static_configs` target %q for `job_name` %q: %s; skipping it" , target , swc . jobName , err )
2020-02-23 11:35:47 +00:00
continue
}
2021-02-26 10:46:28 +00:00
if sw != nil {
dst = append ( dst , sw )
}
2020-02-23 11:35:47 +00:00
}
2020-04-13 09:59:05 +00:00
return dst
}
2021-10-12 14:03:09 +00:00
func appendScrapeWorkKey ( dst [ ] byte , labels [ ] prompbmarshal . Label ) [ ] byte {
for _ , label := range labels {
// Do not use strconv.AppendQuote, since it is slow according to CPU profile.
dst = append ( dst , label . Name ... )
dst = append ( dst , '=' )
dst = append ( dst , label . Value ... )
dst = append ( dst , ',' )
}
2021-02-28 20:29:34 +00:00
return dst
2021-02-26 19:41:54 +00:00
}
2021-03-04 08:20:15 +00:00
func needSkipScrapeWork ( key string , membersCount , replicasCount , memberNum int ) bool {
if membersCount <= 1 {
2021-02-28 16:39:57 +00:00
return false
}
2021-03-05 07:05:52 +00:00
h := xxhash . Sum64 ( bytesutil . ToUnsafeBytes ( key ) )
idx := int ( h % uint64 ( membersCount ) )
2021-03-04 08:20:15 +00:00
if replicasCount < 1 {
replicasCount = 1
}
for i := 0 ; i < replicasCount ; i ++ {
if idx == memberNum {
return false
}
idx ++
2021-05-13 08:14:51 +00:00
if idx >= membersCount {
2021-03-04 08:20:15 +00:00
idx = 0
}
}
return true
2021-02-28 16:39:57 +00:00
}
2021-03-02 14:42:48 +00:00
var scrapeWorkKeyBufPool bytesutil . ByteBufferPool
func ( swc * scrapeWorkConfig ) getScrapeWork ( target string , extraLabels , metaLabels map [ string ] string ) ( * ScrapeWork , error ) {
2021-09-12 10:33:39 +00:00
labels := mergeLabels ( swc , target , extraLabels , metaLabels )
2020-11-04 09:08:30 +00:00
var originalLabels [ ] prompbmarshal . Label
if ! * dropOriginalLabels {
originalLabels = append ( [ ] prompbmarshal . Label { } , labels ... )
promrelabel . SortLabels ( originalLabels )
2020-12-08 10:22:57 +00:00
// Reduce memory usage by interning all the strings in originalLabels.
internLabelStrings ( originalLabels )
2020-11-04 09:08:30 +00:00
}
2021-06-04 17:27:55 +00:00
labels = swc . relabelConfigs . Apply ( labels , 0 , false )
2020-04-14 09:21:10 +00:00
labels = promrelabel . RemoveMetaLabels ( labels [ : 0 ] , labels )
2020-11-09 08:54:24 +00:00
// Remove references to already deleted labels, so GC could clean strings for label name and label value past len(labels).
2020-11-07 14:16:56 +00:00
// This should reduce memory usage when relabeling creates big number of temporary labels with long names and/or values.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825 for details.
2020-11-09 08:54:24 +00:00
labels = append ( [ ] prompbmarshal . Label { } , labels ... )
2020-11-07 14:16:56 +00:00
2021-10-12 14:03:09 +00:00
// Verify whether the scrape work must be skipped because of `-promscrape.cluster.*` configs.
// Perform the verification on labels after the relabeling in order to guarantee that targets with the same set of labels
// go to the same vmagent shard.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
if * clusterMembersCount > 1 {
bb := scrapeWorkKeyBufPool . Get ( )
bb . B = appendScrapeWorkKey ( bb . B [ : 0 ] , labels )
needSkip := needSkipScrapeWork ( bytesutil . ToUnsafeString ( bb . B ) , * clusterMembersCount , * clusterReplicationFactor , * clusterMemberNum )
scrapeWorkKeyBufPool . Put ( bb )
if needSkip {
return nil , nil
}
}
2020-04-13 09:59:05 +00:00
if len ( labels ) == 0 {
// Drop target without labels.
2020-10-20 18:44:59 +00:00
droppedTargetsMap . Register ( originalLabels )
2021-02-26 10:46:28 +00:00
return nil , nil
2020-04-13 09:59:05 +00:00
}
// See https://www.robustperception.io/life-of-a-label
2020-04-14 11:11:54 +00:00
schemeRelabeled := promrelabel . GetLabelValueByName ( labels , "__scheme__" )
if len ( schemeRelabeled ) == 0 {
2020-04-13 09:59:05 +00:00
schemeRelabeled = "http"
}
2020-04-14 11:11:54 +00:00
addressRelabeled := promrelabel . GetLabelValueByName ( labels , "__address__" )
if len ( addressRelabeled ) == 0 {
2020-04-13 09:59:05 +00:00
// Drop target without scrape address.
2020-10-20 18:44:59 +00:00
droppedTargetsMap . Register ( originalLabels )
2021-02-26 10:46:28 +00:00
return nil , nil
2020-04-13 09:59:05 +00:00
}
2020-05-03 13:51:03 +00:00
if strings . Contains ( addressRelabeled , "/" ) {
2020-04-13 09:59:05 +00:00
// Drop target with '/'
2020-10-20 18:44:59 +00:00
droppedTargetsMap . Register ( originalLabels )
2021-02-26 10:46:28 +00:00
return nil , nil
2020-04-13 09:59:05 +00:00
}
2020-05-03 13:41:33 +00:00
addressRelabeled = addMissingPort ( schemeRelabeled , addressRelabeled )
2020-04-14 11:11:54 +00:00
metricsPathRelabeled := promrelabel . GetLabelValueByName ( labels , "__metrics_path__" )
2020-04-13 09:59:05 +00:00
if metricsPathRelabeled == "" {
metricsPathRelabeled = "/metrics"
}
2020-10-29 05:39:42 +00:00
if ! strings . HasPrefix ( metricsPathRelabeled , "/" ) {
metricsPathRelabeled = "/" + metricsPathRelabeled
}
2020-04-13 09:59:05 +00:00
paramsRelabeled := getParamsFromLabels ( labels , swc . params )
optionalQuestion := "?"
if len ( paramsRelabeled ) == 0 || strings . Contains ( metricsPathRelabeled , "?" ) {
optionalQuestion = ""
}
paramsStr := url . Values ( paramsRelabeled ) . Encode ( )
2020-05-03 13:51:03 +00:00
scrapeURL := fmt . Sprintf ( "%s://%s%s%s%s" , schemeRelabeled , addressRelabeled , metricsPathRelabeled , optionalQuestion , paramsStr )
2020-04-13 09:59:05 +00:00
if _ , err := url . Parse ( scrapeURL ) ; err != nil {
2021-02-26 10:46:28 +00:00
return nil , fmt . Errorf ( "invalid url %q for scheme=%q (%q), target=%q (%q), metrics_path=%q (%q) for `job_name` %q: %w" ,
2020-05-03 13:51:03 +00:00
scrapeURL , swc . scheme , schemeRelabeled , target , addressRelabeled , swc . metricsPath , metricsPathRelabeled , swc . jobName , err )
2020-04-13 09:59:05 +00:00
}
2020-05-03 13:41:33 +00:00
// Set missing "instance" label according to https://www.robustperception.io/life-of-a-label
if promrelabel . GetLabelByName ( labels , "instance" ) == nil {
labels = append ( labels , prompbmarshal . Label {
Name : "instance" ,
Value : addressRelabeled ,
} )
promrelabel . SortLabels ( labels )
}
2021-09-12 10:33:39 +00:00
// Read __scrape_interval__ and __scrape_timeout__ from labels.
scrapeInterval := swc . scrapeInterval
if s := promrelabel . GetLabelValueByName ( labels , "__scrape_interval__" ) ; len ( s ) > 0 {
d , err := time . ParseDuration ( s )
if err != nil {
return nil , fmt . Errorf ( "cannot parse __scrape_interval__=%q: %w" , s , err )
}
scrapeInterval = d
}
scrapeTimeout := swc . scrapeTimeout
if s := promrelabel . GetLabelValueByName ( labels , "__scrape_timeout__" ) ; len ( s ) > 0 {
d , err := time . ParseDuration ( s )
if err != nil {
return nil , fmt . Errorf ( "cannot parse __scrape_timeout__=%q: %w" , s , err )
}
scrapeTimeout = d
}
2021-09-09 15:49:37 +00:00
// Read series_limit option from __series_limit__ label.
// See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter
seriesLimit := swc . seriesLimit
if s := promrelabel . GetLabelValueByName ( labels , "__series_limit__" ) ; len ( s ) > 0 {
n , err := strconv . Atoi ( s )
if err != nil {
return nil , fmt . Errorf ( "cannot parse __series_limit__=%q: %w" , s , err )
}
seriesLimit = n
}
// Read stream_parse option from __stream_parse__ label.
// See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
streamParse := swc . streamParse
if s := promrelabel . GetLabelValueByName ( labels , "__stream_parse__" ) ; len ( s ) > 0 {
b , err := strconv . ParseBool ( s )
if err != nil {
return nil , fmt . Errorf ( "cannot parse __stream_parse__=%q: %w" , s , err )
}
streamParse = b
}
2020-12-08 10:22:57 +00:00
// Reduce memory usage by interning all the strings in labels.
internLabelStrings ( labels )
2021-02-26 10:46:28 +00:00
sw := & ScrapeWork {
2020-04-13 09:59:05 +00:00
ScrapeURL : scrapeURL ,
2021-09-12 10:33:39 +00:00
ScrapeInterval : scrapeInterval ,
ScrapeTimeout : scrapeTimeout ,
2020-04-13 09:59:05 +00:00
HonorLabels : swc . honorLabels ,
HonorTimestamps : swc . honorTimestamps ,
2021-04-02 16:56:38 +00:00
DenyRedirects : swc . denyRedirects ,
2020-10-08 15:50:22 +00:00
OriginalLabels : originalLabels ,
2020-04-13 09:59:05 +00:00
Labels : labels ,
2020-12-24 08:52:37 +00:00
ProxyURL : swc . proxyURL ,
2021-03-12 01:35:49 +00:00
ProxyAuthConfig : swc . proxyAuthConfig ,
2020-04-13 09:59:05 +00:00
AuthConfig : swc . authConfig ,
MetricRelabelConfigs : swc . metricRelabelConfigs ,
2020-04-14 08:58:15 +00:00
SampleLimit : swc . sampleLimit ,
2020-07-02 11:19:11 +00:00
DisableCompression : swc . disableCompression ,
DisableKeepAlive : swc . disableKeepAlive ,
2021-09-09 15:49:37 +00:00
StreamParse : streamParse ,
2021-02-18 21:51:29 +00:00
ScrapeAlignInterval : swc . scrapeAlignInterval ,
2021-03-08 09:58:25 +00:00
ScrapeOffset : swc . scrapeOffset ,
2021-09-09 15:49:37 +00:00
SeriesLimit : seriesLimit ,
2020-06-23 12:35:19 +00:00
jobNameOriginal : swc . jobName ,
2021-02-26 10:46:28 +00:00
}
return sw , nil
2020-02-23 11:35:47 +00:00
}
2020-12-08 10:22:57 +00:00
func internLabelStrings ( labels [ ] prompbmarshal . Label ) {
for i := range labels {
label := & labels [ i ]
label . Name = internString ( label . Name )
label . Value = internString ( label . Value )
}
}
func internString ( s string ) string {
internStringsMapLock . Lock ( )
defer internStringsMapLock . Unlock ( )
if sInterned , ok := internStringsMap [ s ] ; ok {
return sInterned
}
// Make a new copy for s in order to remove references from possible bigger string s refers to.
sCopy := string ( append ( [ ] byte { } , s ... ) )
internStringsMap [ sCopy ] = sCopy
if len ( internStringsMap ) > 100e3 {
internStringsMap = make ( map [ string ] string , 100e3 )
}
return sCopy
}
var (
internStringsMapLock sync . Mutex
internStringsMap = make ( map [ string ] string , 100e3 )
)
2020-02-23 11:35:47 +00:00
func getParamsFromLabels ( labels [ ] prompbmarshal . Label , paramsOrig map [ string ] [ ] string ) map [ string ] [ ] string {
// See https://www.robustperception.io/life-of-a-label
m := make ( map [ string ] [ ] string )
for i := range labels {
label := & labels [ i ]
if ! strings . HasPrefix ( label . Name , "__param_" ) {
continue
}
name := label . Name [ len ( "__param_" ) : ]
values := [ ] string { label . Value }
if p := paramsOrig [ name ] ; len ( p ) > 1 {
values = append ( values , p [ 1 : ] ... )
}
m [ name ] = values
}
return m
}
2021-09-12 10:33:39 +00:00
func mergeLabels ( swc * scrapeWorkConfig , target string , extraLabels , metaLabels map [ string ] string ) [ ] prompbmarshal . Label {
2020-02-23 11:35:47 +00:00
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
2021-09-30 13:56:12 +00:00
m := make ( map [ string ] string , 6 + len ( swc . externalLabels ) + len ( swc . params ) + len ( extraLabels ) + len ( metaLabels ) )
2021-09-12 10:33:39 +00:00
for k , v := range swc . externalLabels {
2020-02-23 11:35:47 +00:00
m [ k ] = v
}
2021-09-12 10:33:39 +00:00
m [ "job" ] = swc . jobName
2020-05-03 13:41:33 +00:00
m [ "__address__" ] = target
2021-09-12 10:33:39 +00:00
m [ "__scheme__" ] = swc . scheme
m [ "__metrics_path__" ] = swc . metricsPath
m [ "__scrape_interval__" ] = swc . scrapeInterval . String ( )
m [ "__scrape_timeout__" ] = swc . scrapeTimeout . String ( )
for k , args := range swc . params {
2020-02-23 11:35:47 +00:00
if len ( args ) == 0 {
continue
}
k = "__param_" + k
v := args [ 0 ]
2020-03-12 18:17:13 +00:00
m [ k ] = v
}
2020-04-13 09:59:05 +00:00
for k , v := range extraLabels {
2020-03-12 18:17:13 +00:00
m [ k ] = v
}
for k , v := range metaLabels {
2020-02-23 11:35:47 +00:00
m [ k ] = v
}
result := make ( [ ] prompbmarshal . Label , 0 , len ( m ) )
for k , v := range m {
result = append ( result , prompbmarshal . Label {
Name : k ,
Value : v ,
} )
}
2020-04-13 09:59:05 +00:00
return result
2020-02-23 11:35:47 +00:00
}
func getFilepath ( baseDir , path string ) string {
if filepath . IsAbs ( path ) {
return path
}
return filepath . Join ( baseDir , path )
}
func addMissingPort ( scheme , target string ) string {
if strings . Contains ( target , ":" ) {
return target
}
if scheme == "https" {
target += ":443"
} else {
target += ":80"
}
return target
}
const (
defaultScrapeInterval = time . Minute
defaultScrapeTimeout = 10 * time . Second
)