mirror of
synced 2025-03-11 15:34:56 +00:00
app/vmagent: add -remoteWrite.urlRelabelConfig
for applying individual relabeling for each -remoteWrite.url
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/320 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/308
This commit is contained in:
4 changed files with 134 additions and 78 deletions
@ -69,11 +69,7 @@ sections from [Prometheus config file](https://prometheus.io/docs/prometheus/lat
* `scrape_configs`
All the other sections are ignored, including [remote_write](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) section.
Use `-remoteWrite.*` command-line flags instead for configuring remote write settings:
* `-remoteWrite.url` for pointing to remote storage. Data to remote storage can be sent either via HTTP or HTTPS. See `-remoteWrite.tls*` flags for details.
* `-remoteWrite.label` for adding labels to metrics before sending them to remote storage.
* `-remoteWrite.relabelConfig` for applying relabeling to metrics before sending them to remote storage.
Use `-remoteWrite.*` command-line flags instead for configuring remote write settings.
The following scrape types in [scrape_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) section are supported:
@ -114,7 +110,8 @@ The relabeling can be defined in the following places:
* At `scrape_config -> relabel_configs` section in `-promscrape.config` file. This relabeling is applied to targets when parsing the file during `vmagent` startup
or during config reload after sending `SIGHUP` signal to `vmagent` via `kill -HUP`.
* At `scrape_config -> metric_relabel_configs` section in `-promscrape.config` file. This relabeling is applied to metrics after each scrape for the configured targets.
* At `-remoteWrite.relabelConfig` file. This relabeling is aplied to all the collected metrics before sending them to `-remoteWrite.url`.
* At `-remoteWrite.relabelConfig` file. This relabeling is aplied to all the collected metrics before sending them to remote storage.
* At `-remoteWrite.urlRelabelConfig` files. This relabeling is applied to metrics before sending them to the corresponding `-remoteWrite.url`.
Read more about relabeling in the following articles:
@ -50,7 +50,7 @@ type client struct {
stopCh chan struct{}
func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQueue) *client {
func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQueue, concurrency int) *client {
authHeader := ""
if len(*basicAuthUsername) > 0 || len(*basicAuthPassword) > 0 {
// See https://en.wikipedia.org/wiki/Basic_access_authentication
@ -98,7 +98,7 @@ func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQue
host += ":80"
maxConns := 2 * *queues
maxConns := 2 * concurrency
hc := &fasthttp.HostClient{
Addr: host,
Name: "vmagent",
@ -126,7 +126,7 @@ func newClient(remoteWriteURL, urlLabelValue string, fq *persistentqueue.FastQue
c.requestsOKCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="2XX"}`, c.urlLabelValue))
c.errorsCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_errors_total{url=%q}`, c.urlLabelValue))
c.retriesCount = metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_retries_count_total{url=%q}`, c.urlLabelValue))
for i := 0; i < *queues; i++ {
for i := 0; i < concurrency; i++ {
go func() {
defer c.wg.Done()
@ -12,45 +12,42 @@ import (
var (
extraLabelsUnparsed = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
unparsedLabelsGlobal = flagutil.NewArray("remoteWrite.label", "Optional label in the form 'name=value' to add to all the metrics before sending them to -remoteWrite.url. "+
"Pass multiple -remoteWrite.label flags in order to add multiple flags to metrics before sending them to remote storage")
relabelConfigPath = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+
relabelConfigPathGlobal = flag.String("remoteWrite.relabelConfig", "", "Optional path to file with relabel_config entries. These entries are applied to all the metrics "+
"before sending them to -remoteWrite.url. See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config for details")
var extraLabels []prompbmarshal.Label
var prcs []promrelabel.ParsedRelabelConfig
var labelsGlobal []prompbmarshal.Label
var prcsGlobal []promrelabel.ParsedRelabelConfig
// initRelabel must be called after parsing command-line flags.
func initRelabel() {
// Init extraLabels
for _, s := range *extraLabelsUnparsed {
// initRelabelGlobal must be called after parsing command-line flags.
func initRelabelGlobal() {
// Init labelsGlobal
labelsGlobal = nil
for _, s := range *unparsedLabelsGlobal {
n := strings.IndexByte(s, '=')
if n < 0 {
logger.Panicf("FATAL: missing '=' in `-remoteWrite.label`. It must contain label in the form `name=value`; got %q", s)
extraLabels = append(extraLabels, prompbmarshal.Label{
labelsGlobal = append(labelsGlobal, prompbmarshal.Label{
Name: s[:n],
Value: s[n+1:],
// Init prcs
if len(*relabelConfigPath) > 0 {
// Init prcsGlobal
prcsGlobal = nil
if len(*relabelConfigPathGlobal) > 0 {
var err error
prcs, err = promrelabel.LoadRelabelConfigs(*relabelConfigPath)
prcsGlobal, err = promrelabel.LoadRelabelConfigs(*relabelConfigPathGlobal)
if err != nil {
logger.Panicf("FATAL: cannot load relabel configs from -remoteWrite.relabelConfig=%q: %s", *relabelConfigPath, err)
logger.Panicf("FATAL: cannot load relabel configs from -remoteWrite.relabelConfig=%q: %s", *relabelConfigPathGlobal, err)
func resetRelabel() {
extraLabels = nil
prcs = nil
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
func (rctx *relabelCtx) applyRelabeling(tss []prompbmarshal.TimeSeries, extraLabels []prompbmarshal.Label, prcs []promrelabel.ParsedRelabelConfig) []prompbmarshal.TimeSeries {
if len(extraLabels) == 0 && len(prcs) == 0 {
// Nothing to change.
return tss
@ -105,3 +102,12 @@ var relabelCtxPool = &sync.Pool{
return &relabelCtx{}
func getRelabelCtx() *relabelCtx {
return relabelCtxPool.Get().(*relabelCtx)
func putRelabelCtx(rctx *relabelCtx) {
rctx.labels = rctx.labels[:0]
@ -11,6 +11,7 @@ import (
xxhash "github.com/cespare/xxhash/v2"
@ -19,13 +20,16 @@ var (
remoteWriteURLs = flagutil.NewArray("remoteWrite.url", "Remote storage URL to write data to. It must support Prometheus remote_write API. "+
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
"Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored")
queues = flag.Int("remoteWrite.queues", 1, "The number of concurrent queues to each -remoteWrite.url. Set more queues if a single queue "+
relabelConfigPaths = flagutil.NewArray("remoteWrite.urlRelabelConfig", "Optional path to relabel config for the corresponding -remoteWrite.url")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored")
queues = flag.Int("remoteWrite.queues", 1, "The number of concurrent queues to each -remoteWrite.url. Set more queues if a single queue "+
"isn't enough for sending high volume of collected data to remote storage")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensistive auth info")
var rwctxs []*remoteWriteCtx
// Init initializes remotewrite.
// It must be called after flag.Parse().
@ -40,7 +44,7 @@ func Init() {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
maxInmemoryBlocks := memory.Allowed() / len(*remoteWriteURLs) / maxRowsPerBlock / 100
if maxInmemoryBlocks > 200 {
@ -53,27 +57,16 @@ func Init() {
maxInmemoryBlocks = 2
for i, remoteWriteURL := range *remoteWriteURLs {
h := xxhash.Sum64([]byte(remoteWriteURL))
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks)
relabelConfigPath := ""
if i < len(*relabelConfigPaths) {
relabelConfigPath = (*relabelConfigPaths)[i]
urlLabelValue := fmt.Sprintf("secret-url-%d", i+1)
if *showRemoteWriteURL {
urlLabelValue = remoteWriteURL
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{url=%q, hash="%016X"}`, urlLabelValue, h), func() float64 {
return float64(fq.GetPendingBytes())
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{url=%q}`, urlLabelValue), func() float64 {
return float64(fq.GetInmemoryQueueLen())
c := newClient(remoteWriteURL, urlLabelValue, fq)
fqs = append(fqs, fq)
cs = append(cs, c)
pss = make([]*pendingSeries, *queues)
for i := range pss {
pss[i] = newPendingSeries(pushBlockToPersistentQueues)
rwctx := newRemoteWriteCtx(remoteWriteURL, relabelConfigPath, maxInmemoryBlocks, urlLabelValue)
rwctxs = append(rwctxs, rwctx)
@ -81,30 +74,20 @@ func Init() {
// It is expected that nobody calls Push during and after the call to this func.
func Stop() {
for _, ps := range pss {
for _, rwctx := range rwctxs {
// Close all the persistent queues. This should unblock clients waiting in MustReadBlock.
for _, fq := range fqs {
fqs = nil
// Stop all the clients
for _, c := range cs {
cs = nil
rwctxs = nil
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
// Each timeseries in wr.Timeseries must contain one sample.
func Push(wr *prompbmarshal.WriteRequest) {
rctx := relabelCtxPool.Get().(*relabelCtx)
var rctx *relabelCtx
if len(prcsGlobal) > 0 {
rctx = getRelabelCtx()
tss := wr.Timeseries
for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce maxmimum memory usage
@ -115,22 +98,92 @@ func Push(wr *prompbmarshal.WriteRequest) {
} else {
tss = nil
tssBlock = rctx.applyRelabeling(tssBlock)
idx := atomic.AddUint64(&pssNextIdx, 1) % uint64(len(pss))
if rctx != nil {
tssBlockLen := len(tssBlock)
tssBlock = rctx.applyRelabeling(tssBlock, labelsGlobal, prcsGlobal)
globalRelabelMetricsDropped.Add(tssBlockLen - len(tssBlock))
for _, rwctx := range rwctxs {
func pushBlockToPersistentQueues(block []byte) {
for _, fq := range fqs {
if rctx != nil {
var fqs []*persistentqueue.FastQueue
var cs []*client
var globalRelabelMetricsDropped = metrics.NewCounter("vmagent_remotewrite_global_relabel_metrics_dropped_total")
var pssNextIdx uint64
var pss []*pendingSeries
type remoteWriteCtx struct {
fq *persistentqueue.FastQueue
c *client
prcs []promrelabel.ParsedRelabelConfig
pss []*pendingSeries
pssNextIdx uint64
relabelMetricsDropped *metrics.Counter
func newRemoteWriteCtx(remoteWriteURL, relabelConfigPath string, maxInmemoryBlocks int, urlLabelValue string) *remoteWriteCtx {
h := xxhash.Sum64([]byte(remoteWriteURL))
path := fmt.Sprintf("%s/persistent-queue/%016X", *tmpDataPath, h)
fq := persistentqueue.MustOpenFastQueue(path, remoteWriteURL, maxInmemoryBlocks)
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_data_bytes{url=%q, hash="%016X"}`, urlLabelValue, h), func() float64 {
return float64(fq.GetPendingBytes())
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_pending_inmemory_blocks{url=%q}`, urlLabelValue), func() float64 {
return float64(fq.GetInmemoryQueueLen())
c := newClient(remoteWriteURL, urlLabelValue, fq, *queues)
var prcs []promrelabel.ParsedRelabelConfig
if len(relabelConfigPath) > 0 {
var err error
prcs, err = promrelabel.LoadRelabelConfigs(relabelConfigPath)
if err != nil {
logger.Panicf("FATAL: cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %s", relabelConfigPath, err)
pss := make([]*pendingSeries, *queues)
for i := range pss {
pss[i] = newPendingSeries(fq.MustWriteBlock)
return &remoteWriteCtx{
fq: fq,
c: c,
prcs: prcs,
pss: pss,
relabelMetricsDropped: metrics.NewCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{url=%q}`, urlLabelValue)),
func (rwctx *remoteWriteCtx) MustStop() {
for _, ps := range rwctx.pss {
rwctx.pss = nil
rwctx.fq = nil
rwctx.prcs = nil
rwctx.c = nil
rwctx.relabelMetricsDropped = nil
func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
var rctx *relabelCtx
if len(rwctx.prcs) > 0 {
rctx = getRelabelCtx()
tssLen := len(tss)
tss = rctx.applyRelabeling(tss, nil, rwctx.prcs)
rwctx.relabelMetricsDropped.Add(tssLen - len(tss))
pss := rwctx.pss
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
if rctx != nil {
Reference in a new issue