mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00

- Properly decode protobuf-encoded Loki request if it has no Content-Encoding header. Protobuf Loki message is snappy-encoded by default, so snappy decoding must be used when Content-Encoding header is missing. - Return back the previous signatures of parseJSONRequest and parseProtobufRequest functions. This eliminates the churn in tests for these functions. This also fixes broken benchmarks BenchmarkParseJSONRequest and BenchmarkParseProtobufRequest, which consume the whole request body on the first iteration and do nothing on subsequent iterations. - Put the CHANGELOG entries into correct places, since they were incorrectly put into already released versions of VictoriaMetrics and VictoriaLogs. - Add support for reading zstd-compressed data ingestion requests into the remaining protocols at VictoriaLogs and VictoriaMetrics. - Remove the `encoding` arg from PutUncompressedReader() - it has enough information about the passed reader arg in order to properly deal with it. - Add ReadUncompressedData to lib/protoparser/common for reading uncompressed data from the reader until EOF. This allows removing repeated code across request-based protocol parsers without streaming mode. - Consistently limit data ingestion request sizes, which can be read by ReadUncompressedData function. Previously this wasn't the case for all the supported protocols. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8416 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8380 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8300
218 lines
5.5 KiB
Go
218 lines
5.5 KiB
Go
package loki
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
)
|
|
|
|
type testLogMessageProcessor struct {
|
|
pr PushRequest
|
|
}
|
|
|
|
func (tlp *testLogMessageProcessor) AddRow(timestamp int64, fields, streamFields []logstorage.Field) {
|
|
if streamFields != nil {
|
|
panic(fmt.Errorf("unexpected non-nil streamFields: %v", streamFields))
|
|
}
|
|
msg := ""
|
|
for _, f := range fields {
|
|
if f.Name == "_msg" {
|
|
msg = f.Value
|
|
}
|
|
}
|
|
var a []string
|
|
for _, f := range fields {
|
|
if f.Name == "_msg" {
|
|
continue
|
|
}
|
|
item := fmt.Sprintf("%s=%q", f.Name, f.Value)
|
|
a = append(a, item)
|
|
}
|
|
labels := "{" + strings.Join(a, ", ") + "}"
|
|
tlp.pr.Streams = append(tlp.pr.Streams, Stream{
|
|
Labels: labels,
|
|
Entries: []Entry{
|
|
{
|
|
Timestamp: time.Unix(0, timestamp),
|
|
Line: strings.Clone(msg),
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func (tlp *testLogMessageProcessor) MustClose() {
|
|
}
|
|
|
|
func TestParseProtobufRequest_Success(t *testing.T) {
|
|
f := func(s string, timestampsExpected []int64, resultExpected string) {
|
|
t.Helper()
|
|
|
|
tlp := &testLogMessageProcessor{}
|
|
if err := parseJSONRequest([]byte(s), tlp, nil, false, false); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
if len(tlp.pr.Streams) != len(timestampsExpected) {
|
|
t.Fatalf("unexpected number of streams; got %d; want %d", len(tlp.pr.Streams), len(timestampsExpected))
|
|
}
|
|
|
|
data := tlp.pr.MarshalProtobuf(nil)
|
|
|
|
tlp2 := &insertutils.TestLogMessageProcessor{}
|
|
if err := parseProtobufRequest(data, tlp2, nil, false, false); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
if err := tlp2.Verify(timestampsExpected, resultExpected); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Empty streams
|
|
f(`{"streams":[]}`, nil, ``)
|
|
f(`{"streams":[{"values":[]}]}`, nil, ``)
|
|
f(`{"streams":[{"stream":{},"values":[]}]}`, nil, ``)
|
|
f(`{"streams":[{"stream":{"foo":"bar"},"values":[]}]}`, nil, ``)
|
|
|
|
// Empty stream labels
|
|
f(`{"streams":[{"values":[["1577836800000000001", "foo bar"]]}]}`, []int64{1577836800000000001}, `{"_msg":"foo bar"}`)
|
|
f(`{"streams":[{"stream":{},"values":[["1577836800000000001", "foo bar"]]}]}`, []int64{1577836800000000001}, `{"_msg":"foo bar"}`)
|
|
|
|
// Non-empty stream labels
|
|
f(`{"streams":[{"stream":{
|
|
"label1": "value1",
|
|
"label2": "value2"
|
|
},"values":[
|
|
["1577836800000000001", "foo bar"],
|
|
["1477836900005000002", "abc"],
|
|
["147.78369e9", "foobar"]
|
|
]}]}`, []int64{1577836800000000001, 1477836900005000002, 147783690000000000}, `{"label1":"value1","label2":"value2","_msg":"foo bar"}
|
|
{"label1":"value1","label2":"value2","_msg":"abc"}
|
|
{"label1":"value1","label2":"value2","_msg":"foobar"}`)
|
|
|
|
// Multiple streams
|
|
f(`{
|
|
"streams": [
|
|
{
|
|
"stream": {
|
|
"foo": "bar",
|
|
"a": "b"
|
|
},
|
|
"values": [
|
|
["1577836800000000001", "foo bar"],
|
|
["1577836900005000002", "abc"]
|
|
]
|
|
},
|
|
{
|
|
"stream": {
|
|
"x": "y"
|
|
},
|
|
"values": [
|
|
["1877836900005000002", "yx"]
|
|
]
|
|
}
|
|
]
|
|
}`, []int64{1577836800000000001, 1577836900005000002, 1877836900005000002}, `{"foo":"bar","a":"b","_msg":"foo bar"}
|
|
{"foo":"bar","a":"b","_msg":"abc"}
|
|
{"x":"y","_msg":"yx"}`)
|
|
}
|
|
|
|
func TestParseProtobufRequest_ParseMessage(t *testing.T) {
|
|
f := func(s string, msgFields []string, timestampsExpected []int64, resultExpected string) {
|
|
t.Helper()
|
|
|
|
tlp := &testLogMessageProcessor{}
|
|
if err := parseJSONRequest([]byte(s), tlp, nil, false, false); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
if len(tlp.pr.Streams) != len(timestampsExpected) {
|
|
t.Fatalf("unexpected number of streams; got %d; want %d", len(tlp.pr.Streams), len(timestampsExpected))
|
|
}
|
|
|
|
data := tlp.pr.MarshalProtobuf(nil)
|
|
|
|
tlp2 := &insertutils.TestLogMessageProcessor{}
|
|
if err := parseProtobufRequest(data, tlp2, msgFields, false, true); err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
if err := tlp2.Verify(timestampsExpected, resultExpected); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
f(`{
|
|
"streams": [
|
|
{
|
|
"stream": {
|
|
"foo": "bar",
|
|
"a": "b"
|
|
},
|
|
"values": [
|
|
["1577836800000000001", "{\"user_id\":\"123\"}"],
|
|
["1577836900005000002", "abc", {"trace_id":"pqw"}],
|
|
["1577836900005000003", "{def}"]
|
|
]
|
|
},
|
|
{
|
|
"stream": {
|
|
"x": "y"
|
|
},
|
|
"values": [
|
|
["1877836900005000004", "{\"trace_id\":\"432\",\"parent_id\":\"qwerty\"}"]
|
|
]
|
|
}
|
|
]
|
|
}`, []string{"a", "trace_id"}, []int64{1577836800000000001, 1577836900005000002, 1577836900005000003, 1877836900005000004}, `{"foo":"bar","a":"b","user_id":"123"}
|
|
{"foo":"bar","a":"b","trace_id":"pqw","_msg":"abc"}
|
|
{"foo":"bar","a":"b","_msg":"{def}"}
|
|
{"x":"y","_msg":"432","parent_id":"qwerty"}`)
|
|
}
|
|
|
|
func TestParsePromLabels_Success(t *testing.T) {
|
|
f := func(s string) {
|
|
t.Helper()
|
|
fields, err := parsePromLabels(nil, s)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error: %s", err)
|
|
}
|
|
|
|
var a []string
|
|
for _, f := range fields {
|
|
a = append(a, fmt.Sprintf("%s=%q", f.Name, f.Value))
|
|
}
|
|
result := "{" + strings.Join(a, ", ") + "}"
|
|
if result != s {
|
|
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, s)
|
|
}
|
|
}
|
|
|
|
f("{}")
|
|
f(`{foo="bar"}`)
|
|
f(`{foo="bar", baz="x", y="z"}`)
|
|
f(`{foo="ba\"r\\z\n", a="", b="\"\\"}`)
|
|
}
|
|
|
|
func TestParsePromLabels_Failure(t *testing.T) {
|
|
f := func(s string) {
|
|
t.Helper()
|
|
fields, err := parsePromLabels(nil, s)
|
|
if err == nil {
|
|
t.Fatalf("expecting non-nil error")
|
|
}
|
|
if len(fields) > 0 {
|
|
t.Fatalf("unexpected non-empty fields: %s", fields)
|
|
}
|
|
}
|
|
|
|
f("")
|
|
f("{")
|
|
f(`{foo}`)
|
|
f(`{foo=bar}`)
|
|
f(`{foo="bar}`)
|
|
f(`{foo="ba\",r}`)
|
|
f(`{foo="bar" baz="aa"}`)
|
|
f(`foobar`)
|
|
f(`foo{bar="baz"}`)
|
|
}
|