lib/protoparser/prometheus: move streamparser to subpackage (#3814)

`lib/protoparser/prometheus` is used by various applications,
such as `app/vmalert`. The recent change to the
`lib/protoparser/prometheus` package introduced a new dependency
of `lib/writeconcurrencylimiter` which exposes some metrics.
Because of the dependency, now all applications which have this
dependency also expose these metrics.

Creating a new `lib/protoparser/prometheus/stream` package helps
to remove these metrics from apps which use `lib/protoparser/prometheus`
as dependency.

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3761

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2023-02-13 18:26:07 +01:00 committed by GitHub
parent a645a95bd6
commit 057698f7fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 29 additions and 24 deletions

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -31,7 +32,7 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
return err return err
} }
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(at, rows, extraLabels) return insertRows(at, rows, extraLabels)
}, func(s string) { }, func(s string) {
httpserver.LogError(req, s) httpserver.LogError(req, s)

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" parserCommon "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -28,7 +29,7 @@ func InsertHandler(req *http.Request) error {
return err return err
} }
isGzipped := req.Header.Get("Content-Encoding") == "gzip" isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error { return stream.Parse(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels) return insertRows(rows, extraLabels)
}, func(s string) { }, func(s string) {
httpserver.LogError(req, s) httpserver.LogError(req, s)

View file

@ -26,6 +26,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -575,7 +576,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
if err == nil { if err == nil {
bodyString = bytesutil.ToUnsafeString(sbr.body) bodyString = bytesutil.ToUnsafeString(sbr.body)
areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString) areIdenticalSeries = sw.areIdenticalSeries(lastScrape, bodyString)
err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { err = stream.Parse(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
samplesScraped += len(rows) samplesScraped += len(rows)
@ -796,7 +797,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675 // and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675
var mu sync.Mutex var mu sync.Mutex
br := bytes.NewBufferString(bodyString) br := bytes.NewBufferString(bodyString)
err := parser.ParseStream(br, timestamp, false, func(rows []parser.Row) error { err := stream.Parse(br, timestamp, false, func(rows []parser.Row) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
for i := range rows { for i := range rows {

View file

@ -1,4 +1,4 @@
package prometheus package stream
import ( import (
"bufio" "bufio"
@ -10,16 +10,17 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
// ParseStream parses lines with Prometheus exposition format from r and calls callback for the parsed rows. // Parse parses lines with Prometheus exposition format from r and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error, errLogger func(string)) error { func Parse(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []prometheus.Row) error, errLogger func(string)) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -137,9 +138,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows prometheus.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []prometheus.Row) error
errLogger func(string) errLogger func(string)
defaultTimestamp int64 defaultTimestamp int64
reqBuf []byte reqBuf []byte
@ -154,7 +155,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []prometheus.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -1,4 +1,4 @@
package prometheus package stream
import ( import (
"bytes" "bytes"
@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
) )
func TestParseStream(t *testing.T) { func TestParseStream(t *testing.T) {
@ -17,13 +18,13 @@ func TestParseStream(t *testing.T) {
defer common.StopUnmarshalWorkers() defer common.StopUnmarshalWorkers()
const defaultTimestamp = 123 const defaultTimestamp = 123
f := func(s string, rowsExpected []Row) { f := func(s string, rowsExpected []prometheus.Row) {
t.Helper() t.Helper()
bb := bytes.NewBufferString(s) bb := bytes.NewBufferString(s)
var result []Row var result []prometheus.Row
var lock sync.Mutex var lock sync.Mutex
doneCh := make(chan struct{}) doneCh := make(chan struct{})
err := ParseStream(bb, defaultTimestamp, false, func(rows []Row) error { err := Parse(bb, defaultTimestamp, false, func(rows []prometheus.Row) error {
lock.Lock() lock.Lock()
result = appendRowCopies(result, rows) result = appendRowCopies(result, rows)
if len(result) == len(rowsExpected) { if len(result) == len(rowsExpected) {
@ -56,7 +57,7 @@ func TestParseStream(t *testing.T) {
} }
result = nil result = nil
doneCh = make(chan struct{}) doneCh = make(chan struct{})
err = ParseStream(bb, defaultTimestamp, true, func(rows []Row) error { err = Parse(bb, defaultTimestamp, true, func(rows []prometheus.Row) error {
lock.Lock() lock.Lock()
result = appendRowCopies(result, rows) result = appendRowCopies(result, rows)
if len(result) == len(rowsExpected) { if len(result) == len(rowsExpected) {
@ -79,12 +80,12 @@ func TestParseStream(t *testing.T) {
} }
} }
f("foo 123 456", []Row{{ f("foo 123 456", []prometheus.Row{{
Metric: "foo", Metric: "foo",
Value: 123, Value: 123,
Timestamp: 456000, Timestamp: 456000,
}}) }})
f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []Row{ f(`foo{bar="baz"} 1 2`+"\n"+`aaa{} 3 4`, []prometheus.Row{
{ {
Metric: "aaa", Metric: "aaa",
Value: 3, Value: 3,
@ -92,7 +93,7 @@ func TestParseStream(t *testing.T) {
}, },
{ {
Metric: "foo", Metric: "foo",
Tags: []Tag{{ Tags: []prometheus.Tag{{
Key: "bar", Key: "bar",
Value: "baz", Value: "baz",
}}, }},
@ -100,29 +101,29 @@ func TestParseStream(t *testing.T) {
Timestamp: 2000, Timestamp: 2000,
}, },
}) })
f("foo 23", []Row{{ f("foo 23", []prometheus.Row{{
Metric: "foo", Metric: "foo",
Value: 23, Value: 23,
Timestamp: defaultTimestamp, Timestamp: defaultTimestamp,
}}) }})
} }
func sortRows(rows []Row) { func sortRows(rows []prometheus.Row) {
sort.Slice(rows, func(i, j int) bool { sort.Slice(rows, func(i, j int) bool {
a, b := rows[i], rows[j] a, b := rows[i], rows[j]
return a.Metric < b.Metric return a.Metric < b.Metric
}) })
} }
func appendRowCopies(dst, src []Row) []Row { func appendRowCopies(dst, src []prometheus.Row) []prometheus.Row {
for _, r := range src { for _, r := range src {
// Make a copy of r, since r may contain garbage after returning from the callback to ParseStream. // Make a copy of r, since r may contain garbage after returning from the callback to Parse.
var rCopy Row var rCopy prometheus.Row
rCopy.Metric = copyString(r.Metric) rCopy.Metric = copyString(r.Metric)
rCopy.Value = r.Value rCopy.Value = r.Value
rCopy.Timestamp = r.Timestamp rCopy.Timestamp = r.Timestamp
for _, tag := range r.Tags { for _, tag := range r.Tags {
rCopy.Tags = append(rCopy.Tags, Tag{ rCopy.Tags = append(rCopy.Tags, prometheus.Tag{
Key: copyString(tag.Key), Key: copyString(tag.Key),
Value: copyString(tag.Value), Value: copyString(tag.Value),
}) })