mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vminsert: move ingestion protocol parsers to lib/protoparser, so they could be re-used in the upcoming vmagent
This commit is contained in:
parent
1efea246b7
commit
6456c93dbb
17 changed files with 35 additions and 13 deletions
|
@ -11,6 +11,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -106,7 +107,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
type pushCtx struct {
|
type pushCtx struct {
|
||||||
Rows Rows
|
Rows graphite.Rows
|
||||||
Common common.InsertCtx
|
Common common.InsertCtx
|
||||||
|
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
@ -171,7 +172,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type pushCtx struct {
|
type pushCtx struct {
|
||||||
Rows Rows
|
Rows influx.Rows
|
||||||
Common common.InsertCtx
|
Common common.InsertCtx
|
||||||
|
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -105,7 +106,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
type pushCtx struct {
|
type pushCtx struct {
|
||||||
Rows Rows
|
Rows opentsdb.Rows
|
||||||
Common common.InsertCtx
|
Common common.InsertCtx
|
||||||
|
|
||||||
reqBuf []byte
|
reqBuf []byte
|
||||||
|
|
|
@ -12,8 +12,8 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
"github.com/valyala/fastjson"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request")
|
var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request")
|
||||||
|
@ -65,8 +65,8 @@ func insertHandlerInternal(req *http.Request) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unmarshal the request to ctx.Rows
|
// Unmarshal the request to ctx.Rows
|
||||||
p := parserPool.Get()
|
p := opentsdbhttp.GetParser()
|
||||||
defer parserPool.Put(p)
|
defer opentsdbhttp.PutParser(p)
|
||||||
v, err := p.ParseBytes(ctx.reqBuf.B)
|
v, err := p.ParseBytes(ctx.reqBuf.B)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
unmarshalErrors.Inc()
|
unmarshalErrors.Inc()
|
||||||
|
@ -113,10 +113,8 @@ func insertHandlerInternal(req *http.Request) error {
|
||||||
|
|
||||||
const secondMask int64 = 0x7FFFFFFF00000000
|
const secondMask int64 = 0x7FFFFFFF00000000
|
||||||
|
|
||||||
var parserPool fastjson.ParserPool
|
|
||||||
|
|
||||||
type pushCtx struct {
|
type pushCtx struct {
|
||||||
Rows Rows
|
Rows opentsdbhttp.Rows
|
||||||
Common common.InsertCtx
|
Common common.InsertCtx
|
||||||
|
|
||||||
reqBuf bytesutil.ByteBuffer
|
reqBuf bytesutil.ByteBuffer
|
||||||
|
|
21
lib/protoparser/opentsdbhttp/parser_pool.go
Normal file
21
lib/protoparser/opentsdbhttp/parser_pool.go
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
package opentsdbhttp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/valyala/fastjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetParser returns JSON parser.
|
||||||
|
//
|
||||||
|
// The parser must be returned to the pool via PutParser when no longer needed.
|
||||||
|
func GetParser() *fastjson.Parser {
|
||||||
|
return parserPool.Get()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutParser returns p to the pool.
|
||||||
|
//
|
||||||
|
// p cannot be used after returning to the pool.
|
||||||
|
func PutParser(p *fastjson.Parser) {
|
||||||
|
parserPool.Put(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parserPool fastjson.ParserPool
|
|
@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) {
|
||||||
f := func(s string) {
|
f := func(s string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
var rows Rows
|
var rows Rows
|
||||||
p := parserPool.Get()
|
p := GetParser()
|
||||||
defer parserPool.Put(p)
|
defer PutParser(p)
|
||||||
v, err := p.Parse(s)
|
v, err := p.Parse(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Expected JSON parser error
|
// Expected JSON parser error
|
||||||
|
@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
var rows Rows
|
var rows Rows
|
||||||
|
|
||||||
p := parserPool.Get()
|
p := GetParser()
|
||||||
defer parserPool.Put(p)
|
defer PutParser(p)
|
||||||
v, err := p.Parse(s)
|
v, err := p.Parse(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("cannot parse json %s: %s", s, err)
|
t.Fatalf("cannot parse json %s: %s", s, err)
|
Loading…
Reference in a new issue