mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent: follow-up after fe445f753b
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491
This commit is contained in:
parent
46e27d60a6
commit
d826352688
23 changed files with 529 additions and 245 deletions
|
@ -135,7 +135,11 @@ Also, Basic Auth can be enabled for the incoming `remote_write` requests with `-
|
|||
|
||||
### remote_write for clustered version
|
||||
|
||||
While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `<schema>://<vminsert-host>:8480/insert/<customer-id>/prometheus/api/v1/write`
|
||||
While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `<schema>://<vminsert-host>:8480/insert/<accountID>/prometheus/api/v1/write` according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). There is also support for multitenant writes. See [these docs](#multitenancy).
|
||||
|
||||
## Multitenancy
|
||||
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. But it can accept multitenant data if `-remoteWrite.multitenantURL` is set. In this case it accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
|
||||
|
||||
## How to collect metrics in Prometheus format
|
||||
|
@ -634,7 +638,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.consulSDCheckInterval duration
|
||||
Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details (default 30s)
|
||||
-promscrape.digitaloceanSDCheckInterval duration
|
||||
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s)
|
||||
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s)
|
||||
-promscrape.disableCompression
|
||||
Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control
|
||||
-promscrape.disableKeepAlive
|
||||
|
@ -645,6 +649,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
The maximum duration for waiting to perform API requests if more than -promscrape.discovery.concurrency requests are simultaneously performed (default 1m0s)
|
||||
-promscrape.dnsSDCheckInterval duration
|
||||
Interval for checking for changes in dns. This works only if dns_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config for details (default 30s)
|
||||
-promscrape.dockerSDCheckInterval duration
|
||||
Interval for checking for changes in docker. This works only if docker_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#docker_sd_config for details (default 30s)
|
||||
-promscrape.dockerswarmSDCheckInterval duration
|
||||
Interval for checking for changes in dockerswarm. This works only if dockerswarm_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config for details (default 30s)
|
||||
-promscrape.dropOriginalLabels
|
||||
|
@ -658,7 +664,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.gceSDCheckInterval duration
|
||||
Interval for checking for changes in gce. This works only if gce_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details (default 1m0s)
|
||||
-promscrape.httpSDCheckInterval duration
|
||||
Interval for checking for changes in http service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s)
|
||||
Interval for checking for changes in http endpoint service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s)
|
||||
-promscrape.kubernetes.apiServerTimeout duration
|
||||
How frequently to reload the full state from Kuberntes API server (default 30m0s)
|
||||
-promscrape.kubernetesSDCheckInterval duration
|
||||
|
@ -706,6 +712,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
|
||||
-remoteWrite.maxHourlySeries int
|
||||
The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries
|
||||
-remoteWrite.multitenantURL array
|
||||
Base path for multitenant remote storage URL to write data to. See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://<vminsert>:8480 . Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.oauth2.clientID array
|
||||
Optional OAuth2 clientID to use for -remoteWrite.url. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
|
@ -725,7 +734,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.queues int
|
||||
The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage (default 2 * numberOfAvailableCPUs)
|
||||
The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage. Default value if 2 * numberOfAvailableCPUs (default 8)
|
||||
-remoteWrite.rateLimit array
|
||||
Optional rate limit in bytes per second for data sent to -remoteWrite.url. By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data is sent after temporary unavailability of the remote storage
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
|
@ -762,7 +771,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-remoteWrite.tmpDataPath string
|
||||
Path to directory where temporary data for remote write component is stored. See also -remoteWrite.maxDiskUsagePerURL (default "vmagent-remotewrite-data")
|
||||
-remoteWrite.url array
|
||||
Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems
|
||||
Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.multitenantURL
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.urlRelabelConfig array
|
||||
Optional path to relabel config for the corresponding -remoteWrite.url
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
|
||||
|
@ -17,24 +16,24 @@ import (
|
|||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="csvimport"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="csvimport"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes csv data from req.
|
||||
func InsertHandler(p *httpserver.Path, req *http.Request) error {
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, func(rows []parser.Row) error {
|
||||
return insertRows(p, rows, extraLabels)
|
||||
return insertRows(at, rows, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
|
@ -68,13 +67,10 @@ func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarsh
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(p, &ctx.WriteRequest)
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
if p != nil {
|
||||
at, err := auth.NewToken(p.AuthToken)
|
||||
if err == nil {
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
}
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
}
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return nil
|
||||
|
|
|
@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(nil, &ctx.WriteRequest)
|
||||
remotewrite.Push(&ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return nil
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
|
@ -29,7 +28,7 @@ var (
|
|||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="influx"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="influx"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="influx"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`)
|
||||
)
|
||||
|
||||
|
@ -47,7 +46,7 @@ func InsertHandlerForReader(r io.Reader) error {
|
|||
// InsertHandlerForHTTP processes remote write for influx line protocol.
|
||||
//
|
||||
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
|
||||
func InsertHandlerForHTTP(p *httpserver.Path, req *http.Request) error {
|
||||
func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -59,12 +58,12 @@ func InsertHandlerForHTTP(p *httpserver.Path, req *http.Request) error {
|
|||
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
|
||||
db := q.Get("db")
|
||||
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
|
||||
return insertRows(p, db, rows, extraLabels)
|
||||
return insertRows(at, db, rows, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(p *httpserver.Path, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := getPushCtx()
|
||||
defer putPushCtx(ctx)
|
||||
|
||||
|
@ -134,13 +133,10 @@ func insertRows(p *httpserver.Path, db string, rows []parser.Row, extraLabels []
|
|||
ctx.ctx.Labels = labels
|
||||
ctx.ctx.Samples = samples
|
||||
ctx.commonLabels = commonLabels
|
||||
remotewrite.Push(p, &ctx.ctx.WriteRequest)
|
||||
remotewrite.PushWithAuthToken(at, &ctx.ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if p != nil {
|
||||
at, err := auth.NewToken(p.AuthToken)
|
||||
if err == nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
}
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
}
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ func main() {
|
|||
|
||||
logger.Infof("starting vmagent at %q...", *httpListenAddr)
|
||||
startTime := time.Now()
|
||||
remotewrite.Init(nil)
|
||||
remotewrite.Init()
|
||||
common.StartUnmarshalWorkers()
|
||||
writeconcurrencylimiter.Init()
|
||||
if len(*influxListenAddr) > 0 {
|
||||
|
@ -161,76 +161,6 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
p, err := httpserver.ParsePath(r.URL.Path)
|
||||
if err == nil && p.Prefix == "insert" {
|
||||
|
||||
_, err := auth.NewToken(p.AuthToken)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "auth error: %s", err)
|
||||
return true
|
||||
}
|
||||
|
||||
switch p.Suffix {
|
||||
case "prometheus/", "prometheus", "prometheus/api/v1/write":
|
||||
prometheusWriteRequests.Inc()
|
||||
if err := promremotewrite.InsertHandler(p, r); err != nil {
|
||||
prometheusWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import":
|
||||
vmimportRequests.Inc()
|
||||
if err := vmimport.InsertHandler(p, r); err != nil {
|
||||
vmimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/csv":
|
||||
csvimportRequests.Inc()
|
||||
if err := csvimport.InsertHandler(p, r); err != nil {
|
||||
csvimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/prometheus":
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(p, r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/native":
|
||||
nativeimportRequests.Inc()
|
||||
if err := native.InsertHandler(p, r); err != nil {
|
||||
nativeimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "influx/write", "influx/api/v2/write":
|
||||
influxWriteRequests.Inc()
|
||||
if err := influx.InsertHandlerForHTTP(p, r); err != nil {
|
||||
influxWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
default:
|
||||
// This link is not multitenant
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
path := strings.Replace(r.URL.Path, "//", "/", -1)
|
||||
switch path {
|
||||
case "/api/v1/write":
|
||||
|
@ -317,9 +247,92 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
if remotewrite.MultitenancyEnabled() {
|
||||
return processMultitenantRequest(w, r, path)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func processMultitenantRequest(w http.ResponseWriter, r *http.Request, path string) bool {
|
||||
p, err := httpserver.ParsePath(path)
|
||||
if err != nil {
|
||||
// Cannot parse multitenant path. Skip it - probably it will be parsed later.
|
||||
return false
|
||||
}
|
||||
if p.Prefix != "insert" {
|
||||
httpserver.Errorf(w, r, `unsupported multitenant prefix: %q; expected "insert"`, p.Prefix)
|
||||
return true
|
||||
}
|
||||
at, err := auth.NewToken(p.AuthToken)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "cannot obtain auth token: %s", err)
|
||||
return true
|
||||
}
|
||||
switch p.Suffix {
|
||||
case "prometheus/", "prometheus", "prometheus/api/v1/write":
|
||||
prometheusWriteRequests.Inc()
|
||||
if err := promremotewrite.InsertHandler(at, r); err != nil {
|
||||
prometheusWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import":
|
||||
vmimportRequests.Inc()
|
||||
if err := vmimport.InsertHandler(at, r); err != nil {
|
||||
vmimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/csv":
|
||||
csvimportRequests.Inc()
|
||||
if err := csvimport.InsertHandler(at, r); err != nil {
|
||||
csvimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/prometheus":
|
||||
prometheusimportRequests.Inc()
|
||||
if err := prometheusimport.InsertHandler(at, r); err != nil {
|
||||
prometheusimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "prometheus/api/v1/import/native":
|
||||
nativeimportRequests.Inc()
|
||||
if err := native.InsertHandler(at, r); err != nil {
|
||||
nativeimportErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "influx/write", "influx/api/v2/write":
|
||||
influxWriteRequests.Inc()
|
||||
if err := influx.InsertHandlerForHTTP(at, r); err != nil {
|
||||
influxWriteErrors.Inc()
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return true
|
||||
case "influx/query":
|
||||
influxQueryRequests.Inc()
|
||||
influxutils.WriteDatabaseNames(w)
|
||||
return true
|
||||
default:
|
||||
httpserver.Errorf(w, r, "unsupported multitenant path suffix: %q", p.Suffix)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
prometheusWriteRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/write", protocol="promremotewrite"}`)
|
||||
prometheusWriteErrors = metrics.NewCounter(`vmagent_http_request_errors_total{path="/api/v1/write", protocol="promremotewrite"}`)
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
|
@ -19,26 +18,26 @@ import (
|
|||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="native"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="native"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes `/api/v1/import` request.
|
||||
//
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
|
||||
func InsertHandler(p *httpserver.Path, req *http.Request) error {
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, func(block *parser.Block) error {
|
||||
return insertRows(p, block, extraLabels)
|
||||
return insertRows(at, block, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmarshal.Label) error {
|
||||
func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
|
@ -46,11 +45,8 @@ func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmar
|
|||
// since relabeling can prevent from inserting the rows.
|
||||
rowsLen := len(block.Values)
|
||||
rowsInserted.Add(rowsLen)
|
||||
if p != nil {
|
||||
at, err := auth.NewToken(p.AuthToken)
|
||||
if err == nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsLen)
|
||||
}
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsLen)
|
||||
}
|
||||
rowsPerInsert.Update(float64(rowsLen))
|
||||
|
||||
|
@ -90,6 +86,6 @@ func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmar
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(p, &ctx.WriteRequest)
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(nil, &ctx.WriteRequest)
|
||||
remotewrite.Push(&ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return nil
|
||||
|
|
|
@ -65,7 +65,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(nil, &ctx.WriteRequest)
|
||||
remotewrite.Push(&ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return nil
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
|
@ -17,12 +16,12 @@ import (
|
|||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="prometheus"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="prometheus"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes `/api/v1/import/prometheus` request.
|
||||
func InsertHandler(p *httpserver.Path, req *http.Request) error {
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -34,12 +33,12 @@ func InsertHandler(p *httpserver.Path, req *http.Request) error {
|
|||
return writeconcurrencylimiter.Do(func() error {
|
||||
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
|
||||
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
|
||||
return insertRows(p, rows, extraLabels)
|
||||
return insertRows(at, rows, extraLabels)
|
||||
}, nil)
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
|
@ -73,13 +72,10 @@ func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarsh
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(p, &ctx.WriteRequest)
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
if p != nil {
|
||||
at, err := auth.NewToken(p.AuthToken)
|
||||
if err == nil {
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
}
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
}
|
||||
rowsPerInsert.Update(float64(len(rows)))
|
||||
return nil
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
|
@ -18,25 +17,25 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="promremotewrite"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`)
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="promremotewrite"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes remote write for prometheus.
|
||||
func InsertHandler(p *httpserver.Path, req *http.Request) error {
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
|
||||
return insertRows(p, tss, extraLabels)
|
||||
return insertRows(at, tss, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(p *httpserver.Path, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
|
||||
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
|
@ -72,13 +71,10 @@ func insertRows(p *httpserver.Path, timeseries []prompb.TimeSeries, extraLabels
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(p, &ctx.WriteRequest)
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if p != nil {
|
||||
at, err := auth.NewToken(p.AuthToken)
|
||||
if err == nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
}
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
}
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
|
|
|
@ -43,7 +43,7 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
|
|||
rcs.global = global
|
||||
}
|
||||
if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) {
|
||||
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
|
||||
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url or -remoteWrite.multitenantURL args: %d",
|
||||
len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
||||
}
|
||||
rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
|
||||
|
@ -58,7 +58,6 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
|
|||
}
|
||||
rcs.perURL[i] = prc
|
||||
}
|
||||
|
||||
return &rcs, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -8,17 +8,18 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
)
|
||||
|
@ -26,10 +27,10 @@ import (
|
|||
var (
|
||||
remoteWriteURLs = flagutil.NewArray("remoteWrite.url", "Remote storage URL to write data to. It must support Prometheus remote_write API. "+
|
||||
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
|
||||
"Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems")
|
||||
remoteWriteMultitenantURLs = flagutil.NewArray("remoteWrite.multitenantURL", "Base path for remote storage URL to write data to. It must support VictoriaMetrics remote_write tenants API (identified by accountID or accountID:projectID). "+
|
||||
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428 . "+
|
||||
"Pass multiple -remoteWrite.multitenantURL flags in order to write data concurrently to multiple remote storage systems")
|
||||
"Pass multiple -remoteWrite.url flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.multitenantURL")
|
||||
remoteWriteMultitenantURLs = flagutil.NewArray("remoteWrite.multitenantURL", "Base path for multitenant remote storage URL to write data to. "+
|
||||
"See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://<vminsert>:8480 . "+
|
||||
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
|
||||
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
|
||||
"See also -remoteWrite.maxDiskUsagePerURL")
|
||||
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
|
||||
|
@ -57,10 +58,22 @@ var (
|
|||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries")
|
||||
)
|
||||
|
||||
var defaultWriteToken = "default"
|
||||
var rwctxsMap = map[string][]*remoteWriteCtx{}
|
||||
var (
|
||||
// rwctxsDefault contains statically populated entries when -remoteWrite.url is specified.
|
||||
rwctxsDefault []*remoteWriteCtx
|
||||
|
||||
var rwctxLock = sync.Mutex{}
|
||||
// rwctxsMap contains dynamically populated entries when -remoteWrite.multitenantURL is specified.
|
||||
rwctxsMap = make(map[tenantmetrics.TenantID][]*remoteWriteCtx)
|
||||
rwctxsMapLock sync.Mutex
|
||||
|
||||
// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
|
||||
defaultAuthToken = &auth.Token{}
|
||||
)
|
||||
|
||||
// MultitenancyEnabled returns true if -remoteWrite.multitenantURL is specified.
|
||||
func MultitenancyEnabled() bool {
|
||||
return len(*remoteWriteMultitenantURLs) > 0
|
||||
}
|
||||
|
||||
// Contains the current relabelConfigs.
|
||||
var allRelabelConfigs atomic.Value
|
||||
|
@ -82,27 +95,13 @@ func InitSecretFlags() {
|
|||
// It must be called after flag.Parse().
|
||||
//
|
||||
// Stop must be called for graceful shutdown.
|
||||
func Init(p *httpserver.Path) {
|
||||
rwctxLock.Lock()
|
||||
defer rwctxLock.Unlock()
|
||||
func Init() {
|
||||
if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) == 0 {
|
||||
logger.Fatalf("at least one `-remoteWrite.url` or `-remoteWrite.multitenantURL` command-line flag must be set")
|
||||
}
|
||||
// Do not Init MultitenantURLs they are dynamically initialized
|
||||
if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) > 0 && p == nil {
|
||||
return
|
||||
if len(*remoteWriteURLs) > 0 && len(*remoteWriteMultitenantURLs) > 0 {
|
||||
logger.Fatalf("cannot set both `-remoteWrite.url` and `-remoteWrite.multitenantURL` command-line flags")
|
||||
}
|
||||
|
||||
// Create one writecontext per tenant
|
||||
writeContextIndex := defaultWriteToken
|
||||
|
||||
if p != nil {
|
||||
writeContextIndex = p.AuthToken
|
||||
}
|
||||
if _, ok := rwctxsMap[writeContextIndex]; ok {
|
||||
return
|
||||
}
|
||||
|
||||
if *maxHourlySeries > 0 {
|
||||
hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
|
||||
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
|
||||
|
@ -140,43 +139,9 @@ func Init(p *httpserver.Path) {
|
|||
}
|
||||
allRelabelConfigs.Store(rcs)
|
||||
|
||||
maxInmemoryBlocks := memory.Allowed() / (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) / maxRowsPerBlock / 100
|
||||
if maxInmemoryBlocks > 400 {
|
||||
// There is no much sense in keeping higher number of blocks in memory,
|
||||
// since this means that the producer outperforms consumer and the queue
|
||||
// will continue growing. It is better storing the queue to file.
|
||||
maxInmemoryBlocks = 400
|
||||
if len(*remoteWriteURLs) > 0 {
|
||||
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
||||
}
|
||||
if maxInmemoryBlocks < 2 {
|
||||
maxInmemoryBlocks = 2
|
||||
}
|
||||
|
||||
rwctxs := []*remoteWriteCtx{}
|
||||
|
||||
if len(*remoteWriteURLs) > 0 && p == nil {
|
||||
for i, remoteWriteURL := range *remoteWriteURLs {
|
||||
sanitizedURL := fmt.Sprintf("%d:secret-url", i+1)
|
||||
if *showRemoteWriteURL {
|
||||
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL)
|
||||
}
|
||||
rwctx := newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
||||
rwctxs = append(rwctxs, rwctx)
|
||||
}
|
||||
}
|
||||
|
||||
if len(*remoteWriteMultitenantURLs) > 0 && p != nil {
|
||||
for i, remoteWriteMultitenantURL := range *remoteWriteMultitenantURLs {
|
||||
sanitizedURL := fmt.Sprintf("%d:secret-url", i+1)
|
||||
if *showRemoteWriteURL {
|
||||
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteMultitenantURL)
|
||||
}
|
||||
remoteWriteMultitenantURL := fmt.Sprintf("%s/%s/%s/%s", remoteWriteMultitenantURL, p.Prefix, p.AuthToken, p.Suffix)
|
||||
rwctx := newRemoteWriteCtx(i, remoteWriteMultitenantURL, maxInmemoryBlocks, sanitizedURL)
|
||||
rwctxs = append(rwctxs, rwctx)
|
||||
}
|
||||
}
|
||||
|
||||
rwctxsMap[writeContextIndex] = rwctxs
|
||||
|
||||
// Start config reloader.
|
||||
configReloaderWG.Add(1)
|
||||
|
@ -200,6 +165,37 @@ func Init(p *httpserver.Path) {
|
|||
}()
|
||||
}
|
||||
|
||||
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
||||
if len(urls) == 0 {
|
||||
logger.Panicf("BUG: urls must be non-empty")
|
||||
}
|
||||
|
||||
maxInmemoryBlocks := memory.Allowed() / len(urls) / maxRowsPerBlock / 100
|
||||
if maxInmemoryBlocks > 400 {
|
||||
// There is no much sense in keeping higher number of blocks in memory,
|
||||
// since this means that the producer outperforms consumer and the queue
|
||||
// will continue growing. It is better storing the queue to file.
|
||||
maxInmemoryBlocks = 400
|
||||
}
|
||||
if maxInmemoryBlocks < 2 {
|
||||
maxInmemoryBlocks = 2
|
||||
}
|
||||
rwctxs := make([]*remoteWriteCtx, len(urls))
|
||||
for i, remoteWriteURL := range urls {
|
||||
sanitizedURL := fmt.Sprintf("%d:secret-url", i+1)
|
||||
if at != nil {
|
||||
// Construct full remote_write url for the given tenant according to https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format
|
||||
remoteWriteURL = fmt.Sprintf("%s/insert/%d:%d/prometheus/api/v1/write", remoteWriteURL, at.AccountID, at.ProjectID)
|
||||
sanitizedURL = fmt.Sprintf("%s:%d:%d", sanitizedURL, at.AccountID, at.ProjectID)
|
||||
}
|
||||
if *showRemoteWriteURL {
|
||||
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL)
|
||||
}
|
||||
rwctxs[i] = newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
|
||||
}
|
||||
return rwctxs
|
||||
}
|
||||
|
||||
var stopCh = make(chan struct{})
|
||||
var configReloaderWG sync.WaitGroup
|
||||
|
||||
|
@ -209,11 +205,17 @@ var configReloaderWG sync.WaitGroup
|
|||
func Stop() {
|
||||
close(stopCh)
|
||||
configReloaderWG.Wait()
|
||||
|
||||
for _, rwctx := range rwctxsDefault {
|
||||
rwctx.MustStop()
|
||||
}
|
||||
rwctxsDefault = nil
|
||||
|
||||
// There is no need in locking rwctxsMapLock here, since nobody should call Push during the Stop call.
|
||||
for _, rwctxs := range rwctxsMap {
|
||||
for _, rwctx := range rwctxs {
|
||||
rwctx.MustStop()
|
||||
}
|
||||
rwctxs = nil
|
||||
}
|
||||
rwctxsMap = nil
|
||||
}
|
||||
|
@ -221,19 +223,37 @@ func Stop() {
|
|||
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
|
||||
//
|
||||
// Note that wr may be modified by Push due to relabeling and rounding.
|
||||
func Push(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {
|
||||
// if a queue is not created for this tenant, create it dynamically using the auth.Token
|
||||
var rwctxs []*remoteWriteCtx
|
||||
writeContextIndex := defaultWriteToken
|
||||
func Push(wr *prompbmarshal.WriteRequest) {
|
||||
PushWithAuthToken(nil, wr)
|
||||
}
|
||||
|
||||
// if no tenant speficied, p is nil
|
||||
if p != nil {
|
||||
writeContextIndex = p.AuthToken
|
||||
// PushWithAuthToken sends wr to remote storage systems set via `-remoteWrite.multitenantURL`.
|
||||
//
|
||||
// Note that wr may be modified by Push due to relabeling and rounding.
|
||||
func PushWithAuthToken(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
if at == nil && len(*remoteWriteMultitenantURLs) > 0 {
|
||||
// Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set.
|
||||
at = defaultAuthToken
|
||||
}
|
||||
if _, ok := rwctxsMap[writeContextIndex]; !ok {
|
||||
Init(p)
|
||||
var rwctxs []*remoteWriteCtx
|
||||
if at == nil {
|
||||
rwctxs = rwctxsDefault
|
||||
} else {
|
||||
if len(*remoteWriteMultitenantURLs) == 0 {
|
||||
logger.Panicf("BUG: remoteWriteMultitenantURLs must be non-empty for non-nil at")
|
||||
}
|
||||
rwctxsMapLock.Lock()
|
||||
tenantID := tenantmetrics.TenantID{
|
||||
AccountID: at.AccountID,
|
||||
ProjectID: at.ProjectID,
|
||||
}
|
||||
rwctxs = rwctxsMap[tenantID]
|
||||
if rwctxs == nil {
|
||||
rwctxs = newRemoteWriteCtxs(at, *remoteWriteMultitenantURLs)
|
||||
rwctxsMap[tenantID] = rwctxs
|
||||
}
|
||||
rwctxsMapLock.Unlock()
|
||||
}
|
||||
rwctxs = rwctxsMap[writeContextIndex]
|
||||
|
||||
var rctx *relabelCtx
|
||||
rcs := allRelabelConfigs.Load().(*relabelConfigs)
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
|
@ -19,26 +18,26 @@ import (
|
|||
|
||||
var (
|
||||
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="vmimport"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="vmimport"}`)
|
||||
rowsTenantInserted = tenantmetrics.NewCounterMap(`vmagent_tenant_inserted_rows_total{type="vmimport"}`)
|
||||
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="vmimport"}`)
|
||||
)
|
||||
|
||||
// InsertHandler processes `/api/v1/import` request.
|
||||
//
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
|
||||
func InsertHandler(p *httpserver.Path, req *http.Request) error {
|
||||
func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
extraLabels, err := parserCommon.GetExtraLabels(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeconcurrencylimiter.Do(func() error {
|
||||
return parser.ParseStream(req, func(rows []parser.Row) error {
|
||||
return insertRows(p, rows, extraLabels)
|
||||
return insertRows(at, rows, extraLabels)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
|
@ -78,13 +77,10 @@ func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarsh
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.Push(p, &ctx.WriteRequest)
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if p != nil {
|
||||
at, err := auth.NewToken(p.AuthToken)
|
||||
if err == nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
}
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
}
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
|
|
|
@ -7,6 +7,7 @@ sort: 15
|
|||
## tip
|
||||
|
||||
* FEATURE: add `present_over_time(m[d])` function, which returns 1 if `m` has a least a single sample over the previous duration `d`. This function has been added also to [Prometheus 2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
|
||||
* FEATURE: vmagent: support multitenant writes according to [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy). This allows using a single `vmagent` instance in front of VictoriaMetrics cluster for all the tenants. Thanks to @omarghader for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1505). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491).
|
||||
* FEATURE: vmagent: add `__meta_ec2_availability_zone_id` label to discovered Amazon EC2 targets. This label is available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
|
||||
* FAETURE: vmagent: add `__meta_gce_interface_ipv4_<name>` labels to discovered GCE targets. These labels are available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
|
||||
* FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query can process per each time series. This option can protect from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067).
|
||||
|
|
|
@ -139,7 +139,11 @@ Also, Basic Auth can be enabled for the incoming `remote_write` requests with `-
|
|||
|
||||
### remote_write for clustered version
|
||||
|
||||
While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `<schema>://<vminsert-host>:8480/insert/<customer-id>/prometheus/api/v1/write`
|
||||
While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx, Prometheus, Graphite) and scrape data from various targets, writes are always peformed in Promethes remote_write protocol. Therefore for the [clustered version](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), `-remoteWrite.url` the command-line flag should be configured as `<schema>://<vminsert-host>:8480/insert/<accountID>/prometheus/api/v1/write` according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format). There is also support for multitenant writes. See [these docs](#multitenancy).
|
||||
|
||||
## Multitenancy
|
||||
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. But it can accept multitenant data if `-remoteWrite.multitenantURL` is set. In this case it accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
|
||||
|
||||
## How to collect metrics in Prometheus format
|
||||
|
@ -638,7 +642,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.consulSDCheckInterval duration
|
||||
Interval for checking for changes in Consul. This works only if consul_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details (default 30s)
|
||||
-promscrape.digitaloceanSDCheckInterval duration
|
||||
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s)
|
||||
Interval for checking for changes in digital ocean. This works only if digitalocean_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#digitalocean_sd_config for details (default 1m0s)
|
||||
-promscrape.disableCompression
|
||||
Whether to disable sending 'Accept-Encoding: gzip' request headers to all the scrape targets. This may reduce CPU usage on scrape targets at the cost of higher network bandwidth utilization. It is possible to set 'disable_compression: true' individually per each 'scrape_config' section in '-promscrape.config' for fine grained control
|
||||
-promscrape.disableKeepAlive
|
||||
|
@ -649,6 +653,8 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
The maximum duration for waiting to perform API requests if more than -promscrape.discovery.concurrency requests are simultaneously performed (default 1m0s)
|
||||
-promscrape.dnsSDCheckInterval duration
|
||||
Interval for checking for changes in dns. This works only if dns_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config for details (default 30s)
|
||||
-promscrape.dockerSDCheckInterval duration
|
||||
Interval for checking for changes in docker. This works only if docker_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#docker_sd_config for details (default 30s)
|
||||
-promscrape.dockerswarmSDCheckInterval duration
|
||||
Interval for checking for changes in dockerswarm. This works only if dockerswarm_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dockerswarm_sd_config for details (default 30s)
|
||||
-promscrape.dropOriginalLabels
|
||||
|
@ -662,7 +668,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.gceSDCheckInterval duration
|
||||
Interval for checking for changes in gce. This works only if gce_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config for details (default 1m0s)
|
||||
-promscrape.httpSDCheckInterval duration
|
||||
Interval for checking for changes in http service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s)
|
||||
Interval for checking for changes in http endpoint service discovery. This works only if http_sd_configs is configured in '-promscrape.config' file. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#http_sd_config for details (default 1m0s)
|
||||
-promscrape.kubernetes.apiServerTimeout duration
|
||||
How frequently to reload the full state from Kuberntes API server (default 30m0s)
|
||||
-promscrape.kubernetesSDCheckInterval duration
|
||||
|
@ -710,6 +716,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
|
||||
-remoteWrite.maxHourlySeries int
|
||||
The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -remoteWrite.maxDailySeries
|
||||
-remoteWrite.multitenantURL array
|
||||
Base path for multitenant remote storage URL to write data to. See https://docs.victoriametrics.com/vmagent.html#multitenancy for details. Example url: http://<vminsert>:8480 . Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.oauth2.clientID array
|
||||
Optional OAuth2 clientID to use for -remoteWrite.url. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
|
@ -729,7 +738,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.queues int
|
||||
The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage (default 2 * numberOfAvailableCPUs)
|
||||
The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage. Default value if 2 * numberOfAvailableCPUs (default 8)
|
||||
-remoteWrite.rateLimit array
|
||||
Optional rate limit in bytes per second for data sent to -remoteWrite.url. By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data is sent after temporary unavailability of the remote storage
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
|
@ -766,7 +775,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-remoteWrite.tmpDataPath string
|
||||
Path to directory where temporary data for remote write component is stored. See also -remoteWrite.maxDiskUsagePerURL (default "vmagent-remotewrite-data")
|
||||
-remoteWrite.url array
|
||||
Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems
|
||||
Remote storage URL to write data to. It must support Prometheus remote_write API. It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . Pass multiple -remoteWrite.url flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.multitenantURL
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.urlRelabelConfig array
|
||||
Optional path to relabel config for the corresponding -remoteWrite.url
|
||||
|
|
35
lib/auth/auth.go
Normal file
35
lib/auth/auth.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package auth
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Token contains settings for request processing
|
||||
type Token struct {
|
||||
ProjectID uint32
|
||||
AccountID uint32
|
||||
}
|
||||
|
||||
// NewToken returns new Token for the given authToken
|
||||
func NewToken(authToken string) (*Token, error) {
|
||||
tmp := strings.Split(authToken, ":")
|
||||
if len(tmp) > 2 {
|
||||
return nil, fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp))
|
||||
}
|
||||
var at Token
|
||||
accountID, err := strconv.Atoi(tmp[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err)
|
||||
}
|
||||
at.AccountID = uint32(accountID)
|
||||
if len(tmp) > 1 {
|
||||
projectID, err := strconv.Atoi(tmp[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err)
|
||||
}
|
||||
at.ProjectID = uint32(projectID)
|
||||
}
|
||||
return &at, nil
|
||||
}
|
64
lib/httpserver/path.go
Normal file
64
lib/httpserver/path.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package httpserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Path contains the following path structure:
|
||||
// /{prefix}/{authToken}/{suffix}
|
||||
//
|
||||
// It is compatible with SaaS version.
|
||||
type Path struct {
|
||||
Prefix string
|
||||
AuthToken string
|
||||
Suffix string
|
||||
}
|
||||
|
||||
// ParsePath parses the given path.
|
||||
func ParsePath(path string) (*Path, error) {
|
||||
// The path must have the following form:
|
||||
// /{prefix}/{authToken}/{suffix}
|
||||
//
|
||||
// - prefix must contain `select`, `insert` or `delete`.
|
||||
// - authToken contains `accountID[:projectID]`, where projectID is optional.
|
||||
// - suffix contains arbitrary suffix.
|
||||
//
|
||||
// prefix must be used for the routing to the appropriate service
|
||||
// in the cluster - either vminsert or vmselect.
|
||||
s := skipPrefixSlashes(path)
|
||||
n := strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find {prefix}")
|
||||
}
|
||||
prefix := s[:n]
|
||||
|
||||
s = skipPrefixSlashes(s[n+1:])
|
||||
n = strings.IndexByte(s, '/')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("cannot find {authToken}")
|
||||
}
|
||||
authToken := s[:n]
|
||||
|
||||
s = skipPrefixSlashes(s[n+1:])
|
||||
|
||||
// Substitute double slashes with single slashes in the path, since such slashes
|
||||
// may appear due improper copy-pasting of the url.
|
||||
suffix := strings.Replace(s, "//", "/", -1)
|
||||
|
||||
p := &Path{
|
||||
Prefix: prefix,
|
||||
AuthToken: authToken,
|
||||
Suffix: suffix,
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// skipPrefixSlashes remove double slashes which may appear due
|
||||
// improper copy-pasting of the url
|
||||
func skipPrefixSlashes(s string) string {
|
||||
for len(s) > 0 && s[0] == '/' {
|
||||
s = s[1:]
|
||||
}
|
||||
return s
|
||||
}
|
|
@ -8,7 +8,6 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
|
@ -50,7 +49,7 @@ func CheckConfig() error {
|
|||
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
|
||||
//
|
||||
// Scraped data is passed to pushData.
|
||||
func Init(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) {
|
||||
func Init(pushData func(wr *prompbmarshal.WriteRequest)) {
|
||||
globalStopCh = make(chan struct{})
|
||||
scraperWG.Add(1)
|
||||
go func() {
|
||||
|
@ -73,7 +72,7 @@ var (
|
|||
PendingScrapeConfigs int32
|
||||
)
|
||||
|
||||
func runScraper(configFile string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
|
||||
func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
|
||||
if configFile == "" {
|
||||
// Nothing to scrape.
|
||||
return
|
||||
|
@ -161,13 +160,13 @@ func runScraper(configFile string, pushData func(p *httpserver.Path, wr *prompbm
|
|||
var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`)
|
||||
|
||||
type scrapeConfigs struct {
|
||||
pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
|
||||
pushData func(wr *prompbmarshal.WriteRequest)
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
scfgs []*scrapeConfig
|
||||
}
|
||||
|
||||
func newScrapeConfigs(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scrapeConfigs {
|
||||
func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConfigs {
|
||||
return &scrapeConfigs{
|
||||
pushData: pushData,
|
||||
stopCh: make(chan struct{}),
|
||||
|
@ -208,7 +207,7 @@ func (scs *scrapeConfigs) stop() {
|
|||
|
||||
type scrapeConfig struct {
|
||||
name string
|
||||
pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
|
||||
pushData func(wr *prompbmarshal.WriteRequest)
|
||||
getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
|
||||
checkInterval time.Duration
|
||||
cfgCh chan *Config
|
||||
|
@ -257,7 +256,7 @@ type scraperGroup struct {
|
|||
wg sync.WaitGroup
|
||||
mLock sync.Mutex
|
||||
m map[string]*scraper
|
||||
pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
|
||||
pushData func(wr *prompbmarshal.WriteRequest)
|
||||
|
||||
changesCount *metrics.Counter
|
||||
activeScrapers *metrics.Counter
|
||||
|
@ -265,7 +264,7 @@ type scraperGroup struct {
|
|||
scrapersStopped *metrics.Counter
|
||||
}
|
||||
|
||||
func newScraperGroup(name string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraperGroup {
|
||||
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup {
|
||||
sg := &scraperGroup{
|
||||
name: name,
|
||||
m: make(map[string]*scraper),
|
||||
|
@ -359,7 +358,7 @@ type scraper struct {
|
|||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func newScraper(sw *ScrapeWork, group string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraper {
|
||||
func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper {
|
||||
sc := &scraper{
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
|
@ -164,7 +163,7 @@ type scrapeWork struct {
|
|||
GetStreamReader func() (*streamReader, error)
|
||||
|
||||
// PushData is called for pushing collected data.
|
||||
PushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
|
||||
PushData func(wr *prompbmarshal.WriteRequest)
|
||||
|
||||
// ScrapeGroup is name of ScrapeGroup that
|
||||
// scrapeWork belongs to
|
||||
|
@ -317,7 +316,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
||||
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
||||
startTime := time.Now()
|
||||
sw.PushData(nil, &wc.writeRequest)
|
||||
sw.PushData(&wc.writeRequest)
|
||||
pushDataDuration.UpdateDuration(startTime)
|
||||
sw.prevLabelsLen = len(wc.labels)
|
||||
wc.reset()
|
||||
|
@ -359,7 +358,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
|||
}
|
||||
sw.updateSeriesAdded(wc)
|
||||
startTime := time.Now()
|
||||
sw.PushData(nil, &wc.writeRequest)
|
||||
sw.PushData(&wc.writeRequest)
|
||||
pushDataDuration.UpdateDuration(startTime)
|
||||
wc.resetNoRows()
|
||||
return nil
|
||||
|
@ -387,7 +386,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
|||
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
||||
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
||||
startTime := time.Now()
|
||||
sw.PushData(nil, &wc.writeRequest)
|
||||
sw.PushData(&wc.writeRequest)
|
||||
pushDataDuration.UpdateDuration(startTime)
|
||||
sw.prevLabelsLen = len(wc.labels)
|
||||
wc.reset()
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
|
@ -59,7 +58,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
|
|||
|
||||
pushDataCalls := 0
|
||||
var pushDataErr error
|
||||
sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {
|
||||
sw.PushData = func(wr *prompbmarshal.WriteRequest) {
|
||||
if err := expectEqualTimeseries(wr.Timeseries, timeseriesExpected); err != nil {
|
||||
pushDataErr = fmt.Errorf("unexpected data pushed: %w\ngot\n%#v\nwant\n%#v", err, wr.Timeseries, timeseriesExpected)
|
||||
}
|
||||
|
@ -99,7 +98,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
|
|||
|
||||
pushDataCalls := 0
|
||||
var pushDataErr error
|
||||
sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {
|
||||
sw.PushData = func(wr *prompbmarshal.WriteRequest) {
|
||||
pushDataCalls++
|
||||
if len(wr.Timeseries) > len(timeseriesExpected) {
|
||||
pushDataErr = fmt.Errorf("too many time series obtained; got %d; want %d\ngot\n%+v\nwant\n%+v",
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
|
@ -40,7 +39,7 @@ vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
|
|||
var sw scrapeWork
|
||||
sw.Config = &ScrapeWork{}
|
||||
sw.ReadData = readDataFunc
|
||||
sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {}
|
||||
sw.PushData = func(wr *prompbmarshal.WriteRequest) {}
|
||||
timestamp := int64(0)
|
||||
for pb.Next() {
|
||||
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
|
||||
|
|
72
lib/tenantmetrics/counter_map.go
Normal file
72
lib/tenantmetrics/counter_map.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
package tenantmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// TenantID defines metric tenant.
|
||||
type TenantID struct {
|
||||
AccountID uint32
|
||||
ProjectID uint32
|
||||
}
|
||||
|
||||
// CounterMap is a map of counters keyed by tenant.
|
||||
type CounterMap struct {
|
||||
metric string
|
||||
m atomic.Value
|
||||
}
|
||||
|
||||
// NewCounterMap creates new CounterMap for the given metric.
|
||||
func NewCounterMap(metric string) *CounterMap {
|
||||
cm := &CounterMap{
|
||||
metric: metric,
|
||||
}
|
||||
cm.m.Store(make(map[TenantID]*metrics.Counter))
|
||||
return cm
|
||||
}
|
||||
|
||||
// Get returns counter for the given at
|
||||
func (cm *CounterMap) Get(at *auth.Token) *metrics.Counter {
|
||||
key := TenantID{
|
||||
AccountID: at.AccountID,
|
||||
ProjectID: at.ProjectID,
|
||||
}
|
||||
return cm.GetByTenant(key)
|
||||
}
|
||||
|
||||
// GetByTenant returns counter for the given key.
|
||||
func (cm *CounterMap) GetByTenant(key TenantID) *metrics.Counter {
|
||||
m := cm.m.Load().(map[TenantID]*metrics.Counter)
|
||||
if c := m[key]; c != nil {
|
||||
// Fast path - the counter for k already exists.
|
||||
return c
|
||||
}
|
||||
|
||||
// Slow path - create missing counter for k and re-create m.
|
||||
newM := make(map[TenantID]*metrics.Counter, len(m)+1)
|
||||
for k, c := range m {
|
||||
newM[k] = c
|
||||
}
|
||||
metricName := createMetricName(cm.metric, key)
|
||||
c := metrics.GetOrCreateCounter(metricName)
|
||||
newM[key] = c
|
||||
cm.m.Store(newM)
|
||||
return c
|
||||
}
|
||||
|
||||
func createMetricName(metric string, key TenantID) string {
|
||||
if len(metric) == 0 {
|
||||
logger.Panicf("BUG: metric cannot be empty")
|
||||
}
|
||||
if metric[len(metric)-1] != '}' {
|
||||
// Metric without labels.
|
||||
return fmt.Sprintf(`%s{accountID="%d",projectID="%d"}`, metric, key.AccountID, key.ProjectID)
|
||||
}
|
||||
// Metric with labels.
|
||||
return fmt.Sprintf(`%s,accountID="%d",projectID="%d"}`, metric[:len(metric)-1], key.AccountID, key.ProjectID)
|
||||
}
|
90
lib/tenantmetrics/counter_map_test.go
Normal file
90
lib/tenantmetrics/counter_map_test.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
package tenantmetrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
)
|
||||
|
||||
func TestCreateMetricNameError(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Fatal("expecting non-nil panic")
|
||||
}
|
||||
}()
|
||||
_ = createMetricName("", TenantID{})
|
||||
}
|
||||
|
||||
func TestCreateMetricNameSuccess(t *testing.T) {
|
||||
f := func(s string, at *auth.Token, metricExpected string) {
|
||||
t.Helper()
|
||||
metric := createMetricName(s, TenantID{
|
||||
AccountID: at.AccountID,
|
||||
ProjectID: at.ProjectID,
|
||||
})
|
||||
if metric != metricExpected {
|
||||
t.Fatalf("unexpected result for createMetricName(%q, %v); got %q; want %q", s, at, metric, metricExpected)
|
||||
}
|
||||
}
|
||||
f(`a`, &auth.Token{AccountID: 1, ProjectID: 2}, `a{accountID="1",projectID="2"}`)
|
||||
f(`foo{bar="baz"}`, &auth.Token{AccountID: 33, ProjectID: 41}, `foo{bar="baz",accountID="33",projectID="41"}`)
|
||||
f(`foo{bar="baz",a="aa"}`, &auth.Token{AccountID: 33, ProjectID: 41}, `foo{bar="baz",a="aa",accountID="33",projectID="41"}`)
|
||||
}
|
||||
|
||||
func TestCounterMap(t *testing.T) {
|
||||
cm := NewCounterMap("foobar")
|
||||
cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Inc()
|
||||
cm.Get(&auth.Token{AccountID: 4, ProjectID: 0}).Add(12)
|
||||
|
||||
if n := cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Get(); n != 1 {
|
||||
t.Fatalf("unexpected counter value; got %d; want %d", n, 1)
|
||||
}
|
||||
if n := cm.Get(&auth.Token{AccountID: 4, ProjectID: 0}).Get(); n != 12 {
|
||||
t.Fatalf("unexpected counter value; got %d; want %d", n, 12)
|
||||
}
|
||||
if n := cm.Get(&auth.Token{}).Get(); n != 0 {
|
||||
t.Fatalf("unexpected counter value; got %d; want %d", n, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCounterMapConcurrent(t *testing.T) {
|
||||
cm := NewCounterMap(`aaa{bb="cc"}`)
|
||||
f := func() error {
|
||||
for i := 0; i < 10; i++ {
|
||||
cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Inc()
|
||||
if n := cm.Get(&auth.Token{AccountID: 3, ProjectID: 4}).Get(); n != 0 {
|
||||
return fmt.Errorf("unexpected counter value; got %d; want %d", n, 0)
|
||||
}
|
||||
cm.Get(&auth.Token{AccountID: 1, ProjectID: 3}).Add(5)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const concurrency = 5
|
||||
ch := make(chan error, concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
ch <- f()
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < concurrency; i++ {
|
||||
select {
|
||||
case err := <-ch:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
|
||||
if n := cm.Get(&auth.Token{AccountID: 1, ProjectID: 2}).Get(); n != concurrency*10 {
|
||||
t.Fatalf("unexpected counter value; got %d; want %d", n, concurrency*10)
|
||||
}
|
||||
if n := cm.Get(&auth.Token{AccountID: 1, ProjectID: 3}).Get(); n != concurrency*10*5 {
|
||||
t.Fatalf("unexpected counter value; got %d; want %d", n, concurrency*10*5)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue