[vmagent] make opentsdb insert url support multitenant (#3015)

This commit is contained in:
Jianyun Cheng 2022-08-24 21:17:44 +08:00 committed by Aliaksandr Valialkin
parent d32a6359b0
commit eccae22522
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -1,10 +1,15 @@
package opentsdbhttp package opentsdbhttp
import ( import (
"errors"
"fmt"
"net/http" "net/http"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
@ -20,18 +25,28 @@ var (
// InsertHandler processes HTTP OpenTSDB put requests. // InsertHandler processes HTTP OpenTSDB put requests.
// See http://opentsdb.net/docs/build/html/api_http/put.html // See http://opentsdb.net/docs/build/html/api_http/put.html
func InsertHandler(req *http.Request) error { func InsertHandler(req *http.Request) error {
path := strings.Replace(req.URL.Path, "//", "/", -1)
p, err := httpserver.ParsePath(path)
if err != nil {
// Cannot parse multitenant path. Skip it - probably it will be parsed later.
return err
}
if p.Prefix != "insert" {
return errors.New(fmt.Sprintf(`unsupported multitenant prefix: %q; expected "insert"`, p.Prefix))
}
at, err := auth.NewToken(p.AuthToken)
extraLabels, err := parserCommon.GetExtraLabels(req) extraLabels, err := parserCommon.GetExtraLabels(req)
if err != nil { if err != nil {
return err return err
} }
return writeconcurrencylimiter.Do(func() error { return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(rows []parser.Row) error { return parser.ParseStream(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
}) })
} }
func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error { func insertRows(at *auth.Token, rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx := common.GetPushCtx() ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx) defer common.PutPushCtx(ctx)
@ -65,7 +80,7 @@ func insertRows(rows []parser.Row, extraLabels []prompbmarshal.Label) error {
ctx.WriteRequest.Timeseries = tssDst ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels ctx.Labels = labels
ctx.Samples = samples ctx.Samples = samples
remotewrite.Push(nil, &ctx.WriteRequest) remotewrite.Push(at, &ctx.WriteRequest)
rowsInserted.Add(len(rows)) rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows))) rowsPerInsert.Update(float64(len(rows)))
return nil return nil