diff --git a/app/vmagent/csvimport/request_handler.go b/app/vmagent/csvimport/request_handler.go index e17207c0d8..00d9846d75 100644 --- a/app/vmagent/csvimport/request_handler.go +++ b/app/vmagent/csvimport/request_handler.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -25,7 +26,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error { if err != nil { return err } - return parser.ParseStream(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) }) } diff --git a/app/vminsert/csvimport/request_handler.go b/app/vminsert/csvimport/request_handler.go index 971f3abbd9..d7dc794c24 100644 --- a/app/vminsert/csvimport/request_handler.go +++ b/app/vminsert/csvimport/request_handler.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream" "github.com/VictoriaMetrics/metrics" ) @@ -22,7 +23,7 @@ func InsertHandler(req *http.Request) error { if err != nil { return err } - return parser.ParseStream(req, func(rows []parser.Row) error { + return stream.Parse(req, func(rows []parser.Row) error { return insertRows(rows, extraLabels) }) } diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/stream/streamparser.go similarity index 91% rename from lib/protoparser/csvimport/streamparser.go rename to lib/protoparser/csvimport/stream/streamparser.go index 50db7840a1..02e0e8933c 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/stream/streamparser.go @@ -1,4 +1,4 @@ -package csvimport +package stream import ( "bufio" @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -21,19 +22,19 @@ var ( "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") ) -// ParseStream parses csv from req and calls callback for the parsed rows. +// Parse parses csv from req and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from req. // // callback shouldn't hold rows after returning. -func ParseStream(req *http.Request, callback func(rows []Row) error) error { +func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error { wcr := writeconcurrencylimiter.GetReader(req.Body) defer writeconcurrencylimiter.PutReader(wcr) r := io.Reader(wcr) q := req.URL.Query() format := q.Get("format") - cds, err := ParseColumnDescriptors(format) + cds, err := csvimport.ParseColumnDescriptors(format) if err != nil { return fmt.Errorf("cannot parse the provided csv format: %w", err) } @@ -149,10 +150,10 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { - rows Rows + rows csvimport.Rows ctx *streamContext - callback func(rows []Row) error - cds []ColumnDescriptor + callback func(rows []csvimport.Row) error + cds []csvimport.ColumnDescriptor reqBuf []byte } @@ -164,7 +165,7 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []Row) { +func (uw *unmarshalWork) runCallback(rows []csvimport.Row) { ctx := uw.ctx if err := uw.callback(rows); err != nil { ctx.callbackErrLock.Lock()