lib/protoparser/influx: extract stream parsing code into a separate stream package

This is a follow-up for 057698f7fb
This commit is contained in:
Aliaksandr Valialkin 2023-02-13 09:58:48 -08:00
parent b691d02b92
commit 1add6c3fa0
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 15 additions and 12 deletions

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -36,7 +37,7 @@ var (
// //
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(r io.Reader, isGzipped bool) error { func InsertHandlerForReader(r io.Reader, isGzipped bool) error {
return parser.ParseStream(r, isGzipped, "", "", func(db string, rows []parser.Row) error { return stream.Parse(r, isGzipped, "", "", func(db string, rows []parser.Row) error {
return insertRows(nil, db, rows, nil) return insertRows(nil, db, rows, nil)
}) })
} }
@ -54,7 +55,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
precision := q.Get("precision") precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db") db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, extraLabels) return insertRows(at, db, rows, extraLabels)
}) })
} }

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -37,7 +38,7 @@ var (
// //
// See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/ // See https://github.com/influxdata/telegraf/tree/master/plugins/inputs/socket_listener/
func InsertHandlerForReader(at *auth.Token, r io.Reader) error { func InsertHandlerForReader(at *auth.Token, r io.Reader) error {
return parser.ParseStream(r, false, "", "", func(db string, rows []parser.Row) error { return stream.Parse(r, false, "", "", func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, nil) return insertRows(at, db, rows, nil)
}) })
} }
@ -55,7 +56,7 @@ func InsertHandlerForHTTP(at *auth.Token, req *http.Request) error {
precision := q.Get("precision") precision := q.Get("precision")
// Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint // Read db tag from https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
db := q.Get("db") db := q.Get("db")
return parser.ParseStream(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error { return stream.Parse(req.Body, isGzipped, precision, db, func(db string, rows []parser.Row) error {
return insertRows(at, db, rows, extraLabels) return insertRows(at, db, rows, extraLabels)
}) })
} }

View file

@ -31,7 +31,7 @@ var (
"https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics") "https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics")
) )
// ParseStream parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request. // Parse parses DataDog POST request for /api/v1/series from reader and calls callback for the parsed request.
// //
// callback shouldn't hold series after returning. // callback shouldn't hold series after returning.
func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error { func Parse(r io.Reader, contentEncoding string, callback func(series []datadog.Series) error) error {

View file

@ -1,4 +1,4 @@
package influx package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -22,12 +23,12 @@ var (
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses r with the given args and calls callback for the parsed rows. // Parse parses r with the given args and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []Row) error) error { func Parse(r io.Reader, isGzipped bool, precision, db string, callback func(db string, rows []influx.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -162,9 +163,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows influx.Rows
ctx *streamContext ctx *streamContext
callback func(db string, rows []Row) error callback func(db string, rows []influx.Row) error
db string db string
tsMultiplier int64 tsMultiplier int64
reqBuf []byte reqBuf []byte
@ -179,7 +180,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []influx.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(uw.db, rows); err != nil { if err := uw.callback(uw.db, rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -1,4 +1,4 @@
package influx package stream
import ( import (
"testing" "testing"