lib/protoparser/datadog: extract stream parsing code into a separate stream package

This is a follow-up for 057698f7fb
This commit is contained in:
Aliaksandr Valialkin 2023-02-13 09:51:35 -08:00
parent 867b7e5688
commit b691d02b92
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 12 additions and 9 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -28,7 +29,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
return err return err
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { return stream.Parse(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels) return insertRows(at, series, extraLabels)
}) })
} }

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -29,7 +30,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
return err return err
} }
ce := req.Header.Get("Content-Encoding") ce := req.Header.Get("Content-Encoding")
return parser.ParseStream(req.Body, ce, func(series []parser.Series) error { return stream.Parse(req.Body, ce, func(series []parser.Series) error {
return insertRows(at, series, extraLabels) return insertRows(at, series, extraLabels)
}) })
} }

View file

@ -1,4 +1,4 @@
package datadog package stream
import ( import (
"bufio" "bufio"
@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/datadog"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -33,7 +34,7 @@ var (
// ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. // ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
// //
// callback shouldn't hold series after returning. // callback shouldn't hold series after returning.
func ParseStream(r io.Reader, contentEncoding string, callback func(series []Series) error) error { func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -143,15 +144,15 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool var pushCtxPool sync.Pool
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
func getRequest() *Request { func getRequest() *datadog.Request {
v := requestPool.Get() v := requestPool.Get()
if v == nil { if v == nil {
return &Request{} return &datadog.Request{}
} }
return v.(*Request) return v.(*datadog.Request)
} }
func putRequest(req *Request) { func putRequest(req *datadog.Request) {
requestPool.Put(req) requestPool.Put(req)
} }

View file

@ -1,4 +1,4 @@
package datadog package stream
import ( import (
"testing" "testing"