diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index 3c5a70819..ac330972b 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -5,32 +5,36 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`) + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="csvimport"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="csvimport"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="csvimport"}`) ) // InsertHandler processes csv data from req. -func InsertHandler(req *http.Request) error { +func InsertHandler(p *httpserver.Path, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(rows, extraLabels) + return insertRows(p, rows, extraLabels) }) }) } -func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -64,8 +68,14 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(p, &ctx.WriteRequest) rowsInserted.Add(len(rows)) + if p != nil { + at, err := auth.NewToken(p.AuthToken) + if err == nil { + rowsTenantInserted.Get(at).Add(len(rows)) + } + } rowsPerInsert.Update(float64(len(rows))) return nil } diff --git a/app/vmagent/graphite/request_handler.go b/app/vmagent/graphite/request_handler.go index fd0d96cd6..c3ef22d8d 100644 --- a/app/vmagent/graphite/request_handler.go +++ b/app/vmagent/graphite/request_handler.go @@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(nil, &ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/influx/request_handler.go b/app/vmagent/influx/request_handler.go index 9da1be933..a443af701 100644 --- a/app/vmagent/influx/request_handler.go +++ b/app/vmagent/influx/request_handler.go @@ -8,12 +8,15 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -25,8 +28,9 @@ var ( ) var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="influx"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`) + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="influx"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="influx"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="influx"}`) ) // InsertHandlerForReader processes remote write for influx line protocol. @@ -35,7 +39,7 @@ var ( func InsertHandlerForReader(r io.Reader) error { return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { - return insertRows(db, rows, nil) + return insertRows(nil, db, rows, nil) }) }) } @@ -43,7 +47,7 @@ func InsertHandlerForReader(r io.Reader) error { // InsertHandlerForHTTP processes remote write for influx line protocol. // // See https://github.com/influxdata/influxdb/blob/4cbdc197b8117fee648d62e2e5be75c6575352f0/tsdb/README.md -func InsertHandlerForHTTP(req *http.Request) error { +func InsertHandlerForHTTP(p *httpserver.Path, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err @@ -55,12 +59,12 @@ func InsertHandlerForHTTP(req *http.Request) error { // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { - return insertRows(db, rows, extraLabels) + return insertRows(p, db, rows, extraLabels) }) }) } -func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(p *httpserver.Path, db string, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := getPushCtx() defer putPushCtx(ctx) @@ -130,8 +134,14 @@ func insertRows(db string, rows []parser.Row, extraLabels []prompbmarshal.Label) ctx.ctx.Labels = labels ctx.ctx.Samples = samples ctx.commonLabels = commonLabels - remotewrite.Push(&ctx.ctx.WriteRequest) + remotewrite.Push(p, &ctx.ctx.WriteRequest) rowsInserted.Add(rowsTotal) + if p != nil { + at, err := auth.NewToken(p.AuthToken) + if err == nil { + rowsTenantInserted.Get(at).Add(rowsTotal) + } + } rowsPerInsert.Update(float64(rowsTotal)) return nil diff --git a/app/vmagent/main.go b/app/vmagent/main.go index e1e28d5c1..2a3c7002c 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -19,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -88,7 +89,7 @@ func main() { logger.Infof("starting vmagent at %q...", *httpListenAddr) startTime := time.Now() - remotewrite.Init() + remotewrite.Init(nil) common.StartUnmarshalWorkers() writeconcurrencylimiter.Init() if len(*influxListenAddr) > 0 { @@ -159,11 +160,82 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { }) return true } + + p, err := httpserver.ParsePath(r.URL.Path) + if err == nil && p.Prefix == "insert" { + + _, err := auth.NewToken(p.AuthToken) + if err != nil { + httpserver.Errorf(w, r, "auth error: %s", err) + return true + } + + switch p.Suffix { + case "prometheus/", "prometheus", "prometheus/api/v1/write": + prometheusWriteRequests.Inc() + if err := promremotewrite.InsertHandler(p, r); err != nil { + prometheusWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import": + vmimportRequests.Inc() + if err := vmimport.InsertHandler(p, r); err != nil { + vmimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import/csv": + csvimportRequests.Inc() + if err := csvimport.InsertHandler(p, r); err != nil { + csvimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import/prometheus": + prometheusimportRequests.Inc() + if err := prometheusimport.InsertHandler(p, r); err != nil { + prometheusimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "prometheus/api/v1/import/native": + nativeimportRequests.Inc() + if err := native.InsertHandler(p, r); err != nil { + nativeimportErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + case "influx/write", "influx/api/v2/write": + influxWriteRequests.Inc() + if err := influx.InsertHandlerForHTTP(p, r); err != nil { + influxWriteErrors.Inc() + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true + default: + // This link is not multitenant + } + + } + path := strings.Replace(r.URL.Path, "//", "/", -1) switch path { case "/api/v1/write": prometheusWriteRequests.Inc() - if err := promremotewrite.InsertHandler(r); err != nil { + if err := promremotewrite.InsertHandler(nil, r); err != nil { prometheusWriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true @@ -172,7 +244,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/api/v1/import": vmimportRequests.Inc() - if err := vmimport.InsertHandler(r); err != nil { + if err := vmimport.InsertHandler(nil, r); err != nil { vmimportErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true @@ -181,7 +253,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/api/v1/import/csv": csvimportRequests.Inc() - if err := csvimport.InsertHandler(r); err != nil { + if err := csvimport.InsertHandler(nil, r); err != nil { csvimportErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true @@ -190,7 +262,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/api/v1/import/prometheus": prometheusimportRequests.Inc() - if err := prometheusimport.InsertHandler(r); err != nil { + if err := prometheusimport.InsertHandler(nil, r); err != nil { prometheusimportErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true @@ -199,7 +271,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/api/v1/import/native": nativeimportRequests.Inc() - if err := native.InsertHandler(r); err != nil { + if err := native.InsertHandler(nil, r); err != nil { nativeimportErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true @@ -208,7 +280,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true case "/write", "/api/v2/write": influxWriteRequests.Inc() - if err := influx.InsertHandlerForHTTP(r); err != nil { + if err := influx.InsertHandlerForHTTP(nil, r); err != nil { influxWriteErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true diff --git a/app/vmagent/native/request_handler.go b/app/vmagent/native/request_handler.go index e93f5bb2b..b65af88a9 100644 --- a/app/vmagent/native/request_handler.go +++ b/app/vmagent/native/request_handler.go @@ -5,36 +5,40 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`) + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="native"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="native"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="native"}`) ) // InsertHandler processes `/api/v1/import` request. // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 -func InsertHandler(req *http.Request) error { +func InsertHandler(p *httpserver.Path, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(block *parser.Block) error { - return insertRows(block, extraLabels) + return insertRows(p, block, extraLabels) }) }) } -func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { +func insertRows(p *httpserver.Path, block *parser.Block, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -42,6 +46,12 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { // since relabeling can prevent from inserting the rows. rowsLen := len(block.Values) rowsInserted.Add(rowsLen) + if p != nil { + at, err := auth.NewToken(p.AuthToken) + if err == nil { + rowsTenantInserted.Get(at).Add(rowsLen) + } + } rowsPerInsert.Update(float64(rowsLen)) tssDst := ctx.WriteRequest.Timeseries[:0] @@ -80,6 +90,6 @@ func insertRows(block *parser.Block, extraLabels []prompbmarshal.Label) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(p, &ctx.WriteRequest) return nil } diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index 628676de9..2721912d6 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -58,7 +58,7 @@ func insertRows(rows []parser.Row) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(nil, &ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/opentsdbhttp/request_handler.go b/app/vmagent/opentsdbhttp/request_handler.go index b3026ab86..7d2d409eb 100644 --- a/app/vmagent/opentsdbhttp/request_handler.go +++ b/app/vmagent/opentsdbhttp/request_handler.go @@ -65,7 +65,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(nil, &ctx.WriteRequest) rowsInserted.Add(len(rows)) rowsPerInsert.Update(float64(len(rows))) return nil diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index 45daa860b..c6cdf80a0 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -5,20 +5,24 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`) + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="prometheus"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="prometheus"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="prometheus"}`) ) // InsertHandler processes `/api/v1/import/prometheus` request. -func InsertHandler(req *http.Request) error { +func InsertHandler(p *httpserver.Path, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err @@ -30,12 +34,12 @@ func InsertHandler(req *http.Request) error { return writeconcurrencylimiter.Do(func() error { isGzipped := req.Header.Get("Content-Encoding") == "gzip" return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { - return insertRows(rows, extraLabels) + return insertRows(p, rows, extraLabels) }, nil) }) } -func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -69,8 +73,14 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(p, &ctx.WriteRequest) rowsInserted.Add(len(rows)) + if p != nil { + at, err := auth.NewToken(p.AuthToken) + if err == nil { + rowsTenantInserted.Get(at).Add(len(rows)) + } + } rowsPerInsert.Update(float64(len(rows))) return nil } diff --git a/app/vmagent/promremotewrite/request_handler.go b/app/vmagent/promremotewrite/request_handler.go index 0039e4ef4..64b3a91a7 100644 --- a/app/vmagent/promremotewrite/request_handler.go +++ b/app/vmagent/promremotewrite/request_handler.go @@ -5,34 +5,38 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/promremotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) var ( rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="promremotewrite"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="promremotewrite"}`) rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="promremotewrite"}`) ) // InsertHandler processes remote write for prometheus. -func InsertHandler(req *http.Request) error { +func InsertHandler(p *httpserver.Path, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(tss []prompb.TimeSeries) error { - return insertRows(tss, extraLabels) + return insertRows(p, tss, extraLabels) }) }) } -func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { +func insertRows(p *httpserver.Path, timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -68,8 +72,14 @@ func insertRows(timeseries []prompb.TimeSeries, extraLabels []prompbmarshal.Labe ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(p, &ctx.WriteRequest) rowsInserted.Add(rowsTotal) + if p != nil { + at, err := auth.NewToken(p.AuthToken) + if err == nil { + rowsTenantInserted.Get(at).Add(rowsTotal) + } + } rowsPerInsert.Update(float64(rowsTotal)) return nil } diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index f2e87847b..5dcfc1805 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -42,11 +42,11 @@ func loadRelabelConfigs() (*relabelConfigs, error) { } rcs.global = global } - if len(*relabelConfigPaths) > len(*remoteWriteURLs) { + if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) { return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", - len(*relabelConfigPaths), len(*remoteWriteURLs)) + len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs))) } - rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs)) + rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs))) for i, path := range *relabelConfigPaths { if len(path) == 0 { // Skip empty relabel config. @@ -58,6 +58,7 @@ func loadRelabelConfigs() (*relabelConfigs, error) { } rcs.perURL[i] = prc } + return &rcs, nil } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 29a5ef1ff..70a294cc8 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" @@ -26,6 +27,9 @@ var ( remoteWriteURLs = flagutil.NewArray("remoteWrite.url", "Remote storage URL to write data to. It must support Prometheus remote_write API. "+ "It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428/api/v1/write . "+ "Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems") + remoteWriteMultitenantURLs = flagutil.NewArray("remoteWrite.multitenantURL", "Base path for remote storage URL to write data to. It must support VictoriaMetrics remote_write tenants API (identified by accountID or accountID:projectID). "+ + "It is recommended using VictoriaMetrics as remote storage. Example url: http://:8428 . "+ + "Pass multiple -remoteWrite.multitenantURL flags in order to write data concurrently to multiple remote storage systems") tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+ "See also -remoteWrite.maxDiskUsagePerURL") queues = flag.Int("remoteWrite.queues", cgroup.AvailableCPUs()*2, "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues "+ @@ -53,7 +57,10 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -remoteWrite.maxHourlySeries") ) -var rwctxs []*remoteWriteCtx +var defaultWriteToken = "default" +var rwctxsMap = map[string][]*remoteWriteCtx{} + +var rwctxLock = sync.Mutex{} // Contains the current relabelConfigs. var allRelabelConfigs atomic.Value @@ -75,10 +82,27 @@ func InitSecretFlags() { // It must be called after flag.Parse(). // // Stop must be called for graceful shutdown. -func Init() { - if len(*remoteWriteURLs) == 0 { - logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set") +func Init(p *httpserver.Path) { + rwctxLock.Lock() + defer rwctxLock.Unlock() + if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) == 0 { + logger.Fatalf("at least one `-remoteWrite.url` or `-remoteWrite.multitenantURL` command-line flag must be set") } + // Do not Init MultitenantURLs they are dynamically initialized + if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) > 0 && p == nil { + return + } + + // Create one writecontext per tenant + writeContextIndex := defaultWriteToken + + if p != nil { + writeContextIndex = p.AuthToken + } + if _, ok := rwctxsMap[writeContextIndex]; ok { + return + } + if *maxHourlySeries > 0 { hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour) _ = metrics.NewGauge(`vmagent_hourly_series_limit_max_series`, func() float64 { @@ -116,7 +140,7 @@ func Init() { } allRelabelConfigs.Store(rcs) - maxInmemoryBlocks := memory.Allowed() / len(*remoteWriteURLs) / maxRowsPerBlock / 100 + maxInmemoryBlocks := memory.Allowed() / (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) / maxRowsPerBlock / 100 if maxInmemoryBlocks > 400 { // There is no much sense in keeping higher number of blocks in memory, // since this means that the producer outperforms consumer and the queue @@ -126,15 +150,34 @@ func Init() { if maxInmemoryBlocks < 2 { maxInmemoryBlocks = 2 } - for i, remoteWriteURL := range *remoteWriteURLs { - sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) - if *showRemoteWriteURL { - sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL) + + rwctxs := []*remoteWriteCtx{} + + if len(*remoteWriteURLs) > 0 && p == nil { + for i, remoteWriteURL := range *remoteWriteURLs { + sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) + if *showRemoteWriteURL { + sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteURL) + } + rwctx := newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) + rwctxs = append(rwctxs, rwctx) } - rwctx := newRemoteWriteCtx(i, remoteWriteURL, maxInmemoryBlocks, sanitizedURL) - rwctxs = append(rwctxs, rwctx) } + if len(*remoteWriteMultitenantURLs) > 0 && p != nil { + for i, remoteWriteMultitenantURL := range *remoteWriteMultitenantURLs { + sanitizedURL := fmt.Sprintf("%d:secret-url", i+1) + if *showRemoteWriteURL { + sanitizedURL = fmt.Sprintf("%d:%s", i+1, remoteWriteMultitenantURL) + } + remoteWriteMultitenantURL := fmt.Sprintf("%s/%s/%s/%s", remoteWriteMultitenantURL, p.Prefix, p.AuthToken, p.Suffix) + rwctx := newRemoteWriteCtx(i, remoteWriteMultitenantURL, maxInmemoryBlocks, sanitizedURL) + rwctxs = append(rwctxs, rwctx) + } + } + + rwctxsMap[writeContextIndex] = rwctxs + // Start config reloader. configReloaderWG.Add(1) go func() { @@ -166,17 +209,32 @@ var configReloaderWG sync.WaitGroup func Stop() { close(stopCh) configReloaderWG.Wait() - - for _, rwctx := range rwctxs { - rwctx.MustStop() + for _, rwctxs := range rwctxsMap { + for _, rwctx := range rwctxs { + rwctx.MustStop() + } + rwctxs = nil } - rwctxs = nil + rwctxsMap = nil } // Push sends wr to remote storage systems set via `-remoteWrite.url`. // // Note that wr may be modified by Push due to relabeling and rounding. -func Push(wr *prompbmarshal.WriteRequest) { +func Push(p *httpserver.Path, wr *prompbmarshal.WriteRequest) { + // if a queue is not created for this tenant, create it dynamically using the auth.Token + var rwctxs []*remoteWriteCtx + writeContextIndex := defaultWriteToken + + // if no tenant speficied, p is nil + if p != nil { + writeContextIndex = p.AuthToken + } + if _, ok := rwctxsMap[writeContextIndex]; !ok { + Init(p) + } + rwctxs = rwctxsMap[writeContextIndex] + var rctx *relabelCtx rcs := allRelabelConfigs.Load().(*relabelConfigs) pcsGlobal := rcs.global diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index 5afb8a37c..b2fb10023 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -5,36 +5,40 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="vmimport"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="vmimport"}`) + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="vmimport"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="vmimport"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="vmimport"}`) ) // InsertHandler processes `/api/v1/import` request. // // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 -func InsertHandler(req *http.Request) error { +func InsertHandler(p *httpserver.Path, req *http.Request) error { extraLabels, err := parserCommon.GetExtraLabels(req) if err != nil { return err } return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(req, func(rows []parser.Row) error { - return insertRows(rows, extraLabels) + return insertRows(p, rows, extraLabels) }) }) } -func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { +func insertRows(p *httpserver.Path, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := common.GetPushCtx() defer common.PutPushCtx(ctx) @@ -74,8 +78,14 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx.WriteRequest.Timeseries = tssDst ctx.Labels = labels ctx.Samples = samples - remotewrite.Push(&ctx.WriteRequest) + remotewrite.Push(p, &ctx.WriteRequest) rowsInserted.Add(rowsTotal) + if p != nil { + at, err := auth.NewToken(p.AuthToken) + if err == nil { + rowsTenantInserted.Get(at).Add(rowsTotal) + } + } rowsPerInsert.Update(float64(rowsTotal)) return nil } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 8509636d6..6b661566d 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -49,7 +50,7 @@ func CheckConfig() error { // Init initializes Prometheus scraper with config from the `-promscrape.config`. // // Scraped data is passed to pushData. -func Init(pushData func(wr *prompbmarshal.WriteRequest)) { +func Init(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) { globalStopCh = make(chan struct{}) scraperWG.Add(1) go func() { @@ -72,7 +73,7 @@ var ( PendingScrapeConfigs int32 ) -func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { +func runScraper(configFile string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { if configFile == "" { // Nothing to scrape. return @@ -160,13 +161,13 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest) var configReloads = metrics.NewCounter(`vm_promscrape_config_reloads_total`) type scrapeConfigs struct { - pushData func(wr *prompbmarshal.WriteRequest) + pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) wg sync.WaitGroup stopCh chan struct{} scfgs []*scrapeConfig } -func newScrapeConfigs(pushData func(wr *prompbmarshal.WriteRequest)) *scrapeConfigs { +func newScrapeConfigs(pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scrapeConfigs { return &scrapeConfigs{ pushData: pushData, stopCh: make(chan struct{}), @@ -207,7 +208,7 @@ func (scs *scrapeConfigs) stop() { type scrapeConfig struct { name string - pushData func(wr *prompbmarshal.WriteRequest) + pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork checkInterval time.Duration cfgCh chan *Config @@ -256,7 +257,7 @@ type scraperGroup struct { wg sync.WaitGroup mLock sync.Mutex m map[string]*scraper - pushData func(wr *prompbmarshal.WriteRequest) + pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) changesCount *metrics.Counter activeScrapers *metrics.Counter @@ -264,7 +265,7 @@ type scraperGroup struct { scrapersStopped *metrics.Counter } -func newScraperGroup(name string, pushData func(wr *prompbmarshal.WriteRequest)) *scraperGroup { +func newScraperGroup(name string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraperGroup { sg := &scraperGroup{ name: name, m: make(map[string]*scraper), @@ -358,7 +359,7 @@ type scraper struct { stopCh chan struct{} } -func newScraper(sw *ScrapeWork, group string, pushData func(wr *prompbmarshal.WriteRequest)) *scraper { +func newScraper(sw *ScrapeWork, group string, pushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest)) *scraper { sc := &scraper{ stopCh: make(chan struct{}), } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 281a0e57e..90cd3b9d3 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" @@ -163,7 +164,7 @@ type scrapeWork struct { GetStreamReader func() (*streamReader, error) // PushData is called for pushing collected data. - PushData func(wr *prompbmarshal.WriteRequest) + PushData func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) // ScrapeGroup is name of ScrapeGroup that // scrapeWork belongs to @@ -316,7 +317,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) startTime := time.Now() - sw.PushData(&wc.writeRequest) + sw.PushData(nil, &wc.writeRequest) pushDataDuration.UpdateDuration(startTime) sw.prevLabelsLen = len(wc.labels) wc.reset() @@ -358,7 +359,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } sw.updateSeriesAdded(wc) startTime := time.Now() - sw.PushData(&wc.writeRequest) + sw.PushData(nil, &wc.writeRequest) pushDataDuration.UpdateDuration(startTime) wc.resetNoRows() return nil @@ -386,7 +387,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) startTime := time.Now() - sw.PushData(&wc.writeRequest) + sw.PushData(nil, &wc.writeRequest) pushDataDuration.UpdateDuration(startTime) sw.prevLabelsLen = len(wc.labels) wc.reset() diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index 7d2eeced6..4083a4495 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" @@ -58,7 +59,7 @@ func TestScrapeWorkScrapeInternalFailure(t *testing.T) { pushDataCalls := 0 var pushDataErr error - sw.PushData = func(wr *prompbmarshal.WriteRequest) { + sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) { if err := expectEqualTimeseries(wr.Timeseries, timeseriesExpected); err != nil { pushDataErr = fmt.Errorf("unexpected data pushed: %w\ngot\n%#v\nwant\n%#v", err, wr.Timeseries, timeseriesExpected) } @@ -98,7 +99,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { pushDataCalls := 0 var pushDataErr error - sw.PushData = func(wr *prompbmarshal.WriteRequest) { + sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) { pushDataCalls++ if len(wr.Timeseries) > len(timeseriesExpected) { pushDataErr = fmt.Errorf("too many time series obtained; got %d; want %d\ngot\n%+v\nwant\n%+v", diff --git a/lib/promscrape/scrapework_timing_test.go b/lib/promscrape/scrapework_timing_test.go index 0720d698e..0aa9ec363 100644 --- a/lib/promscrape/scrapework_timing_test.go +++ b/lib/promscrape/scrapework_timing_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) @@ -39,7 +40,7 @@ vm_tcplistener_write_calls_total{name="https", addr=":443"} 132356 var sw scrapeWork sw.Config = &ScrapeWork{} sw.ReadData = readDataFunc - sw.PushData = func(wr *prompbmarshal.WriteRequest) {} + sw.PushData = func(p *httpserver.Path, wr *prompbmarshal.WriteRequest) {} timestamp := int64(0) for pb.Next() { if err := sw.scrapeInternal(timestamp, timestamp); err != nil {