From 6456c93dbb72012009f116f0537313a411aa1794 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 24 Jan 2020 16:52:48 +0200 Subject: [PATCH] app/vminsert: move ingestion protocol parsers to lib/protoparser, so they could be re-used in the upcoming vmagent --- app/vminsert/graphite/request_handler.go | 3 ++- app/vminsert/influx/request_handler.go | 3 ++- app/vminsert/opentsdb/request_handler.go | 3 ++- app/vminsert/opentsdbhttp/request_handler.go | 10 ++++----- .../protoparser}/graphite/parser.go | 0 .../protoparser}/graphite/parser_test.go | 0 .../graphite/parser_timing_test.go | 0 .../protoparser}/influx/parser.go | 0 .../protoparser}/influx/parser_test.go | 0 .../protoparser}/influx/parser_timing_test.go | 0 .../protoparser}/opentsdb/parser.go | 0 .../protoparser}/opentsdb/parser_test.go | 0 .../opentsdb/parser_timing_test.go | 0 .../protoparser}/opentsdbhttp/parser.go | 0 lib/protoparser/opentsdbhttp/parser_pool.go | 21 +++++++++++++++++++ .../protoparser}/opentsdbhttp/parser_test.go | 8 +++---- .../opentsdbhttp/parser_timing_test.go | 0 17 files changed, 35 insertions(+), 13 deletions(-) rename {app/vminsert => lib/protoparser}/graphite/parser.go (100%) rename {app/vminsert => lib/protoparser}/graphite/parser_test.go (100%) rename {app/vminsert => lib/protoparser}/graphite/parser_timing_test.go (100%) rename {app/vminsert => lib/protoparser}/influx/parser.go (100%) rename {app/vminsert => lib/protoparser}/influx/parser_test.go (100%) rename {app/vminsert => lib/protoparser}/influx/parser_timing_test.go (100%) rename {app/vminsert => lib/protoparser}/opentsdb/parser.go (100%) rename {app/vminsert => lib/protoparser}/opentsdb/parser_test.go (100%) rename {app/vminsert => lib/protoparser}/opentsdb/parser_timing_test.go (100%) rename {app/vminsert => lib/protoparser}/opentsdbhttp/parser.go (100%) create mode 100644 lib/protoparser/opentsdbhttp/parser_pool.go rename {app/vminsert => lib/protoparser}/opentsdbhttp/parser_test.go (98%) rename {app/vminsert => lib/protoparser}/opentsdbhttp/parser_timing_test.go (100%) diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 2f57e7a5d..adaac204d 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" "github.com/VictoriaMetrics/metrics" ) @@ -106,7 +107,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool { } type pushCtx struct { - Rows Rows + Rows graphite.Rows Common common.InsertCtx reqBuf []byte diff --git a/app/vminsert/influx/request_handler.go b/app/vminsert/influx/request_handler.go index 0e397c29d..1453ccdbb 100644 --- a/app/vminsert/influx/request_handler.go +++ b/app/vminsert/influx/request_handler.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/influx" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" ) @@ -171,7 +172,7 @@ var ( ) type pushCtx struct { - Rows Rows + Rows influx.Rows Common common.InsertCtx reqBuf []byte diff --git a/app/vminsert/opentsdb/request_handler.go b/app/vminsert/opentsdb/request_handler.go index 7ad872fc9..2a0c5a7cf 100644 --- a/app/vminsert/opentsdb/request_handler.go +++ b/app/vminsert/opentsdb/request_handler.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdb" "github.com/VictoriaMetrics/metrics" ) @@ -105,7 +106,7 @@ func (ctx *pushCtx) Read(r io.Reader) bool { } type pushCtx struct { - Rows Rows + Rows opentsdb.Rows Common common.InsertCtx reqBuf []byte diff --git a/app/vminsert/opentsdbhttp/request_handler.go b/app/vminsert/opentsdbhttp/request_handler.go index 7f3910274..b8385a993 100644 --- a/app/vminsert/opentsdbhttp/request_handler.go +++ b/app/vminsert/opentsdbhttp/request_handler.go @@ -12,8 +12,8 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentsdbhttp" "github.com/VictoriaMetrics/metrics" - "github.com/valyala/fastjson" ) var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") @@ -65,8 +65,8 @@ func insertHandlerInternal(req *http.Request) error { } // Unmarshal the request to ctx.Rows - p := parserPool.Get() - defer parserPool.Put(p) + p := opentsdbhttp.GetParser() + defer opentsdbhttp.PutParser(p) v, err := p.ParseBytes(ctx.reqBuf.B) if err != nil { unmarshalErrors.Inc() @@ -113,10 +113,8 @@ func insertHandlerInternal(req *http.Request) error { const secondMask int64 = 0x7FFFFFFF00000000 -var parserPool fastjson.ParserPool - type pushCtx struct { - Rows Rows + Rows opentsdbhttp.Rows Common common.InsertCtx reqBuf bytesutil.ByteBuffer diff --git a/app/vminsert/graphite/parser.go b/lib/protoparser/graphite/parser.go similarity index 100% rename from app/vminsert/graphite/parser.go rename to lib/protoparser/graphite/parser.go diff --git a/app/vminsert/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go similarity index 100% rename from app/vminsert/graphite/parser_test.go rename to lib/protoparser/graphite/parser_test.go diff --git a/app/vminsert/graphite/parser_timing_test.go b/lib/protoparser/graphite/parser_timing_test.go similarity index 100% rename from app/vminsert/graphite/parser_timing_test.go rename to lib/protoparser/graphite/parser_timing_test.go diff --git a/app/vminsert/influx/parser.go b/lib/protoparser/influx/parser.go similarity index 100% rename from app/vminsert/influx/parser.go rename to lib/protoparser/influx/parser.go diff --git a/app/vminsert/influx/parser_test.go b/lib/protoparser/influx/parser_test.go similarity index 100% rename from app/vminsert/influx/parser_test.go rename to lib/protoparser/influx/parser_test.go diff --git a/app/vminsert/influx/parser_timing_test.go b/lib/protoparser/influx/parser_timing_test.go similarity index 100% rename from app/vminsert/influx/parser_timing_test.go rename to lib/protoparser/influx/parser_timing_test.go diff --git a/app/vminsert/opentsdb/parser.go b/lib/protoparser/opentsdb/parser.go similarity index 100% rename from app/vminsert/opentsdb/parser.go rename to lib/protoparser/opentsdb/parser.go diff --git a/app/vminsert/opentsdb/parser_test.go b/lib/protoparser/opentsdb/parser_test.go similarity index 100% rename from app/vminsert/opentsdb/parser_test.go rename to lib/protoparser/opentsdb/parser_test.go diff --git a/app/vminsert/opentsdb/parser_timing_test.go b/lib/protoparser/opentsdb/parser_timing_test.go similarity index 100% rename from app/vminsert/opentsdb/parser_timing_test.go rename to lib/protoparser/opentsdb/parser_timing_test.go diff --git a/app/vminsert/opentsdbhttp/parser.go b/lib/protoparser/opentsdbhttp/parser.go similarity index 100% rename from app/vminsert/opentsdbhttp/parser.go rename to lib/protoparser/opentsdbhttp/parser.go diff --git a/lib/protoparser/opentsdbhttp/parser_pool.go b/lib/protoparser/opentsdbhttp/parser_pool.go new file mode 100644 index 000000000..f64763107 --- /dev/null +++ b/lib/protoparser/opentsdbhttp/parser_pool.go @@ -0,0 +1,21 @@ +package opentsdbhttp + +import ( + "github.com/valyala/fastjson" +) + +// GetParser returns JSON parser. +// +// The parser must be returned to the pool via PutParser when no longer needed. +func GetParser() *fastjson.Parser { + return parserPool.Get() +} + +// PutParser returns p to the pool. +// +// p cannot be used after returning to the pool. +func PutParser(p *fastjson.Parser) { + parserPool.Put(p) +} + +var parserPool fastjson.ParserPool diff --git a/app/vminsert/opentsdbhttp/parser_test.go b/lib/protoparser/opentsdbhttp/parser_test.go similarity index 98% rename from app/vminsert/opentsdbhttp/parser_test.go rename to lib/protoparser/opentsdbhttp/parser_test.go index 819c22543..fc43804e9 100644 --- a/app/vminsert/opentsdbhttp/parser_test.go +++ b/lib/protoparser/opentsdbhttp/parser_test.go @@ -9,8 +9,8 @@ func TestRowsUnmarshalFailure(t *testing.T) { f := func(s string) { t.Helper() var rows Rows - p := parserPool.Get() - defer parserPool.Put(p) + p := GetParser() + defer PutParser(p) v, err := p.Parse(s) if err != nil { // Expected JSON parser error @@ -84,8 +84,8 @@ func TestRowsUnmarshalSuccess(t *testing.T) { t.Helper() var rows Rows - p := parserPool.Get() - defer parserPool.Put(p) + p := GetParser() + defer PutParser(p) v, err := p.Parse(s) if err != nil { t.Fatalf("cannot parse json %s: %s", s, err) diff --git a/app/vminsert/opentsdbhttp/parser_timing_test.go b/lib/protoparser/opentsdbhttp/parser_timing_test.go similarity index 100% rename from app/vminsert/opentsdbhttp/parser_timing_test.go rename to lib/protoparser/opentsdbhttp/parser_timing_test.go