diff --git a/README.md b/README.md index 7aba95a44..3ace5755d 100644 --- a/README.md +++ b/README.md @@ -646,7 +646,7 @@ to your needs or when testing bugfixes. ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make victoria-metrics` from the root folder of the repository. It builds `victoria-metrics` binary and puts it into the `bin` folder. @@ -662,7 +662,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b ### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make victoria-metrics-arm` or `make victoria-metrics-arm64` from the root folder of the repository. It builds `victoria-metrics-arm` or `victoria-metrics-arm64` binary respectively and puts it into the `bin` folder. @@ -678,7 +678,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b This is an experimental mode, which may result in a lower compression ratio and slower decompression performance. Use it with caution! -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make victoria-metrics-pure` from the root folder of the repository. It builds `victoria-metrics-pure` binary and puts it into the `bin` folder. @@ -1165,8 +1165,8 @@ on the same time series if they fall within the same discrete 60s bucket. The e The recommended value for `-dedup.minScrapeInterval` must equal to `scrape_interval` config from Prometheus configs. -The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair -write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical +The de-duplication reduces disk space usage if multiple identically configured [vmagent](https://victoriametrics.github.io/vmagent.html) or Prometheus instances in HA pair +write data to the same VictoriaMetrics instance. These vmagent or Prometheus instances must have identical `external_labels` section in their configs, so they write data to the same time series. @@ -1317,6 +1317,17 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), since the encountered issue could be already fixed there. +* It is recommended to have at least 50% of spare resources for CPU, disk IO and RAM, so VictoriaMetrics could handle short spikes in the workload without performance issues. + +* VictoriaMetrics requires free disk space for [merging data files to bigger ones](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). + It may slow down when there is no enough free space left. So make sure `-storageDataPath` directory + has at least 20% of free space. The remaining amount of free space + can be [monitored](#monitoring) via `vm_free_disk_space_bytes` metric. The total size of data + stored on the disk can be monitored via sum of `vm_data_size_bytes` metrics. + See also `vm_merge_need_free_disk_space` metrics, which are set to values higher than 0 + if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions, + which would start background merge if they had more free disk space. + * It is recommended inspecting logs during troubleshooting, since they may contain useful information. * VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage. @@ -1335,15 +1346,6 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion, then data querying may slow down significantly. -* VictoriaMetrics requires free disk space for [merging data files to bigger ones](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). - It may slow down when there is no enough free space left. So make sure `-storageDataPath` directory - has at least 20% of free space comparing to disk size. The remaining amount of free space - can be [monitored](#monitoring) via `vm_free_disk_space_bytes` metric. The total size of data - stored on the disk can be monitored via sum of `vm_data_size_bytes` metrics. - See also `vm_merge_need_free_disk_space` metrics, which are set to values higher than 0 - if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions, - which would start background merge if they had more free disk space. - * If VictoriaMetrics doesn't work because of certain parts are corrupted due to disk errors, then just remove directories with broken parts. It is safe removing subdirectories under `<-storageDataPath>/data/{big,small}/YYYY_MM` directories when VictoriaMetrics isn't running. This recovers VictoriaMetrics at the cost of data loss stored in the deleted broken parts. diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 848da1d69..22aaa2360 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -230,7 +230,7 @@ You can read more about relabeling in the following articles: ## Scraping big number of targets A single `vmagent` instance can scrape tens of thousands of scrape targets. Sometimes this isn't enough due to limitations on CPU, network, RAM, etc. -In this case scrape targets can be split among multiple `vmagent` instances (aka `vmagent` clustering). +In this case scrape targets can be split among multiple `vmagent` instances (aka `vmagent` horizontal scaling and clustering). Each `vmagent` instance in the cluster must use identical `-promscrape.config` files with distinct `-promscrape.cluster.memberNum` values. The flag value must be in the range `0 ... N-1`, where `N` is the number of `vmagent` instances in the cluster. The number of `vmagent` instances in the cluster must be passed to `-promscrape.cluster.membersCount` command-line flag. For example, the following commands @@ -241,6 +241,19 @@ spread scrape targets among a cluster of two `vmagent` instances: /path/to/vmagent -promscrape.cluster.membersCount=2 -promscrape.cluster.memberNum=1 -promscrape.config=/path/to/config.yml ... ``` +By default each scrape target is scraped only by a single `vmagent` instance in the cluster. If there is a need for replicating scrape targets among multiple `vmagent` instances, +then `-promscrape.cluster.replicationFactor` command-line flag must be set to the desired number of replicas. For example, the following commands +start a cluster of three `vmagent` instances, where each target is scraped by two `vmagent` instances: + +``` +/path/to/vmagent -promscrape.cluster.membersCount=3 -promscrape.cluster.replicationFactor=2 -promscrape.cluster.memberNum=0 -promscrape.config=/path/to/config.yml ... +/path/to/vmagent -promscrape.cluster.membersCount=3 -promscrape.cluster.replicationFactor=2 -promscrape.cluster.memberNum=1 -promscrape.config=/path/to/config.yml ... +/path/to/vmagent -promscrape.cluster.membersCount=3 -promscrape.cluster.replicationFactor=2 -promscrape.cluster.memberNum=2 -promscrape.config=/path/to/config.yml ... +``` + +If each target is scraped by multiple `vmagent` instances, then data deduplication must be enabled at remote storage pointed by `-remoteWrite.url`. +See [these docs](https://victoriametrics.github.io/#deduplication) for details. + ## Monitoring @@ -313,26 +326,30 @@ It may be useful to perform `vmagent` rolling update without any scrape loss. the url may contain sensitive information such as auth tokens or passwords. Pass `-remoteWrite.showURL` command-line flag when starting `vmagent` in order to see all the valid urls. -* If scrapes must be aligned in time (for instance, if they must be performed at the beginning of every hour), then set the `scrape_align_interval` option - in the corresponding scrape config. For example, the following config aligns hourly scrapes to the nearest 10 minutes: +* By default `vmagent` evenly spreads scrape load in time. If a particular scrape target must be scraped at the beginning of some interval, + then `scrape_align_interval` option must be used. For example, the following config aligns hourly scrapes to the beginning of hour: ```yml scrape_configs: - job_name: foo scrape_interval: 1h - scrape_align_interval: 10m + scrape_align_interval: 1h + ``` + +* By default `vmagent` evenly spreads scrape load in time. If a particular scrape target must be scraped at specific offset, then `scrape_offset` option must be used. + For example, the following config instructs `vmagent` to scrape the target at 10 seconds of every minute: + + ```yml + scrape_configs: + - job_name: foo + scrape_interval: 1m + scrape_offset: 10s ``` * If you see `skipping duplicate scrape target with identical labels` errors when scraping Kubernetes pods, then it is likely these pods listen to multiple ports or they use an init container. These errors can either be fixed or suppressed with the `-promscrape.suppressDuplicateScrapeTargetErrors` command-line flag. See the available options below if you prefer fixing the root cause of the error: - The following `relabel_configs` section may help determining `__meta_*` labels resulting in duplicate targets: - ```yml - - action: labelmap - regex: __meta_(.*) - ``` - The following relabeling rule may be added to `relabel_configs` section in order to filter out pods with unneeded ports: ```yml - action: keep_if_equal @@ -354,7 +371,7 @@ We recommend using [binary releases](https://github.com/VictoriaMetrics/Victoria ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmagent` from the root folder of the repository. It builds the `vmagent` binary and puts it into the `bin` folder. @@ -383,7 +400,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b ### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmagent-arm` or `make vmagent-arm64` from the root folder of the repository. It builds `vmagent-arm` or `vmagent-arm64` binary respectively and puts it into the `bin` folder. diff --git a/app/vmalert/README.md b/app/vmalert/README.md index cc3ef4fbd..adfed9bf2 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -391,7 +391,7 @@ It is recommended using #### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmalert` from the root folder of the repository. It builds `vmalert` binary and puts it into the `bin` folder. @@ -408,7 +408,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b #### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmalert-arm` or `make vmalert-arm64` from the root folder of the repository. It builds `vmalert-arm` or `vmalert-arm64` binary respectively and puts it into the `bin` folder. diff --git a/app/vmauth/README.md b/app/vmauth/README.md index 41492efd1..133718787 100644 --- a/app/vmauth/README.md +++ b/app/vmauth/README.md @@ -65,13 +65,13 @@ users: # A single user for querying and inserting data: - # - Requests to http://vmauth:8427/api/v1/query or http://vmauth:8427/api/v1/query_range - # are routed to http://vmselect:8481/select/42/prometheus. + # - Requests to http://vmauth:8427/api/v1/query, http://vmauth:8427/api/v1/query_range + # and http://vmauth:8427/api/v1/label//values are routed to http://vmselect:8481/select/42/prometheus. # For example, http://vmauth:8427/api/v1/query is routed to http://vmselect:8480/select/42/prometheus/api/v1/query # - Requests to http://vmauth:8427/api/v1/write are routed to http://vminsert:8480/insert/42/prometheus/api/v1/write - username: "foobar" url_map: - - src_paths: ["/api/v1/query", "/api/v1/query_range"] + - src_paths: ["/api/v1/query", "/api/v1/query_range", "/api/v1/label/[^/]+/values"] url_prefix: "http://vmselect:8481/select/42/prometheus" - src_paths: ["/api/v1/write"] url_prefix: "http://vminsert:8480/insert/42/prometheus" @@ -110,7 +110,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmauth` from the root folder of the repository. It builds `vmauth` binary and puts it into the `bin` folder. diff --git a/app/vmauth/auth_config.go b/app/vmauth/auth_config.go index fca24d25f..dc8c1da2f 100644 --- a/app/vmauth/auth_config.go +++ b/app/vmauth/auth_config.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "net/url" + "regexp" "strings" "sync" "sync/atomic" @@ -38,8 +39,47 @@ type UserInfo struct { // URLMap is a mapping from source paths to target urls. type URLMap struct { - SrcPaths []string `yaml:"src_paths"` - URLPrefix string `yaml:"url_prefix"` + SrcPaths []*SrcPath `yaml:"src_paths"` + URLPrefix string `yaml:"url_prefix"` +} + +// SrcPath represents an src path +type SrcPath struct { + sOriginal string + re *regexp.Regexp +} + +func (sp *SrcPath) match(s string) bool { + prefix, ok := sp.re.LiteralPrefix() + if ok { + // Fast path - literal match + return s == prefix + } + if !strings.HasPrefix(s, prefix) { + return false + } + return sp.re.MatchString(s) +} + +// UnmarshalYAML implements yaml.Unmarshaler +func (sp *SrcPath) UnmarshalYAML(f func(interface{}) error) error { + var s string + if err := f(&s); err != nil { + return err + } + sAnchored := "^(?:" + s + ")$" + re, err := regexp.Compile(sAnchored) + if err != nil { + return fmt.Errorf("cannot build regexp from %q: %w", s, err) + } + sp.sOriginal = s + sp.re = re + return nil +} + +// MarshalYAML implements yaml.Marshaler. +func (sp *SrcPath) MarshalYAML() (interface{}, error) { + return sp.sOriginal, nil } func initAuthConfig() { @@ -127,11 +167,6 @@ func parseAuthConfig(data []byte) (map[string]*UserInfo, error) { if len(e.SrcPaths) == 0 { return nil, fmt.Errorf("missing `src_paths`") } - for _, path := range e.SrcPaths { - if !strings.HasPrefix(path, "/") { - return nil, fmt.Errorf("`src_path`=%q must start with `/`", path) - } - } urlPrefix, err := sanitizeURLPrefix(e.URLPrefix) if err != nil { return nil, err diff --git a/app/vmauth/auth_config_test.go b/app/vmauth/auth_config_test.go index dbd8299c8..0033b2b42 100644 --- a/app/vmauth/auth_config_test.go +++ b/app/vmauth/auth_config_test.go @@ -1,8 +1,12 @@ package main import ( - "reflect" + "bytes" + "fmt" + "regexp" "testing" + + "gopkg.in/yaml.v2" ) func TestParseAuthConfigFailure(t *testing.T) { @@ -79,12 +83,12 @@ users: - url_prefix: http://foobar `) - // src_path not starting with `/` + // Invalid regexp in src_path. f(` users: - username: a url_map: - - src_paths: [foobar] + - src_paths: ['fo[obar'] url_prefix: http://foobar `) } @@ -97,8 +101,8 @@ func TestParseAuthConfigSuccess(t *testing.T) { t.Fatalf("unexpected error: %s", err) } removeMetrics(m) - if !reflect.DeepEqual(m, expectedAuthConfig) { - t.Fatalf("unexpected auth config\ngot\n%v\nwant\n%v", m, expectedAuthConfig) + if err := areEqualConfigs(m, expectedAuthConfig); err != nil { + t.Fatal(err) } } @@ -139,7 +143,7 @@ users: users: - username: foo url_map: - - src_paths: ["/api/v1/query","/api/v1/query_range"] + - src_paths: ["/api/v1/query","/api/v1/query_range","/api/v1/label/[^./]+/.+"] url_prefix: http://vmselect/select/0/prometheus - src_paths: ["/api/v1/write"] url_prefix: http://vminsert/insert/0/prometheus @@ -148,11 +152,11 @@ users: Username: "foo", URLMap: []URLMap{ { - SrcPaths: []string{"/api/v1/query", "/api/v1/query_range"}, + SrcPaths: getSrcPaths([]string{"/api/v1/query", "/api/v1/query_range", "/api/v1/label/[^./]+/.+"}), URLPrefix: "http://vmselect/select/0/prometheus", }, { - SrcPaths: []string{"/api/v1/write"}, + SrcPaths: getSrcPaths([]string{"/api/v1/write"}), URLPrefix: "http://vminsert/insert/0/prometheus", }, }, @@ -160,8 +164,34 @@ users: }) } +func getSrcPaths(paths []string) []*SrcPath { + var sps []*SrcPath + for _, path := range paths { + sps = append(sps, &SrcPath{ + sOriginal: path, + re: regexp.MustCompile("^(?:" + path + ")$"), + }) + } + return sps +} + func removeMetrics(m map[string]*UserInfo) { for _, info := range m { info.requests = nil } } + +func areEqualConfigs(a, b map[string]*UserInfo) error { + aData, err := yaml.Marshal(a) + if err != nil { + return fmt.Errorf("cannot marshal a: %w", err) + } + bData, err := yaml.Marshal(b) + if err != nil { + return fmt.Errorf("cannot marshal b: %w", err) + } + if !bytes.Equal(aData, bData) { + return fmt.Errorf("unexpected configs;\ngot\n%s\nwant\n%s", aData, bData) + } + return nil +} diff --git a/app/vmauth/target_url.go b/app/vmauth/target_url.go index f086227f0..9f4db4dcf 100644 --- a/app/vmauth/target_url.go +++ b/app/vmauth/target_url.go @@ -18,8 +18,8 @@ func createTargetURL(ui *UserInfo, uOrig *url.URL) (string, error) { u.Path = "/" + u.Path } for _, e := range ui.URLMap { - for _, path := range e.SrcPaths { - if u.Path == path { + for _, sp := range e.SrcPaths { + if sp.match(u.Path) { return e.URLPrefix + u.RequestURI(), nil } } diff --git a/app/vmauth/target_url_test.go b/app/vmauth/target_url_test.go index 5dcfd07cf..34d2e6c33 100644 --- a/app/vmauth/target_url_test.go +++ b/app/vmauth/target_url_test.go @@ -44,11 +44,11 @@ func TestCreateTargetURLSuccess(t *testing.T) { ui := &UserInfo{ URLMap: []URLMap{ { - SrcPaths: []string{"/api/v1/query"}, + SrcPaths: getSrcPaths([]string{"/api/v1/query"}), URLPrefix: "http://vmselect/0/prometheus", }, { - SrcPaths: []string{"/api/v1/write"}, + SrcPaths: getSrcPaths([]string{"/api/v1/write"}), URLPrefix: "http://vminsert/0/prometheus", }, }, @@ -57,6 +57,26 @@ func TestCreateTargetURLSuccess(t *testing.T) { f(ui, "/api/v1/query?query=up", "http://vmselect/0/prometheus/api/v1/query?query=up") f(ui, "/api/v1/write", "http://vminsert/0/prometheus/api/v1/write") f(ui, "/api/v1/query_range", "http://default-server/api/v1/query_range") + + // Complex routing regexp paths in `url_map` + ui = &UserInfo{ + URLMap: []URLMap{ + { + SrcPaths: getSrcPaths([]string{"/api/v1/query(_range)?", "/api/v1/label/[^/]+/values"}), + URLPrefix: "http://vmselect/0/prometheus", + }, + { + SrcPaths: getSrcPaths([]string{"/api/v1/write"}), + URLPrefix: "http://vminsert/0/prometheus", + }, + }, + URLPrefix: "http://default-server", + } + f(ui, "/api/v1/query?query=up", "http://vmselect/0/prometheus/api/v1/query?query=up") + f(ui, "/api/v1/query_range?query=up", "http://vmselect/0/prometheus/api/v1/query_range?query=up") + f(ui, "/api/v1/label/foo/values", "http://vmselect/0/prometheus/api/v1/label/foo/values") + f(ui, "/api/v1/write", "http://vminsert/0/prometheus/api/v1/write") + f(ui, "/api/v1/foo/bar", "http://default-server/api/v1/foo/bar") } func TestCreateTargetURLFailure(t *testing.T) { @@ -78,7 +98,7 @@ func TestCreateTargetURLFailure(t *testing.T) { f(&UserInfo{ URLMap: []URLMap{ { - SrcPaths: []string{"/api/v1/query"}, + SrcPaths: getSrcPaths([]string{"/api/v1/query"}), URLPrefix: "http://foobar/baz", }, }, diff --git a/app/vmbackup/README.md b/app/vmbackup/README.md index 37bc102f4..a26357b89 100644 --- a/app/vmbackup/README.md +++ b/app/vmbackup/README.md @@ -235,7 +235,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmbackup` from the root folder of the repository. It builds `vmbackup` binary and puts it into the `bin` folder. diff --git a/app/vmctl/README.md b/app/vmctl/README.md index 549196c53..8a53d590a 100644 --- a/app/vmctl/README.md +++ b/app/vmctl/README.md @@ -48,7 +48,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmctl` from the root folder of the repository. It builds `vmctl` binary and puts it into the `bin` folder. @@ -77,7 +77,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b #### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmctl-arm` or `make vmctl-arm64` from the root folder of the repository. It builds `vmctl-arm` or `vmctl-arm64` binary respectively and puts it into the `bin` folder. diff --git a/app/vmrestore/README.md b/app/vmrestore/README.md index c12629549..67127d5be 100644 --- a/app/vmrestore/README.md +++ b/app/vmrestore/README.md @@ -131,7 +131,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmrestore` from the root folder of the repository. It builds `vmrestore` binary and puts it into the `bin` folder. diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 9a3664064..c50c64eea 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -2810,6 +2810,72 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`stdvar_over_time()`, func(t *testing.T) { + t.Parallel() + q := `round(stdvar_over_time(rand(0)[200s:5s]), 0.001)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.082, 0.088, 0.092, 0.075, 0.101, 0.08}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`histogram_stdvar()`, func(t *testing.T) { + t.Parallel() + q := `round(histogram_stdvar(histogram_over_time(rand(0)[200s:5s])), 0.001)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.079, 0.089, 0.089, 0.071, 0.1, 0.082}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`stddev_over_time()`, func(t *testing.T) { + t.Parallel() + q := `round(stddev_over_time(rand(0)[200s:5s]), 0.001)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.286, 0.297, 0.303, 0.274, 0.318, 0.283}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`histogram_stddev()`, func(t *testing.T) { + t.Parallel() + q := `round(histogram_stddev(histogram_over_time(rand(0)[200s:5s])), 0.001)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.281, 0.299, 0.298, 0.267, 0.316, 0.286}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`avg_over_time()`, func(t *testing.T) { + t.Parallel() + q := `round(avg_over_time(rand(0)[200s:5s]), 0.001)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.521, 0.518, 0.509, 0.544, 0.511, 0.504}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`histogram_avg()`, func(t *testing.T) { + t.Parallel() + q := `round(histogram_avg(histogram_over_time(rand(0)[200s:5s])), 0.001)` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.519, 0.521, 0.503, 0.543, 0.511, 0.506}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run(`histogram_share(single-value-valid-le)`, func(t *testing.T) { t.Parallel() q := `histogram_share(80, label_set(100, "le", "200"))` diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 714f3cf80..a0e0d23c0 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -118,6 +118,9 @@ var transformFuncs = map[string]transformFunc{ "prometheus_buckets": transformPrometheusBuckets, "buckets_limit": transformBucketsLimit, "histogram_share": transformHistogramShare, + "histogram_avg": transformHistogramAvg, + "histogram_stdvar": transformHistogramStdvar, + "histogram_stddev": transformHistogramStddev, "sort_by_label": newTransformFuncSortByLabel(false), "sort_by_label_desc": newTransformFuncSortByLabel(true), } @@ -657,6 +660,127 @@ func transformHistogramShare(tfa *transformFuncArg) ([]*timeseries, error) { return rvs, nil } +func transformHistogramAvg(tfa *transformFuncArg) ([]*timeseries, error) { + args := tfa.args + if err := expectTransformArgsNum(args, 1); err != nil { + return nil, err + } + tss := vmrangeBucketsToLE(args[0]) + m := groupLeTimeseries(tss) + rvs := make([]*timeseries, 0, len(m)) + for _, xss := range m { + sort.Slice(xss, func(i, j int) bool { + return xss[i].le < xss[j].le + }) + dst := xss[0].ts + for i := range dst.Values { + dst.Values[i] = avgForLeTimeseries(i, xss) + } + rvs = append(rvs, dst) + } + return rvs, nil +} + +func transformHistogramStddev(tfa *transformFuncArg) ([]*timeseries, error) { + args := tfa.args + if err := expectTransformArgsNum(args, 1); err != nil { + return nil, err + } + tss := vmrangeBucketsToLE(args[0]) + m := groupLeTimeseries(tss) + rvs := make([]*timeseries, 0, len(m)) + for _, xss := range m { + sort.Slice(xss, func(i, j int) bool { + return xss[i].le < xss[j].le + }) + dst := xss[0].ts + for i := range dst.Values { + v := stdvarForLeTimeseries(i, xss) + dst.Values[i] = math.Sqrt(v) + } + rvs = append(rvs, dst) + } + return rvs, nil +} + +func transformHistogramStdvar(tfa *transformFuncArg) ([]*timeseries, error) { + args := tfa.args + if err := expectTransformArgsNum(args, 1); err != nil { + return nil, err + } + tss := vmrangeBucketsToLE(args[0]) + m := groupLeTimeseries(tss) + rvs := make([]*timeseries, 0, len(m)) + for _, xss := range m { + sort.Slice(xss, func(i, j int) bool { + return xss[i].le < xss[j].le + }) + dst := xss[0].ts + for i := range dst.Values { + dst.Values[i] = stdvarForLeTimeseries(i, xss) + } + rvs = append(rvs, dst) + } + return rvs, nil +} + +func avgForLeTimeseries(i int, xss []leTimeseries) float64 { + lePrev := float64(0) + vPrev := float64(0) + sum := float64(0) + weightTotal := float64(0) + for _, xs := range xss { + if math.IsInf(xs.le, 0) { + continue + } + le := xs.le + n := (le + lePrev) / 2 + v := xs.ts.Values[i] + weight := v - vPrev + sum += n * weight + weightTotal += weight + lePrev = le + vPrev = v + } + if weightTotal == 0 { + return nan + } + return sum / weightTotal +} + +func stdvarForLeTimeseries(i int, xss []leTimeseries) float64 { + lePrev := float64(0) + vPrev := float64(0) + sum := float64(0) + sum2 := float64(0) + weightTotal := float64(0) + for _, xs := range xss { + if math.IsInf(xs.le, 0) { + continue + } + le := xs.le + n := (le + lePrev) / 2 + v := xs.ts.Values[i] + weight := v - vPrev + sum += n * weight + sum2 += n * n * weight + weightTotal += weight + lePrev = le + vPrev = v + } + if weightTotal == 0 { + return nan + } + avg := sum / weightTotal + avg2 := sum2 / weightTotal + stdvar := avg2 - avg*avg + if stdvar < 0 { + // Correct possible calculation error. + stdvar = 0 + } + return stdvar +} + func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 2 || len(args) > 3 { diff --git a/deployment/docker/Makefile b/deployment/docker/Makefile index 4b392de95..ccdd75c03 100644 --- a/deployment/docker/Makefile +++ b/deployment/docker/Makefile @@ -2,8 +2,8 @@ DOCKER_NAMESPACE := victoriametrics -ROOT_IMAGE ?= alpine:3.13.1 -CERTS_IMAGE := alpine:3.13.1 +ROOT_IMAGE ?= alpine:3.13.2 +CERTS_IMAGE := alpine:3.13.2 GO_BUILDER_IMAGE := golang:1.16.0 BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr : _) BASE_IMAGE := local/base:1.1.3-$(shell echo $(ROOT_IMAGE) | tr : _)-$(shell echo $(CERTS_IMAGE) | tr : _) diff --git a/docs/Articles.md b/docs/Articles.md index 4a4248610..c6c8c645e 100644 --- a/docs/Articles.md +++ b/docs/Articles.md @@ -26,6 +26,7 @@ * [Cloud Native Model Driven Telemetry Stack on OpenShift](https://cer6erus.medium.com/cloud-native-model-driven-telemetry-stack-on-openshift-80712621f5bc) * [Observability, Availability & DORA’s Research Program](https://medium.com/alteos-tech-blog/observability-availability-and-doras-research-program-85deb6680e78) * [Tame Kubernetes Costs with Percona Monitoring and Management and Prometheus Operator](https://www.percona.com/blog/2021/02/12/tame-kubernetes-costs-with-percona-monitoring-and-management-and-prometheus-operator/) +* [Prometheus Victoria Metrics On AWS ECS](https://dalefro.medium.com/prometheus-victoria-metrics-on-aws-ecs-62448e266090) ## Our articles diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bbe6d002d..d13dfbe76 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,18 @@ # tip +* FEATURE: add the following functions to [MetricsQL](https://victoriametrics.github.io/MetricsQL.html): + - `histogram_avg(buckets)` - returns the average value for the given buckets. + - `histogram_stdvar(buckets)` - returns standard variance for the given buckets. + - `histogram_stddev(buckets)` - returns standard deviation for the given buckets. +* FEATURE: vmagent: add ability to replicate scrape targets among `vmagent` instances in the cluster with `-promscrape.cluster.replicationFactor` command-line flag. See [these docs](https://victoriametrics.github.io/vmagent.html#scraping-big-number-of-targets). +* FATURE: vmagent: accept `scrape_offset` option at `scrape_config`. This option may be useful when scrapes must start at the specified offset of every scrape interval. See [these docs](https://victoriametrics.github.io/vmagent.html#troubleshooting) for details. +* FEATURE: vmauth: allow using regexp paths in `url_map`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1112) for details. + +* BUGFIX: vmagent: prevent from high CPU usage bug during failing scrapes with small `scrape_timeout` (less than a few seconds). +* BUGFIX: vmagent: reduce memory usage when Kubernetes service discovery is used in big number of distinct jobs by sharing the cache. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1113 +* BUGFIX: prevent exponent overflow when processing extremely small values close to zero such as `2.964393875E-314`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1114 + # [v1.55.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.55.1) @@ -374,7 +386,7 @@ in front of VictoriaMetrics. [Contact us](mailto:sales@victoriametrics.com) if y See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/807 * BUGFIX: properly handle `inf` values during [background merge of LSM parts](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). - Previously `Inf` values could result in `NaN` values for adjancent samples in time series. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/805 . + Previously `Inf` values could result in `NaN` values for adjacent samples in time series. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/805 . * BUGFIX: fill gaps on graphs for `range_*` and `running_*` functions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/806 . * BUGFIX: make a copy of label with new name during relabeling with `action: labelmap` in the same way as Prometheus does. Previously the original label name has been replaced. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/812 . diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 7e678e1c1..1fe691492 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -96,7 +96,7 @@ vmstorage-prod ### Development Builds -1. [Install go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make` from the repository root. It should build `vmstorage`, `vmselect` and `vminsert` binaries and put them into the `bin` folder. diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index 15bf2638c..b67cb4d3c 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -104,9 +104,12 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `histogram(q)` - calculates aggregate histogram over `q` time series for each point on the graph. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details. - `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`. For example, the following query calculates median temperature by country over the last 24 hours: - `histogram_quantile(0.5, sum(histogram_over_time(temperature[24h])) by (vmbucket, country))`. + `histogram_quantile(0.5, sum(histogram_over_time(temperature[24h])) by (vmrange,country))`. - `histogram_share(le, buckets)` - returns share (in the range 0..1) for `buckets` that fall below `le`. Useful for calculating SLI and SLO. For instance, the following query returns the share of requests which are performed under 1.5 seconds during the last 5 minutes: `histogram_share(1.5, sum(rate(request_duration_seconds_bucket[5m])) by (le))`. +- `histogram_avg(buckets)` - returns the average value for the given buckets. It can be used for calculating the average over the given time range across multiple time series. For exmple, `histogram_avg(sum(histogram_over_time(response_time_duration_seconds[5m])) by (vmrange,job))` would return the average response time per each `job` over the last 5 minutes. +- `histogram_stdvar(buckets)` - returns standard variance for the given buckets. It can be used for calculating standard deviation over the given time range across multiple time series. For example, `histogram_stdvar(sum(histogram_over_time(temperature[24])) by (vmrange,country))` would return standard deviation for the temperature per each country over the last 24 hours. +- `histogram_stddev(buckets)` - returns standard deviation for the given buckets. - `topk_*` and `bottomk_*` aggregate functions, which return up to K time series. Note that the standard `topk` function may return more than K time series - see [this article](https://www.robustperception.io/graph-top-n-time-series-in-grafana) for details. - `topk_min(k, q)` - returns top K time series with the max minimums on the given time range @@ -140,8 +143,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g - `first_over_time(m[d])` - returns the first value for `m` on the time range `d`. - `outliersk(N, q) by (group)` - returns up to `N` outlier time series for `q` in every `group`. Outlier time series have the highest deviation from the `median(q)`. This aggregate function is useful to detect anomalies across groups of similar time series. -- `ascent_over_time(m[d])` - returns the sum of positive deltas between adjancent data points in `m` over `d`. Useful for tracking height gains in GPS track. -- `descent_over_time(m[d])` - returns the absolute sum of negative deltas between adjancent data points in `m` over `d`. Useful for tracking height loss in GPS track. +- `ascent_over_time(m[d])` - returns the sum of positive deltas between adjacent data points in `m` over `d`. Useful for tracking height gains in GPS track. +- `descent_over_time(m[d])` - returns the absolute sum of negative deltas between adjacent data points in `m` over `d`. Useful for tracking height loss in GPS track. - `mode_over_time(m[d])` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for `m` values over `d`. It is expected that `m` values are discrete. - `mode(q) by (x)` - returns [mode](https://en.wikipedia.org/wiki/Mode_(statistics)) for each point in `q` grouped by `x`. It is expected that `q` points are discrete. - `rate_over_sum(m[d])` - returns rate over the sum of `m` values over `d` duration. diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 7aba95a44..3ace5755d 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -646,7 +646,7 @@ to your needs or when testing bugfixes. ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make victoria-metrics` from the root folder of the repository. It builds `victoria-metrics` binary and puts it into the `bin` folder. @@ -662,7 +662,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b ### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make victoria-metrics-arm` or `make victoria-metrics-arm64` from the root folder of the repository. It builds `victoria-metrics-arm` or `victoria-metrics-arm64` binary respectively and puts it into the `bin` folder. @@ -678,7 +678,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b This is an experimental mode, which may result in a lower compression ratio and slower decompression performance. Use it with caution! -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make victoria-metrics-pure` from the root folder of the repository. It builds `victoria-metrics-pure` binary and puts it into the `bin` folder. @@ -1165,8 +1165,8 @@ on the same time series if they fall within the same discrete 60s bucket. The e The recommended value for `-dedup.minScrapeInterval` must equal to `scrape_interval` config from Prometheus configs. -The de-duplication reduces disk space usage if multiple identically configured Prometheus instances in HA pair -write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical +The de-duplication reduces disk space usage if multiple identically configured [vmagent](https://victoriametrics.github.io/vmagent.html) or Prometheus instances in HA pair +write data to the same VictoriaMetrics instance. These vmagent or Prometheus instances must have identical `external_labels` section in their configs, so they write data to the same time series. @@ -1317,6 +1317,17 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * It is recommended upgrading to the latest available release from [this page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases), since the encountered issue could be already fixed there. +* It is recommended to have at least 50% of spare resources for CPU, disk IO and RAM, so VictoriaMetrics could handle short spikes in the workload without performance issues. + +* VictoriaMetrics requires free disk space for [merging data files to bigger ones](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). + It may slow down when there is no enough free space left. So make sure `-storageDataPath` directory + has at least 20% of free space. The remaining amount of free space + can be [monitored](#monitoring) via `vm_free_disk_space_bytes` metric. The total size of data + stored on the disk can be monitored via sum of `vm_data_size_bytes` metrics. + See also `vm_merge_need_free_disk_space` metrics, which are set to values higher than 0 + if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions, + which would start background merge if they had more free disk space. + * It is recommended inspecting logs during troubleshooting, since they may contain useful information. * VictoriaMetrics buffers incoming data in memory for up to a few seconds before flushing it to persistent storage. @@ -1335,15 +1346,6 @@ See the example of alerting rules for VM components [here](https://github.com/Vi * VictoriaMetrics prioritizes data ingestion over data querying. So if it has no enough resources for data ingestion, then data querying may slow down significantly. -* VictoriaMetrics requires free disk space for [merging data files to bigger ones](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). - It may slow down when there is no enough free space left. So make sure `-storageDataPath` directory - has at least 20% of free space comparing to disk size. The remaining amount of free space - can be [monitored](#monitoring) via `vm_free_disk_space_bytes` metric. The total size of data - stored on the disk can be monitored via sum of `vm_data_size_bytes` metrics. - See also `vm_merge_need_free_disk_space` metrics, which are set to values higher than 0 - if background merge cannot be initiated due to free disk space shortage. The value shows the number of per-month partitions, - which would start background merge if they had more free disk space. - * If VictoriaMetrics doesn't work because of certain parts are corrupted due to disk errors, then just remove directories with broken parts. It is safe removing subdirectories under `<-storageDataPath>/data/{big,small}/YYYY_MM` directories when VictoriaMetrics isn't running. This recovers VictoriaMetrics at the cost of data loss stored in the deleted broken parts. diff --git a/docs/vmagent.md b/docs/vmagent.md index 848da1d69..22aaa2360 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -230,7 +230,7 @@ You can read more about relabeling in the following articles: ## Scraping big number of targets A single `vmagent` instance can scrape tens of thousands of scrape targets. Sometimes this isn't enough due to limitations on CPU, network, RAM, etc. -In this case scrape targets can be split among multiple `vmagent` instances (aka `vmagent` clustering). +In this case scrape targets can be split among multiple `vmagent` instances (aka `vmagent` horizontal scaling and clustering). Each `vmagent` instance in the cluster must use identical `-promscrape.config` files with distinct `-promscrape.cluster.memberNum` values. The flag value must be in the range `0 ... N-1`, where `N` is the number of `vmagent` instances in the cluster. The number of `vmagent` instances in the cluster must be passed to `-promscrape.cluster.membersCount` command-line flag. For example, the following commands @@ -241,6 +241,19 @@ spread scrape targets among a cluster of two `vmagent` instances: /path/to/vmagent -promscrape.cluster.membersCount=2 -promscrape.cluster.memberNum=1 -promscrape.config=/path/to/config.yml ... ``` +By default each scrape target is scraped only by a single `vmagent` instance in the cluster. If there is a need for replicating scrape targets among multiple `vmagent` instances, +then `-promscrape.cluster.replicationFactor` command-line flag must be set to the desired number of replicas. For example, the following commands +start a cluster of three `vmagent` instances, where each target is scraped by two `vmagent` instances: + +``` +/path/to/vmagent -promscrape.cluster.membersCount=3 -promscrape.cluster.replicationFactor=2 -promscrape.cluster.memberNum=0 -promscrape.config=/path/to/config.yml ... +/path/to/vmagent -promscrape.cluster.membersCount=3 -promscrape.cluster.replicationFactor=2 -promscrape.cluster.memberNum=1 -promscrape.config=/path/to/config.yml ... +/path/to/vmagent -promscrape.cluster.membersCount=3 -promscrape.cluster.replicationFactor=2 -promscrape.cluster.memberNum=2 -promscrape.config=/path/to/config.yml ... +``` + +If each target is scraped by multiple `vmagent` instances, then data deduplication must be enabled at remote storage pointed by `-remoteWrite.url`. +See [these docs](https://victoriametrics.github.io/#deduplication) for details. + ## Monitoring @@ -313,26 +326,30 @@ It may be useful to perform `vmagent` rolling update without any scrape loss. the url may contain sensitive information such as auth tokens or passwords. Pass `-remoteWrite.showURL` command-line flag when starting `vmagent` in order to see all the valid urls. -* If scrapes must be aligned in time (for instance, if they must be performed at the beginning of every hour), then set the `scrape_align_interval` option - in the corresponding scrape config. For example, the following config aligns hourly scrapes to the nearest 10 minutes: +* By default `vmagent` evenly spreads scrape load in time. If a particular scrape target must be scraped at the beginning of some interval, + then `scrape_align_interval` option must be used. For example, the following config aligns hourly scrapes to the beginning of hour: ```yml scrape_configs: - job_name: foo scrape_interval: 1h - scrape_align_interval: 10m + scrape_align_interval: 1h + ``` + +* By default `vmagent` evenly spreads scrape load in time. If a particular scrape target must be scraped at specific offset, then `scrape_offset` option must be used. + For example, the following config instructs `vmagent` to scrape the target at 10 seconds of every minute: + + ```yml + scrape_configs: + - job_name: foo + scrape_interval: 1m + scrape_offset: 10s ``` * If you see `skipping duplicate scrape target with identical labels` errors when scraping Kubernetes pods, then it is likely these pods listen to multiple ports or they use an init container. These errors can either be fixed or suppressed with the `-promscrape.suppressDuplicateScrapeTargetErrors` command-line flag. See the available options below if you prefer fixing the root cause of the error: - The following `relabel_configs` section may help determining `__meta_*` labels resulting in duplicate targets: - ```yml - - action: labelmap - regex: __meta_(.*) - ``` - The following relabeling rule may be added to `relabel_configs` section in order to filter out pods with unneeded ports: ```yml - action: keep_if_equal @@ -354,7 +371,7 @@ We recommend using [binary releases](https://github.com/VictoriaMetrics/Victoria ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmagent` from the root folder of the repository. It builds the `vmagent` binary and puts it into the `bin` folder. @@ -383,7 +400,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b ### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmagent-arm` or `make vmagent-arm64` from the root folder of the repository. It builds `vmagent-arm` or `vmagent-arm64` binary respectively and puts it into the `bin` folder. diff --git a/docs/vmalert.md b/docs/vmalert.md index cc3ef4fbd..adfed9bf2 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -391,7 +391,7 @@ It is recommended using #### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmalert` from the root folder of the repository. It builds `vmalert` binary and puts it into the `bin` folder. @@ -408,7 +408,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b #### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmalert-arm` or `make vmalert-arm64` from the root folder of the repository. It builds `vmalert-arm` or `vmalert-arm64` binary respectively and puts it into the `bin` folder. diff --git a/docs/vmauth.md b/docs/vmauth.md index 41492efd1..133718787 100644 --- a/docs/vmauth.md +++ b/docs/vmauth.md @@ -65,13 +65,13 @@ users: # A single user for querying and inserting data: - # - Requests to http://vmauth:8427/api/v1/query or http://vmauth:8427/api/v1/query_range - # are routed to http://vmselect:8481/select/42/prometheus. + # - Requests to http://vmauth:8427/api/v1/query, http://vmauth:8427/api/v1/query_range + # and http://vmauth:8427/api/v1/label//values are routed to http://vmselect:8481/select/42/prometheus. # For example, http://vmauth:8427/api/v1/query is routed to http://vmselect:8480/select/42/prometheus/api/v1/query # - Requests to http://vmauth:8427/api/v1/write are routed to http://vminsert:8480/insert/42/prometheus/api/v1/write - username: "foobar" url_map: - - src_paths: ["/api/v1/query", "/api/v1/query_range"] + - src_paths: ["/api/v1/query", "/api/v1/query_range", "/api/v1/label/[^/]+/values"] url_prefix: "http://vmselect:8481/select/42/prometheus" - src_paths: ["/api/v1/write"] url_prefix: "http://vminsert:8480/insert/42/prometheus" @@ -110,7 +110,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmauth` from the root folder of the repository. It builds `vmauth` binary and puts it into the `bin` folder. diff --git a/docs/vmbackup.md b/docs/vmbackup.md index 37bc102f4..a26357b89 100644 --- a/docs/vmbackup.md +++ b/docs/vmbackup.md @@ -235,7 +235,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmbackup` from the root folder of the repository. It builds `vmbackup` binary and puts it into the `bin` folder. diff --git a/docs/vmctl.md b/docs/vmctl.md index 549196c53..8a53d590a 100644 --- a/docs/vmctl.md +++ b/docs/vmctl.md @@ -48,7 +48,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmctl` from the root folder of the repository. It builds `vmctl` binary and puts it into the `bin` folder. @@ -77,7 +77,7 @@ ARM build may run on Raspberry Pi or on [energy-efficient ARM servers](https://b #### Development ARM build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmctl-arm` or `make vmctl-arm64` from the root folder of the repository. It builds `vmctl-arm` or `vmctl-arm64` binary respectively and puts it into the `bin` folder. diff --git a/docs/vmrestore.md b/docs/vmrestore.md index c12629549..67127d5be 100644 --- a/docs/vmrestore.md +++ b/docs/vmrestore.md @@ -131,7 +131,7 @@ It is recommended using [binary releases](https://github.com/VictoriaMetrics/Vic ### Development build -1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.13. +1. [Install Go](https://golang.org/doc/install). The minimum supported version is Go 1.14. 2. Run `make vmrestore` from the root folder of the repository. It builds `vmrestore` binary and puts it into the `bin` folder. diff --git a/go.mod b/go.mod index e96d6156a..a9d740c66 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module github.com/VictoriaMetrics/VictoriaMetrics require ( cloud.google.com/go v0.78.0 // indirect cloud.google.com/go/storage v1.14.0 - github.com/VictoriaMetrics/fastcache v1.5.7 + github.com/VictoriaMetrics/fastcache v1.5.8 // Do not use the original github.com/valyala/fasthttp because of issues // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b - github.com/VictoriaMetrics/fasthttp v1.0.12 + github.com/VictoriaMetrics/fasthttp v1.0.13 github.com/VictoriaMetrics/metrics v1.15.2 - github.com/VictoriaMetrics/metricsql v0.12.0 + github.com/VictoriaMetrics/metricsql v0.14.0 github.com/aws/aws-sdk-go v1.37.22 github.com/cespare/xxhash/v2 v2.1.1 github.com/cheggaaa/pb/v3 v3.0.6 @@ -45,4 +45,4 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) -go 1.13 +go 1.14 diff --git a/go.sum b/go.sum index ff85f6a34..78d9a7aa1 100644 --- a/go.sum +++ b/go.sum @@ -80,15 +80,15 @@ github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdko github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw= -github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8= -github.com/VictoriaMetrics/fasthttp v1.0.12 h1:Ag0E119yrH4BTxVyjKD9TeiSImtG9bUcg/stItLJhSE= -github.com/VictoriaMetrics/fasthttp v1.0.12/go.mod h1:3SeUL4zwB/p/a9aEeRc6gdlbrtNHXBJR6N376EgiSHU= +github.com/VictoriaMetrics/fastcache v1.5.8 h1:XW+YVx9lEXITBVv35ugK9OyotdNJVcbza69o3jmqWuI= +github.com/VictoriaMetrics/fastcache v1.5.8/go.mod h1:SiMZNgwEPJ9qWLshu9tyuE6bKc9ZWYhcNV/L7jurprQ= +github.com/VictoriaMetrics/fasthttp v1.0.13 h1:5JNS4vSPdN4QyfcpAg3Y1Wznf0uXEuSOFpeIlFw3MgM= +github.com/VictoriaMetrics/fasthttp v1.0.13/go.mod h1:3SeUL4zwB/p/a9aEeRc6gdlbrtNHXBJR6N376EgiSHU= github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/VictoriaMetrics/metrics v1.15.2 h1:w/GD8L9tm+gvx1oZvAofRRXwammiicdI0jgLghA2Gdo= github.com/VictoriaMetrics/metrics v1.15.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= -github.com/VictoriaMetrics/metricsql v0.12.0 h1:NMIu0MPBmGP34g4RUjI1U0xW5XYp7IVNXe9KtZI3PFQ= -github.com/VictoriaMetrics/metricsql v0.12.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= +github.com/VictoriaMetrics/metricsql v0.14.0 h1:XGbpZJVskUPJFo2C7vG6ATxXBwkBFPe7EWZXB2HZt2U= +github.com/VictoriaMetrics/metricsql v0.14.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= diff --git a/lib/decimal/decimal.go b/lib/decimal/decimal.go index 84304fefe..9b75814cd 100644 --- a/lib/decimal/decimal.go +++ b/lib/decimal/decimal.go @@ -456,6 +456,13 @@ func positiveFloatToDecimalSlow(f float64) (int64, int16) { prec = 1e15 } _, exp := math.Frexp(f) + // Bound the exponent according to https://en.wikipedia.org/wiki/Double-precision_floating-point_format + // This fixes the issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1114 + if exp < -1022 { + exp = -1022 + } else if exp > 1023 { + exp = 1023 + } scale = int16(float64(exp) * (math.Ln2 / math.Ln10)) f *= math.Pow10(-int(scale)) } diff --git a/lib/decimal/decimal_test.go b/lib/decimal/decimal_test.go index f6a94ea97..5bc177d64 100644 --- a/lib/decimal/decimal_test.go +++ b/lib/decimal/decimal_test.go @@ -95,6 +95,20 @@ func TestPositiveFloatToDecimal(t *testing.T) { f(0.001130435, 1130435, -9) f(vInfPos, 9223372036854775, 3) f(vMax, 9223372036854775, 3) + + // Extreme cases. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1114 + f(2.964393875e-100, 2964393875, -109) + f(2.964393875e-309, 2964393875, -318) + f(2.964393875e-314, 296439387505, -325) + f(2.964393875e-315, 2964393875047, -327) + f(2.964393875e-320, 296439387505, -331) + f(2.964393875e-324, 494065645841, -335) + f(2.964393875e-325, 0, 1) + + f(2.964393875e+307, 2964393875, 298) + f(9.964393875e+307, 9964393875, 298) + f(1.064393875e+308, 1064393875, 299) + f(1.797393875e+308, 1797393875, 299) } func TestAppendDecimalToFloat(t *testing.T) { diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 77029f7be..8d558862f 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -47,6 +47,8 @@ var ( "Each member then scrapes roughly 1/N of all the targets. By default cluster scraping is disabled, i.e. a single scraper scrapes all the targets") clusterMemberNum = flag.Int("promscrape.cluster.memberNum", 0, "The number of number in the cluster of scrapers. "+ "It must be an unique value in the range 0 ... promscrape.cluster.membersCount-1 across scrapers in the cluster") + clusterReplicationFactor = flag.Int("promscrape.cluster.replicationFactor", 1, "The number of members in the cluster, which scrape the same targets. "+ + "If the replication factor is greater than 2, then the deduplication must be enabled at remote storage side. See https://victoriametrics.github.io/#deduplication") ) // Config represents essential parts from Prometheus config defined at https://prometheus.io/docs/prometheus/latest/configuration/configuration/ @@ -112,6 +114,7 @@ type ScrapeConfig struct { DisableKeepAlive bool `yaml:"disable_keepalive,omitempty"` StreamParse bool `yaml:"stream_parse,omitempty"` ScrapeAlignInterval time.Duration `yaml:"scrape_align_interval,omitempty"` + ScrapeOffset time.Duration `yaml:"scrape_offset,omitempty"` // This is set in loadConfig swc *scrapeWorkConfig @@ -567,6 +570,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf disableKeepAlive: sc.DisableKeepAlive, streamParse: sc.StreamParse, scrapeAlignInterval: sc.ScrapeAlignInterval, + scrapeOffset: sc.ScrapeOffset, } return swc, nil } @@ -590,6 +594,7 @@ type scrapeWorkConfig struct { disableKeepAlive bool streamParse bool scrapeAlignInterval time.Duration + scrapeOffset time.Duration } type targetLabelsGetter interface { @@ -721,12 +726,25 @@ func appendScrapeWorkKey(dst []byte, target string, extraLabels, metaLabels map[ return dst } -func needSkipScrapeWork(key string) bool { - if *clusterMembersCount <= 0 { +func needSkipScrapeWork(key string, membersCount, replicasCount, memberNum int) bool { + if membersCount <= 1 { return false } - h := int(xxhash.Sum64(bytesutil.ToUnsafeBytes(key))) - return (h % *clusterMembersCount) != *clusterMemberNum + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key)) + idx := int(h % uint64(membersCount)) + if replicasCount < 1 { + replicasCount = 1 + } + for i := 0; i < replicasCount; i++ { + if idx == memberNum { + return false + } + idx++ + if idx >= replicasCount { + idx = 0 + } + } + return true } func appendSortedKeyValuePairs(dst []byte, m map[string]string) []byte { @@ -753,7 +771,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel bb := scrapeWorkKeyBufPool.Get() defer scrapeWorkKeyBufPool.Put(bb) bb.B = appendScrapeWorkKey(bb.B[:0], target, extraLabels, metaLabels) - if needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B)) { + if needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor, *clusterMemberNum) { return nil, nil } @@ -838,6 +856,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel DisableKeepAlive: swc.disableKeepAlive, StreamParse: swc.streamParse, ScrapeAlignInterval: swc.scrapeAlignInterval, + ScrapeOffset: swc.scrapeOffset, jobNameOriginal: swc.jobName, } diff --git a/lib/promscrape/config_test.go b/lib/promscrape/config_test.go index 6ca1cf993..f75dae834 100644 --- a/lib/promscrape/config_test.go +++ b/lib/promscrape/config_test.go @@ -12,6 +12,32 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) +func TestNeedSkipScrapeWork(t *testing.T) { + f := func(key string, membersCount, replicationFactor, memberNum int, needSkipExpected bool) { + t.Helper() + needSkip := needSkipScrapeWork(key, membersCount, replicationFactor, memberNum) + if needSkip != needSkipExpected { + t.Fatalf("unexpected needSkipScrapeWork(key=%q, membersCount=%d, replicationFactor=%d, memberNum=%d); got %v; want %v", + key, membersCount, replicationFactor, memberNum, needSkip, needSkipExpected) + } + } + // Disabled clustering + f("foo", 0, 0, 0, false) + + // A cluster with 2 nodes with disabled replication + f("foo", 2, 0, 0, true) + f("foo", 2, 0, 1, false) + + // A cluster with 2 nodes with replicationFactor=2 + f("foo", 2, 2, 0, false) + f("foo", 2, 2, 1, false) + + // A cluster with 3 nodes with replicationFactor=2 + f("foo", 3, 2, 0, false) + f("foo", 3, 2, 1, true) + f("foo", 3, 2, 2, false) +} + func TestLoadStaticConfigs(t *testing.T) { scs, err := loadStaticConfigs("testdata/file_sd.json") if err != nil { @@ -1276,6 +1302,7 @@ scrape_configs: disable_compression: true stream_parse: true scrape_align_interval: 1s + scrape_offset: 0.5s static_configs: - targets: - 192.168.1.2 # SNMP device. @@ -1330,6 +1357,7 @@ scrape_configs: DisableCompression: true, StreamParse: true, ScrapeAlignInterval: time.Second, + ScrapeOffset: 500 * time.Millisecond, jobNameOriginal: "snmp", }, }) diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index e79ef6325..cd20d89ee 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -1,27 +1,17 @@ package kubernetes import ( - "context" - "encoding/json" - "errors" "flag" "fmt" - "io" - "io/ioutil" "net" "net/http" "net/url" "os" - "reflect" - "strconv" "strings" - "sync" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" - "github.com/VictoriaMetrics/metrics" ) var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kuberntes API server") @@ -104,498 +94,3 @@ func newAPIConfig(sdc *SDConfig, baseDir string, swcFunc ScrapeWorkConstructorFu } return cfg, nil } - -// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set. -// -// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes -type WatchEvent struct { - Type string - Object json.RawMessage -} - -// object is any Kubernetes object. -type object interface { - key() string - getTargetLabels(aw *apiWatcher) []map[string]string -} - -// parseObjectFunc must parse object from the given data. -type parseObjectFunc func(data []byte) (object, error) - -// parseObjectListFunc must parse objectList from the given data. -type parseObjectListFunc func(data []byte) (map[string]object, ListMeta, error) - -// apiWatcher is used for watching for Kuberntes object changes and caching their latest states. -type apiWatcher struct { - // The client used for watching for object changes - client *http.Client - - // Kubenetes API server address in the form http://api-server - apiServer string - - // The contents for `Authorization` HTTP request header - authorization string - - // Namespaces to watch - namespaces []string - - // Selectors to apply during watch - selectors []Selector - - // Constructor for creating ScrapeWork objects from labels. - swcFunc ScrapeWorkConstructorFunc - - // mu protects watchersByURL - mu sync.Mutex - - // a map of watchers keyed by request urls - watchersByURL map[string]*urlWatcher - - stopFunc func() - stopCtx context.Context - wg sync.WaitGroup -} - -func (aw *apiWatcher) mustStop() { - aw.stopFunc() - aw.wg.Wait() -} - -func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher { - stopCtx, stopFunc := context.WithCancel(context.Background()) - return &apiWatcher{ - apiServer: apiServer, - authorization: authorization, - client: client, - namespaces: namespaces, - selectors: selectors, - swcFunc: swcFunc, - - watchersByURL: make(map[string]*urlWatcher), - - stopFunc: stopFunc, - stopCtx: stopCtx, - } -} - -// getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role. -func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} { - aw.startWatchersForRole(role) - var swos []interface{} - aw.mu.Lock() - for _, uw := range aw.watchersByURL { - if uw.role != role { - continue - } - uw.mu.Lock() - for _, swosLocal := range uw.swosByKey { - swos = append(swos, swosLocal...) - } - uw.mu.Unlock() - } - aw.mu.Unlock() - return swos -} - -// getObjectByRole returns an object with the given (namespace, name) key and the given role. -func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { - if aw == nil { - return nil - } - key := namespace + "/" + name - aw.startWatchersForRole(role) - var o object - aw.mu.Lock() - for _, uw := range aw.watchersByURL { - if uw.role != role { - continue - } - uw.mu.Lock() - o = uw.objectsByKey[key] - uw.mu.Unlock() - if o != nil { - break - } - } - aw.mu.Unlock() - return o -} - -func (aw *apiWatcher) startWatchersForRole(role string) { - parseObject, parseObjectList := getObjectParsersForRole(role) - paths := getAPIPaths(role, aw.namespaces, aw.selectors) - for _, path := range paths { - apiURL := aw.apiServer + path - aw.startWatcherForURL(role, apiURL, parseObject, parseObjectList) - } -} - -func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) { - aw.mu.Lock() - if aw.watchersByURL[apiURL] != nil { - // Watcher for the given path already exists. - aw.mu.Unlock() - return - } - uw := aw.newURLWatcher(role, apiURL, parseObject, parseObjectList) - aw.watchersByURL[apiURL] = uw - aw.mu.Unlock() - - uw.watchersCount.Inc() - uw.watchersCreated.Inc() - resourceVersion := uw.reloadObjects() - aw.wg.Add(1) - go func() { - defer aw.wg.Done() - logger.Infof("started watcher for %q", apiURL) - uw.watchForUpdates(resourceVersion) - logger.Infof("stopped watcher for %q", apiURL) - uw.mu.Lock() - uw.objectsCount.Add(-len(uw.objectsByKey)) - uw.objectsRemoved.Add(len(uw.objectsByKey)) - uw.mu.Unlock() - - aw.mu.Lock() - delete(aw.watchersByURL, apiURL) - aw.mu.Unlock() - uw.watchersCount.Dec() - uw.watchersStopped.Inc() - }() -} - -// needStop returns true if aw must be stopped. -func (aw *apiWatcher) needStop() bool { - select { - case <-aw.stopCtx.Done(): - return true - default: - return false - } -} - -// doRequest performs http request to the given requestURL. -func (aw *apiWatcher) doRequest(requestURL string) (*http.Response, error) { - req, err := http.NewRequestWithContext(aw.stopCtx, "GET", requestURL, nil) - if err != nil { - logger.Fatalf("cannot create a request for %q: %s", requestURL, err) - } - if aw.authorization != "" { - req.Header.Set("Authorization", aw.authorization) - } - return aw.client.Do(req) -} - -// urlWatcher watches for an apiURL and updates object states in objectsByKey. -type urlWatcher struct { - role string - apiURL string - - parseObject parseObjectFunc - parseObjectList parseObjectListFunc - - // mu protects objectsByKey and swosByKey - mu sync.Mutex - - // objectsByKey contains the latest state for objects obtained from apiURL - objectsByKey map[string]object - swosByKey map[string][]interface{} - - // the parent apiWatcher - aw *apiWatcher - - watchersCount *metrics.Counter - watchersCreated *metrics.Counter - watchersStopped *metrics.Counter - - objectsCount *metrics.Counter - objectsAdded *metrics.Counter - objectsRemoved *metrics.Counter -} - -func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher { - return &urlWatcher{ - role: role, - apiURL: apiURL, - - parseObject: parseObject, - parseObjectList: parseObjectList, - - objectsByKey: make(map[string]object), - swosByKey: make(map[string][]interface{}), - - aw: aw, - - watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)), - watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)), - watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)), - - objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), - objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), - objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), - } -} - -// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. -func (uw *urlWatcher) reloadObjects() string { - aw := uw.aw - requestURL := uw.apiURL - resp, err := aw.doRequest(requestURL) - if err != nil { - if !aw.needStop() { - logger.Errorf("error when performing a request to %q: %s", requestURL, err) - } - return "" - } - body, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - if resp.StatusCode != http.StatusOK { - logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) - return "" - } - objectsByKey, metadata, err := uw.parseObjectList(body) - if err != nil { - if !aw.needStop() { - logger.Errorf("cannot parse response from %q: %s", requestURL, err) - } - return "" - } - swosByKey := make(map[string][]interface{}, len(objectsByKey)) - for k, o := range objectsByKey { - labels := o.getTargetLabels(aw) - swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) - if len(swos) > 0 { - swosByKey[k] = swos - } - } - uw.mu.Lock() - uw.objectsAdded.Add(len(objectsByKey)) - uw.objectsRemoved.Add(len(uw.objectsByKey)) - uw.objectsCount.Add(len(objectsByKey) - len(uw.objectsByKey)) - uw.objectsByKey = objectsByKey - uw.swosByKey = swosByKey - uw.mu.Unlock() - return metadata.ResourceVersion -} - -func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} { - swos := make([]interface{}, 0, len(labelss)) - for _, labels := range labelss { - swo := swcFunc(labels) - // The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1 - if swo != nil && !reflect.ValueOf(swo).IsNil() { - swos = append(swos, swo) - } - } - return swos -} - -// watchForUpdates watches for object updates starting from resourceVersion and updates the corresponding objects to the latest state. -// -// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes -func (uw *urlWatcher) watchForUpdates(resourceVersion string) { - aw := uw.aw - backoffDelay := time.Second - maxBackoffDelay := 30 * time.Second - backoffSleep := func() { - time.Sleep(backoffDelay) - backoffDelay *= 2 - if backoffDelay > maxBackoffDelay { - backoffDelay = maxBackoffDelay - } - } - apiURL := uw.apiURL - delimiter := "?" - if strings.Contains(apiURL, "?") { - delimiter = "&" - } - timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds() - apiURL += delimiter + "watch=1&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) - for { - if aw.needStop() { - return - } - requestURL := apiURL - if resourceVersion != "" { - requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) - } - resp, err := aw.doRequest(requestURL) - if err != nil { - if aw.needStop() { - return - } - logger.Errorf("error when performing a request to %q: %s", requestURL, err) - backoffSleep() - resourceVersion = uw.reloadObjects() - continue - } - if resp.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) - if resp.StatusCode == 410 { - // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses - backoffDelay = time.Second - } else { - backoffSleep() - } - resourceVersion = uw.reloadObjects() - continue - } - backoffDelay = time.Second - err = uw.readObjectUpdateStream(resp.Body) - _ = resp.Body.Close() - if err != nil { - if aw.needStop() { - return - } - if !errors.Is(err, io.EOF) { - logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) - } - backoffSleep() - resourceVersion = uw.reloadObjects() - continue - } - } -} - -// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events. -func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { - aw := uw.aw - d := json.NewDecoder(r) - var we WatchEvent - for { - if err := d.Decode(&we); err != nil { - return err - } - o, err := uw.parseObject(we.Object) - if err != nil { - return err - } - key := o.key() - switch we.Type { - case "ADDED", "MODIFIED": - uw.mu.Lock() - if uw.objectsByKey[key] == nil { - uw.objectsAdded.Inc() - uw.objectsCount.Inc() - } - uw.objectsByKey[key] = o - uw.mu.Unlock() - labels := o.getTargetLabels(aw) - swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) - uw.mu.Lock() - if len(swos) > 0 { - uw.swosByKey[key] = swos - } else { - delete(uw.swosByKey, key) - } - uw.mu.Unlock() - case "DELETED": - uw.mu.Lock() - if uw.objectsByKey[key] != nil { - uw.objectsRemoved.Inc() - uw.objectsCount.Dec() - } - delete(uw.objectsByKey, key) - delete(uw.swosByKey, key) - uw.mu.Unlock() - default: - return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) - } - } -} - -func getAPIPaths(role string, namespaces []string, selectors []Selector) []string { - objectName := getObjectNameByRole(role) - if objectName == "nodes" || len(namespaces) == 0 { - query := joinSelectors(role, selectors) - path := getAPIPath(objectName, "", query) - return []string{path} - } - query := joinSelectors(role, selectors) - paths := make([]string, len(namespaces)) - for i, namespace := range namespaces { - paths[i] = getAPIPath(objectName, namespace, query) - } - return paths -} - -func getAPIPath(objectName, namespace, query string) string { - suffix := objectName - if namespace != "" { - suffix = "namespaces/" + namespace + "/" + objectName - } - if len(query) > 0 { - suffix += "?" + query - } - if objectName == "ingresses" { - return "/apis/networking.k8s.io/v1beta1/" + suffix - } - if objectName == "endpointslices" { - return "/apis/discovery.k8s.io/v1beta1/" + suffix - } - return "/api/v1/" + suffix -} - -func joinSelectors(role string, selectors []Selector) string { - var labelSelectors, fieldSelectors []string - for _, s := range selectors { - if s.Role != role { - continue - } - if s.Label != "" { - labelSelectors = append(labelSelectors, s.Label) - } - if s.Field != "" { - fieldSelectors = append(fieldSelectors, s.Field) - } - } - var args []string - if len(labelSelectors) > 0 { - args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ","))) - } - if len(fieldSelectors) > 0 { - args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ","))) - } - return strings.Join(args, "&") -} - -func getObjectNameByRole(role string) string { - switch role { - case "node": - return "nodes" - case "pod": - return "pods" - case "service": - return "services" - case "endpoints": - return "endpoints" - case "endpointslices": - return "endpointslices" - case "ingress": - return "ingresses" - default: - logger.Panicf("BUG: unknonw role=%q", role) - return "" - } -} - -func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) { - switch role { - case "node": - return parseNode, parseNodeList - case "pod": - return parsePod, parsePodList - case "service": - return parseService, parseServiceList - case "endpoints": - return parseEndpoints, parseEndpointsList - case "endpointslices": - return parseEndpointSlice, parseEndpointSliceList - case "ingress": - return parseIngress, parseIngressList - default: - logger.Panicf("BUG: unsupported role=%q", role) - return nil, nil - } -} diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go new file mode 100644 index 000000000..10066dea6 --- /dev/null +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -0,0 +1,625 @@ +package kubernetes + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" +) + +// WatchEvent is a watch event returned from API server endpoints if `watch=1` query arg is set. +// +// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes +type WatchEvent struct { + Type string + Object json.RawMessage +} + +// object is any Kubernetes object. +type object interface { + key() string + getTargetLabels(aw *apiWatcher) []map[string]string +} + +// parseObjectFunc must parse object from the given data. +type parseObjectFunc func(data []byte) (object, error) + +// parseObjectListFunc must parse objectList from the given data. +type parseObjectListFunc func(data []byte) (map[string]object, ListMeta, error) + +// apiWatcher is used for watching for Kuberntes object changes and caching their latest states. +type apiWatcher struct { + // The client used for watching for object changes + client *http.Client + + // Kubenetes API server address in the form http://api-server + apiServer string + + // The contents for `Authorization` HTTP request header + authorization string + + // Namespaces to watch + namespaces []string + + // Selectors to apply during watch + selectors []Selector + + // Constructor for creating ScrapeWork objects from labels. + swcFunc ScrapeWorkConstructorFunc + + // mu protects watchersByURL + mu sync.Mutex + + // a map of watchers keyed by request urls + watchersByURL map[string]*urlWatcher + + stopFunc func() + stopCtx context.Context + wg sync.WaitGroup +} + +func (aw *apiWatcher) mustStop() { + aw.stopFunc() + aw.wg.Wait() +} + +func newAPIWatcher(client *http.Client, apiServer, authorization string, namespaces []string, selectors []Selector, swcFunc ScrapeWorkConstructorFunc) *apiWatcher { + stopCtx, stopFunc := context.WithCancel(context.Background()) + return &apiWatcher{ + apiServer: apiServer, + authorization: authorization, + client: client, + namespaces: namespaces, + selectors: selectors, + swcFunc: swcFunc, + + watchersByURL: make(map[string]*urlWatcher), + + stopFunc: stopFunc, + stopCtx: stopCtx, + } +} + +// getScrapeWorkObjectsForRole returns all the ScrapeWork objects for the given role. +func (aw *apiWatcher) getScrapeWorkObjectsForRole(role string) []interface{} { + aw.startWatchersForRole(role) + var swos []interface{} + aw.mu.Lock() + for _, uw := range aw.watchersByURL { + if uw.role != role { + continue + } + uw.mu.Lock() + for _, swosLocal := range uw.swosByKey { + swos = append(swos, swosLocal...) + } + uw.mu.Unlock() + } + aw.mu.Unlock() + return swos +} + +// getObjectByRole returns an object with the given (namespace, name) key and the given role. +func (aw *apiWatcher) getObjectByRole(role, namespace, name string) object { + if aw == nil { + return nil + } + key := namespace + "/" + name + aw.startWatchersForRole(role) + var o object + aw.mu.Lock() + for _, uw := range aw.watchersByURL { + if uw.role != role { + continue + } + o = uw.objectsByKey.get(key) + if o != nil { + break + } + } + aw.mu.Unlock() + return o +} + +func (aw *apiWatcher) startWatchersForRole(role string) { + parseObject, parseObjectList := getObjectParsersForRole(role) + paths := getAPIPaths(role, aw.namespaces, aw.selectors) + for _, path := range paths { + apiURL := aw.apiServer + path + aw.startWatcherForURL(role, apiURL, parseObject, parseObjectList) + } +} + +func (aw *apiWatcher) startWatcherForURL(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) { + aw.mu.Lock() + if aw.watchersByURL[apiURL] != nil { + // Watcher for the given path already exists. + aw.mu.Unlock() + return + } + uw := aw.newURLWatcher(role, apiURL, parseObject, parseObjectList) + aw.watchersByURL[apiURL] = uw + aw.mu.Unlock() + + uw.watchersCount.Inc() + uw.watchersCreated.Inc() + uw.reloadObjects() + aw.wg.Add(1) + go func() { + defer aw.wg.Done() + logger.Infof("started watcher for %q", apiURL) + uw.watchForUpdates() + logger.Infof("stopped watcher for %q", apiURL) + uw.objectsByKey.decRef() + + aw.mu.Lock() + delete(aw.watchersByURL, apiURL) + aw.mu.Unlock() + uw.watchersCount.Dec() + uw.watchersStopped.Inc() + }() +} + +// needStop returns true if aw must be stopped. +func (aw *apiWatcher) needStop() bool { + select { + case <-aw.stopCtx.Done(): + return true + default: + return false + } +} + +// doRequest performs http request to the given requestURL. +func (aw *apiWatcher) doRequest(requestURL string) (*http.Response, error) { + req, err := http.NewRequestWithContext(aw.stopCtx, "GET", requestURL, nil) + if err != nil { + logger.Fatalf("cannot create a request for %q: %s", requestURL, err) + } + if aw.authorization != "" { + req.Header.Set("Authorization", aw.authorization) + } + return aw.client.Do(req) +} + +// urlWatcher watches for an apiURL and updates object states in objectsByKey. +type urlWatcher struct { + role string + apiURL string + + parseObject parseObjectFunc + parseObjectList parseObjectListFunc + + // objectsByKey contains the latest state for objects obtained from apiURL + objectsByKey *objectsMap + + // mu protects swosByKey and resourceVersion + mu sync.Mutex + swosByKey map[string][]interface{} + resourceVersion string + + // the parent apiWatcher + aw *apiWatcher + + watchersCount *metrics.Counter + watchersCreated *metrics.Counter + watchersStopped *metrics.Counter +} + +func (aw *apiWatcher) newURLWatcher(role, apiURL string, parseObject parseObjectFunc, parseObjectList parseObjectListFunc) *urlWatcher { + return &urlWatcher{ + role: role, + apiURL: apiURL, + + parseObject: parseObject, + parseObjectList: parseObjectList, + + objectsByKey: sharedObjectsGlobal.getByAPIURL(role, apiURL), + swosByKey: make(map[string][]interface{}), + + aw: aw, + + watchersCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)), + watchersCreated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_created_total{role=%q}`, role)), + watchersStopped: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers_stopped_total{role=%q}`, role)), + } +} + +// Limit the concurrency for per-role objects reloading to 1. +// +// This should reduce memory usage when big number of watchers simultaneously receive an update for objects of the same role. +var reloadObjectsLocksByRole = map[string]*sync.Mutex{ + "node": {}, + "pod": {}, + "service": {}, + "endpoints": {}, + "endpointslices": {}, + "ingress": {}, +} + +func (uw *urlWatcher) resetResourceVersion() { + uw.mu.Lock() + uw.resourceVersion = "" + uw.mu.Unlock() +} + +// reloadObjects reloads objects to the latest state and returns resourceVersion for the latest state. +func (uw *urlWatcher) reloadObjects() string { + lock := reloadObjectsLocksByRole[uw.role] + lock.Lock() + defer lock.Unlock() + + uw.mu.Lock() + resourceVersion := uw.resourceVersion + uw.mu.Unlock() + if resourceVersion != "" { + // Fast path - objects have been already reloaded by concurrent goroutines. + return resourceVersion + } + + aw := uw.aw + requestURL := uw.apiURL + resp, err := aw.doRequest(requestURL) + if err != nil { + if !aw.needStop() { + logger.Errorf("error when performing a request to %q: %s", requestURL, err) + } + return "" + } + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if resp.StatusCode != http.StatusOK { + logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) + return "" + } + objectsByKey, metadata, err := uw.parseObjectList(body) + if err != nil { + if !aw.needStop() { + logger.Errorf("cannot parse response from %q: %s", requestURL, err) + } + return "" + } + uw.objectsByKey.reload(objectsByKey) + swosByKey := make(map[string][]interface{}) + for k, o := range objectsByKey { + labels := o.getTargetLabels(aw) + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + if len(swos) > 0 { + swosByKey[k] = swos + } + } + uw.mu.Lock() + uw.swosByKey = swosByKey + uw.resourceVersion = metadata.ResourceVersion + uw.mu.Unlock() + + return metadata.ResourceVersion +} + +func getScrapeWorkObjectsForLabels(swcFunc ScrapeWorkConstructorFunc, labelss []map[string]string) []interface{} { + swos := make([]interface{}, 0, len(labelss)) + for _, labels := range labelss { + swo := swcFunc(labels) + // The reflect check is needed because of https://mangatmodi.medium.com/go-check-nil-interface-the-right-way-d142776edef1 + if swo != nil && !reflect.ValueOf(swo).IsNil() { + swos = append(swos, swo) + } + } + return swos +} + +// watchForUpdates watches for object updates starting from uw.resourceVersion and updates the corresponding objects to the latest state. +// +// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes +func (uw *urlWatcher) watchForUpdates() { + aw := uw.aw + backoffDelay := time.Second + maxBackoffDelay := 30 * time.Second + backoffSleep := func() { + time.Sleep(backoffDelay) + backoffDelay *= 2 + if backoffDelay > maxBackoffDelay { + backoffDelay = maxBackoffDelay + } + } + apiURL := uw.apiURL + delimiter := "?" + if strings.Contains(apiURL, "?") { + delimiter = "&" + } + timeoutSeconds := time.Duration(0.9 * float64(aw.client.Timeout)).Seconds() + apiURL += delimiter + "watch=1&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds)) + for { + if aw.needStop() { + return + } + resourceVersion := uw.reloadObjects() + requestURL := apiURL + if resourceVersion != "" { + requestURL += "&resourceVersion=" + url.QueryEscape(resourceVersion) + } + resp, err := aw.doRequest(requestURL) + if err != nil { + if aw.needStop() { + return + } + logger.Errorf("error when performing a request to %q: %s", requestURL, err) + backoffSleep() + uw.resetResourceVersion() + continue + } + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) + if resp.StatusCode == 410 { + // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses + backoffDelay = time.Second + } else { + backoffSleep() + } + uw.resetResourceVersion() + continue + } + backoffDelay = time.Second + err = uw.readObjectUpdateStream(resp.Body) + _ = resp.Body.Close() + if err != nil { + if aw.needStop() { + return + } + if !errors.Is(err, io.EOF) { + logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err) + } + backoffSleep() + uw.resetResourceVersion() + continue + } + } +} + +// readObjectUpdateStream reads Kuberntes watch events from r and updates locally cached objects according to the received events. +func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { + aw := uw.aw + d := json.NewDecoder(r) + var we WatchEvent + for { + if err := d.Decode(&we); err != nil { + return err + } + o, err := uw.parseObject(we.Object) + if err != nil { + return err + } + key := o.key() + switch we.Type { + case "ADDED", "MODIFIED": + uw.objectsByKey.update(key, o) + labels := o.getTargetLabels(aw) + swos := getScrapeWorkObjectsForLabels(aw.swcFunc, labels) + uw.mu.Lock() + if len(swos) > 0 { + uw.swosByKey[key] = swos + } else { + delete(uw.swosByKey, key) + } + uw.mu.Unlock() + case "DELETED": + uw.objectsByKey.remove(key) + uw.mu.Lock() + delete(uw.swosByKey, key) + uw.mu.Unlock() + default: + return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) + } + } +} + +func getAPIPaths(role string, namespaces []string, selectors []Selector) []string { + objectName := getObjectNameByRole(role) + if objectName == "nodes" || len(namespaces) == 0 { + query := joinSelectors(role, selectors) + path := getAPIPath(objectName, "", query) + return []string{path} + } + query := joinSelectors(role, selectors) + paths := make([]string, len(namespaces)) + for i, namespace := range namespaces { + paths[i] = getAPIPath(objectName, namespace, query) + } + return paths +} + +func getAPIPath(objectName, namespace, query string) string { + suffix := objectName + if namespace != "" { + suffix = "namespaces/" + namespace + "/" + objectName + } + if len(query) > 0 { + suffix += "?" + query + } + if objectName == "ingresses" { + return "/apis/networking.k8s.io/v1beta1/" + suffix + } + if objectName == "endpointslices" { + return "/apis/discovery.k8s.io/v1beta1/" + suffix + } + return "/api/v1/" + suffix +} + +func joinSelectors(role string, selectors []Selector) string { + var labelSelectors, fieldSelectors []string + for _, s := range selectors { + if s.Role != role { + continue + } + if s.Label != "" { + labelSelectors = append(labelSelectors, s.Label) + } + if s.Field != "" { + fieldSelectors = append(fieldSelectors, s.Field) + } + } + var args []string + if len(labelSelectors) > 0 { + args = append(args, "labelSelector="+url.QueryEscape(strings.Join(labelSelectors, ","))) + } + if len(fieldSelectors) > 0 { + args = append(args, "fieldSelector="+url.QueryEscape(strings.Join(fieldSelectors, ","))) + } + return strings.Join(args, "&") +} + +func getObjectNameByRole(role string) string { + switch role { + case "node": + return "nodes" + case "pod": + return "pods" + case "service": + return "services" + case "endpoints": + return "endpoints" + case "endpointslices": + return "endpointslices" + case "ingress": + return "ingresses" + default: + logger.Panicf("BUG: unknonw role=%q", role) + return "" + } +} + +func getObjectParsersForRole(role string) (parseObjectFunc, parseObjectListFunc) { + switch role { + case "node": + return parseNode, parseNodeList + case "pod": + return parsePod, parsePodList + case "service": + return parseService, parseServiceList + case "endpoints": + return parseEndpoints, parseEndpointsList + case "endpointslices": + return parseEndpointSlice, parseEndpointSliceList + case "ingress": + return parseIngress, parseIngressList + default: + logger.Panicf("BUG: unsupported role=%q", role) + return nil, nil + } +} + +type objectsMap struct { + mu sync.Mutex + refCount int + m map[string]object + + objectsAdded *metrics.Counter + objectsRemoved *metrics.Counter + objectsCount *metrics.Counter +} + +func (om *objectsMap) incRef() { + om.mu.Lock() + om.refCount++ + om.mu.Unlock() +} + +func (om *objectsMap) decRef() { + om.mu.Lock() + om.refCount-- + if om.refCount < 0 { + logger.Panicf("BUG: refCount cannot be smaller than 0; got %d", om.refCount) + } + if om.refCount == 0 { + // Free up memory occupied by om.m + om.objectsRemoved.Add(len(om.m)) + om.objectsCount.Add(-len(om.m)) + om.m = make(map[string]object) + } + om.mu.Unlock() +} + +func (om *objectsMap) reload(m map[string]object) { + om.mu.Lock() + om.objectsAdded.Add(len(m)) + om.objectsRemoved.Add(len(om.m)) + om.objectsCount.Add(len(m) - len(om.m)) + for k := range om.m { + delete(om.m, k) + } + for k, o := range m { + om.m[k] = o + } + om.mu.Unlock() +} + +func (om *objectsMap) update(key string, o object) { + om.mu.Lock() + if om.m[key] == nil { + om.objectsAdded.Inc() + om.objectsCount.Inc() + } + om.m[key] = o + om.mu.Unlock() +} + +func (om *objectsMap) remove(key string) { + om.mu.Lock() + if om.m[key] != nil { + om.objectsRemoved.Inc() + om.objectsCount.Dec() + delete(om.m, key) + } + om.mu.Unlock() +} + +func (om *objectsMap) get(key string) object { + om.mu.Lock() + o, ok := om.m[key] + om.mu.Unlock() + if !ok { + return nil + } + return o +} + +type sharedObjects struct { + mu sync.Mutex + oms map[string]*objectsMap +} + +func (so *sharedObjects) getByAPIURL(role, apiURL string) *objectsMap { + so.mu.Lock() + om := so.oms[apiURL] + if om == nil { + om = &objectsMap{ + m: make(map[string]object), + + objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), + objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), + objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), + } + so.oms[apiURL] = om + } + so.mu.Unlock() + om.incRef() + return om +} + +var sharedObjectsGlobal = &sharedObjects{ + oms: make(map[string]*objectsMap), +} diff --git a/lib/promscrape/discovery/kubernetes/api_test.go b/lib/promscrape/discovery/kubernetes/api_watcher_test.go similarity index 92% rename from lib/promscrape/discovery/kubernetes/api_test.go rename to lib/promscrape/discovery/kubernetes/api_watcher_test.go index 1a21eb70d..3ce42961b 100644 --- a/lib/promscrape/discovery/kubernetes/api_test.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher_test.go @@ -137,7 +137,7 @@ func TestGetAPIPaths(t *testing.T) { }) // role=ingress - f("ingress", nil, nil, []string{"/api/v1/ingresses"}) + f("ingress", nil, nil, []string{"/apis/networking.k8s.io/v1beta1/ingresses"}) f("ingress", []string{"x", "y"}, []Selector{ { Role: "node", @@ -156,7 +156,7 @@ func TestGetAPIPaths(t *testing.T) { Label: "baaa", }, }, []string{ - "/api/v1/namespaces/x/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", - "/api/v1/namespaces/y/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", + "/apis/networking.k8s.io/v1beta1/namespaces/x/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", + "/apis/networking.k8s.io/v1beta1/namespaces/y/ingresses?labelSelector=cde%2Cbaaa&fieldSelector=abc", }) } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 558f3e8b5..e5e28ff42 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -92,6 +92,9 @@ type ScrapeWork struct { // The interval for aligning the first scrape. ScrapeAlignInterval time.Duration + // The offset for the first scrape. + ScrapeOffset time.Duration + // The original 'job_name' jobNameOriginal string } @@ -102,9 +105,11 @@ type ScrapeWork struct { func (sw *ScrapeWork) key() string { // Do not take into account OriginalLabels. key := fmt.Sprintf("ScrapeURL=%s, ScrapeInterval=%s, ScrapeTimeout=%s, HonorLabels=%v, HonorTimestamps=%v, Labels=%s, "+ - "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v, ScrapeAlignInterval=%s", + "AuthConfig=%s, MetricRelabelConfigs=%s, SampleLimit=%d, DisableCompression=%v, DisableKeepAlive=%v, StreamParse=%v, "+ + "ScrapeAlignInterval=%s, ScrapeOffset=%s", sw.ScrapeURL, sw.ScrapeInterval, sw.ScrapeTimeout, sw.HonorLabels, sw.HonorTimestamps, sw.LabelsString(), - sw.AuthConfig.String(), sw.MetricRelabelConfigs.String(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive, sw.StreamParse, sw.ScrapeAlignInterval) + sw.AuthConfig.String(), sw.MetricRelabelConfigs.String(), sw.SampleLimit, sw.DisableCompression, sw.DisableKeepAlive, sw.StreamParse, + sw.ScrapeAlignInterval, sw.ScrapeOffset) return key } @@ -174,9 +179,14 @@ type scrapeWork struct { } func (sw *scrapeWork) run(stopCh <-chan struct{}) { - scrapeInterval := sw.Config.ScrapeInterval var randSleep uint64 - if sw.Config.ScrapeAlignInterval <= 0 { + scrapeInterval := sw.Config.ScrapeInterval + scrapeAlignInterval := sw.Config.ScrapeAlignInterval + scrapeOffset := sw.Config.ScrapeOffset + if scrapeOffset > 0 { + scrapeAlignInterval = scrapeInterval + } + if scrapeAlignInterval <= 0 { // Calculate start time for the first scrape from ScrapeURL and labels. // This should spread load when scraping many targets with different // scrape urls and labels. @@ -191,8 +201,11 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) { } randSleep -= sleepOffset } else { - d := uint64(sw.Config.ScrapeAlignInterval) + d := uint64(scrapeAlignInterval) randSleep = d - uint64(time.Now().UnixNano())%d + if scrapeOffset > 0 { + randSleep += uint64(scrapeOffset) + } randSleep %= uint64(scrapeInterval) } timer := timerpool.Get(time.Duration(randSleep)) diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index f80157b6b..5421a6656 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -42,7 +42,7 @@ type blockStreamWriter struct { // prevTimestamps* is used as an optimization for reducing disk space usage // when serially written blocks have identical timestamps. - // This is usually the case when adjancent blocks contain metrics scraped from the same target, + // This is usually the case when adjacent blocks contain metrics scraped from the same target, // since such metrics have identical timestamps. prevTimestampsData []byte prevTimestampsBlockOffset uint64 diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 2aea6a621..99711f00e 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -154,8 +154,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working metricIDCache: metricIDCache, metricNameCache: metricNameCache, tsidCache: tsidCache, - uselessTagFiltersCache: workingsetcache.New(mem/128, 3*time.Hour), - loopsPerDateTagFilterCache: workingsetcache.New(mem/128, 3*time.Hour), + uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), + loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), minTimestampForCompositeIndex: minTimestampForCompositeIndex, } @@ -2458,7 +2458,8 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 } // Slow path: need tf.matchSuffix call. ok, err := tf.matchSuffix(suffix) - loopsCount += tf.matchCost + // Assume that tf.matchSuffix call needs 10x more time than a single metric scan iteration. + loopsCount += 10 * tf.matchCost if err != nil { return loopsCount, fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err) } @@ -2479,8 +2480,8 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 } kb.B[len(kb.B)-1]++ ts.Seek(kb.B) - // Assume that a seek cost is equivalent to 100 ordinary loops. - loopsCount += 100 + // Assume that a seek cost is equivalent to 1000 ordinary loops. + loopsCount += 1000 continue } prevMatch = true @@ -3496,7 +3497,7 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefi if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 { // Write rows not starting with nsPrefix as-is. // Additionally write the first and the last row as-is in order to preserve - // sort order for adjancent blocks. + // sort order for adjacent blocks. dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev) dstData = append(dstData, item...) dstItems = append(dstItems, mergeset.Item{ diff --git a/lib/storage/storage.go b/lib/storage/storage.go index d1f8dd57c..6c0586357 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1412,7 +1412,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra rows = rows[:rowsLen+len(mrs)] j := 0 var ( - // These vars are used for speeding up bulk imports of multiple adjancent rows for the same metricName. + // These vars are used for speeding up bulk imports of multiple adjacent rows for the same metricName. prevTSID TSID prevMetricNameRaw []byte ) @@ -1626,7 +1626,7 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { var hour uint64 var prevTimestamp int64 var ( - // These vars are used for speeding up bulk imports when multiple adjancent rows + // These vars are used for speeding up bulk imports when multiple adjacent rows // contain the same (metricID, date) pairs. prevDate uint64 prevMetricID uint64 diff --git a/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go index 7ca6f483a..ea234b40d 100644 --- a/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go +++ b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go @@ -72,10 +72,18 @@ func (c *Cache) SetBig(k, v []byte) { // with values stored via other methods. // // k contents may be modified after returning from GetBig. -func (c *Cache) GetBig(dst, k []byte) []byte { +func (c *Cache) GetBig(dst, k []byte) (r []byte) { atomic.AddUint64(&c.bigStats.GetBigCalls, 1) subkey := getSubkeyBuf() - defer putSubkeyBuf(subkey) + dstWasNil := dst == nil + defer func() { + putSubkeyBuf(subkey) + if len(r) == 0 && dstWasNil { + // Guarantee that if the caller provided nil and this is a cache miss that + // the caller can accurately test for a cache miss with `if r == nil`. + r = nil + } + }() // Read and parse metavalue subkey.B = c.Get(subkey.B[:0], k) diff --git a/vendor/github.com/VictoriaMetrics/fastcache/go.mod b/vendor/github.com/VictoriaMetrics/fastcache/go.mod index 1b5309215..f575823ae 100644 --- a/vendor/github.com/VictoriaMetrics/fastcache/go.mod +++ b/vendor/github.com/VictoriaMetrics/fastcache/go.mod @@ -1,9 +1,11 @@ module github.com/VictoriaMetrics/fastcache +go 1.13 + require ( github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 github.com/cespare/xxhash/v2 v2.1.1 github.com/davecgh/go-spew v1.1.1 // indirect - github.com/golang/snappy v0.0.1 + github.com/golang/snappy v0.0.3 github.com/stretchr/testify v1.3.0 // indirect ) diff --git a/vendor/github.com/VictoriaMetrics/fastcache/go.sum b/vendor/github.com/VictoriaMetrics/fastcache/go.sum index 4afe0e792..066369ee9 100644 --- a/vendor/github.com/VictoriaMetrics/fastcache/go.sum +++ b/vendor/github.com/VictoriaMetrics/fastcache/go.sum @@ -5,8 +5,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/vendor/github.com/VictoriaMetrics/fasthttp/client.go b/vendor/github.com/VictoriaMetrics/fasthttp/client.go index 5f1f433d6..60d1f19b7 100644 --- a/vendor/github.com/VictoriaMetrics/fasthttp/client.go +++ b/vendor/github.com/VictoriaMetrics/fasthttp/client.go @@ -1023,7 +1023,7 @@ func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) panic("BUG: resp cannot be nil") } - atomic.StoreUint32(&c.lastUseTime, uint32(CoarseTimeNow().Unix()-startTimeUnix)) + atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix)) // Free up resources occupied by response before sending the request, // so the GC may reclaim these resources (e.g. response body). @@ -1039,7 +1039,7 @@ func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) // Optimization: update write deadline only if more than 25% // of the last write deadline exceeded. // See https://github.com/golang/go/issues/15133 for details. - currentTime := CoarseTimeNow() + currentTime := time.Now() if currentTime.Sub(cc.lastWriteDeadlineTime) > (c.WriteTimeout >> 2) { if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil { c.closeConn(cc) @@ -1083,7 +1083,7 @@ func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) // Optimization: update read deadline only if more than 25% // of the last read deadline exceeded. // See https://github.com/golang/go/issues/15133 for details. - currentTime := CoarseTimeNow() + currentTime := time.Now() if currentTime.Sub(cc.lastReadDeadlineTime) > (c.ReadTimeout >> 2) { if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil { c.closeConn(cc) @@ -1255,7 +1255,7 @@ func acquireClientConn(conn net.Conn) *clientConn { } cc := v.(*clientConn) cc.c = conn - cc.createdTime = CoarseTimeNow() + cc.createdTime = time.Now() return cc } @@ -1267,7 +1267,7 @@ func releaseClientConn(cc *clientConn) { var clientConnPool sync.Pool func (c *HostClient) releaseConn(cc *clientConn) { - cc.lastUseTime = CoarseTimeNow() + cc.lastUseTime = time.Now() c.connsLock.Lock() c.conns = append(c.conns, cc) c.connsLock.Unlock() @@ -1970,7 +1970,7 @@ func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error // Optimization: update write deadline only if more than 25% // of the last write deadline exceeded. // See https://github.com/golang/go/issues/15133 for details. - currentTime := CoarseTimeNow() + currentTime := time.Now() if currentTime.Sub(lastWriteDeadlineTime) > (writeTimeout >> 2) { if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil { w.err = err @@ -2051,7 +2051,7 @@ func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error // Optimization: update read deadline only if more than 25% // of the last read deadline exceeded. // See https://github.com/golang/go/issues/15133 for details. - currentTime := CoarseTimeNow() + currentTime := time.Now() if currentTime.Sub(lastReadDeadlineTime) > (readTimeout >> 2) { if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil { w.err = err diff --git a/vendor/github.com/VictoriaMetrics/fasthttp/coarseTime.go b/vendor/github.com/VictoriaMetrics/fasthttp/coarseTime.go deleted file mode 100644 index 03c14f39a..000000000 --- a/vendor/github.com/VictoriaMetrics/fasthttp/coarseTime.go +++ /dev/null @@ -1,28 +0,0 @@ -package fasthttp - -import ( - "sync/atomic" - "time" -) - -// CoarseTimeNow returns the current time truncated to the nearest second. -// -// This is a faster alternative to time.Now(). -func CoarseTimeNow() time.Time { - tp := coarseTime.Load().(*time.Time) - return *tp -} - -func init() { - t := time.Now().Truncate(time.Second) - coarseTime.Store(&t) - go func() { - for { - time.Sleep(time.Second) - t := time.Now().Truncate(time.Second) - coarseTime.Store(&t) - } - }() -} - -var coarseTime atomic.Value diff --git a/vendor/github.com/VictoriaMetrics/fasthttp/server.go b/vendor/github.com/VictoriaMetrics/fasthttp/server.go index 3e74369e7..c2432c840 100644 --- a/vendor/github.com/VictoriaMetrics/fasthttp/server.go +++ b/vendor/github.com/VictoriaMetrics/fasthttp/server.go @@ -1281,7 +1281,7 @@ func (s *Server) Serve(ln net.Listener) error { if time.Since(lastOverflowErrorTime) > time.Minute { s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+ "Try increasing Server.Concurrency", maxWorkersCount) - lastOverflowErrorTime = CoarseTimeNow() + lastOverflowErrorTime = time.Now() } // The current server reached concurrency limit, @@ -1323,7 +1323,7 @@ func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net. if time.Since(*lastPerIPErrorTime) > time.Minute { s.logger().Printf("The number of connections from %s exceeds MaxConnsPerIP=%d", getConnIP4(c), s.MaxConnsPerIP) - *lastPerIPErrorTime = CoarseTimeNow() + *lastPerIPErrorTime = time.Now() } continue } @@ -1438,7 +1438,7 @@ func (s *Server) serveConn(c net.Conn) error { serverName := s.getServerName() connRequestNum := uint64(0) connID := nextConnID() - currentTime := CoarseTimeNow() + currentTime := time.Now() connTime := currentTime maxRequestBodySize := s.MaxRequestBodySize if maxRequestBodySize <= 0 { @@ -1491,7 +1491,7 @@ func (s *Server) serveConn(c net.Conn) error { } } - currentTime = CoarseTimeNow() + currentTime = time.Now() ctx.lastReadDuration = currentTime.Sub(ctx.time) if err != nil { @@ -1632,7 +1632,7 @@ func (s *Server) serveConn(c net.Conn) error { break } - currentTime = CoarseTimeNow() + currentTime = time.Now() } if br != nil { @@ -1688,7 +1688,7 @@ func (s *Server) updateWriteDeadline(c net.Conn, ctx *RequestCtx, lastDeadlineTi // Optimization: update write deadline only if more than 25% // of the last write deadline exceeded. // See https://github.com/golang/go/issues/15133 for details. - currentTime := CoarseTimeNow() + currentTime := time.Now() if currentTime.Sub(lastDeadlineTime) > (writeTimeout >> 2) { if err := c.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil { panic(fmt.Sprintf("BUG: error in SetWriteDeadline(%s): %s", writeTimeout, err)) @@ -1871,7 +1871,7 @@ func (ctx *RequestCtx) Init2(conn net.Conn, logger Logger, reduceMemoryUsage boo ctx.connID = nextConnID() ctx.s = fakeServer ctx.connRequestNum = 0 - ctx.connTime = CoarseTimeNow() + ctx.connTime = time.Now() ctx.time = ctx.connTime keepBodyBuffer := !reduceMemoryUsage diff --git a/vendor/github.com/VictoriaMetrics/fasthttp/workerpool.go b/vendor/github.com/VictoriaMetrics/fasthttp/workerpool.go index cf602e050..081ac65c3 100644 --- a/vendor/github.com/VictoriaMetrics/fasthttp/workerpool.go +++ b/vendor/github.com/VictoriaMetrics/fasthttp/workerpool.go @@ -187,7 +187,7 @@ func (wp *workerPool) getCh() *workerChan { } func (wp *workerPool) release(ch *workerChan) bool { - ch.lastUseTime = CoarseTimeNow() + ch.lastUseTime = time.Now() wp.lock.Lock() if wp.mustStop { wp.lock.Unlock() diff --git a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go index 4f559f066..09afe6747 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go @@ -112,7 +112,7 @@ func getMetricExprForOptimization(e Expr) *MetricExpr { case "absent", "histogram_quantile", "label_join", "label_replace", "scalar", "vector", "label_set", "label_map", "label_uppercase", "label_lowercase", "label_del", "label_keep", "label_copy", "label_move", "label_transform", "label_value", "label_match", "label_mismatch", - "prometheus_buckets", "buckets_limit", "histogram_share", "union", "": + "prometheus_buckets", "buckets_limit", "histogram_share", "histogram_avg", "histogram_stdvar", "histogram_stddev", "union", "": // metric expressions for these functions cannot be optimized. return nil } diff --git a/vendor/github.com/VictoriaMetrics/metricsql/transform.go b/vendor/github.com/VictoriaMetrics/metricsql/transform.go index f8dd7cc84..df4f9b2b5 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/transform.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/transform.go @@ -83,6 +83,9 @@ var transformFuncs = map[string]bool{ "prometheus_buckets": true, "buckets_limit": true, "histogram_share": true, + "histogram_avg": true, + "histogram_stdvar": true, + "histogram_stddev": true, "sort_by_label": true, "sort_by_label_desc": true, } diff --git a/vendor/modules.txt b/vendor/modules.txt index 5949e56e0..f1e50bf9d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,5 @@ # cloud.google.com/go v0.78.0 +## explicit cloud.google.com/go cloud.google.com/go/compute/metadata cloud.google.com/go/iam @@ -7,21 +8,27 @@ cloud.google.com/go/internal/optional cloud.google.com/go/internal/trace cloud.google.com/go/internal/version # cloud.google.com/go/storage v1.14.0 +## explicit cloud.google.com/go/storage -# github.com/VictoriaMetrics/fastcache v1.5.7 +# github.com/VictoriaMetrics/fastcache v1.5.8 +## explicit github.com/VictoriaMetrics/fastcache -# github.com/VictoriaMetrics/fasthttp v1.0.12 +# github.com/VictoriaMetrics/fasthttp v1.0.13 +## explicit github.com/VictoriaMetrics/fasthttp github.com/VictoriaMetrics/fasthttp/fasthttputil github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.15.2 +## explicit github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.12.0 +# github.com/VictoriaMetrics/metricsql v0.14.0 +## explicit github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop # github.com/VividCortex/ewma v1.1.1 github.com/VividCortex/ewma # github.com/aws/aws-sdk-go v1.37.22 +## explicit github.com/aws/aws-sdk-go/aws github.com/aws/aws-sdk-go/aws/arn github.com/aws/aws-sdk-go/aws/awserr @@ -76,13 +83,17 @@ github.com/aws/aws-sdk-go/service/sts/stsiface # github.com/beorn7/perks v1.0.1 github.com/beorn7/perks/quantile # github.com/cespare/xxhash/v2 v2.1.1 +## explicit github.com/cespare/xxhash/v2 # github.com/cheggaaa/pb/v3 v3.0.6 +## explicit github.com/cheggaaa/pb/v3 github.com/cheggaaa/pb/v3/termutil # github.com/cpuguy83/go-md2man/v2 v2.0.0 +## explicit github.com/cpuguy83/go-md2man/v2/md2man # github.com/fatih/color v1.10.0 +## explicit github.com/fatih/color # github.com/go-kit/kit v0.10.0 github.com/go-kit/kit/log @@ -100,10 +111,12 @@ github.com/golang/protobuf/ptypes/any github.com/golang/protobuf/ptypes/duration github.com/golang/protobuf/ptypes/timestamp # github.com/golang/snappy v0.0.3 +## explicit github.com/golang/snappy # github.com/googleapis/gax-go/v2 v2.0.5 github.com/googleapis/gax-go/v2 # github.com/influxdata/influxdb v1.8.4 +## explicit github.com/influxdata/influxdb/client/v2 github.com/influxdata/influxdb/models github.com/influxdata/influxdb/pkg/escape @@ -114,6 +127,7 @@ github.com/jstemmer/go-junit-report github.com/jstemmer/go-junit-report/formatter github.com/jstemmer/go-junit-report/parser # github.com/klauspost/compress v1.11.9 +## explicit github.com/klauspost/compress/flate github.com/klauspost/compress/fse github.com/klauspost/compress/gzip @@ -127,6 +141,7 @@ github.com/mattn/go-colorable # github.com/mattn/go-isatty v0.0.12 github.com/mattn/go-isatty # github.com/mattn/go-runewidth v0.0.10 +## explicit github.com/mattn/go-runewidth # github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/matttproud/golang_protobuf_extensions/pbutil @@ -135,19 +150,23 @@ github.com/oklog/ulid # github.com/pkg/errors v0.9.1 github.com/pkg/errors # github.com/prometheus/client_golang v1.9.0 +## explicit github.com/prometheus/client_golang/prometheus github.com/prometheus/client_golang/prometheus/internal # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.18.0 +## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/model # github.com/prometheus/procfs v0.6.0 +## explicit github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util # github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9 +## explicit github.com/prometheus/prometheus/pkg/labels github.com/prometheus/prometheus/pkg/timestamp github.com/prometheus/prometheus/storage @@ -164,27 +183,37 @@ github.com/prometheus/prometheus/tsdb/tombstones github.com/prometheus/prometheus/tsdb/tsdbutil github.com/prometheus/prometheus/tsdb/wal # github.com/rivo/uniseg v0.2.0 +## explicit github.com/rivo/uniseg # github.com/russross/blackfriday/v2 v2.1.0 +## explicit github.com/russross/blackfriday/v2 # github.com/urfave/cli/v2 v2.3.0 +## explicit github.com/urfave/cli/v2 # github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/bytebufferpool # github.com/valyala/fastjson v1.6.3 +## explicit github.com/valyala/fastjson github.com/valyala/fastjson/fastfloat # github.com/valyala/fastrand v1.0.0 +## explicit github.com/valyala/fastrand # github.com/valyala/fasttemplate v1.2.1 +## explicit github.com/valyala/fasttemplate # github.com/valyala/gozstd v1.9.0 +## explicit github.com/valyala/gozstd # github.com/valyala/histogram v1.1.2 +## explicit github.com/valyala/histogram # github.com/valyala/quicktemplate v1.6.3 +## explicit github.com/valyala/quicktemplate # go.opencensus.io v0.23.0 +## explicit go.opencensus.io go.opencensus.io/internal go.opencensus.io/internal/tagencoding @@ -210,6 +239,7 @@ golang.org/x/lint/golint golang.org/x/mod/module golang.org/x/mod/semver # golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 +## explicit golang.org/x/net/context golang.org/x/net/context/ctxhttp golang.org/x/net/http/httpguts @@ -219,6 +249,7 @@ golang.org/x/net/idna golang.org/x/net/internal/timeseries golang.org/x/net/trace # golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93 +## explicit golang.org/x/oauth2 golang.org/x/oauth2/google golang.org/x/oauth2/google/internal/externalaccount @@ -226,8 +257,10 @@ golang.org/x/oauth2/internal golang.org/x/oauth2/jws golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20210220032951-036812b2e83c +## explicit golang.org/x/sync/errgroup # golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b +## explicit golang.org/x/sys/execabs golang.org/x/sys/internal/unsafeheader golang.org/x/sys/unix @@ -254,6 +287,7 @@ golang.org/x/tools/internal/imports golang.org/x/xerrors golang.org/x/xerrors/internal # google.golang.org/api v0.40.0 +## explicit google.golang.org/api/googleapi google.golang.org/api/googleapi/transport google.golang.org/api/internal @@ -280,12 +314,14 @@ google.golang.org/appengine/internal/remote_api google.golang.org/appengine/internal/urlfetch google.golang.org/appengine/urlfetch # google.golang.org/genproto v0.0.0-20210302174412-5ede27ff9881 +## explicit google.golang.org/genproto/googleapis/api/annotations google.golang.org/genproto/googleapis/iam/v1 google.golang.org/genproto/googleapis/rpc/code google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/googleapis/type/expr # google.golang.org/grpc v1.36.0 +## explicit google.golang.org/grpc google.golang.org/grpc/attributes google.golang.org/grpc/backoff @@ -366,4 +402,5 @@ google.golang.org/protobuf/types/known/durationpb google.golang.org/protobuf/types/known/timestamppb google.golang.org/protobuf/types/pluginpb # gopkg.in/yaml.v2 v2.4.0 +## explicit gopkg.in/yaml.v2