lib/protoparser/graphite: extract stream parsing code into a separate stream package

This commit is contained in:
Aliaksandr Valialkin 2023-02-13 10:32:36 -08:00
parent 1801fa6c5c
commit 75cf5a8939
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
5 changed files with 89 additions and 79 deletions

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -19,7 +20,7 @@ var (
// //
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(r io.Reader) error { func InsertHandler(r io.Reader) error {
return parser.ParseStream(r, insertRows) return stream.Parse(r, insertRows)
} }
func insertRows(rows []parser.Row) error { func insertRows(rows []parser.Row) error {

View file

@ -7,6 +7,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite/stream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,7 +22,7 @@ var (
// //
// See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol // See https://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
func InsertHandler(at *auth.Token, r io.Reader) error { func InsertHandler(at *auth.Token, r io.Reader) error {
return parser.ParseStream(r, func(rows []parser.Row) error { return stream.Parse(r, func(rows []parser.Row) error {
return insertRows(at, rows) return insertRows(at, rows)
}) })
} }

View file

@ -2,10 +2,7 @@ package graphite
import ( import (
"reflect" "reflect"
"strings"
"testing" "testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
) )
func TestUnmarshalMetricAndTagsFailure(t *testing.T) { func TestUnmarshalMetricAndTagsFailure(t *testing.T) {
@ -383,71 +380,3 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}}, }},
}) })
} }
func Test_streamContext_Read(t *testing.T) {
f := func(s string, rowsExpected *Rows) {
t.Helper()
ctx := getStreamContext(strings.NewReader(s))
if !ctx.Read() {
t.Fatalf("expecting successful read")
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.ctx = ctx
uw.callback = func(rows []Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
}
}
// Full line without tags
f("aaa 1123 345", &Rows{
Rows: []Row{{
Metric: "aaa",
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// Full line with tags
f("aaa;x=y 1123 345", &Rows{
Rows: []Row{{
Metric: "aaa",
Tags: []Tag{{
Key: "x",
Value: "y",
}},
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// missing timestamp.
// Note that this test may be flaky due to timing issues. TODO: fix it
f("aaa 1123", &Rows{
Rows: []Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
// -1 timestamp. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/610
// Note that this test may be flaky due to timing issues. TODO: fix it.
f("aaa 1123 -1", &Rows{
Rows: []Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
}

View file

@ -1,4 +1,4 @@
package graphite package stream
import ( import (
"bufio" "bufio"
@ -12,6 +12,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -21,12 +22,12 @@ var (
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
) )
// ParseStream parses Graphite lines from r and calls callback for the parsed rows. // Parse parses Graphite lines from r and calls callback for the parsed rows.
// //
// The callback can be called concurrently multiple times for streamed data from r. // The callback can be called concurrently multiple times for streamed data from r.
// //
// callback shouldn't hold rows after returning. // callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, callback func(rows []Row) error) error { func Parse(r io.Reader, callback func(rows []graphite.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r) wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr) defer writeconcurrencylimiter.PutReader(wcr)
r = wcr r = wcr
@ -135,9 +136,9 @@ var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct { type unmarshalWork struct {
rows Rows rows graphite.Rows
ctx *streamContext ctx *streamContext
callback func(rows []Row) error callback func(rows []graphite.Row) error
reqBuf []byte reqBuf []byte
} }
@ -148,7 +149,7 @@ func (uw *unmarshalWork) reset() {
uw.reqBuf = uw.reqBuf[:0] uw.reqBuf = uw.reqBuf[:0]
} }
func (uw *unmarshalWork) runCallback(rows []Row) { func (uw *unmarshalWork) runCallback(rows []graphite.Row) {
ctx := uw.ctx ctx := uw.ctx
if err := uw.callback(rows); err != nil { if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock() ctx.callbackErrLock.Lock()

View file

@ -0,0 +1,78 @@
package stream
import (
"reflect"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/graphite"
)
func Test_streamContext_Read(t *testing.T) {
f := func(s string, rowsExpected *graphite.Rows) {
t.Helper()
ctx := getStreamContext(strings.NewReader(s))
if !ctx.Read() {
t.Fatalf("expecting successful read")
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.ctx = ctx
uw.callback = func(rows []graphite.Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
}
}
// Full line without tags
f("aaa 1123 345", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// Full line with tags
f("aaa;x=y 1123 345", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Tags: []graphite.Tag{{
Key: "x",
Value: "y",
}},
Value: 1123,
Timestamp: 345 * 1000,
}},
})
// missing timestamp.
// Note that this test may be flaky due to timing issues. TODO: fix it
f("aaa 1123", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
// -1 timestamp. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/610
// Note that this test may be flaky due to timing issues. TODO: fix it.
f("aaa 1123 -1", &graphite.Rows{
Rows: []graphite.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
}