diff --git a/app/vmagent/opentsdb/request_handler.go b/app/vmagent/opentsdb/request_handler.go index e1e42ba4bc..8388a5238a 100644 --- a/app/vmagent/opentsdb/request_handler.go +++ b/app/vmagent/opentsdb/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream" "github.com/VictoriaMetrics/metrics" ) @@ -19,7 +20,7 @@ var ( // // See http://opentsdb.net/docs/build/html/api_telnet/put.html func InsertHandler(r io.Reader) error { - return parser.ParseStream(r, insertRows) + return stream.Parse(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 3ed06533e4..de700c2e8a 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -21,7 +22,7 @@ var ( // // See http://opentsdb.net/docs/build/html/api_telnet/put.html func InsertHandler(at *auth.Token, r io.Reader) error { - return parser.ParseStream(r, func(rows []parser.Row) error { + return stream.Parse(r, func(rows []parser.Row) error { return insertRows(at, rows) }) } diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/stream/streamparser.go similarity index 92% rename from lib/protoparser/opentsdb/streamparser.go rename to lib/protoparser/opentsdb/stream/streamparser.go index e094d4d8b6..d7bdbea0e4 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/stream/streamparser.go @@ -1,4 +1,4 @@ -package opentsdb +package stream import ( "bufio" @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -21,12 +22,12 @@ var ( "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") ) -// ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. +// Parse parses OpenTSDB lines from r and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, callback func(rows []Row) error) error { +func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -134,9 +135,9 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { - rows Rows + rows opentsdb.Rows ctx *streamContext - callback func(rows []Row) error + callback func(rows []opentsdb.Row) error reqBuf []byte } @@ -147,7 +148,7 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []Row) { +func (uw *unmarshalWork) runCallback(rows []opentsdb.Row) { ctx := uw.ctx if err := uw.callback(rows); err != nil { ctx.callbackErrLock.Lock()