mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vminsert: move ingestion protocol parsers to lib/protoparser, so they could be re-used in the upcoming vmagent
This commit is contained in:
parent
a9802fcb72
commit
0cda6afa8e
17 changed files with 35 additions and 13 deletions
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/fastjson/fastfloat"
|
||||
|
@ -124,7 +125,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
|
|||
}
|
||||
|
||||
type pushCtx struct {
|
||||
Rows Rows
|
||||
Rows graphite.Rows
|
||||
Common netstorage.InsertCtx
|
||||
|
||||
reqBuf []byte
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
|
@ -177,7 +178,7 @@ var (
|
|||
)
|
||||
|
||||
type pushCtx struct {
|
||||
Rows Rows
|
||||
Rows influx.Rows
|
||||
Common netstorage.InsertCtx
|
||||
|
||||
reqBuf []byte
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/fastjson/fastfloat"
|
||||
|
@ -123,7 +124,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool {
|
|||
}
|
||||
|
||||
type pushCtx struct {
|
||||
Rows Rows
|
||||
Rows opentsdb.Rows
|
||||
Common netstorage.InsertCtx
|
||||
|
||||
reqBuf []byte
|
||||
|
|
|
@ -14,9 +14,9 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/fastjson"
|
||||
)
|
||||
|
||||
var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request")
|
||||
|
@ -68,8 +68,8 @@ func insertHandlerInternal(at *auth.Token, req *http.Request) error {
|
|||
}
|
||||
|
||||
// Unmarshal the request to ctx.Rows
|
||||
p := parserPool.Get()
|
||||
defer parserPool.Put(p)
|
||||
p := opentsdbhttp.GetParser()
|
||||
defer opentsdbhttp.PutParser(p)
|
||||
v, err := p.ParseBytes(ctx.reqBuf.B)
|
||||
if err != nil {
|
||||
unmarshalErrors.Inc()
|
||||
|
@ -118,10 +118,8 @@ func insertHandlerInternal(at *auth.Token, req *http.Request) error {
|
|||
|
||||
const secondMask int64 = 0x7FFFFFFF00000000
|
||||
|
||||
var parserPool fastjson.ParserPool
|
||||
|
||||
type pushCtx struct {
|
||||
Rows Rows
|
||||
Rows opentsdbhttp.Rows
|
||||
Common netstorage.InsertCtx
|
||||
|
||||
reqBuf bytesutil.ByteBuffer
|
||||
|
|
21
lib/protoparser/opentsdbhttp/parser_pool.go
Normal file
21
lib/protoparser/opentsdbhttp/parser_pool.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package opentsdbhttp
|
||||
|
||||
import (
|
||||
"github.com/valyala/fastjson"
|
||||
)
|
||||
|
||||
// GetParser returns JSON parser.
|
||||
//
|
||||
// The parser must be returned to the pool via PutParser when no longer needed.
|
||||
func GetParser() *fastjson.Parser {
|
||||
return parserPool.Get()
|
||||
}
|
||||
|
||||
// PutParser returns p to the pool.
|
||||
//
|
||||
// p cannot be used after returning to the pool.
|
||||
func PutParser(p *fastjson.Parser) {
|
||||
parserPool.Put(p)
|
||||
}
|
||||
|
||||
var parserPool fastjson.ParserPool
|
|
@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) {
|
|||
f := func(s string) {
|
||||
t.Helper()
|
||||
var rows Rows
|
||||
p := parserPool.Get()
|
||||
defer parserPool.Put(p)
|
||||
p := GetParser()
|
||||
defer PutParser(p)
|
||||
v, err := p.Parse(s)
|
||||
if err != nil {
|
||||
// Expected JSON parser error
|
||||
|
@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
|||
t.Helper()
|
||||
var rows Rows
|
||||
|
||||
p := parserPool.Get()
|
||||
defer parserPool.Put(p)
|
||||
p := GetParser()
|
||||
defer PutParser(p)
|
||||
v, err := p.Parse(s)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse json %s: %s", s, err)
|
Loading…
Reference in a new issue