diff --git a/README.md b/README.md index 91644283b3..d0efdb2d12 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,9 @@ It increases cluster availability, and simplifies cluster maintenance as well as VictoriaMetrics cluster supports multiple isolated tenants (aka namespaces). Tenants are identified by `accountID` or `accountID:projectID`, which are put inside request urls. -See [these docs](#url-format) for details. Some facts about tenants in VictoriaMetrics: +See [these docs](#url-format) for details. + +Some facts about tenants in VictoriaMetrics: - Each `accountID` and `projectID` is identified by an arbitrary 32-bit integer in the range `[0 .. 2^32)`. If `projectID` is missing, then it is automatically assigned to `0`. It is expected that other information about tenants @@ -55,6 +57,33 @@ when different tenants have different amounts of data and different query load. - VictoriaMetrics doesn't support querying multiple tenants in a single request. +See also [multitenancy via labels](#multitenancy-via-labels). + + +## Multitenancy via labels + +`vminsert` can accept data from multiple [tenants](#multitenancy) via a special `multitenant` endpoints `http://vminsert:8480/insert/multitenant/`, +where `` can be replaced with any supported suffix for data ingestion from [this list](#url-format). +In this case the account id and project id are obtained from optional `vm_account_id` and `vm_project_id` labels of the incoming samples. +If `vm_account_id` or `vm_project_id` labels are missing or invalid, then the corresponding `accountID` or `projectID` is set to 0. +These labels are automatically removed from samples before forwarding them to `vmstorage`. +For example, if the following samples are written into `http://vminsert:8480/insert/multitenant/prometheus/api/v1/write`: + +``` +http_requests_total{path="/foo",vm_account_id="42"} 12 +http_requests_total{path="/bar",vm_account_id="7",vm_project_id="9"} 34 +``` + +Then the `http_requests_total{path="/foo"} 12` would be stored in the tenant `accountID=42, projectID=0`, +while the `http_requests_total{path="/bar"} 34` would be stored in the tenant `accountID=7, projectID=9`. + +The `vm_account_id` and `vm_project_id` labels are extracted after applying the [relabeling](https://docs.victoriametrics.com/relabeling.html) +set via `-relabelConfig` command-line flag, so these labels can be set at this stage. + +**Security considerations:** it is recommended restricting access to `multitenant` endpoints only to trusted sources, +since untrusted source may break per-tenant data by writing unwanted samples to aribtrary tenants. + + ## Binaries Compiled binaries for the cluster version are available in the `assets` section of the [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). @@ -214,7 +243,9 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html - URLs for data ingestion: `http://:8480/insert//`, where: - `` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`, - where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. + where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. See [multitenancy docs](#multitenancy) for more details. + The `` can be set to `multitenant` string, e.g. `http://:8480/insert/multitenant/`. Such urls accept data from multiple tenants + specified via `vm_account_id` and `vm_project_id` labels. See [multitenancy via labels](#multitenancy-via-labels) for more details. - `` may have the following values: - `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). - `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 51cc9e71d4..e7c25dde90 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -142,6 +142,10 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. +[VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) supports writing data to multiple tenants +specified via special labels - see [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). +This allows specifying tenant ids via [relabeling](#relabeling) and writing multitenant data to a single `-remoteWrite.url=http:///insert/multitenant/api/v1/write`. + [Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert//...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert//prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants. If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set) diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index cff2a033d7..9b9115b5d5 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -38,6 +38,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -59,12 +60,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(at) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(at).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/datadog/request_handler.go b/app/vminsert/datadog/request_handler.go index 8a9a955cd0..a7c9bf1a44 100644 --- a/app/vminsert/datadog/request_handler.go +++ b/app/vminsert/datadog/request_handler.go @@ -48,6 +48,7 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars ctx.Reset() rowsTotal := 0 + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range series { ss := &series[i] @@ -74,18 +75,20 @@ func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmars continue } ctx.SortLabelsIfNeeded() - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) for _, pt := range ss.Points { timestamp := pt.Timestamp() value := pt.Value() - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { return err } } + perTenantRows[*atLocal] += len(ss.Points) } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ctx.FlushBufs() } diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 4305ded91a..9cbf79afea 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -35,7 +35,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. - atCopy := *at + var atCopy auth.Token + if at != nil { + atCopy = *at + } + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -63,13 +67,14 @@ func insertRows(at *auth.Token, rows []parser.Row) error { continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(&atCopy) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } - // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(&atCopy).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index ba5edac1e2..a3cd05194c 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -73,7 +73,11 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom ic := &ctx.Common ic.Reset() // This line is required for initializing ic internals. rowsTotal := 0 - atCopy := *at + var atCopy auth.Token + if at != nil { + atCopy = *at + } + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -115,8 +119,6 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom metricGroupPrefixLen := len(ctx.metricGroupBuf) if hasRelabeling { ctx.originLabels = append(ctx.originLabels[:0], ic.Labels...) - ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, nil) - metricNameBufLen := len(ic.MetricNameBuf) for j := range r.Fields { f := &r.Fields[j] if !skipFieldKey { @@ -130,19 +132,22 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom // Skip metric without labels. continue } - ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen] ic.SortLabelsIfNeeded() + atLocal := ic.GetLocalAuthToken(&atCopy) + ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, nil) for i := range ic.Labels { ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[i]) } - storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels) - if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { + storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels) + if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { return err } + perTenantRows[*atLocal]++ } } else { ic.SortLabelsIfNeeded() - ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels) + atLocal := ic.GetLocalAuthToken(&atCopy) + ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ic.Labels) metricNameBufLen := len(ic.MetricNameBuf) labelsLen := len(ic.Labels) for j := range r.Fields { @@ -159,15 +164,16 @@ func insertRows(at *auth.Token, db string, rows []parser.Row, extraLabels []prom } ic.MetricNameBuf = ic.MetricNameBuf[:metricNameBufLen] ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf, &ic.Labels[len(ic.Labels)-1]) - storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels) - if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { + storageNodeIdx := ic.GetStorageNodeIdx(atLocal, ic.Labels) + if err := ic.WriteDataPointExt(storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { return err } + perTenantRows[*atLocal]++ } } } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(&atCopy).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() } diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 03f304b7f1..bb997f3eee 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -108,20 +108,17 @@ func main() { } if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, func(r io.Reader) error { - var at auth.Token // TODO: properly initialize auth token - return graphite.InsertHandler(&at, r) + return graphite.InsertHandler(nil, r) }) } if len(*influxListenAddr) > 0 { influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { - var at auth.Token // TODO: properly initialize auth token - return influx.InsertHandlerForReader(&at, r) + return influx.InsertHandlerForReader(nil, r) }) } if len(*opentsdbListenAddr) > 0 { opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, func(r io.Reader) error { - var at auth.Token // TODO: properly initialize auth token - return opentsdb.InsertHandler(&at, r) + return opentsdb.InsertHandler(nil, r) }, opentsdbhttp.InsertHandler) } if len(*opentsdbHTTPListenAddr) > 0 { diff --git a/app/vminsert/native/request_handler.go b/app/vminsert/native/request_handler.go index cf9007473b..323df55ddd 100644 --- a/app/vminsert/native/request_handler.go +++ b/app/vminsert/native/request_handler.go @@ -44,7 +44,9 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal // since relabeling can prevent from inserting the rows. rowsLen := len(block.Values) rowsInserted.Add(rowsLen) - rowsTenantInserted.Get(at).Add(rowsLen) + if at != nil { + rowsTenantInserted.Get(at).Add(rowsLen) + } rowsPerInsert.Update(float64(rowsLen)) ctx.Reset() // This line is required for initializing ctx internals. @@ -68,8 +70,9 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal return nil } ctx.SortLabelsIfNeeded() - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) values := block.Values timestamps := block.Timestamps if len(timestamps) != len(values) { @@ -77,9 +80,15 @@ func insertRows(at *auth.Token, block *parser.Block, extraLabels []prompbmarshal } for j, value := range values { timestamp := timestamps[j] - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { return err } } - return ctx.FlushBufs() + if err := ctx.FlushBufs(); err != nil { + return err + } + if at == nil { + rowsTenantInserted.Get(atLocal).Add(rowsLen) + } + return nil } diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index fc74a8601b..8fb36c15f4 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -3,6 +3,7 @@ package netstorage import ( "fmt" "net/http" + "strconv" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" @@ -25,6 +26,8 @@ type InsertCtx struct { labelsBuf []byte relabelCtx relabel.Ctx + + at auth.Token } type bufRows struct { @@ -68,6 +71,7 @@ func (ctx *InsertCtx) Reset() { } ctx.labelsBuf = ctx.labelsBuf[:0] ctx.relabelCtx.Reset() + ctx.at.Set(0, 0) } // AddLabelBytes adds (name, value) label to ctx.Labels. @@ -115,11 +119,11 @@ func (ctx *InsertCtx) ApplyRelabeling() { func (ctx *InsertCtx) WriteDataPoint(at *auth.Token, labels []prompb.Label, timestamp int64, value float64) error { ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, labels) storageNodeIdx := ctx.GetStorageNodeIdx(at, labels) - return ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value) + return ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value) } // WriteDataPointExt writes the given metricNameRaw with (timestmap, value) to ctx buffer with the given storageNodeIdx. -func (ctx *InsertCtx) WriteDataPointExt(at *auth.Token, storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { +func (ctx *InsertCtx) WriteDataPointExt(storageNodeIdx int, metricNameRaw []byte, timestamp int64, value float64) error { br := &ctx.bufRowss[storageNodeIdx] sn := storageNodes[storageNodeIdx] bufNew := storage.MarshalMetricRow(br.buf, metricNameRaw, timestamp, value) @@ -181,3 +185,46 @@ func marshalBytesFast(dst []byte, s []byte) []byte { dst = append(dst, s...) return dst } + +// GetLocalAuthToken obtains auth.Token from context labels vm_account_id and vm_project_id if at is nil. +// +// At is returned as is if it isn't nil. +// +// The vm_account_id and vm_project_id labels are automatically removed from the ctx. +func (ctx *InsertCtx) GetLocalAuthToken(at *auth.Token) *auth.Token { + if at != nil { + return at + } + accountID := uint32(0) + projectID := uint32(0) + tmpLabels := ctx.Labels[:0] + for _, label := range ctx.Labels { + if string(label.Name) == "vm_account_id" { + accountID = parseUint32(label.Value) + continue + } + if string(label.Name) == "vm_project_id" { + projectID = parseUint32(label.Value) + continue + } + tmpLabels = append(tmpLabels, label) + } + cleanLabels := ctx.Labels[len(tmpLabels):] + for i := range cleanLabels { + label := &cleanLabels[i] + label.Name = nil + label.Value = nil + } + ctx.Labels = tmpLabels + ctx.at.Set(accountID, projectID) + return &ctx.at +} + +func parseUint32(b []byte) uint32 { + s := bytesutil.ToUnsafeString(b) + n, err := strconv.ParseUint(s, 10, 32) + if err != nil { + return 0 + } + return uint32(n) +} diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 9d404b5101..76ff9e4ef6 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -35,7 +35,11 @@ func insertRows(at *auth.Token, rows []parser.Row) error { defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. - atCopy := *at + var atCopy auth.Token + if at != nil { + atCopy = *at + } + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -63,13 +67,14 @@ func insertRows(at *auth.Token, rows []parser.Row) error { continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(&atCopy, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(&atCopy) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } - // Assume that all the rows for a single connection belong to the same (AccountID, ProjectID). rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(&atCopy).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 824bae21ba..e9c505e436 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -57,8 +57,8 @@ func InsertHandler(req *http.Request) error { func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error { ctx := netstorage.GetInsertCtx() defer netstorage.PutInsertCtx(ctx) - ctx.Reset() // This line is required for initializing ctx internals. + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -80,12 +80,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(at) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(at).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index 5fefff1a8f..ad59f2a5fe 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -43,6 +43,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L defer netstorage.PutInsertCtx(ctx) ctx.Reset() // This line is required for initializing ctx internals. + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -64,12 +65,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - if err := ctx.WriteDataPoint(at, ctx.Labels, r.Timestamp, r.Value); err != nil { + atLocal := ctx.GetLocalAuthToken(at) + if err := ctx.WriteDataPoint(atLocal, ctx.Labels, r.Timestamp, r.Value); err != nil { return err } + perTenantRows[*atLocal]++ } rowsInserted.Add(len(rows)) - rowsTenantInserted.Get(at).Add(len(rows)) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(len(rows))) return ctx.FlushBufs() } diff --git a/app/vminsert/promremotewrite/request_handler.go b/app/vminsert/promremotewrite/request_handler.go index 2c8ff00d96..fd7a6cd04b 100644 --- a/app/vminsert/promremotewrite/request_handler.go +++ b/app/vminsert/promremotewrite/request_handler.go @@ -41,6 +41,7 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr ctx.Reset() // This line is required for initializing ctx internals. rowsTotal := 0 + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range timeseries { ts := ×eries[i] @@ -62,21 +63,23 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, extraLabels []pr continue } ctx.SortLabelsIfNeeded() - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) ctx.MetricNameBuf = ctx.MetricNameBuf[:0] samples := ts.Samples for i := range samples { r := &samples[i] if len(ctx.MetricNameBuf) == 0 { - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) } - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, r.Timestamp, r.Value); err != nil { return err } } + perTenantRows[*atLocal] += len(ts.Samples) } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ctx.FlushBufs() } diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 9706b5e9ed..a3fd4ff960 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -44,6 +44,7 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L ctx.Reset() // This line is required for initializing ctx internals. rowsTotal := 0 + perTenantRows := make(map[auth.Token]int) hasRelabeling := relabel.HasRelabeling() for i := range rows { r := &rows[i] @@ -65,8 +66,9 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L continue } ctx.SortLabelsIfNeeded() - ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) - storageNodeIdx := ctx.GetStorageNodeIdx(at, ctx.Labels) + atLocal := ctx.GetLocalAuthToken(at) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) values := r.Values timestamps := r.Timestamps if len(timestamps) != len(values) { @@ -74,13 +76,14 @@ func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.L } for j, value := range values { timestamp := timestamps[j] - if err := ctx.WriteDataPointExt(at, storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { return err } } + perTenantRows[*atLocal] += len(r.Values) } rowsInserted.Add(rowsTotal) - rowsTenantInserted.Get(at).Add(rowsTotal) + rowsTenantInserted.MultiAdd(perTenantRows) rowsPerInsert.Update(float64(rowsTotal)) return ctx.FlushBufs() } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f9080cee11..5a7d17c170 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f **Update note 2:** [vmalert](https://docs.victoriametrics.com/vmalert.html) changes default value for command-line flag `-datasource.queryStep` from `0s` to `5m`. The change supposed to improve reliability of the rules evaluation when evaluation interval is lower than scraping interval. +* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970). * FEATURE: improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x if non-trivial `regex` values are used. * FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105). * FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113). diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index b73d0be776..ffe7aeabbd 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -43,7 +43,9 @@ It increases cluster availability, and simplifies cluster maintenance as well as VictoriaMetrics cluster supports multiple isolated tenants (aka namespaces). Tenants are identified by `accountID` or `accountID:projectID`, which are put inside request urls. -See [these docs](#url-format) for details. Some facts about tenants in VictoriaMetrics: +See [these docs](#url-format) for details. + +Some facts about tenants in VictoriaMetrics: - Each `accountID` and `projectID` is identified by an arbitrary 32-bit integer in the range `[0 .. 2^32)`. If `projectID` is missing, then it is automatically assigned to `0`. It is expected that other information about tenants @@ -59,6 +61,30 @@ when different tenants have different amounts of data and different query load. - VictoriaMetrics doesn't support querying multiple tenants in a single request. +See also [multitenancy via labels](#multitenancy-via-labels). + + +## Multitenancy via labels + +`vminsert` can accept data from multiple [tenants](#multitenancy) via a special `multitenant` endpoints `http://vminsert:8480/insert/multitenant/`, +where `` can be replaced with any supported suffix for data ingestion from [this list](#url-format). +In this case the account id and project id are obtained from optional `vm_account_id` and `vm_project_id` labels of the incoming samples. +If `vm_account_id` or `vm_project_id` labels are missing or invalid, then the corresponding `accountID` or `projectID` is set to 0. +These labels are automatically removed from samples before forwarding them to `vmstorage`. +For example, if the following samples are written into `http://vminsert:8480/insert/multitenant/prometheus/api/v1/write`: + +``` +http_requests_total{path="/foo",vm_account_id="42"} 12 +http_requests_total{path="/bar",vm_account_id="7",vm_project_id="9"} 34 +``` + +Then the `http_requests_total{path="/foo"} 12` would be stored in the tenant `accountID=42, projectID=0`, +while the `http_requests_total{path="/bar"} 34` would be stored in the tenant `accountID=7, projectID=9`. + +The `vm_account_id` and `vm_project_id` labels are extracted after applying the [relabeling](https://docs.victoriametrics.com/relabeling.html) +set via `-relabelConfig` command-line flag, so these labels can be set at this stage. + + ## Binaries Compiled binaries for the cluster version are available in the `assets` section of the [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). @@ -218,7 +244,9 @@ See [troubleshooting docs](https://docs.victoriametrics.com/Troubleshooting.html - URLs for data ingestion: `http://:8480/insert//`, where: - `` is an arbitrary 32-bit integer identifying namespace for data ingestion (aka tenant). It is possible to set it as `accountID:projectID`, - where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. + where `projectID` is also arbitrary 32-bit integer. If `projectID` isn't set, then it equals to `0`. See [multitenancy docs](#multitenancy) for more details. + The `` can be set to `multitenant` string, e.g. `http://:8480/insert/multitenant/`. Such urls accept data from multiple tenants + specified via `vm_account_id` and `vm_project_id` labels. See [multitenancy via labels](#multitenancy-via-labels) for more details. - `` may have the following values: - `prometheus` and `prometheus/api/v1/write` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). - `datadog/api/v1/series` - for inserting data with [DataDog submit metrics API](https://docs.datadoghq.com/api/latest/metrics/#submit-metrics). See [these docs](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-send-data-from-datadog-agent) for details. diff --git a/docs/vmagent.md b/docs/vmagent.md index 7ef86bad42..315d704186 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -146,6 +146,10 @@ While `vmagent` can accept data in several supported protocols (OpenTSDB, Influx By default `vmagent` collects the data without tenant identifiers and routes it to the configured `-remoteWrite.url`. +[VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html) supports writing data to multiple tenants +specified via special labels - see [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). +This allows specifying tenant ids via [relabeling](#relabeling) and writing multitenant data to a single `-remoteWrite.url=http:///insert/multitenant/api/v1/write`. + [Multitenancy](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy) support is enabled when `-remoteWrite.multitenantURL` command-line flag is set. In this case `vmagent` accepts multitenant data at `http://vmagent:8429/insert//...` in the same way as cluster version of VictoriaMetrics does according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) and routes it to `<-remoteWrite.multitenantURL>/insert//prometheus/api/v1/write`. If multiple `-remoteWrite.multitenantURL` command-line options are set, then `vmagent` replicates the collected data across all the configured urls. This allows using a single `vmagent` instance in front of VictoriaMetrics clusters for processing the data from all the tenants. If `-remoteWrite.multitenantURL` command-line flag is set and `vmagent` is configured to scrape Prometheus-compatible targets (e.g. if `-promscrape.config` command-line flag is set) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 6d27943e3b..38e2354482 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -14,30 +14,54 @@ type Token struct { // String returns string representation of t. func (t *Token) String() string { + if t == nil { + return "multitenant" + } if t.ProjectID == 0 { return fmt.Sprintf("%d", t.AccountID) } return fmt.Sprintf("%d:%d", t.AccountID, t.ProjectID) } -// NewToken returns new Token for the given authToken +// NewToken returns new Token for the given authToken. +// +// If authToken == "multitenant", then nil Token is returned. func NewToken(authToken string) (*Token, error) { + if authToken == "multitenant" { + return nil, nil + } + var t Token + if err := t.Init(authToken); err != nil { + return nil, err + } + return &t, nil +} + +// Init initializes t from authToken. +func (t *Token) Init(authToken string) error { tmp := strings.Split(authToken, ":") if len(tmp) > 2 { - return nil, fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp)) + return fmt.Errorf("unexpected number of items in authToken %q; got %d; want 1 or 2", authToken, len(tmp)) } - var at Token - accountID, err := strconv.ParseUint(tmp[0], 10, 32) + n, err := strconv.ParseUint(tmp[0], 10, 32) if err != nil { - return nil, fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err) + return fmt.Errorf("cannot parse accountID from %q: %w", tmp[0], err) } - at.AccountID = uint32(accountID) + accountID := uint32(n) + projectID := uint32(0) if len(tmp) > 1 { - projectID, err := strconv.ParseUint(tmp[1], 10, 32) + n, err := strconv.ParseUint(tmp[1], 10, 32) if err != nil { - return nil, fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err) + return fmt.Errorf("cannot parse projectID from %q: %w", tmp[1], err) } - at.ProjectID = uint32(projectID) + projectID = uint32(n) } - return &at, nil + t.Set(accountID, projectID) + return nil +} + +// Set sets accountID and projectID for the t. +func (t *Token) Set(accountID, projectID uint32) { + t.AccountID = accountID + t.ProjectID = projectID } diff --git a/lib/auth/auth_test.go b/lib/auth/auth_test.go index aa2b4b4f92..eddf1c6fe0 100644 --- a/lib/auth/auth_test.go +++ b/lib/auth/auth_test.go @@ -26,6 +26,8 @@ func TestNewTokenSuccess(t *testing.T) { f("1:4294967295", "1:4294967295") // max uint32 accountID and projectID f("4294967295:4294967295", "4294967295:4294967295") + // multitenant + f("multitenant", "multitenant") } func TestNewTokenFailure(t *testing.T) { diff --git a/lib/httpserver/path.go b/lib/httpserver/path.go index 833c0a6b41..3743341e6e 100644 --- a/lib/httpserver/path.go +++ b/lib/httpserver/path.go @@ -22,6 +22,8 @@ func ParsePath(path string) (*Path, error) { // // - prefix must contain `select`, `insert` or `delete`. // - authToken contains `accountID[:projectID]`, where projectID is optional. + // authToken may also contain `multitenant` string. In this case the accountID and projectID + // are obtained from vm_account_id and vm_project_id labels of the ingested samples. // - suffix contains arbitrary suffix. // // prefix must be used for the routing to the appropriate service @@ -29,14 +31,14 @@ func ParsePath(path string) (*Path, error) { s := skipPrefixSlashes(path) n := strings.IndexByte(s, '/') if n < 0 { - return nil, fmt.Errorf("cannot find {prefix}") + return nil, fmt.Errorf("cannot find {prefix} in %q; expecting /{prefix}/{authToken}/{suffix} format", path) } prefix := s[:n] s = skipPrefixSlashes(s[n+1:]) n = strings.IndexByte(s, '/') if n < 0 { - return nil, fmt.Errorf("cannot find {authToken}") + return nil, fmt.Errorf("cannot find {authToken} in %q; expecting /{prefix}/{authToken}/{suffix} format", path) } authToken := s[:n] diff --git a/lib/tenantmetrics/counter_map.go b/lib/tenantmetrics/counter_map.go index 710986b80f..3ab106679c 100644 --- a/lib/tenantmetrics/counter_map.go +++ b/lib/tenantmetrics/counter_map.go @@ -39,6 +39,13 @@ func (cm *CounterMap) Get(at *auth.Token) *metrics.Counter { return cm.GetByTenant(key) } +// MultiAdd adds multiple values grouped by auth.Token +func (cm *CounterMap) MultiAdd(perTenantValues map[auth.Token]int) { + for token, value := range perTenantValues { + cm.Get(&token).Add(value) + } +} + // GetByTenant returns counter for the given key. func (cm *CounterMap) GetByTenant(key TenantID) *metrics.Counter { m := cm.m.Load().(map[TenantID]*metrics.Counter)