app/vminsert/influx: support passing AccountID and ProjectID via plain TCP and UDP

Now `vminsert` accepts AccountID and ProjectID via `VictoriaMetrics_AccountID` and `VictoriaMetrics_ProjectID` tags
when reading Influx line protocol data via plain TCP or UDP (i.e. when `-influxListenAddr` is set).
This commit is contained in:
Aliaksandr Valialkin 2020-05-12 13:13:00 +03:00
parent f7753b1469
commit 4e237b4670

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
)
var (
@ -33,7 +34,7 @@ var (
func InsertHandlerForReader(at *auth.Token, r io.Reader) error {
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(at, db, rows)
return insertRows(at, db, rows, true)
})
})
}
@ -49,24 +50,34 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows)
return insertRows(at, db, rows, false)
})
})
}
func insertRows(at *auth.Token, db string, rows []parser.Row) error {
func insertRows(at *auth.Token, db string, rows []parser.Row, mayOverrideAccountProjectID bool) error {
ctx := getPushCtx()
defer putPushCtx(ctx)
ic := &ctx.Common
ic.Reset() // This line is required for initializing ic internals.
rowsTotal := 0
atCopy := *at
for i := range rows {
r := &rows[i]
ic.Labels = ic.Labels[:0]
hasDBLabel := false
for j := range r.Tags {
tag := &r.Tags[j]
if mayOverrideAccountProjectID {
// Multi-tenancy support via custom tags.
if tag.Key == "VictoriaMetrics_AccountID" {
atCopy.AccountID = uint32(fastfloat.ParseUint64BestEffort(tag.Value))
}
if tag.Key == "VictoriaMetrics_ProjectID" {
atCopy.ProjectID = uint32(fastfloat.ParseUint64BestEffort(tag.Value))
}
}
if tag.Key == "db" {
hasDBLabel = true
}
@ -75,7 +86,7 @@ func insertRows(at *auth.Token, db string, rows []parser.Row) error {
if len(db) > 0 && !hasDBLabel {
ic.AddLabel("db", db)
}
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], at.AccountID, at.ProjectID, ic.Labels)
ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], atCopy.AccountID, atCopy.ProjectID, ic.Labels)
metricNameBufLen := len(ic.MetricNameBuf)
ctx.metricGroupBuf = append(ctx.metricGroupBuf[:0], r.Measurement...)
skipFieldKey := len(r.Fields) == 1 && *skipSingleField
@ -94,14 +105,14 @@ func insertRows(at *auth.Token, db string, rows []parser.Row) error {
ic.Labels = ic.Labels[:len(ic.Labels)-1]
ic.AddLabel("", metricGroup)
ic.MetricNameBuf = storage.MarshalMetricLabelRaw(ic.MetricNameBuf[:metricNameBufLen], placeholderLabel)
storageNodeIdx := ic.GetStorageNodeIdx(at, ic.Labels)
if err := ic.WriteDataPointExt(at, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
storageNodeIdx := ic.GetStorageNodeIdx(&atCopy, ic.Labels)
if err := ic.WriteDataPointExt(&atCopy, storageNodeIdx, ic.MetricNameBuf, r.Timestamp, f.Value); err != nil {
return err
}
}
rowsTotal += len(r.Fields)
}
rowsInserted.Get(at).Add(rowsTotal)
rowsInserted.Get(&atCopy).Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return ic.FlushBufs()
}