mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape: follow-up after 2c553d5a2f
- fix broken tests - cosmetic code cleanup - document the change at https://docs.victoriametrics.com/vmagent.html#multitenancy - document the change at https://docs.victoriametrics.com/CHANGELOG.html
This commit is contained in:
parent
2c553d5a2f
commit
46d7792b72
18 changed files with 76 additions and 30 deletions
|
@ -140,7 +140,25 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx
|
|||
|
||||
## Multitenancy
|
||||
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. But it can accept multitenant data if `-remoteWrite.multitenantURL` is set. In this case it accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`.
|
||||
|
||||
[Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
|
||||
If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set)
|
||||
then `vmagent` reads tenantID from `__tenant_id__` label for the discovered targets and routes all the metrics from this target to the given `__tenant_id__`, e.g. to the url `<-remoteWrite.multitnenatURL>/insert/<__tenant_id__>/prometheus/api/v1/write`.
|
||||
|
||||
For example, the following relabeling rule instructs sending metrics to tenantID defined in the `prometheus.io/tenant` annotation of Kubernetes pod deployment:
|
||||
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- kubernetes_sd_configs:
|
||||
- role: pod
|
||||
relabel_configs:
|
||||
- source_labels: [__meta_kubernetes_annotation_prometheus_io_tenant]
|
||||
target_label: __tenant_id__
|
||||
```
|
||||
|
||||
If the target has no associated `__tenant_id__` label, then its' metrics are routed to zero tenantID, e.g. to `<-remoteWrite.multitenantURL>/insert/0/prometheus/api/v1/write`.
|
||||
|
||||
## How to collect metrics in Prometheus format
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
remotewrite.Push(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
|
|
|
@ -82,7 +82,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
remotewrite.Push(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
|
|
|
@ -134,7 +134,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom
|
|||
ctx.ctx.Labels = labels
|
||||
ctx.ctx.Samples = samples
|
||||
ctx.commonLabels = commonLabels
|
||||
remotewrite.PushWithAuthToken(at, &ctx.ctx.WriteRequest)
|
||||
remotewrite.Push(at, &ctx.ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
|
|
|
@ -87,6 +87,6 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
remotewrite.Push(at, &ctx.WriteRequest)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
remotewrite.Push(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(len(rows))
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(len(rows))
|
||||
|
|
|
@ -81,7 +81,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
remotewrite.Push(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
|
|
|
@ -234,15 +234,11 @@ func Stop() {
|
|||
|
||||
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
|
||||
//
|
||||
// Note that wr may be modified by Push due to relabeling and rounding.
|
||||
func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
PushWithAuthToken(at, wr)
|
||||
}
|
||||
|
||||
// PushWithAuthToken sends wr to remote storage systems set via `-remoteWrite.multitenantURL`.
|
||||
// If at is nil, then the data is pushed to the configured `-remoteWrite.url`.
|
||||
// If at isn't nil, the the data is pushed to the configured `-remoteWrite.multitenantURL`.
|
||||
//
|
||||
// Note that wr may be modified by Push due to relabeling and rounding.
|
||||
func PushWithAuthToken(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
if at == nil && len(*remoteWriteMultitenantURLs) > 0 {
|
||||
// Write data to default tenant if at isn't set while -remoteWrite.multitenantURL is set.
|
||||
at = defaultAuthToken
|
||||
|
@ -252,7 +248,7 @@ func PushWithAuthToken(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
|||
rwctxs = rwctxsDefault
|
||||
} else {
|
||||
if len(*remoteWriteMultitenantURLs) == 0 {
|
||||
logger.Panicf("BUG: remoteWriteMultitenantURLs must be non-empty for non-nil at")
|
||||
logger.Panicf("BUG: -remoteWrite.multitenantURL command-line flag must be set when __tenant_id__=%q label is set", at)
|
||||
}
|
||||
rwctxsMapLock.Lock()
|
||||
tenantID := tenantmetrics.TenantID{
|
||||
|
|
|
@ -88,7 +88,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L
|
|||
ctx.WriteRequest.Timeseries = tssDst
|
||||
ctx.Labels = labels
|
||||
ctx.Samples = samples
|
||||
remotewrite.PushWithAuthToken(at, &ctx.WriteRequest)
|
||||
remotewrite.Push(at, &ctx.WriteRequest)
|
||||
rowsInserted.Add(rowsTotal)
|
||||
if at != nil {
|
||||
rowsTenantInserted.Get(at).Add(rowsTotal)
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/influxutils"
|
||||
|
@ -29,6 +30,7 @@ import (
|
|||
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
|
||||
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
|
@ -80,7 +82,9 @@ func Init() {
|
|||
if len(*opentsdbHTTPListenAddr) > 0 {
|
||||
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
|
||||
}
|
||||
promscrape.Init(prompush.Push)
|
||||
promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
prompush.Push(wr)
|
||||
})
|
||||
}
|
||||
|
||||
// Stop stops vminsert.
|
||||
|
|
|
@ -14,7 +14,7 @@ var (
|
|||
|
||||
const maxRowsPerBlock = 10000
|
||||
|
||||
// Push pushes wr to storage.
|
||||
// Push pushes wr for the given at to storage.
|
||||
func Push(wr *prompbmarshal.WriteRequest) {
|
||||
ctx := common.GetInsertCtx()
|
||||
defer common.PutInsertCtx(ctx)
|
||||
|
|
|
@ -21,6 +21,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics).
|
||||
* FEATURE: improve performance for heavy queries over big number of time series on systems with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896). Thanks to @zqyzyq for [the idea](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2).
|
||||
* FEATURE: improve performance for registering new time series in `indexdb` by up to 50%. Thanks to @ahfuzhang for [the issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2249).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ability to specify tenantID in target labels. In this case metrics from the given target are routed to the given `__tenant_id__`. See [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2943).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery for [Yandex Cloud](https://cloud.yandex.com/en/). See [these docs](https://docs.victoriametrics.com/sd_configs.html#yandexcloud_sd_configs) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1386).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui). Zoom in the graph by selecting the needed time range in the same way Grafana does. Hold `ctrl` (or `cmd` on MacOS) in order to move the graph to the left/right. Hold `ctrl` (or `cmd` on MacOS) and scroll up/down in order to zoom in/out the area under the cursor. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2812).
|
||||
|
||||
|
|
|
@ -144,7 +144,25 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx
|
|||
|
||||
## Multitenancy
|
||||
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. But it can accept multitenant data if `-remoteWrite.multitenantURL` is set. In this case it accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`.
|
||||
|
||||
[Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert/<accountID>/...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert/<accountID>/prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants.
|
||||
|
||||
If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set)
|
||||
then `vmagent` reads tenantID from `__tenant_id__` label for the discovered targets and routes all the metrics from this target to the given `__tenant_id__`, e.g. to the url `<-remoteWrite.multitnenatURL>/insert/<__tenant_id__>/prometheus/api/v1/write`.
|
||||
|
||||
For example, the following relabeling rule instructs sending metrics to tenantID defined in the `prometheus.io/tenant` annotation of Kubernetes pod deployment:
|
||||
|
||||
```yaml
|
||||
scrape_configs:
|
||||
- kubernetes_sd_configs:
|
||||
- role: pod
|
||||
relabel_configs:
|
||||
- source_labels: [__meta_kubernetes_annotation_prometheus_io_tenant]
|
||||
target_label: __tenant_id__
|
||||
```
|
||||
|
||||
If the target has no associated `__tenant_id__` label, then its' metrics are routed to zero tenantID, e.g. to `<-remoteWrite.multitenantURL>/insert/0/prometheus/api/v1/write`.
|
||||
|
||||
## How to collect metrics in Prometheus format
|
||||
|
||||
|
|
|
@ -8,8 +8,16 @@ import (
|
|||
|
||||
// Token contains settings for request processing
|
||||
type Token struct {
|
||||
ProjectID uint32
|
||||
AccountID uint32
|
||||
ProjectID uint32
|
||||
}
|
||||
|
||||
// String returns string representation of t.
|
||||
func (t *Token) String() string {
|
||||
if t.ProjectID == 0 {
|
||||
return fmt.Sprintf("%d", t.AccountID)
|
||||
}
|
||||
return fmt.Sprintf("%d:%d", t.AccountID, t.ProjectID)
|
||||
}
|
||||
|
||||
// NewToken returns new Token for the given authToken
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package auth
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
@ -12,13 +11,13 @@ func TestNewTokenSuccess(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
got := fmt.Sprintf("%d:%d", newToken.AccountID, newToken.ProjectID)
|
||||
got := newToken.String()
|
||||
if got != want {
|
||||
t.Fatalf("unexpected NewToken() result;got\n%s\nwant\n%s", got, want)
|
||||
}
|
||||
}
|
||||
// token with accountID only
|
||||
f("1", "1:0")
|
||||
f("1", "1")
|
||||
// token with accountID and projecTID
|
||||
f("1:2", "1:2")
|
||||
// max uint32 accountID
|
||||
|
|
|
@ -1230,11 +1230,11 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
|
|||
}
|
||||
|
||||
var at *auth.Token
|
||||
tenantIdRelabeled := promrelabel.GetLabelValueByName(labels, "__tenant_id__")
|
||||
if tenantIdRelabeled != "" {
|
||||
newToken, err := auth.NewToken(tenantIdRelabeled)
|
||||
tenantID := promrelabel.GetLabelValueByName(labels, "__tenant_id__")
|
||||
if tenantID != "" {
|
||||
newToken, err := auth.NewToken(tenantID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid tenant id: %s for job=%s, err: %w", tenantIdRelabeled, swc.jobName, err)
|
||||
return nil, fmt.Errorf("cannot parse __tenant_id__=%q for job=%s, err: %w", tenantID, swc.jobName, err)
|
||||
}
|
||||
at = newToken
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
|
||||
|
@ -62,7 +63,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
|
|||
|
||||
pushDataCalls := 0
|
||||
var pushDataErr error
|
||||
sw.PushData = func(wr *prompbmarshal.WriteRequest) {
|
||||
sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
if err := expectEqualTimeseries(wr.Timeseries, timeseriesExpected); err != nil {
|
||||
pushDataErr = fmt.Errorf("unexpected data pushed: %w\ngot\n%#v\nwant\n%#v", err, wr.Timeseries, timeseriesExpected)
|
||||
}
|
||||
|
@ -102,7 +103,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
|
|||
|
||||
pushDataCalls := 0
|
||||
var pushDataErr error
|
||||
sw.PushData = func(wr *prompbmarshal.WriteRequest) {
|
||||
sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
|
||||
pushDataCalls++
|
||||
if len(wr.Timeseries) > len(timeseriesExpected) {
|
||||
pushDataErr = fmt.Errorf("too many time series obtained; got %d; want %d\ngot\n%+v\nwant\n%+v",
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
|
@ -39,7 +40,7 @@ vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
|
|||
var sw scrapeWork
|
||||
sw.Config = &ScrapeWork{}
|
||||
sw.ReadData = readDataFunc
|
||||
sw.PushData = func(wr *prompbmarshal.WriteRequest) {}
|
||||
sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) {}
|
||||
timestamp := int64(0)
|
||||
for pb.Next() {
|
||||
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {
|
||||
|
|
Loading…
Reference in a new issue