From 057698f7fbfd1d1322a888f7b26bebead83fe115 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 13 Feb 2023 18:26:07 +0100 Subject: [PATCH] lib/protoparser/prometheus: move `streamparser` to subpackage (#3814) `lib/protoparser/prometheus` is used by various applications, such as `app/vmalert`. The recent change to the `lib/protoparser/prometheus` package introduced a new dependency of `lib/writeconcurrencylimiter` which exposes some metrics. Because of the dependency, now all applications which have this dependency also expose these metrics. Creating a new `lib/protoparser/prometheus/stream` package helps to remove these metrics from apps which use `lib/protoparser/prometheus` as dependency. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3761 Signed-off-by: hagen1778 --- .../prometheusimport/request_handler.go | 3 +- .../prometheusimport/request_handler.go | 3 +- lib/promscrape/scrapework.go | 5 ++-- .../prometheus/{ => stream}/streamparser.go | 13 +++++---- .../{ => stream}/streamparser_test.go | 29 ++++++++++--------- 5 files changed, 29 insertions(+), 24 deletions(-) rename lib/protoparser/prometheus/{ => stream}/streamparser.go (90%) rename lib/protoparser/prometheus/{ => stream}/streamparser_test.go (78%) diff --git a/app/vmagent/prometheusimport/request_handler.go b/app/vmagent/prometheusimport/request_handler.go index ed5d53a850..2f450e1c29 100644 --- a/app/vmagent/prometheusimport/request_handler.go +++ b/app/vmagent/prometheusimport/request_handler.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" ) @@ -31,7 +32,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { + return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(at, rows, extraLabels) }, func(s string) { httpserver.LogError(req, s) diff --git a/app/vminsert/prometheusimport/request_handler.go b/app/vminsert/prometheusimport/request_handler.go index 1ef5abb084..09eefc7c40 100644 --- a/app/vminsert/prometheusimport/request_handler.go +++ b/app/vminsert/prometheusimport/request_handler.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream" "github.com/VictoriaMetrics/metrics" ) @@ -28,7 +29,7 @@ func InsertHandler(req *http.Request) error { return err } isGzipped := req.Header.Get("Content-Encoding") == "gzip" - return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { + return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return insertRows(rows, extraLabels) }, func(s string) { httpserver.LogError(req, s) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 98e564d8d1..961b69c2b5 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -26,6 +26,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" @@ -575,7 +576,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { if err == nil { bodyString = bytesutil.ToUnsafeString(sbr.body) areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString) - err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { + err = stream.Parse(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { mu.Lock() defer mu.Unlock() samplesScraped += len(rows) @@ -796,7 +797,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 var mu sync.Mutex br := bytes.NewBufferString(bodyString) - err := parser.ParseStream(br, timestamp, false, func(rows []parser.Row) error { + err := stream.Parse(br, timestamp, false, func(rows []parser.Row) error { mu.Lock() defer mu.Unlock() for i := range rows { diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/stream/streamparser.go similarity index 90% rename from lib/protoparser/prometheus/streamparser.go rename to lib/protoparser/prometheus/stream/streamparser.go index 0a0b554779..5f34614cce 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/stream/streamparser.go @@ -1,4 +1,4 @@ -package prometheus +package stream import ( "bufio" @@ -10,16 +10,17 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/metrics" ) -// ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. +// Parse parses lines with Prometheus exposition format from r and calls callback for the parsed rows. // // The callback can be called concurrently multiple times for streamed data from r. // // callback shouldn't hold rows after returning. -func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error, errLogger func(string)) error { +func Parse(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []prometheus.Row) error, errLogger func(string)) error { wcr := writeconcurrencylimiter.GetReader(r) defer writeconcurrencylimiter.PutReader(wcr) r = wcr @@ -137,9 +138,9 @@ var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) type unmarshalWork struct { - rows Rows + rows prometheus.Rows ctx *streamContext - callback func(rows []Row) error + callback func(rows []prometheus.Row) error errLogger func(string) defaultTimestamp int64 reqBuf []byte @@ -154,7 +155,7 @@ func (uw *unmarshalWork) reset() { uw.reqBuf = uw.reqBuf[:0] } -func (uw *unmarshalWork) runCallback(rows []Row) { +func (uw *unmarshalWork) runCallback(rows []prometheus.Row) { ctx := uw.ctx if err := uw.callback(rows); err != nil { ctx.callbackErrLock.Lock() diff --git a/lib/protoparser/prometheus/streamparser_test.go b/lib/protoparser/prometheus/stream/streamparser_test.go similarity index 78% rename from lib/protoparser/prometheus/streamparser_test.go rename to lib/protoparser/prometheus/stream/streamparser_test.go index 3e0a0048fc..d9eed3d522 100644 --- a/lib/protoparser/prometheus/streamparser_test.go +++ b/lib/protoparser/prometheus/stream/streamparser_test.go @@ -1,4 +1,4 @@ -package prometheus +package stream import ( "bytes" @@ -10,6 +10,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" ) func TestParseStream(t *testing.T) { @@ -17,13 +18,13 @@ func TestParseStream(t *testing.T) { defer common.StopUnmarshalWorkers() const defaultTimestamp = 123 - f := func(s string, rowsExpected []Row) { + f := func(s string, rowsExpected []prometheus.Row) { t.Helper() bb := bytes.NewBufferString(s) - var result []Row + var result []prometheus.Row var lock sync.Mutex doneCh := make(chan struct{}) - err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error { + err := Parse(bb, defaultTimestamp, false, func(rows []prometheus.Row) error { lock.Lock() result = appendRowCopies(result, rows) if len(result) == len(rowsExpected) { @@ -56,7 +57,7 @@ func TestParseStream(t *testing.T) { } result = nil doneCh = make(chan struct{}) - err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error { + err = Parse(bb, defaultTimestamp, true, func(rows []prometheus.Row) error { lock.Lock() result = appendRowCopies(result, rows) if len(result) == len(rowsExpected) { @@ -79,12 +80,12 @@ func TestParseStream(t *testing.T) { } } - f("foo 123 456", []Row{{ + f("foo 123 456", []prometheus.Row{{ Metric: "foo", Value: 123, Timestamp: 456000, }}) - f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []Row{ + f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []prometheus.Row{ { Metric: "aaa", Value: 3, @@ -92,7 +93,7 @@ func TestParseStream(t *testing.T) { }, { Metric: "foo", - Tags: []Tag{{ + Tags: []prometheus.Tag{{ Key: "bar", Value: "baz", }}, @@ -100,29 +101,29 @@ func TestParseStream(t *testing.T) { Timestamp: 2000, }, }) - f("foo 23", []Row{{ + f("foo 23", []prometheus.Row{{ Metric: "foo", Value: 23, Timestamp: defaultTimestamp, }}) } -func sortRows(rows []Row) { +func sortRows(rows []prometheus.Row) { sort.Slice(rows, func(i, j int) bool { a, b := rows[i], rows[j] return a.Metric < b.Metric }) } -func appendRowCopies(dst, src []Row) []Row { +func appendRowCopies(dst, src []prometheus.Row) []prometheus.Row { for _, r := range src { - // Make a copy of r, since r may contain garbage after returning from the callback to ParseStream. - var rCopy Row + // Make a copy of r, since r may contain garbage after returning from the callback to Parse. + var rCopy prometheus.Row rCopy.Metric = copyString(r.Metric) rCopy.Value = r.Value rCopy.Timestamp = r.Timestamp for _, tag := range r.Tags { - rCopy.Tags = append(rCopy.Tags, Tag{ + rCopy.Tags = append(rCopy.Tags, prometheus.Tag{ Key: copyString(tag.Key), Value: copyString(tag.Value), })