lib/protoparser/opentsdb: extract stream parsing code into a separate stream package

This is a follow-up for 057698f7fb
This commit is contained in:
Aliaksandr Valialkin 2023-02-13 10:03:06 -08:00
parent 1add6c3fa0
commit 67c0281535
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 11 additions and 8 deletions

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -19,7 +20,7 @@ var (
// //
// See http://opentsdb.net/docs/build/html/api_telnet/put.html // See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(r io.Reader) error { func InsertHandler(r io.Reader) error {
return parser.ParseStream(r, insertRows) return stream.Parse(r, insertRows)
} }
func insertRows(rows []parser.Row) error { func insertRows(rows []parser.Row) error {

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,7 +22,7 @@ var (
// //
// See http://opentsdb.net/docs/build/html/api_telnet/put.html // See http://opentsdb.net/docs/build/html/api_telnet/put.html
func InsertHandler(at *auth.Token, r io.Reader) error { func InsertHandler(at *auth.Token, r io.Reader) error {
return parser.ParseStream(r, func(rows []parser.Row) error { return stream.Parse(r, func(rows []parser.Row) error {
return insertRows(at, rows) return insertRows(at, rows)
}) })
} }

View file

@ -1,4 +1,4 @@
package opentsdb package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,12 +22,12 @@ var (
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // Parse parses OpenTSDB lines from r and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error { func Parse(r io.Reader, callback func(rows []opentsdb.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -134,9 +135,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows opentsdb.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []opentsdb.Row) error
reqBuf []byte reqBuf []byte
} }
@ -147,7 +148,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []opentsdb.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()