feature: Add multitenant for vmagent (#1505)

* feature: Add multitenant for vmagent

* Minor fix

* Fix rcs index out of range

* Minor fix

* Fix multi Init

* Fix multi Init

* Fix multi Init

* Add default multi

* Adjust naming

* Add TenantInserted metrics

* Add TenantInserted metrics

* fix: remove unused metrics for vmagent

* fix: remove unused metrics for vmagent

Co-authored-by: mghader <marc.ghader@ubisoft.com>
Co-authored-by: Sebastian YEPES <syepes@gmail.com>
This commit is contained in:
Omar Ghader 2021-08-05 08:44:29 +02:00 committed by GitHub
parent 095bb90879
commit fe445f753b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 274 additions and 79 deletions

View file

@ -5,32 +5,36 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="csvimport"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`)
)
// InsertHandler processes csv data from req.
func InsertHandler(req *http.Request) error {
func InsertHandler(p *httpserver.Path, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
return insertRows(p, rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -64,8 +68,14 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(p, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
if p != nil {
at, err := auth.NewToken(p.AuthToken)
if err == nil {
rowsTenantInserted.Get(at).Add(len(rows))
}
}
rowsPerInsert.Update(float64(len(rows)))
return nil
}

View file

@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View file

@ -8,12 +8,15 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
@ -26,6 +29,7 @@ var (
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="influx"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="influx"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`)
)
@ -35,7 +39,7 @@ var (
func InsertHandlerForReader(r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(db, rows, nil)
return insertRows(nil, db, rows, nil)
})
})
}
@ -43,7 +47,7 @@ func InsertHandlerForReader(r io.Reader) error {
// InsertHandlerForHTTP processes remote write for influx line protocol.
//
// See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md
func InsertHandlerForHTTP(req *http.Request) error {
func InsertHandlerForHTTP(p *httpserver.Path, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
@ -55,12 +59,12 @@ func InsertHandlerForHTTP(req *http.Request) error {
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(db, rows, extraLabels)
return insertRows(p, db, rows, extraLabels)
})
})
}
func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(p *httpserver.Path, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
@ -130,8 +134,14 @@ func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label)
ctx.ctx.Labels = labels
ctx.ctx.Samples = samples
ctx.commonLabels = commonLabels
remotewrite.Push(&ctx.ctx.WriteRequest)
remotewrite.Push(p, &ctx.ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if p != nil {
at, err := auth.NewToken(p.AuthToken)
if err == nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
}
rowsPerInsert.Update(float64(rowsTotal))
return nil

View file

@ -19,6 +19,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -88,7 +89,7 @@ func main() {
logger.Infof("starting vmagent at %q...", *httpListenAddr)
startTime := time.Now()
remotewrite.Init()
remotewrite.Init(nil)
common.StartUnmarshalWorkers()
writeconcurrencylimiter.Init()
if len(*influxListenAddr) > 0 {
@ -159,11 +160,82 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
})
return true
}
p, err := httpserver.ParsePath(r.URL.Path)
if err == nil && p.Prefix == "insert" {
_, err := auth.NewToken(p.AuthToken)
if err != nil {
httpserver.Errorf(w, r, "auth error: %s", err)
return true
}
switch p.Suffix {
case "prometheus/", "prometheus", "prometheus/api/v1/write":
prometheusWriteRequests.Inc()
if err := promremotewrite.InsertHandler(p, r); err != nil {
prometheusWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import":
vmimportRequests.Inc()
if err := vmimport.InsertHandler(p, r); err != nil {
vmimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import/csv":
csvimportRequests.Inc()
if err := csvimport.InsertHandler(p, r); err != nil {
csvimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import/prometheus":
prometheusimportRequests.Inc()
if err := prometheusimport.InsertHandler(p, r); err != nil {
prometheusimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "prometheus/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(p, r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
case "influx/write", "influx/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(p, r); err != nil {
influxWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
}
w.WriteHeader(http.StatusNoContent)
return true
default:
// This link is not multitenant
}
}
path := strings.Replace(r.URL.Path, "//", "/", -1)
switch path {
case "/api/v1/write":
prometheusWriteRequests.Inc()
if err := promremotewrite.InsertHandler(r); err != nil {
if err := promremotewrite.InsertHandler(nil, r); err != nil {
prometheusWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@ -172,7 +244,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/api/v1/import":
vmimportRequests.Inc()
if err := vmimport.InsertHandler(r); err != nil {
if err := vmimport.InsertHandler(nil, r); err != nil {
vmimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@ -181,7 +253,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/api/v1/import/csv":
csvimportRequests.Inc()
if err := csvimport.InsertHandler(r); err != nil {
if err := csvimport.InsertHandler(nil, r); err != nil {
csvimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@ -190,7 +262,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/api/v1/import/prometheus":
prometheusimportRequests.Inc()
if err := prometheusimport.InsertHandler(r); err != nil {
if err := prometheusimport.InsertHandler(nil, r); err != nil {
prometheusimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@ -199,7 +271,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/api/v1/import/native":
nativeimportRequests.Inc()
if err := native.InsertHandler(r); err != nil {
if err := native.InsertHandler(nil, r); err != nil {
nativeimportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@ -208,7 +280,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
case "/write", "/api/v2/write":
influxWriteRequests.Inc()
if err := influx.InsertHandlerForHTTP(r); err != nil {
if err := influx.InsertHandlerForHTTP(nil, r); err != nil {
influxWriteErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true

View file

@ -5,36 +5,40 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="native"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`)
)
// InsertHandler processes `/api/v1/import` request.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(req *http.Request) error {
func InsertHandler(p *httpserver.Path, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return insertRows(block, extraLabels)
return insertRows(p, block, extraLabels)
})
})
}
func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -42,6 +46,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
// since relabeling can prevent from inserting the rows.
rowsLen := len(block.Values)
rowsInserted.Add(rowsLen)
if p != nil {
at, err := auth.NewToken(p.AuthToken)
if err == nil {
rowsTenantInserted.Get(at).Add(rowsLen)
}
}
rowsPerInsert.Update(float64(rowsLen))
tssDst := ctx.WriteRequest.Timeseries[:0]
@ -80,6 +90,6 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(p, &ctx.WriteRequest)
return nil
}

View file

@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View file

@ -65,7 +65,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(nil, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil

View file

@ -5,20 +5,24 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="prometheus"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`)
)
// InsertHandler processes `/api/v1/import/prometheus` request.
func InsertHandler(req *http.Request) error {
func InsertHandler(p *httpserver.Path, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
@ -30,12 +34,12 @@ func InsertHandler(req *http.Request) error {
return writeconcurrencylimiter.Do(func() error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
return insertRows(p, rows, extraLabels)
}, nil)
})
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -69,8 +73,14 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(p, &ctx.WriteRequest)
rowsInserted.Add(len(rows))
if p != nil {
at, err := auth.NewToken(p.AuthToken)
if err == nil {
rowsTenantInserted.Get(at).Add(len(rows))
}
}
rowsPerInsert.Update(float64(len(rows)))
return nil
}

View file

@ -5,34 +5,38 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="promremotewrite"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`)
)
// InsertHandler processes remote write for prometheus.
func InsertHandler(req *http.Request) error {
func InsertHandler(p *httpserver.Path, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(tss []prompb.TimeSeries) error {
return insertRows(tss, extraLabels)
return insertRows(p, tss, extraLabels)
})
})
}
func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
func insertRows(p *httpserver.Path, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -68,8 +72,14 @@ func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Labe
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(p, &ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if p != nil {
at, err := auth.NewToken(p.AuthToken)
if err == nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View file

@ -42,11 +42,11 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
}
rcs.global = global
}
if len(*relabelConfigPaths) > len(*remoteWriteURLs) {
if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) {
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*relabelConfigPaths), len(*remoteWriteURLs))
len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
}
rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs))
rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
for i, path := range *relabelConfigPaths {
if len(path) == 0 {
// Skip empty relabel config.
@ -58,6 +58,7 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
}
rcs.perURL[i] = prc
}
return &rcs, nil
}

View file

@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
@ -26,6 +27,9 @@ var (
remoteWriteURLs = flagutil.NewArray("remoteWrite.url", "Remote storage URL to write data to. It must support Prometheus remote_write API. "+
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
"Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems")
remoteWriteMultitenantURLs = flagutil.NewArray("remoteWrite.multitenantURL", "Base path for remote storage URL to write data to. It must support VictoriaMetrics remote_write tenants API (identified by accountID or accountID:projectID). "+
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428 . "+
"Pass multiple -remoteWrite.multitenantURL flags in order to write data concurrently to multiple remote storage systems")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
"See also -remoteWrite.maxDiskUsagePerURL")
queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+
@ -53,7 +57,10 @@ var (
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries")
)
var rwctxs []*remoteWriteCtx
var defaultWriteToken = "default"
var rwctxsMap = map[string][]*remoteWriteCtx{}
var rwctxLock = sync.Mutex{}
// Contains the current relabelConfigs.
var allRelabelConfigs atomic.Value
@ -75,10 +82,27 @@ func InitSecretFlags() {
// It must be called after flag.Parse().
//
// Stop must be called for graceful shutdown.
func Init() {
if len(*remoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
func Init(p *httpserver.Path) {
rwctxLock.Lock()
defer rwctxLock.Unlock()
if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` or `-remoteWrite.multitenantURL` command-line flag must be set")
}
// Do not Init MultitenantURLs they are dynamically initialized
if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) > 0 && p == nil {
return
}
// Create one writecontext per tenant
writeContextIndex := defaultWriteToken
if p != nil {
writeContextIndex = p.AuthToken
}
if _, ok := rwctxsMap[writeContextIndex]; ok {
return
}
if *maxHourlySeries > 0 {
hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
_ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 {
@ -116,7 +140,7 @@ func Init() {
}
allRelabelConfigs.Store(rcs)
maxInmemoryBlocks := memory.Allowed() / len(*remoteWriteURLs) / maxRowsPerBlock / 100
maxInmemoryBlocks := memory.Allowed() / (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) / maxRowsPerBlock / 100
if maxInmemoryBlocks > 400 {
// There is no much sense in keeping higher number of blocks in memory,
// since this means that the producer outperforms consumer and the queue
@ -126,6 +150,10 @@ func Init() {
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
rwctxs := []*remoteWriteCtx{}
if len(*remoteWriteURLs) > 0 && p == nil {
for i, remoteWriteURL := range *remoteWriteURLs {
sanitizedURL := fmt.Sprintf("%d:secret-url", i+1)
if *showRemoteWriteURL {
@ -134,6 +162,21 @@ func Init() {
rwctx := newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL)
rwctxs = append(rwctxs, rwctx)
}
}
if len(*remoteWriteMultitenantURLs) > 0 && p != nil {
for i, remoteWriteMultitenantURL := range *remoteWriteMultitenantURLs {
sanitizedURL := fmt.Sprintf("%d:secret-url", i+1)
if *showRemoteWriteURL {
sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteMultitenantURL)
}
remoteWriteMultitenantURL := fmt.Sprintf("%s/%s/%s/%s", remoteWriteMultitenantURL, p.Prefix, p.AuthToken, p.Suffix)
rwctx := newRemoteWriteCtx(i, remoteWriteMultitenantURL, maxInmemoryBlocks, sanitizedURL)
rwctxs = append(rwctxs, rwctx)
}
}
rwctxsMap[writeContextIndex] = rwctxs
// Start config reloader.
configReloaderWG.Add(1)
@ -166,17 +209,32 @@ var configReloaderWG sync.WaitGroup
func Stop() {
close(stopCh)
configReloaderWG.Wait()
for _, rwctxs := range rwctxsMap {
for _, rwctx := range rwctxs {
rwctx.MustStop()
}
rwctxs = nil
}
rwctxsMap = nil
}
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
//
// Note that wr may be modified by Push due to relabeling and rounding.
func Push(wr *prompbmarshal.WriteRequest) {
func Push(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {
// if a queue is not created for this tenant, create it dynamically using the auth.Token
var rwctxs []*remoteWriteCtx
writeContextIndex := defaultWriteToken
// if no tenant speficied, p is nil
if p != nil {
writeContextIndex = p.AuthToken
}
if _, ok := rwctxsMap[writeContextIndex]; !ok {
Init(p)
}
rwctxs = rwctxsMap[writeContextIndex]
var rctx *relabelCtx
rcs := allRelabelConfigs.Load().(*relabelConfigs)
pcsGlobal := rcs.global

View file

@ -5,36 +5,40 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="vmimport"}`)
rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="vmimport"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="vmimport"}`)
)
// InsertHandler processes `/api/v1/import` request.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6
func InsertHandler(req *http.Request) error {
func InsertHandler(p *httpserver.Path, req *http.Request) error {
extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil {
return err
}
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
return insertRows(p, rows, extraLabels)
})
})
}
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@ -74,8 +78,14 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
remotewrite.Push(&ctx.WriteRequest)
remotewrite.Push(p, &ctx.WriteRequest)
rowsInserted.Add(rowsTotal)
if p != nil {
at, err := auth.NewToken(p.AuthToken)
if err == nil {
rowsTenantInserted.Get(at).Add(rowsTotal)
}
}
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View file

@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -49,7 +50,7 @@ func CheckConfig() error {
// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
func Init(pushData func(wr *prompbmarshal.WriteRequest)) {
func Init(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) {
globalStopCh = make(chan struct{})
scraperWG.Add(1)
go func() {
@ -72,7 +73,7 @@ var (
PendingScrapeConfigs int32
)
func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
func runScraper(configFile string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
if configFile == "" {
// Nothing to scrape.
return
@ -160,13 +161,13 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`)
type scrapeConfigs struct {
pushData func(wr *prompbmarshal.WriteRequest)
pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
wg sync.WaitGroup
stopCh chan struct{}
scfgs []*scrapeConfig
}
func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConfigs {
func newScrapeConfigs(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scrapeConfigs {
return &scrapeConfigs{
pushData: pushData,
stopCh: make(chan struct{}),
@ -207,7 +208,7 @@ func (scs *scrapeConfigs) stop() {
type scrapeConfig struct {
name string
pushData func(wr *prompbmarshal.WriteRequest)
pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
checkInterval time.Duration
cfgCh chan *Config
@ -256,7 +257,7 @@ type scraperGroup struct {
wg sync.WaitGroup
mLock sync.Mutex
m map[string]*scraper
pushData func(wr *prompbmarshal.WriteRequest)
pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
changesCount *metrics.Counter
activeScrapers *metrics.Counter
@ -264,7 +265,7 @@ type scraperGroup struct {
scrapersStopped *metrics.Counter
}
func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup {
func newScraperGroup(name string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraperGroup {
sg := &scraperGroup{
name: name,
m: make(map[string]*scraper),
@ -358,7 +359,7 @@ type scraper struct {
stopCh chan struct{}
}
func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper {
func newScraper(sw *ScrapeWork, group string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraper {
sc := &scraper{
stopCh: make(chan struct{}),
}

View file

@ -10,6 +10,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
@ -163,7 +164,7 @@ type scrapeWork struct {
GetStreamReader func() (*streamReader, error)
// PushData is called for pushing collected data.
PushData func(wr *prompbmarshal.WriteRequest)
PushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)
// ScrapeGroup is name of ScrapeGroup that
// scrapeWork belongs to
@ -316,7 +317,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
sw.PushData(nil, &wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
sw.prevLabelsLen = len(wc.labels)
wc.reset()
@ -358,7 +359,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
}
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
sw.PushData(nil, &wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
return nil
@ -386,7 +387,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
sw.PushData(nil, &wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
sw.prevLabelsLen = len(wc.labels)
wc.reset()

View file

@ -5,6 +5,7 @@ import (
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
@ -58,7 +59,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
pushDataCalls := 0
var pushDataErr error
sw.PushData = func(wr *prompbmarshal.WriteRequest) {
sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {
if err := expectEqualTimeseries(wr.Timeseries, timeseriesExpected); err != nil {
pushDataErr = fmt.Errorf("unexpected data pushed: %w\ngot\n%#v\nwant\n%#v", err, wr.Timeseries, timeseriesExpected)
}
@ -98,7 +99,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
pushDataCalls := 0
var pushDataErr error
sw.PushData = func(wr *prompbmarshal.WriteRequest) {
sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {
pushDataCalls++
if len(wr.Timeseries) > len(timeseriesExpected) {
pushDataErr = fmt.Errorf("too many time series obtained; got %d; want %d\ngot\n%+v\nwant\n%+v",

View file

@ -4,6 +4,7 @@ import (
"fmt"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
@ -39,7 +40,7 @@ vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356
var sw scrapeWork
sw.Config = &ScrapeWork{}
sw.ReadData = readDataFunc
sw.PushData = func(wr *prompbmarshal.WriteRequest) {}
sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {}
timestamp := int64(0)
for pb.Next() {
if err := sw.scrapeInternal(timestamp, timestamp); err != nil {