add support to scrape multi tenant metrics (#2950)

* add support to scrape multi tenant metrics

* add support to scrape multi tenant metrics

Co-authored-by: 赵福玉 <zhaofuyu@zhaofuyudeMac-mini.local>
This commit is contained in:
Fury 2022-08-08 19:10:18 +08:00 committed by GitHub
parent 7a87251ff5
commit 2c553d5a2f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 38 additions and 20 deletions

View file

@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View file

@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View file

@ -65,7 +65,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View file

@ -235,8 +235,8 @@ func Stop() {
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
//
// Note that wr may be modified by Push due to relabeling and rounding.
func Push(wr *prompbmarshal.WriteRequest) {
PushWithAuthToken(nil, wr)
func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) {
PushWithAuthToken(at, wr)
}
// PushWithAuthToken sends wr to remote storage systems set via `-remoteWrite.multitenantURL`.

View file

@ -4,6 +4,7 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"net/url"
"path/filepath"
"sort"
@ -1227,6 +1228,17 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
if metricsPathRelabeled == "" {
metricsPathRelabeled = "/metrics"
}
var at *auth.Token
tenantIdRelabeled := promrelabel.GetLabelValueByName(labels, "__tenant_id__")
if tenantIdRelabeled != "" {
newToken, err := auth.NewToken(tenantIdRelabeled)
if err != nil {
return nil, fmt.Errorf("invalid tenant id: %s for job=%s, err: %w", tenantIdRelabeled, swc.jobName, err)
}
at = newToken
}
if !strings.HasPrefix(metricsPathRelabeled, "/") {
metricsPathRelabeled = "/" + metricsPathRelabeled
}
@ -1308,6 +1320,7 @@ func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabel
ScrapeAlignInterval: swc.scrapeAlignInterval,
ScrapeOffset: swc.scrapeOffset,
SeriesLimit: seriesLimit,
AuthToken: at,
jobNameOriginal: swc.jobName,
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"io"
"sync"
"sync/atomic"
@ -53,7 +54,7 @@ func CheckConfig() error {
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
func Init(pushData func(wr *prompbmarshal.WriteRequest)) {
func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) {
mustInitClusterMemberID()
globalStopChan = make(chan struct{})
scraperWG.Add(1)
@ -91,7 +92,7 @@ func WriteConfigData(w io.Writer) {
_, _ = w.Write(*b)
}
func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
if configFile == "" {
// Nothing to scrape.
return
@ -185,14 +186,14 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`)
type scrapeConfigs struct {
pushData func(wr *prompbmarshal.WriteRequest)
pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
wg sync.WaitGroup
stopCh chan struct{}
globalStopCh <-chan struct{}
scfgs []*scrapeConfig
}
func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs {
func newScrapeConfigs(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs {
return &scrapeConfigs{
pushData: pushData,
stopCh: make(chan struct{}),
@ -234,7 +235,7 @@ func (scs *scrapeConfigs) stop() {
type scrapeConfig struct {
name string
pushData func(wr *prompbmarshal.WriteRequest)
pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
checkInterval time.Duration
cfgCh chan *Config
@ -287,7 +288,7 @@ type scraperGroup struct {
wg sync.WaitGroup
mLock sync.Mutex
m map[string]*scraper
pushData func(wr *prompbmarshal.WriteRequest)
pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
changesCount *metrics.Counter
activeScrapers *metrics.Counter
@ -297,7 +298,7 @@ type scraperGroup struct {
globalStopCh <-chan struct{}
}
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup {
func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup {
sg := &scraperGroup{
name: name,
m: make(map[string]*scraper),
@ -413,7 +414,7 @@ type scraper struct {
stoppedCh chan struct{}
}
func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper {
func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper {
sc := &scraper{
stopCh: make(chan struct{}),
stoppedCh: make(chan struct{}),

View file

@ -3,6 +3,7 @@ package promscrape
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"io/ioutil"
"math"
"math/bits"
@ -119,6 +120,9 @@ type ScrapeWork struct {
// Optional limit on the number of unique series the scrape target can expose.
SeriesLimit int
//The Tenant Info
AuthToken *auth.Token
// The original 'job_name'
jobNameOriginal string
}
@ -188,7 +192,7 @@ type scrapeWork struct {
GetStreamReader func() (*streamReader, error)
// PushData is called for pushing collected data.
PushData func(wr *prompbmarshal.WriteRequest)
PushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)
// ScrapeGroup is name of ScrapeGroup that
// scrapeWork belongs to
@ -487,7 +491,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
// See https://github.com/VictoriaMetrics/operator/issues/497
sw.addAutoTimeseries(wc, "scrape_samples_limit", float64(sw.Config.SampleLimit), scrapeTimestamp)
}
sw.pushData(&wc.writeRequest)
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
sw.prevLabelsLen = len(wc.labels)
sw.prevBodyLen = len(bodyString)
wc.reset()
@ -514,9 +518,9 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
return err
}
func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {
startTime := time.Now()
sw.PushData(wr)
sw.PushData(at, wr)
pushDataDuration.UpdateDuration(startTime)
}
@ -568,7 +572,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
}
sw.pushData(&wc.writeRequest)
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.resetNoRows()
return nil
}, sw.logError)
@ -603,7 +607,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp)
sw.pushData(&wc.writeRequest)
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
sw.prevLabelsLen = len(wc.labels)
sw.prevBodyLen = sbr.bodyLen
wc.reset()
@ -770,7 +774,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
}
staleSamplesCreated.Add(len(samples))
}
sw.pushData(&wc.writeRequest)
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
}
var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`)