From 75cf5a89391895ca87442879e3dda896cd6b7f81 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 13 Feb 2023 10:32:36 -0800 Subject: [PATCH] lib/protoparser/graphite: extract stream parsing code into a separate stream package --- app/vmagent/graphite/request_handler.go | 3 +- app/vminsert/graphite/request_handler.go | 3 +- lib/protoparser/graphite/parser_test.go | 71 ----------------- .../graphite/{ => stream}/streamparser.go | 13 ++-- .../graphite/stream/streamparser_test.go | 78 +++++++++++++++++++ 5 files changed, 89 insertions(+), 79 deletions(-) rename lib/protoparser/graphite/{ => stream}/streamparser.go (92%) create mode 100644 lib/protoparser/graphite/stream/streamparser_test.go diff --git a/app/vmagent/graphite/request_handler.go b/app/vmagent/graphite/request_handler.go index d9ff2a1b2..2c8180667 100644 --- a/app/vmagent/graphite/request_handler.go +++ b/app/vmagent/graphite/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream" "github.com/VictoriaMetrics/metrics" ) @@ -19,7 +20,7 @@ var ( // // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol func InsertHandler(r io.Reader) error { - return parser.ParseStream(r, insertRows) + return stream.Parse(r, insertRows) } func insertRows(rows []parser.Row) error { diff --git a/app/vminsert/graphite/request_handler.go b/app/vminsert/graphite/request_handler.go index 5cbd57478..675db423f 100644 --- a/app/vminsert/graphite/request_handler.go +++ b/app/vminsert/graphite/request_handler.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -21,7 +22,7 @@ var ( // // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol func InsertHandler(at *auth.Token, r io.Reader) error { - return parser.ParseStream(r, func(rows []parser.Row) error { + return stream.Parse(r, func(rows []parser.Row) error { return insertRows(at, rows) }) } diff --git a/lib/protoparser/graphite/parser_test.go b/lib/protoparser/graphite/parser_test.go index 467d5d9e4..f4e6cce7b 100644 --- a/lib/protoparser/graphite/parser_test.go +++ b/lib/protoparser/graphite/parser_test.go @@ -2,10 +2,7 @@ package graphite import ( "reflect" - "strings" "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) func TestUnmarshalMetricAndTagsFailure(t *testing.T) { @@ -383,71 +380,3 @@ func TestRowsUnmarshalSuccess(t *testing.T) { }}, }) } - -func Test_streamContext_Read(t *testing.T) { - f := func(s string, rowsExpected *Rows) { - t.Helper() - ctx := getStreamContext(strings.NewReader(s)) - if !ctx.Read() { - t.Fatalf("expecting successful read") - } - uw := getUnmarshalWork() - callbackCalls := 0 - uw.ctx = ctx - uw.callback = func(rows []Row) error { - callbackCalls++ - if len(rows) != len(rowsExpected.Rows) { - t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) - } - if !reflect.DeepEqual(rows, rowsExpected.Rows) { - t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) - } - return nil - } - uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) - ctx.wg.Add(1) - uw.Unmarshal() - if callbackCalls != 1 { - t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls) - } - } - - // Full line without tags - f("aaa 1123 345", &Rows{ - Rows: []Row{{ - Metric: "aaa", - Value: 1123, - Timestamp: 345 * 1000, - }}, - }) - // Full line with tags - f("aaa;x=y 1123 345", &Rows{ - Rows: []Row{{ - Metric: "aaa", - Tags: []Tag{{ - Key: "x", - Value: "y", - }}, - Value: 1123, - Timestamp: 345 * 1000, - }}, - }) - // missing timestamp. - // Note that this test may be flaky due to timing issues. TODO: fix it - f("aaa 1123", &Rows{ - Rows: []Row{{ - Metric: "aaa", - Value: 1123, - Timestamp: int64(fasttime.UnixTimestamp()) * 1000, - }}, - }) - // -1 timestamp. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/610 - // Note that this test may be flaky due to timing issues. TODO: fix it. - f("aaa 1123 -1", &Rows{ - Rows: []Row{{ - Metric: "aaa", - Value: 1123, - Timestamp: int64(fasttime.UnixTimestamp()) * 1000, - }}, - }) -} diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/stream/streamparser.go similarity index 92% rename from lib/protoparser/graphite/streamparser.go rename to lib/protoparser/graphite/stream/streamparser.go index 1fb94fffd..16a472ae2 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/stream/streamparser.go @@ -1,4 +1,4 @@ -package graphite +package stream import ( "bufio" @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) @@ -21,12 +22,12 @@ var ( "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") ) -// ParseStream parses Graphite lines from r and calls callback for the parsed rows. +// Parse parses Graphite lines from r and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, callback func(rows []Row) error) error { +func Parse(r io.Reader, callback func(rows []graphite.Row) error) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -135,9 +136,9 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { - rows Rows + rows graphite.Rows ctx *streamContext - callback func(rows []Row) error + callback func(rows []graphite.Row) error reqBuf []byte } @@ -148,7 +149,7 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []Row) { +func (uw *unmarshalWork) runCallback(rows []graphite.Row) { ctx := uw.ctx if err := uw.callback(rows); err != nil { ctx.callbackErrLock.Lock() diff --git a/lib/protoparser/graphite/stream/streamparser_test.go b/lib/protoparser/graphite/stream/streamparser_test.go new file mode 100644 index 000000000..416473b62 --- /dev/null +++ b/lib/protoparser/graphite/stream/streamparser_test.go @@ -0,0 +1,78 @@ +package stream + +import ( + "reflect" + "strings" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" +) + +func Test_streamContext_Read(t *testing.T) { + f := func(s string, rowsExpected *graphite.Rows) { + t.Helper() + ctx := getStreamContext(strings.NewReader(s)) + if !ctx.Read() { + t.Fatalf("expecting successful read") + } + uw := getUnmarshalWork() + callbackCalls := 0 + uw.ctx = ctx + uw.callback = func(rows []graphite.Row) error { + callbackCalls++ + if len(rows) != len(rowsExpected.Rows) { + t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) + } + if !reflect.DeepEqual(rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) + } + return nil + } + uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) + ctx.wg.Add(1) + uw.Unmarshal() + if callbackCalls != 1 { + t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls) + } + } + + // Full line without tags + f("aaa 1123 345", &graphite.Rows{ + Rows: []graphite.Row{{ + Metric: "aaa", + Value: 1123, + Timestamp: 345 * 1000, + }}, + }) + // Full line with tags + f("aaa;x=y 1123 345", &graphite.Rows{ + Rows: []graphite.Row{{ + Metric: "aaa", + Tags: []graphite.Tag{{ + Key: "x", + Value: "y", + }}, + Value: 1123, + Timestamp: 345 * 1000, + }}, + }) + // missing timestamp. + // Note that this test may be flaky due to timing issues. TODO: fix it + f("aaa 1123", &graphite.Rows{ + Rows: []graphite.Row{{ + Metric: "aaa", + Value: 1123, + Timestamp: int64(fasttime.UnixTimestamp()) * 1000, + }}, + }) + // -1 timestamp. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/610 + // Note that this test may be flaky due to timing issues. TODO: fix it. + f("aaa 1123 -1", &graphite.Rows{ + Rows: []graphite.Row{{ + Metric: "aaa", + Value: 1123, + Timestamp: int64(fasttime.UnixTimestamp()) * 1000, + }}, + }) +}