diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index d6ae61bec..b046f1b73 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson/fastfloat" ) var ( @@ -33,7 +34,7 @@ var ( func InsertHandlerForReader(at *auth.Token, r io.Reader) error { return writeconcurrencylimiter.Do(func() error { return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { - return insertRows(at, db, rows) + return insertRows(at, db, rows, true) }) }) } @@ -49,24 +50,34 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error { // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint db := q.Get("db") return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { - return insertRows(at, db, rows) + return insertRows(at, db, rows, false) }) }) } -func insertRows(at *auth.Token, db string, rows []parser.Row) error { +func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccountProjectID bool) error { ctx := getPushCtx() defer putPushCtx(ctx) ic := &ctx.Common ic.Reset() // This line is required for initializing ic internals. rowsTotal := 0 + atCopy := *at for i := range rows { r := &rows[i] ic.Labels = ic.Labels[:0] hasDBLabel := false for j := range r.Tags { tag := &r.Tags[j] + if mayOverrideAccountProjectID { + // Multi-tenancy support via custom tags. + if tag.Key == "VictoriaMetrics_AccountID" { + atCopy.AccountID = uint32(fastfloat.ParseUint64BestEffort(tag.Value)) + } + if tag.Key == "VictoriaMetrics_ProjectID" { + atCopy.ProjectID = uint32(fastfloat.ParseUint64BestEffort(tag.Value)) + } + } if tag.Key == "db" { hasDBLabel = true } @@ -75,7 +86,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row) error { if len(db) > 0 && !hasDBLabel { ic.AddLabel("db", db) } - ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], at.AccountID, at.ProjectID, ic.Labels) + ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels) metricNameBufLen := len(ic.MetricNameBuf) ctx.metricGroupBuf = append(ctx.metricGroupBuf[:0], r.Measurement...) skipFieldKey := len(r.Fields) == 1 && *skipSingleField @@ -94,14 +105,14 @@ func insertRows(at *auth.Token, db string, rows []parser.Row) error { ic.Labels = ic.Labels[:len(ic.Labels)-1] ic.AddLabel("", metricGroup) ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf[:metricNameBufLen], placeholderLabel) - storageNodeIdx := ic.GetStorageNodeIdx(at, ic.Labels) - if err := ic.WriteDataPointExt(at, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { + storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels) + if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil { return err } } rowsTotal += len(r.Fields) } - rowsInserted.Get(at).Add(rowsTotal) + rowsInserted.Get(&atCopy).Add(rowsTotal) rowsPerInsert.Update(float64(rowsTotal)) return ic.FlushBufs() }