From 505d359b397d3594a996253d49635ec83fd949d4 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 30 Sep 2022 16:28:35 +0200 Subject: [PATCH] app/vminsert: allows parsing tenant id from labels (#3009) * app/vminsert: allows parsing tenant id from labels it should help mitigate issues with vmagent's multiTenant mode, which works incorrectly at heavy load and it cannot handle more then 100 different tenants. This functional hidden with flag and do not change vminsert default behaviour https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970 * Update docs/Cluster-VictoriaMetrics.md Co-authored-by: Roman Khavronenko * wip * app/vminsert/netstorage: clean remaining labels in order to free up GC * docs/Cluster-VictoriaMetrics.md: typo fix * wip * wip Co-authored-by: Roman Khavronenko Co-authored-by: Aliaksandr Valialkin --- README.md | 35 ++++++++++++- app/vmagent/README.md | 4 ++ app/vminsert/csvimport/request_handler.go | 7 ++- app/vminsert/datadog/request_handler.go | 11 ++-- app/vminsert/graphite/request_handler.go | 13 +++-- app/vminsert/influx/request_handler.go | 26 ++++++---- app/vminsert/main.go | 9 ++-- app/vminsert/native/request_handler.go | 19 +++++-- app/vminsert/netstorage/insert_ctx.go | 51 ++++++++++++++++++- app/vminsert/opentsdb/request_handler.go | 13 +++-- app/vminsert/opentsdbhttp/request_handler.go | 8 +-- .../prometheusimport/request_handler.go | 7 ++- .../promremotewrite/request_handler.go | 11 ++-- app/vminsert/vmimport/request_handler.go | 11 ++-- docs/CHANGELOG.md | 1 + docs/Cluster-VictoriaMetrics.md | 32 +++++++++++- docs/vmagent.md | 4 ++ lib/auth/auth.go | 44 ++++++++++++---- lib/auth/auth_test.go | 2 + lib/httpserver/path.go | 6 ++- lib/tenantmetrics/counter_map.go | 7 +++ 21 files changed, 255 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 91644283b3..d0efdb2d12 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,9 @@ It increases cluster availability, and simplifies cluster maintenance as well as VictoriaMetrics cluster supports multiple isolated tenants (aka namespaces). Tenants are identified by `accountID` or `accountID:projectID`, which are put inside request urls. -See [these docs](#url-format) for details. Some facts about tenants in VictoriaMetrics: +See [these docs](#url-format) for details. + +Some facts about tenants in VictoriaMetrics: - Each `accountID` and `projectID` is identified by an arbitrary 32-bit integer in the range `[0 .. 2^32)`. If `projectID` is missing, then it is automatically assigned to `0`. It is expected that other information about tenants @@ -55,6 +57,33 @@ when different tenants have different amounts of data and different query load. - VictoriaMetrics doesn't support querying multiple tenants in a single request. +See also [multitenancy via labels](#multitenancy-via-labels). + + +## Multitenancy via labels + +`vminsert` can accept data from multiple [tenants](#multitenancy) via a special `multitenant` endpoints `http://vminsert:8480/insert/multitenant/`, +where `` can be replaced with any supported suffix for data ingestion from [this list](#url-format). +In this case the account id and project id are obtained from optional `vm_account_id` and `vm_project_id` labels of the incoming samples. +If `vm_account_id` or `vm_project_id` labels are missing or invalid, then the corresponding `accountID` or `projectID` is set to 0. +These labels are automatically removed from samples before forwarding them to `vmstorage`. +For example, if the following samples are written into `http://vminsert:8480/insert/multitenant/prometheus/api/v1/write`: + +``` +http_requests_total{path="/foo",vm_account_id="42"} 12 +http_requests_total{path="/bar",vm_account_id="7",vm_project_id="9"} 34 +``` + +Then the `http_requests_total{path="/foo"} 12` would be stored in the tenant `accountID=42, projectID=0`, +while the `http_requests_total{path="/bar"} 34` would be stored in the tenant `accountID=7, projectID=9`. + +The `vm_account_id` and `vm_project_id` labels are extracted after applying the [relabeling](https://docs.victoriametrics.com/relabeling.html) +set via `-relabelConfig` command-line flag, so these labels can be set at this stage. + +**Security considerations:** it is recommended restricting access to `multitenant` endpoints only to trusted sources, +since untrusted source may break per-tenant data by writing unwanted samples to aribtrary tenants. + + ## Binaries Compiled binaries for the cluster version are available in the `assets` section of the [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). @@ -214,7 +243,9 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html - URLs for data ingestion: `http://:8480/insert//`, where: - `` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`, - where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. + where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. See [multitenancy docs](#multitenancy) for more details. + The `` can be set to `multitenant` string, e.g. `http://:8480/insert/multitenant/`. Such urls accept data from multiple tenants + specified via `vm_account_id` and `vm_project_id` labels. See [multitenancy via labels](#multitenancy-via-labels) for more details. - `` may have the following values: - `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). - `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 51cc9e71d4..e7c25dde90 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -142,6 +142,10 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. +[VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) supports writing data to multiple tenants +specified via special labels - see [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). +This allows specifying tenant ids via [relabeling](#relabeling) and writing multitenant data to a single `-remoteWrite.url=http:///insert/multitenant/api/v1/write`. + [Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert//...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert//prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants. If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set) diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index cff2a033d7..9b9115b5d5 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -38,6 +38,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -59,12 +60,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(at) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(at).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/datadog/request_handler.go b/app/vminsert/datadog/request_handler.go index 8a9a955cd0..a7c9bf1a44 100644 --- a/app/vminsert/datadog/request_handler.go +++ b/app/vminsert/datadog/request_handler.go @@ -48,6 +48,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars ctx.Reset() rowsTotal := 0 + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range series { ss := &series[i] @@ -74,18 +75,20 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars continue } ctx.SortLabelsIfNeeded() - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) for _, pt := range ss.Points { timestamp := pt.Timestamp() value := pt.Value() - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { return err } } + perTenantRows[*atLocal] += len(ss.Points) } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ctx.FlushBufs() } diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 4305ded91a..9cbf79afea 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -35,7 +35,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. - atCopy := *at + var atCopy auth.Token + if at != nil { + atCopy = *at + } + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -63,13 +67,14 @@ func insertRows(at *auth.Token, rows []parser.Row) error { continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(&atCopy) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } - // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(&atCopy).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index ba5edac1e2..a3cd05194c 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -73,7 +73,11 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom ic := &ctx.Common ic.Reset() // This line is required for initializing ic internals. rowsTotal := 0 - atCopy := *at + var atCopy auth.Token + if at != nil { + atCopy = *at + } + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -115,8 +119,6 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom metricGroupPrefixLen := len(ctx.metricGroupBuf) if hasRelabeling { ctx.originLabels = append(ctx.originLabels[:0], ic.Labels...) - ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, nil) - metricNameBufLen := len(ic.MetricNameBuf) for j := range r.Fields { f := &r.Fields[j] if !skipFieldKey { @@ -130,19 +132,22 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom // Skip metric without labels. continue } - ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen] ic.SortLabelsIfNeeded() + atLocal := ic.GetLocalAuthToken(&atCopy) + ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, nil) for i := range ic.Labels { ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[i]) } - storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels) - if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { + storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels) + if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { return err } + perTenantRows[*atLocal]++ } } else { ic.SortLabelsIfNeeded() - ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels) + atLocal := ic.GetLocalAuthToken(&atCopy) + ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ic.Labels) metricNameBufLen := len(ic.MetricNameBuf) labelsLen := len(ic.Labels) for j := range r.Fields { @@ -159,15 +164,16 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom } ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen] ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[len(ic.Labels)-1]) - storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels) - if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { + storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels) + if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { return err } + perTenantRows[*atLocal]++ } } } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(&atCopy).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 03f304b7f1..bb997f3eee 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -108,20 +108,17 @@ func main() { } if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error { - var at auth.Token // TODO: properly initialize auth token - return graphite.InsertHandler(&at, r) + return graphite.InsertHandler(nil, r) }) } if len(*influxListenAddr) > 0 { influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { - var at auth.Token // TODO: properly initialize auth token - return influx.InsertHandlerForReader(&at, r) + return influx.InsertHandlerForReader(nil, r) }) } if len(*opentsdbListenAddr) > 0 { opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error { - var at auth.Token // TODO: properly initialize auth token - return opentsdb.InsertHandler(&at, r) + return opentsdb.InsertHandler(nil, r) }, opentsdbhttp.InsertHandler) } if len(*opentsdbHTTPListenAddr) > 0 { diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index cf9007473b..323df55ddd 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -44,7 +44,9 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal // since relabeling can prevent from inserting the rows. rowsLen := len(block.Values) rowsInserted.Add(rowsLen) - rowsTenantInserted.Get(at).Add(rowsLen) + if at != nil { + rowsTenantInserted.Get(at).Add(rowsLen) + } rowsPerInsert.Update(float64(rowsLen)) ctx.Reset() // This line is required for initializing ctx internals. @@ -68,8 +70,9 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal return nil } ctx.SortLabelsIfNeeded() - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) values := block.Values timestamps := block.Timestamps if len(timestamps) != len(values) { @@ -77,9 +80,15 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal } for j, value := range values { timestamp := timestamps[j] - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { return err } } - return ctx.FlushBufs() + if err := ctx.FlushBufs(); err != nil { + return err + } + if at == nil { + rowsTenantInserted.Get(atLocal).Add(rowsLen) + } + return nil } diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index fc74a8601b..8fb36c15f4 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -3,6 +3,7 @@ package netstorage import ( "fmt" "net/http" + "strconv" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -25,6 +26,8 @@ type InsertCtx struct { labelsBuf []byte relabelCtx relabel.Ctx + + at auth.Token } type bufRows struct { @@ -68,6 +71,7 @@ func (ctx *InsertCtx) Reset() { } ctx.labelsBuf = ctx.labelsBuf[:0] ctx.relabelCtx.Reset() + ctx.at.Set(0, 0) } // AddLabelBytes adds (name, value) label to ctx.Labels. @@ -115,11 +119,11 @@ func (ctx *InsertCtx) ApplyRelabeling() { func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error { ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels) storageNodeIdx := ctx.GetStorageNodeIdx(at, labels) - return ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value) + return ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value) } // WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx. -func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { +func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { br := &ctx.bufRowss[storageNodeIdx] sn := storageNodes[storageNodeIdx] bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) @@ -181,3 +185,46 @@ func marshalBytesFast(dst []byte, s []byte) []byte { dst = append(dst, s...) return dst } + +// GetLocalAuthToken obtains auth.Token from context labels vm_account_id and vm_project_id if at is nil. +// +// At is returned as is if it isn't nil. +// +// The vm_account_id and vm_project_id labels are automatically removed from the ctx. +func (ctx *InsertCtx) GetLocalAuthToken(at *auth.Token) *auth.Token { + if at != nil { + return at + } + accountID := uint32(0) + projectID := uint32(0) + tmpLabels := ctx.Labels[:0] + for _, label := range ctx.Labels { + if string(label.Name) == "vm_account_id" { + accountID = parseUint32(label.Value) + continue + } + if string(label.Name) == "vm_project_id" { + projectID = parseUint32(label.Value) + continue + } + tmpLabels = append(tmpLabels, label) + } + cleanLabels := ctx.Labels[len(tmpLabels):] + for i := range cleanLabels { + label := &cleanLabels[i] + label.Name = nil + label.Value = nil + } + ctx.Labels = tmpLabels + ctx.at.Set(accountID, projectID) + return &ctx.at +} + +func parseUint32(b []byte) uint32 { + s := bytesutil.ToUnsafeString(b) + n, err := strconv.ParseUint(s, 10, 32) + if err != nil { + return 0 + } + return uint32(n) +} diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 9d404b5101..76ff9e4ef6 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -35,7 +35,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. - atCopy := *at + var atCopy auth.Token + if at != nil { + atCopy = *at + } + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -63,13 +67,14 @@ func insertRows(at *auth.Token, rows []parser.Row) error { continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(&atCopy) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } - // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(&atCopy).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 824bae21ba..e9c505e436 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -57,8 +57,8 @@ func InsertHandler(req *http.Request) error { func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := netstorage.GetInsertCtx() defer netstorage.PutInsertCtx(ctx) - ctx.Reset() // This line is required for initializing ctx internals. + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -80,12 +80,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(at) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(at).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index 5fefff1a8f..ad59f2a5fe 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -43,6 +43,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -64,12 +65,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(at) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(at).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 2c8ff00d96..fd7a6cd04b 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -41,6 +41,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr ctx.Reset() // This line is required for initializing ctx internals. rowsTotal := 0 + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range timeseries { ts := ×eries[i] @@ -62,21 +63,23 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr continue } ctx.SortLabelsIfNeeded() - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) ctx.MetricNameBuf = ctx.MetricNameBuf[:0] samples := ts.Samples for i := range samples { r := &samples[i] if len(ctx.MetricNameBuf) == 0 { - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) } - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil { return err } } + perTenantRows[*atLocal] += len(ts.Samples) } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ctx.FlushBufs() } diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 9706b5e9ed..a3fd4ff960 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -44,6 +44,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.Reset() // This line is required for initializing ctx internals. rowsTotal := 0 + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -65,8 +66,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) values := r.Values timestamps := r.Timestamps if len(timestamps) != len(values) { @@ -74,13 +76,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L } for j, value := range values { timestamp := timestamps[j] - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { return err } } + perTenantRows[*atLocal] += len(r.Values) } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ctx.FlushBufs() } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f9080cee11..5a7d17c170 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f **Update note 2:** [vmalert](https://docs.victoriametrics.com/vmalert.html) changes default value for command-line flag `-datasource.queryStep` from `0s` to `5m`. The change supposed to improve reliability of the rules evaluation when evaluation interval is lower than scraping interval. +* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970). * FEATURE: improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x if non-trivial `regex` values are used. * FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105). * FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113). diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index b73d0be776..ffe7aeabbd 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -43,7 +43,9 @@ It increases cluster availability, and simplifies cluster maintenance as well as VictoriaMetrics cluster supports multiple isolated tenants (aka namespaces). Tenants are identified by `accountID` or `accountID:projectID`, which are put inside request urls. -See [these docs](#url-format) for details. Some facts about tenants in VictoriaMetrics: +See [these docs](#url-format) for details. + +Some facts about tenants in VictoriaMetrics: - Each `accountID` and `projectID` is identified by an arbitrary 32-bit integer in the range `[0 .. 2^32)`. If `projectID` is missing, then it is automatically assigned to `0`. It is expected that other information about tenants @@ -59,6 +61,30 @@ when different tenants have different amounts of data and different query load. - VictoriaMetrics doesn't support querying multiple tenants in a single request. +See also [multitenancy via labels](#multitenancy-via-labels). + + +## Multitenancy via labels + +`vminsert` can accept data from multiple [tenants](#multitenancy) via a special `multitenant` endpoints `http://vminsert:8480/insert/multitenant/`, +where `` can be replaced with any supported suffix for data ingestion from [this list](#url-format). +In this case the account id and project id are obtained from optional `vm_account_id` and `vm_project_id` labels of the incoming samples. +If `vm_account_id` or `vm_project_id` labels are missing or invalid, then the corresponding `accountID` or `projectID` is set to 0. +These labels are automatically removed from samples before forwarding them to `vmstorage`. +For example, if the following samples are written into `http://vminsert:8480/insert/multitenant/prometheus/api/v1/write`: + +``` +http_requests_total{path="/foo",vm_account_id="42"} 12 +http_requests_total{path="/bar",vm_account_id="7",vm_project_id="9"} 34 +``` + +Then the `http_requests_total{path="/foo"} 12` would be stored in the tenant `accountID=42, projectID=0`, +while the `http_requests_total{path="/bar"} 34` would be stored in the tenant `accountID=7, projectID=9`. + +The `vm_account_id` and `vm_project_id` labels are extracted after applying the [relabeling](https://docs.victoriametrics.com/relabeling.html) +set via `-relabelConfig` command-line flag, so these labels can be set at this stage. + + ## Binaries Compiled binaries for the cluster version are available in the `assets` section of the [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). @@ -218,7 +244,9 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html - URLs for data ingestion: `http://:8480/insert//`, where: - `` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`, - where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. + where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. See [multitenancy docs](#multitenancy) for more details. + The `` can be set to `multitenant` string, e.g. `http://:8480/insert/multitenant/`. Such urls accept data from multiple tenants + specified via `vm_account_id` and `vm_project_id` labels. See [multitenancy via labels](#multitenancy-via-labels) for more details. - `` may have the following values: - `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). - `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. diff --git a/docs/vmagent.md b/docs/vmagent.md index 7ef86bad42..315d704186 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -146,6 +146,10 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. +[VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) supports writing data to multiple tenants +specified via special labels - see [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). +This allows specifying tenant ids via [relabeling](#relabeling) and writing multitenant data to a single `-remoteWrite.url=http:///insert/multitenant/api/v1/write`. + [Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert//...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert//prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants. If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 6d27943e3b..38e2354482 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -14,30 +14,54 @@ type Token struct { // String returns string representation of t. func (t *Token) String() string { + if t == nil { + return "multitenant" + } if t.ProjectID == 0 { return fmt.Sprintf("%d", t.AccountID) } return fmt.Sprintf("%d:%d", t.AccountID, t.ProjectID) } -// NewToken returns new Token for the given authToken +// NewToken returns new Token for the given authToken. +// +// If authToken == "multitenant", then nil Token is returned. func NewToken(authToken string) (*Token, error) { + if authToken == "multitenant" { + return nil, nil + } + var t Token + if err := t.Init(authToken); err != nil { + return nil, err + } + return &t, nil +} + +// Init initializes t from authToken. +func (t *Token) Init(authToken string) error { tmp := strings.Split(authToken, ":") if len(tmp) > 2 { - return nil, fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp)) + return fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp)) } - var at Token - accountID, err := strconv.ParseUint(tmp[0], 10, 32) + n, err := strconv.ParseUint(tmp[0], 10, 32) if err != nil { - return nil, fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err) + return fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err) } - at.AccountID = uint32(accountID) + accountID := uint32(n) + projectID := uint32(0) if len(tmp) > 1 { - projectID, err := strconv.ParseUint(tmp[1], 10, 32) + n, err := strconv.ParseUint(tmp[1], 10, 32) if err != nil { - return nil, fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err) + return fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err) } - at.ProjectID = uint32(projectID) + projectID = uint32(n) } - return &at, nil + t.Set(accountID, projectID) + return nil +} + +// Set sets accountID and projectID for the t. +func (t *Token) Set(accountID, projectID uint32) { + t.AccountID = accountID + t.ProjectID = projectID } diff --git a/lib/auth/auth_test.go b/lib/auth/auth_test.go index aa2b4b4f92..eddf1c6fe0 100644 --- a/lib/auth/auth_test.go +++ b/lib/auth/auth_test.go @@ -26,6 +26,8 @@ func TestNewTokenSuccess(t *testing.T) { f("1:4294967295", "1:4294967295") // max uint32 accountID and projectID f("4294967295:4294967295", "4294967295:4294967295") + // multitenant + f("multitenant", "multitenant") } func TestNewTokenFailure(t *testing.T) { diff --git a/lib/httpserver/path.go b/lib/httpserver/path.go index 833c0a6b41..3743341e6e 100644 --- a/lib/httpserver/path.go +++ b/lib/httpserver/path.go @@ -22,6 +22,8 @@ func ParsePath(path string) (*Path, error) { // // - prefix must contain `select`, `insert` or `delete`. // - authToken contains `accountID[:projectID]`, where projectID is optional. + // authToken may also contain `multitenant` string. In this case the accountID and projectID + // are obtained from vm_account_id and vm_project_id labels of the ingested samples. // - suffix contains arbitrary suffix. // // prefix must be used for the routing to the appropriate service @@ -29,14 +31,14 @@ func ParsePath(path string) (*Path, error) { s := skipPrefixSlashes(path) n := strings.IndexByte(s, '/') if n < 0 { - return nil, fmt.Errorf("cannot find {prefix}") + return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{authToken}/{suffix} format", path) } prefix := s[:n] s = skipPrefixSlashes(s[n+1:]) n = strings.IndexByte(s, '/') if n < 0 { - return nil, fmt.Errorf("cannot find {authToken}") + return nil, fmt.Errorf("cannot find {authToken} in %q; expecting /{prefix}/{authToken}/{suffix} format", path) } authToken := s[:n] diff --git a/lib/tenantmetrics/counter_map.go b/lib/tenantmetrics/counter_map.go index 710986b80f..3ab106679c 100644 --- a/lib/tenantmetrics/counter_map.go +++ b/lib/tenantmetrics/counter_map.go @@ -39,6 +39,13 @@ func (cm *CounterMap) Get(at *auth.Token) *metrics.Counter { return cm.GetByTenant(key) } +// MultiAdd adds multiple values grouped by auth.Token +func (cm *CounterMap) MultiAdd(perTenantValues map[auth.Token]int) { + for token, value := range perTenantValues { + cm.Get(&token).Add(value) + } +} + // GetByTenant returns counter for the given key. func (cm *CounterMap) GetByTenant(key TenantID) *metrics.Counter { m := cm.m.Load().(map[TenantID]*metrics.Counter)