lib/protoparser/opentsdbhttp: extract stream parsing code into a separate stream package

This is a follow-up for 057698f7fb
This commit is contained in:
Aliaksandr Valialkin 2023-02-13 10:14:17 -08:00
parent 7720d403c0
commit af37717108
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 23 additions and 20 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -24,7 +25,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}) })
} }

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -27,7 +28,7 @@ func InsertHandler(req *http.Request) error {
if err != nil { if err != nil {
return err return err
} }
return parser.ParseStream(req, func(rows []parser.Row) error { return stream.Parse(req, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}) })
default: default:

View file

@ -4,17 +4,17 @@ import (
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
) )
// getJSONParser returns JSON parser. // GetJSONParser returns JSON parser.
// //
// The parser must be returned to the pool via putJSONParser when no longer needed. // The parser must be returned to the pool via PutJSONParser when no longer needed.
func getJSONParser() *fastjson.Parser { func GetJSONParser() *fastjson.Parser {
return parserPool.Get() return parserPool.Get()
} }
// putJSONParser returns p to the pool. // PutJSONParser returns p to the pool.
// //
// p cannot be used after returning to the pool. // p cannot be used after returning to the pool.
func putJSONParser(p *fastjson.Parser) { func PutJSONParser(p *fastjson.Parser) {
parserPool.Put(p) parserPool.Put(p)
} }

View file

@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) { f := func(s string) {
t.Helper() t.Helper()
var rows Rows var rows Rows
p := getJSONParser() p := GetJSONParser()
defer putJSONParser(p) defer PutJSONParser(p)
v, err := p.Parse(s) v, err := p.Parse(s)
if err != nil { if err != nil {
// Expected JSON parser error // Expected JSON parser error
@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
t.Helper() t.Helper()
var rows Rows var rows Rows
p := getJSONParser() p := GetJSONParser()
defer putJSONParser(p) defer PutJSONParser(p)
v, err := p.Parse(s) v, err := p.Parse(s)
if err != nil { if err != nil {
t.Fatalf("cannot parse json %s: %s", s, err) t.Fatalf("cannot parse json %s: %s", s, err)

View file

@ -1,4 +1,4 @@
package opentsdbhttp package stream
import ( import (
"bufio" "bufio"
@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -24,12 +25,12 @@ var (
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // Parse parses OpenTSDB http lines from req and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from req. // The callback can be called concurrently multiple times for streamed data from req.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(req *http.Request, callback func(rows []Row) error) error { func Parse(req *http.Request, callback func(rows []opentsdbhttp.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(req.Body) wcr := writeconcurrencylimiter.GetReader(req.Body)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r := io.Reader(req.Body) r := io.Reader(req.Body)
@ -62,8 +63,8 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
// Process the request synchronously, since there is no sense in processing a single request asynchronously. // Process the request synchronously, since there is no sense in processing a single request asynchronously.
// Sync code is easier to read and understand. // Sync code is easier to read and understand.
p := getJSONParser() p := opentsdbhttp.GetJSONParser()
defer putJSONParser(p) defer opentsdbhttp.PutJSONParser(p)
v, err := p.ParseBytes(ctx.reqBuf.B) v, err := p.ParseBytes(ctx.reqBuf.B)
if err != nil { if err != nil {
unmarshalErrors.Inc() unmarshalErrors.Inc()
@ -155,15 +156,15 @@ func putStreamContext(ctx *streamContext) {
var streamContextPool sync.Pool var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
func getRows() *Rows { func getRows() *opentsdbhttp.Rows {
v := rowsPool.Get() v := rowsPool.Get()
if v == nil { if v == nil {
return &Rows{} return &opentsdbhttp.Rows{}
} }
return v.(*Rows) return v.(*opentsdbhttp.Rows)
} }
func putRows(rs *Rows) { func putRows(rs *opentsdbhttp.Rows) {
rs.Reset() rs.Reset()
rowsPool.Put(rs) rowsPool.Put(rs)
} }