lib/protoparser/csvimport: extract stream parsing code into a separate stream package

This is a follow-up for 057698f7fb
This commit is contained in:
Aliaksandr Valialkin 2023-02-13 10:25:37 -08:00
parent 7568658c19
commit a646841c07
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 13 additions and 10 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -25,7 +26,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
} }

View file

@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -22,7 +23,7 @@ func InsertHandler(req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}) })
} }

View file

@ -1,4 +1,4 @@
package csvimport package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/csvimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,19 +22,19 @@ var (
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses csv from req and calls callback for the parsed rows. // Parse parses csv from req and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func Parse(req *http.Request, callback func(rows []csvimport.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(req.Body) wcr := writeconcurrencylimiter.GetReader(req.Body)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r := io.Reader(wcr) r := io.Reader(wcr)
q := req.URL.Query() q := req.URL.Query()
format := q.Get("format") format := q.Get("format")
cds, err := ParseColumnDescriptors(format) cds, err := csvimport.ParseColumnDescriptors(format)
if err != nil { if err != nil {
return fmt.Errorf("cannot parse the provided csv format: %w", err) return fmt.Errorf("cannot parse the provided csv format: %w", err)
} }
@ -149,10 +150,10 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows csvimport.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []csvimport.Row) error
cds []ColumnDescriptor cds []csvimport.ColumnDescriptor
reqBuf []byte reqBuf []byte
} }
@ -164,7 +165,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []csvimport.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()