diff --git a/app/vmagent/vmimport/request_handler.go b/app/vmagent/vmimport/request_handler.go index f0eb010ba6..838f7a6ca9 100644 --- a/app/vmagent/vmimport/request_handler.go +++ b/app/vmagent/vmimport/request_handler.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -30,7 +31,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return stream.Parse(req.Body, isGzipped, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) }) } diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go index 2ff7e3b99f..3d3e5fec40 100644 --- a/app/vminsert/vmimport/request_handler.go +++ b/app/vminsert/vmimport/request_handler.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -30,7 +31,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return stream.Parse(req.Body, isGzipped, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) }) } diff --git a/lib/protoparser/vmimport/streamparser.go b/lib/protoparser/vmimport/stream/streamparser.go similarity index 91% rename from lib/protoparser/vmimport/streamparser.go rename to lib/protoparser/vmimport/stream/streamparser.go index 3d4ba19d62..25e259f223 100644 --- a/lib/protoparser/vmimport/streamparser.go +++ b/lib/protoparser/vmimport/stream/streamparser.go @@ -1,4 +1,4 @@ -package vmimport +package stream import ( "bufio" @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -17,12 +18,12 @@ import ( var maxLineLen = flagutil.NewBytes("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by /api/v1/import; "+ "the line length can be limited with 'max_rows_per_line' query arg passed to /api/v1/export") -// ParseStream parses /api/v1/import lines from req and calls callback for the parsed rows. +// Parse parses /api/v1/import lines from req and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from reader. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, isGzipped bool, callback func(rows []Row) error) error { +func Parse(r io.Reader, isGzipped bool, callback func(rows []vmimport.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -138,9 +139,9 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { - rows Rows + rows vmimport.Rows ctx *streamContext - callback func(rows []Row) error + callback func(rows []vmimport.Row) error reqBuf []byte } @@ -151,7 +152,7 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []Row) { +func (uw *unmarshalWork) runCallback(rows []vmimport.Row) { ctx := uw.ctx if err := uw.callback(rows); err != nil { ctx.callbackErrLock.Lock()