mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vlinsert: added opentelemetry logs support
Commit adds the following changes: * Adds support of OpenTelemetry logs for Victoria Logs with protobuf encoded messages * json encoding is not supported for the following reasons: - It brings a lot of fragile code, which works inefficiently. - json encoding is impossible to use with language SDK. * splits metrics and logs structures at lib/protoparser/opentelemetry/pb package. * adds docs with examples for opentelemetry logs. --- Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4839 Co-authored-by: AndrewChubatiuk <andrew.chubatiuk@gmail.com> Co-authored-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
parent
dcc525b388
commit
711f2cc4f2
33 changed files with 1476 additions and 278 deletions
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/opentelemetry"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/syslog"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/syslog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,6 +42,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||||
case strings.HasPrefix(path, "/loki/"):
|
case strings.HasPrefix(path, "/loki/"):
|
||||||
path = strings.TrimPrefix(path, "/loki")
|
path = strings.TrimPrefix(path, "/loki")
|
||||||
return loki.RequestHandler(path, w, r)
|
return loki.RequestHandler(path, w, r)
|
||||||
|
case strings.HasPrefix(path, "/opentelemetry/"):
|
||||||
|
path = strings.TrimPrefix(path, "/opentelemetry")
|
||||||
|
return opentelemetry.RequestHandler(path, w, r)
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
143
app/vlinsert/opentelemetry/opentelemetry.go
Normal file
143
app/vlinsert/opentelemetry/opentelemetry.go
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
package opentelemetry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RequestHandler processes Opentelemetry insert requests
|
||||||
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
|
switch path {
|
||||||
|
// use the same path as opentelemetry collector
|
||||||
|
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
|
||||||
|
case "/v1/logs":
|
||||||
|
if r.Header.Get("Content-Type") == "application/json" {
|
||||||
|
httpserver.Errorf(w, r, "json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
handleProtobuf(r, w)
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleProtobuf(r *http.Request, w http.ResponseWriter) {
|
||||||
|
startTime := time.Now()
|
||||||
|
requestsProtobufTotal.Inc()
|
||||||
|
reader := r.Body
|
||||||
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||||
|
zr, err := common.GetGzipReader(reader)
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer common.PutGzipReader(zr)
|
||||||
|
reader = zr
|
||||||
|
}
|
||||||
|
|
||||||
|
wcr := writeconcurrencylimiter.GetReader(reader)
|
||||||
|
data, err := io.ReadAll(wcr)
|
||||||
|
writeconcurrencylimiter.PutReader(wcr)
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cp, err := insertutils.GetCommonParams(r)
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := vlstorage.CanWriteData(); err != nil {
|
||||||
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lmp := cp.NewLogMessageProcessor()
|
||||||
|
n, err := pushProtobufRequest(data, lmp)
|
||||||
|
lmp.MustClose()
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsIngestedProtobufTotal.Add(n)
|
||||||
|
|
||||||
|
// update requestProtobufDuration only for successfully parsed requests
|
||||||
|
// There is no need in updating requestProtobufDuration for request errors,
|
||||||
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
||||||
|
requestProtobufDuration.UpdateDuration(startTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry",format="protobuf"}`)
|
||||||
|
|
||||||
|
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
||||||
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
||||||
|
|
||||||
|
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
||||||
|
)
|
||||||
|
|
||||||
|
func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) {
|
||||||
|
var req pb.ExportLogsServiceRequest
|
||||||
|
if err := req.UnmarshalProtobuf(data); err != nil {
|
||||||
|
errorsTotal.Inc()
|
||||||
|
return 0, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var rowsIngested int
|
||||||
|
var commonFields []logstorage.Field
|
||||||
|
for _, rl := range req.ResourceLogs {
|
||||||
|
attributes := rl.Resource.Attributes
|
||||||
|
commonFields = slicesutil.SetLength(commonFields, len(attributes))
|
||||||
|
for i, attr := range attributes {
|
||||||
|
commonFields[i].Name = attr.Key
|
||||||
|
commonFields[i].Value = attr.Value.FormatString()
|
||||||
|
}
|
||||||
|
commonFieldsLen := len(commonFields)
|
||||||
|
for _, sc := range rl.ScopeLogs {
|
||||||
|
var scopeIngested int
|
||||||
|
commonFields, scopeIngested = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp)
|
||||||
|
rowsIngested += scopeIngested
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rowsIngested, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutils.LogMessageProcessor) ([]logstorage.Field, int) {
|
||||||
|
fields := commonFields
|
||||||
|
for _, lr := range sc.LogRecords {
|
||||||
|
fields = fields[:len(commonFields)]
|
||||||
|
fields = append(fields, logstorage.Field{
|
||||||
|
Name: "_msg",
|
||||||
|
Value: lr.Body.FormatString(),
|
||||||
|
})
|
||||||
|
for _, attr := range lr.Attributes {
|
||||||
|
fields = append(fields, logstorage.Field{
|
||||||
|
Name: attr.Key,
|
||||||
|
Value: attr.Value.FormatString(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fields = append(fields, logstorage.Field{
|
||||||
|
Name: "severity",
|
||||||
|
Value: lr.FormatSeverity(),
|
||||||
|
})
|
||||||
|
|
||||||
|
lmp.AddRow(lr.ExtractTimestampNano(), fields)
|
||||||
|
}
|
||||||
|
return fields, len(sc.LogRecords)
|
||||||
|
}
|
126
app/vlinsert/opentelemetry/opentelemetry_test.go
Normal file
126
app/vlinsert/opentelemetry/opentelemetry_test.go
Normal file
|
@ -0,0 +1,126 @@
|
||||||
|
package opentelemetry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPushProtoOk(t *testing.T) {
|
||||||
|
f := func(src []pb.ResourceLogs, timestampsExpected []int64, resultExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
lr := pb.ExportLogsServiceRequest{
|
||||||
|
ResourceLogs: src,
|
||||||
|
}
|
||||||
|
|
||||||
|
pData := lr.MarshalProtobuf(nil)
|
||||||
|
tlp := &insertutils.TestLogMessageProcessor{}
|
||||||
|
n, err := pushProtobufRequest(pData, tlp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tlp.Verify(n, timestampsExpected, resultExpected); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// single line without resource attributes
|
||||||
|
f([]pb.ResourceLogs{
|
||||||
|
{
|
||||||
|
ScopeLogs: []pb.ScopeLogs{
|
||||||
|
{
|
||||||
|
LogRecords: []pb.LogRecord{
|
||||||
|
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]int64{1234},
|
||||||
|
`{"_msg":"log-line-message","severity":"Trace"}`,
|
||||||
|
)
|
||||||
|
// multi-line with resource attributes
|
||||||
|
f([]pb.ResourceLogs{
|
||||||
|
{
|
||||||
|
Resource: pb.Resource{
|
||||||
|
Attributes: []*pb.KeyValue{
|
||||||
|
{Key: "logger", Value: &pb.AnyValue{StringValue: ptrTo("context")}},
|
||||||
|
{Key: "instance_id", Value: &pb.AnyValue{IntValue: ptrTo[int64](10)}},
|
||||||
|
{Key: "node_taints", Value: &pb.AnyValue{KeyValueList: &pb.KeyValueList{
|
||||||
|
Values: []*pb.KeyValue{
|
||||||
|
{Key: "role", Value: &pb.AnyValue{StringValue: ptrTo("dev")}},
|
||||||
|
{Key: "cluster_load_percent", Value: &pb.AnyValue{DoubleValue: ptrTo(0.55)}},
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ScopeLogs: []pb.ScopeLogs{
|
||||||
|
{
|
||||||
|
LogRecords: []pb.LogRecord{
|
||||||
|
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}},
|
||||||
|
{Attributes: []*pb.KeyValue{}, TimeUnixNano: 1235, SeverityNumber: 21, Body: pb.AnyValue{StringValue: ptrTo("log-line-message-msg-2")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]int64{1234, 1235},
|
||||||
|
`{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message","severity":"Trace"}
|
||||||
|
{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message-msg-2","severity":"Unspecified"}`,
|
||||||
|
)
|
||||||
|
|
||||||
|
// multi-scope with resource attributes and multi-line
|
||||||
|
f([]pb.ResourceLogs{
|
||||||
|
{
|
||||||
|
Resource: pb.Resource{
|
||||||
|
Attributes: []*pb.KeyValue{
|
||||||
|
{Key: "logger", Value: &pb.AnyValue{StringValue: ptrTo("context")}},
|
||||||
|
{Key: "instance_id", Value: &pb.AnyValue{IntValue: ptrTo[int64](10)}},
|
||||||
|
{Key: "node_taints", Value: &pb.AnyValue{KeyValueList: &pb.KeyValueList{
|
||||||
|
Values: []*pb.KeyValue{
|
||||||
|
{Key: "role", Value: &pb.AnyValue{StringValue: ptrTo("dev")}},
|
||||||
|
{Key: "cluster_load_percent", Value: &pb.AnyValue{DoubleValue: ptrTo(0.55)}},
|
||||||
|
},
|
||||||
|
}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ScopeLogs: []pb.ScopeLogs{
|
||||||
|
{
|
||||||
|
LogRecords: []pb.LogRecord{
|
||||||
|
{TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}},
|
||||||
|
{TimeUnixNano: 1235, SeverityNumber: 5, Body: pb.AnyValue{StringValue: ptrTo("log-line-message-msg-2")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ScopeLogs: []pb.ScopeLogs{
|
||||||
|
{
|
||||||
|
LogRecords: []pb.LogRecord{
|
||||||
|
{TimeUnixNano: 2345, SeverityNumber: 10, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-0-0")}},
|
||||||
|
{TimeUnixNano: 2346, SeverityNumber: 10, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-0-1")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
LogRecords: []pb.LogRecord{
|
||||||
|
{TimeUnixNano: 2347, SeverityNumber: 12, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-1-0")}},
|
||||||
|
{ObservedTimeUnixNano: 2348, SeverityNumber: 12, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-1-1")}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]int64{1234, 1235, 2345, 2346, 2347, 2348},
|
||||||
|
`{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message","severity":"Trace"}
|
||||||
|
{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message-msg-2","severity":"Debug"}
|
||||||
|
{"_msg":"log-line-resource-scope-1-0-0","severity":"Info2"}
|
||||||
|
{"_msg":"log-line-resource-scope-1-0-1","severity":"Info2"}
|
||||||
|
{"_msg":"log-line-resource-scope-1-1-0","severity":"Info4"}
|
||||||
|
{"_msg":"log-line-resource-scope-1-1-1","severity":"Info4"}`,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ptrTo[T any](s T) *T {
|
||||||
|
return &s
|
||||||
|
}
|
79
app/vlinsert/opentelemetry/opentemetry_timing_test.go
Normal file
79
app/vlinsert/opentelemetry/opentemetry_timing_test.go
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
package opentelemetry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkParseProtobufRequest(b *testing.B) {
|
||||||
|
for _, scopes := range []int{1, 2} {
|
||||||
|
for _, rows := range []int{100, 1000} {
|
||||||
|
for _, attributes := range []int{5, 10} {
|
||||||
|
b.Run(fmt.Sprintf("scopes_%d/rows_%d/attributes_%d", scopes, rows, attributes), func(b *testing.B) {
|
||||||
|
benchmarkParseProtobufRequest(b, scopes, rows, attributes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) {
|
||||||
|
blp := &insertutils.BenchmarkLogMessageProcessor{}
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.SetBytes(int64(streams * rows))
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
body := getProtobufBody(streams, rows, labels)
|
||||||
|
for pb.Next() {
|
||||||
|
_, err := pushProtobufRequest(body, blp)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("unexpected error: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func getProtobufBody(scopesCount, rowsCount, attributesCount int) []byte {
|
||||||
|
msg := "12345678910"
|
||||||
|
|
||||||
|
attrValues := []*pb.AnyValue{
|
||||||
|
{StringValue: ptrTo("string-attribute")},
|
||||||
|
{IntValue: ptrTo[int64](12345)},
|
||||||
|
{DoubleValue: ptrTo(3.14)},
|
||||||
|
}
|
||||||
|
attrs := make([]*pb.KeyValue, attributesCount)
|
||||||
|
for j := 0; j < attributesCount; j++ {
|
||||||
|
attrs[j] = &pb.KeyValue{
|
||||||
|
Key: fmt.Sprintf("key-%d", j),
|
||||||
|
Value: attrValues[j%3],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
entries := make([]pb.LogRecord, rowsCount)
|
||||||
|
for j := 0; j < rowsCount; j++ {
|
||||||
|
entries[j] = pb.LogRecord{
|
||||||
|
TimeUnixNano: 12345678910, ObservedTimeUnixNano: 12345678910, Body: pb.AnyValue{StringValue: &msg},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scopes := make([]pb.ScopeLogs, scopesCount)
|
||||||
|
|
||||||
|
for j := 0; j < scopesCount; j++ {
|
||||||
|
scopes[j] = pb.ScopeLogs{
|
||||||
|
LogRecords: entries,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pr := pb.ExportLogsServiceRequest{
|
||||||
|
ResourceLogs: []pb.ResourceLogs{
|
||||||
|
{
|
||||||
|
Resource: pb.Resource{
|
||||||
|
Attributes: attrs,
|
||||||
|
},
|
||||||
|
ScopeLogs: scopes,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return pr.MarshalProtobuf(nil)
|
||||||
|
}
|
1
deployment/docker/victorialogs/opentelemetry-collector/.gitignore
vendored
Normal file
1
deployment/docker/victorialogs/opentelemetry-collector/.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
**/logs
|
|
@ -0,0 +1,27 @@
|
||||||
|
# Docker compose OpenTelemetry Elasticsearch integration with VictoriaLogs for docker
|
||||||
|
|
||||||
|
The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs
|
||||||
|
|
||||||
|
To spin-up environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose up -d
|
||||||
|
```
|
||||||
|
|
||||||
|
To shut down the docker-compose environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose down
|
||||||
|
docker compose rm -f
|
||||||
|
```
|
||||||
|
|
||||||
|
The docker compose file contains the following components:
|
||||||
|
|
||||||
|
* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics.
|
||||||
|
* VictoriaLogs - the log database, it accepts the data from `collector` by elastic protocol
|
||||||
|
* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics`
|
||||||
|
|
||||||
|
Querying the data
|
||||||
|
|
||||||
|
* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui`
|
||||||
|
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)
|
||||||
|
|
||||||
|
Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance.
|
|
@ -0,0 +1,48 @@
|
||||||
|
services:
|
||||||
|
collector:
|
||||||
|
image: docker.io/otel/opentelemetry-collector-contrib:0.102.1
|
||||||
|
restart: on-failure
|
||||||
|
volumes:
|
||||||
|
- $PWD/logs:/tmp/logs
|
||||||
|
- $PWD/config.yaml:/etc/otelcol-contrib/config.yaml
|
||||||
|
depends_on:
|
||||||
|
victorialogs:
|
||||||
|
condition: service_healthy
|
||||||
|
victoriametrics:
|
||||||
|
condition: service_healthy
|
||||||
|
|
||||||
|
victorialogs:
|
||||||
|
image: docker.io/victoriametrics/victoria-logs:v0.28.0-victorialogs
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vl:/vlogs
|
||||||
|
ports:
|
||||||
|
- '9428:9428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/vlogs
|
||||||
|
- -loggerFormat=json
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
victoriametrics:
|
||||||
|
image: victoriametrics/victoria-metrics:latest
|
||||||
|
ports:
|
||||||
|
- '8428:8428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/vmsingle
|
||||||
|
- -promscrape.config=/promscrape.yml
|
||||||
|
- -loggerFormat=json
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vm:/vmsingle
|
||||||
|
- ./scrape.yml:/promscrape.yml
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
victorialogs-vector-docker-vl:
|
||||||
|
victorialogs-vector-docker-vm:
|
|
@ -0,0 +1,14 @@
|
||||||
|
exporters:
|
||||||
|
elasticsearch:
|
||||||
|
endpoints:
|
||||||
|
- http://victorialogs:9428/insert/elasticsearch
|
||||||
|
receivers:
|
||||||
|
filelog:
|
||||||
|
include: [/tmp/logs/*.log]
|
||||||
|
resource:
|
||||||
|
region: us-east-1
|
||||||
|
service:
|
||||||
|
pipelines:
|
||||||
|
logs:
|
||||||
|
receivers: [filelog]
|
||||||
|
exporters: [elasticsearch]
|
|
@ -0,0 +1,11 @@
|
||||||
|
scrape_configs:
|
||||||
|
- job_name: "victoriametrics"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victoriametrics:8428
|
||||||
|
- job_name: "victorialogs"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victorialogs:9428
|
|
@ -0,0 +1,27 @@
|
||||||
|
# Docker compose OpenTelemetry Loki integration with VictoriaLogs for docker
|
||||||
|
|
||||||
|
The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs
|
||||||
|
|
||||||
|
To spin-up environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose up -d
|
||||||
|
```
|
||||||
|
|
||||||
|
To shut down the docker-compose environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose down
|
||||||
|
docker compose rm -f
|
||||||
|
```
|
||||||
|
|
||||||
|
The docker compose file contains the following components:
|
||||||
|
|
||||||
|
* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics.
|
||||||
|
* VictoriaLogs - the log database, it accepts the data from `collector` by Loki protocol
|
||||||
|
* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics`
|
||||||
|
|
||||||
|
Querying the data
|
||||||
|
|
||||||
|
* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui`
|
||||||
|
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)
|
||||||
|
|
||||||
|
Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance.
|
|
@ -0,0 +1,48 @@
|
||||||
|
services:
|
||||||
|
collector:
|
||||||
|
image: docker.io/otel/opentelemetry-collector-contrib:0.102.1
|
||||||
|
restart: on-failure
|
||||||
|
volumes:
|
||||||
|
- $PWD/logs:/tmp/logs
|
||||||
|
- $PWD/config.yaml:/etc/otelcol-contrib/config.yaml
|
||||||
|
depends_on:
|
||||||
|
victorialogs:
|
||||||
|
condition: service_healthy
|
||||||
|
victoriametrics:
|
||||||
|
condition: service_healthy
|
||||||
|
|
||||||
|
victorialogs:
|
||||||
|
image: docker.io/victoriametrics/victoria-logs:v0.28.0-victorialogs
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vl:/loki
|
||||||
|
ports:
|
||||||
|
- '9428:9428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/loki
|
||||||
|
- -loggerFormat=json
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
victoriametrics:
|
||||||
|
image: victoriametrics/victoria-metrics:latest
|
||||||
|
ports:
|
||||||
|
- '8428:8428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/vmsingle
|
||||||
|
- -promscrape.config=/promscrape.yml
|
||||||
|
- -loggerFormat=json
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vm:/vmsingle
|
||||||
|
- ./scrape.yml:/promscrape.yml
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
victorialogs-vector-docker-vl:
|
||||||
|
victorialogs-vector-docker-vm:
|
|
@ -0,0 +1,13 @@
|
||||||
|
exporters:
|
||||||
|
loki:
|
||||||
|
endpoint: http://victorialogs:9428/insert/loki/api/v1/push
|
||||||
|
receivers:
|
||||||
|
filelog:
|
||||||
|
include: [/tmp/logs/*.log]
|
||||||
|
resource:
|
||||||
|
region: us-east-1
|
||||||
|
service:
|
||||||
|
pipelines:
|
||||||
|
logs:
|
||||||
|
receivers: [filelog]
|
||||||
|
exporters: [loki]
|
|
@ -0,0 +1,11 @@
|
||||||
|
scrape_configs:
|
||||||
|
- job_name: "victoriametrics"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victoriametrics:8428
|
||||||
|
- job_name: "victorialogs"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victorialogs:9428
|
|
@ -0,0 +1,27 @@
|
||||||
|
# Docker compose OpenTelemetry OTLP integration with VictoriaLogs for docker
|
||||||
|
|
||||||
|
The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs
|
||||||
|
|
||||||
|
To spin-up environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose up -d
|
||||||
|
```
|
||||||
|
|
||||||
|
To shut down the docker-compose environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose down
|
||||||
|
docker compose rm -f
|
||||||
|
```
|
||||||
|
|
||||||
|
The docker compose file contains the following components:
|
||||||
|
|
||||||
|
* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics.
|
||||||
|
* VictoriaLogs - the log database, it accepts the data from `collector` by otlp protocol
|
||||||
|
* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics`
|
||||||
|
|
||||||
|
Querying the data
|
||||||
|
|
||||||
|
* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui`
|
||||||
|
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)
|
||||||
|
|
||||||
|
Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance.
|
|
@ -0,0 +1,48 @@
|
||||||
|
services:
|
||||||
|
collector:
|
||||||
|
image: docker.io/otel/opentelemetry-collector-contrib:0.102.1
|
||||||
|
restart: on-failure
|
||||||
|
volumes:
|
||||||
|
- $PWD/logs:/tmp/logs
|
||||||
|
- $PWD/config.yaml:/etc/otelcol-contrib/config.yaml
|
||||||
|
depends_on:
|
||||||
|
victorialogs:
|
||||||
|
condition: service_healthy
|
||||||
|
victoriametrics:
|
||||||
|
condition: service_healthy
|
||||||
|
|
||||||
|
victorialogs:
|
||||||
|
image: docker.io/victoriametrics/victoria-logs:v0.29.0-victorialogs
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vl:/otlp
|
||||||
|
ports:
|
||||||
|
- '9428:9428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/otlp
|
||||||
|
- -loggerFormat=json
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
victoriametrics:
|
||||||
|
image: victoriametrics/victoria-metrics:latest
|
||||||
|
ports:
|
||||||
|
- '8428:8428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/vmsingle
|
||||||
|
- -promscrape.config=/promscrape.yml
|
||||||
|
- -loggerFormat=json
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vm:/vmsingle
|
||||||
|
- ./scrape.yml:/promscrape.yml
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
victorialogs-vector-docker-vl:
|
||||||
|
victorialogs-vector-docker-vm:
|
|
@ -0,0 +1,15 @@
|
||||||
|
exporters:
|
||||||
|
otlphttp:
|
||||||
|
logs_endpoint: http://victorialogs:9428/insert/opentelemetry/v1/logs
|
||||||
|
debug:
|
||||||
|
verbosity: detailed
|
||||||
|
receivers:
|
||||||
|
filelog:
|
||||||
|
include: [/tmp/logs/*.log]
|
||||||
|
resource:
|
||||||
|
region: us-east-1
|
||||||
|
service:
|
||||||
|
pipelines:
|
||||||
|
logs:
|
||||||
|
receivers: [filelog]
|
||||||
|
exporters: [otlphttp, debug]
|
|
@ -0,0 +1,11 @@
|
||||||
|
scrape_configs:
|
||||||
|
- job_name: "victoriametrics"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victoriametrics:8428
|
||||||
|
- job_name: "victorialogs"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victorialogs:9428
|
|
@ -0,0 +1,27 @@
|
||||||
|
# Docker compose OpenTelemetry Syslog integration with VictoriaLogs for docker
|
||||||
|
|
||||||
|
The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs
|
||||||
|
|
||||||
|
To spin-up environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose up -d
|
||||||
|
```
|
||||||
|
|
||||||
|
To shut down the docker-compose environment run the following command:
|
||||||
|
```
|
||||||
|
docker compose down
|
||||||
|
docker compose rm -f
|
||||||
|
```
|
||||||
|
|
||||||
|
The docker compose file contains the following components:
|
||||||
|
|
||||||
|
* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics.
|
||||||
|
* VictoriaLogs - the log database, it accepts the data from `collector` by syslog protocol
|
||||||
|
* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics`
|
||||||
|
|
||||||
|
Querying the data
|
||||||
|
|
||||||
|
* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui`
|
||||||
|
* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line)
|
||||||
|
|
||||||
|
Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance.
|
|
@ -0,0 +1,49 @@
|
||||||
|
services:
|
||||||
|
collector:
|
||||||
|
image: docker.io/otel/opentelemetry-collector-contrib:0.107.0
|
||||||
|
restart: on-failure
|
||||||
|
volumes:
|
||||||
|
- $PWD/logs:/tmp/logs
|
||||||
|
- $PWD/config.yaml:/etc/otelcol-contrib/config.yaml
|
||||||
|
depends_on:
|
||||||
|
victorialogs:
|
||||||
|
condition: service_healthy
|
||||||
|
victoriametrics:
|
||||||
|
condition: service_healthy
|
||||||
|
|
||||||
|
victorialogs:
|
||||||
|
image: docker.io/victoriametrics/victoria-logs:v0.28.0-victorialogs
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vl:/syslog
|
||||||
|
ports:
|
||||||
|
- '9428:9428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/syslog
|
||||||
|
- -syslog.listenAddr.tcp=:5410
|
||||||
|
- -syslog.useLocalTimestamp.tcp
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
victoriametrics:
|
||||||
|
image: victoriametrics/victoria-metrics:latest
|
||||||
|
ports:
|
||||||
|
- '8428:8428'
|
||||||
|
command:
|
||||||
|
- -storageDataPath=/vmsingle
|
||||||
|
- -promscrape.config=/promscrape.yml
|
||||||
|
- -loggerFormat=json
|
||||||
|
volumes:
|
||||||
|
- victorialogs-vector-docker-vm:/vmsingle
|
||||||
|
- ./scrape.yml:/promscrape.yml
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"]
|
||||||
|
interval: 1s
|
||||||
|
timeout: 1s
|
||||||
|
retries: 10
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
victorialogs-vector-docker-vl:
|
||||||
|
victorialogs-vector-docker-vm:
|
|
@ -0,0 +1,24 @@
|
||||||
|
exporters:
|
||||||
|
syslog:
|
||||||
|
network: tcp
|
||||||
|
endpoint: victorialogs
|
||||||
|
port: 5410
|
||||||
|
tls:
|
||||||
|
insecure: true
|
||||||
|
debug:
|
||||||
|
verbosity: detailed
|
||||||
|
processors:
|
||||||
|
transform:
|
||||||
|
log_statements:
|
||||||
|
- context: log
|
||||||
|
statements:
|
||||||
|
- set(attributes["message"], body)
|
||||||
|
receivers:
|
||||||
|
filelog:
|
||||||
|
include: [/tmp/logs/*.log]
|
||||||
|
service:
|
||||||
|
pipelines:
|
||||||
|
logs:
|
||||||
|
receivers: [filelog]
|
||||||
|
exporters: [syslog, debug]
|
||||||
|
processors: [transform]
|
|
@ -0,0 +1,11 @@
|
||||||
|
scrape_configs:
|
||||||
|
- job_name: "victoriametrics"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victoriametrics:8428
|
||||||
|
- job_name: "victorialogs"
|
||||||
|
scrape_interval: 30s
|
||||||
|
static_configs:
|
||||||
|
- targets:
|
||||||
|
- victorialogs:9428
|
|
@ -7,6 +7,7 @@
|
||||||
- Vector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/).
|
- Vector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/).
|
||||||
- Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/).
|
- Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/).
|
||||||
- Telegraf - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/).
|
- Telegraf - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/).
|
||||||
|
- OpenTelemetry Collector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/opentelemetry/).
|
||||||
|
|
||||||
The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/).
|
The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/).
|
||||||
|
|
||||||
|
@ -22,6 +23,7 @@ VictoriaLogs supports the following data ingestion HTTP APIs:
|
||||||
- Elasticsearch bulk API. See [these docs](#elasticsearch-bulk-api).
|
- Elasticsearch bulk API. See [these docs](#elasticsearch-bulk-api).
|
||||||
- JSON stream API aka [ndjson](https://jsonlines.org/). See [these docs](#json-stream-api).
|
- JSON stream API aka [ndjson](https://jsonlines.org/). See [these docs](#json-stream-api).
|
||||||
- Loki JSON API. See [these docs](#loki-json-api).
|
- Loki JSON API. See [these docs](#loki-json-api).
|
||||||
|
- OpenTelemetry API. See [these docs](#opentelemetry-api).
|
||||||
|
|
||||||
VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs.
|
VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs.
|
||||||
|
|
||||||
|
@ -273,13 +275,14 @@ VictoriaLogs exposes various [metrics](https://docs.victoriametrics.com/victoria
|
||||||
|
|
||||||
Here is the list of log collectors and their ingestion formats supported by VictoriaLogs:
|
Here is the list of log collectors and their ingestion formats supported by VictoriaLogs:
|
||||||
|
|
||||||
| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog |
|
| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog | Format: OpenTelemetry |
|
||||||
|----------------------------|-----------------------|---------------------|--------------|----------------|
|
|----------------------------|-----------------------|---------------------|--------------|----------------|-----------------------|
|
||||||
| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) |
|
| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) | No |
|
||||||
| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) |
|
| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) | No |
|
||||||
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No |
|
| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No | No |
|
||||||
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/elasticsearch) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) |
|
| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/opentelemetry) |
|
||||||
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html) | [Yes](https://grafana.com/docs/loki/latest/send-data/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) |
|
| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) | [Yes](https://github.com/paulgrav/logstash-output-opentelemetry) |
|
||||||
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No |
|
| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No | [Yes](https://vector.dev/docs/reference/configuration/sources/opentelemetry/) |
|
||||||
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No |
|
| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No | No |
|
||||||
| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) |
|
| [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter) | No | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/lokiexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/syslogexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter) |
|
||||||
|
| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) | Yes |
|
||||||
|
|
120
docs/VictoriaLogs/data-ingestion/opentelemetry.md
Normal file
120
docs/VictoriaLogs/data-ingestion/opentelemetry.md
Normal file
|
@ -0,0 +1,120 @@
|
||||||
|
---
|
||||||
|
weight: 4
|
||||||
|
title: OpenTelemetry setup
|
||||||
|
disableToc: true
|
||||||
|
menu:
|
||||||
|
docs:
|
||||||
|
parent: "victorialogs-data-ingestion"
|
||||||
|
weight: 4
|
||||||
|
aliases:
|
||||||
|
- /VictoriaLogs/data-ingestion/OpenTelemetry.html
|
||||||
|
---
|
||||||
|
|
||||||
|
|
||||||
|
VictoriaLogs supports both client open-telemetry [SDK](https://opentelemetry.io/docs/languages/) and [collector](https://opentelemetry.io/docs/collector/).
|
||||||
|
|
||||||
|
## Client SDK
|
||||||
|
|
||||||
|
Specify `EndpointURL` for http-exporter builder.
|
||||||
|
|
||||||
|
Consider the following example for `golang` `SDK`:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Create the OTLP log exporter that sends logs to configured destination
|
||||||
|
logExporter, err := otlploghttp.New(ctx,
|
||||||
|
otlploghttp.WithEndpointURL("http://victorialogs:9428/insert/opentelemetry/v1/logs"),
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
Optionally, [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) could be defined via headers:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Create the OTLP log exporter that sends logs to configured destination
|
||||||
|
logExporter, err := otlploghttp.New(ctx,
|
||||||
|
otlploghttp.WithEndpointURL("http://victorialogs:9428/insert/opentelemetry/v1/logs"),
|
||||||
|
otlploghttp.WithHeaders(map[string]string{"VL-Stream-Fields": "telemetry.sdk.language,severity"}),
|
||||||
|
)
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
Given config defines 2 stream fields - `severity` and `telemetry.sdk.language`.
|
||||||
|
|
||||||
|
See also [HTTP headers](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-headers)
|
||||||
|
|
||||||
|
## Collector configuration
|
||||||
|
|
||||||
|
VictoriaLogs supports given below OpenTelemetry collector exporters:
|
||||||
|
|
||||||
|
* [Elasticsearch](#elasticsearch)
|
||||||
|
* [Loki](#loki)
|
||||||
|
* [OpenTelemetry](#opentelemetry)
|
||||||
|
|
||||||
|
### Elasticsearch
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
exporters:
|
||||||
|
elasticsearch:
|
||||||
|
endpoints:
|
||||||
|
- http://victorialogs:9428/insert/elasticsearch
|
||||||
|
receivers:
|
||||||
|
filelog:
|
||||||
|
include: [/tmp/logs/*.log]
|
||||||
|
resource:
|
||||||
|
region: us-east-1
|
||||||
|
service:
|
||||||
|
pipelines:
|
||||||
|
logs:
|
||||||
|
receivers: [filelog]
|
||||||
|
exporters: [elasticsearch]
|
||||||
|
```
|
||||||
|
|
||||||
|
### Loki
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
exporters:
|
||||||
|
loki:
|
||||||
|
endpoint: http://victorialogs:9428/insert/loki/api/v1/push
|
||||||
|
receivers:
|
||||||
|
filelog:
|
||||||
|
include: [/tmp/logs/*.log]
|
||||||
|
resource:
|
||||||
|
region: us-east-1
|
||||||
|
service:
|
||||||
|
pipelines:
|
||||||
|
logs:
|
||||||
|
receivers: [filelog]
|
||||||
|
exporters: [loki]
|
||||||
|
```
|
||||||
|
|
||||||
|
### OpenTelemetry
|
||||||
|
|
||||||
|
Specify logs endpoint for [OTLP/HTTP exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlphttpexporter/README.md) in configuration file
|
||||||
|
for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/):
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
exporters:
|
||||||
|
otlphttp:
|
||||||
|
logs_endpoint: http://localhost:9428/insert/opentelemetry/v1/logs
|
||||||
|
```
|
||||||
|
|
||||||
|
Optionally, [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) could be defined via headers:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
exporters:
|
||||||
|
otlphttp:
|
||||||
|
logs_endpoint: http://localhost:9428/insert/opentelemetry/v1/logs
|
||||||
|
headers:
|
||||||
|
VL-Stream-Fields: telemetry.sdk.language,severity
|
||||||
|
```
|
||||||
|
|
||||||
|
See also [HTTP headers](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-headers)
|
||||||
|
|
||||||
|
Substitute `localhost:9428` address inside `exporters.oltphttp.logs_endpoint` with the real address of VictoriaLogs.
|
||||||
|
|
||||||
|
The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/).
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
* [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting).
|
||||||
|
* [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/).
|
||||||
|
* [Docker-compose demo for OpenTelemetry collector integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/opentelemetry-collector).
|
|
@ -202,6 +202,7 @@ Released at 2024-06-07
|
||||||
* SECURITY: upgrade base docker image (Alpine) from 3.19.1 to 3.20.0. See [alpine 3.20.0 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.0-released.html).
|
* SECURITY: upgrade base docker image (Alpine) from 3.19.1 to 3.20.0. See [alpine 3.20.0 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.0-released.html).
|
||||||
* SECURITY: add release images built from scratch image. Such images could be more preferable for using in environments with higher security standards. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6386).
|
* SECURITY: add release images built from scratch image. Such images could be more preferable for using in environments with higher security standards. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6386).
|
||||||
|
|
||||||
|
* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/): added OpenTelemetry logs ingestion support.
|
||||||
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details.
|
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details.
|
||||||
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): properly display version in the Stats row for the custom builds of VictoriaMetrics.
|
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): properly display version in the Stats row for the custom builds of VictoriaMetrics.
|
||||||
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): add `Network Usage` panel to `Resource Usage` row.
|
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): add `Network Usage` panel to `Resource Usage` row.
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package filestream
|
package filestream
|
||||||
|
|
||||||
func (st *streamTracker) adviseDontNeed(n int, fdatasync bool) error {
|
func (st *streamTracker) adviseDontNeed(_ int, _ bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
func fadviseSequentialRead(_ *os.File, _ bool) error {
|
||||||
// TODO: implement this properly
|
// TODO: implement this properly
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,12 +3,13 @@ package firehose
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProcessRequestBody(t *testing.T) {
|
func TestProcessRequestBody(t *testing.T) {
|
||||||
|
|
275
lib/protoparser/opentelemetry/pb/common.go
Normal file
275
lib/protoparser/opentelemetry/pb/common.go
Normal file
|
@ -0,0 +1,275 @@
|
||||||
|
package pb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/easyproto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Resource represents the corresponding OTEL protobuf message
|
||||||
|
type Resource struct {
|
||||||
|
Attributes []*KeyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// marshalProtobuf marshals
|
||||||
|
func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
for _, a := range r.Attributes {
|
||||||
|
a.marshalProtobuf(mm.AppendMessage(1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshalProtobuf unmarshals r from protobuf message at src.
|
||||||
|
func (r *Resource) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message Resource {
|
||||||
|
// repeated KeyValue attributes = 1;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in Resource: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Attribute data")
|
||||||
|
}
|
||||||
|
r.Attributes = append(r.Attributes, &KeyValue{})
|
||||||
|
a := r.Attributes[len(r.Attributes)-1]
|
||||||
|
if err := a.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal Attribute: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyValue represents the corresponding OTEL protobuf message
|
||||||
|
type KeyValue struct {
|
||||||
|
Key string
|
||||||
|
Value *AnyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kv *KeyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
mm.AppendString(1, kv.Key)
|
||||||
|
if kv.Value != nil {
|
||||||
|
kv.Value.marshalProtobuf(mm.AppendMessage(2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshalProtobuf unmarshals kv from protobuf message at src.
|
||||||
|
func (kv *KeyValue) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message KeyValue {
|
||||||
|
// string key = 1;
|
||||||
|
// AnyValue value = 2;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in KeyValue: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
key, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Key")
|
||||||
|
}
|
||||||
|
kv.Key = strings.Clone(key)
|
||||||
|
case 2:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Value")
|
||||||
|
}
|
||||||
|
kv.Value = &AnyValue{}
|
||||||
|
if err := kv.Value.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal Value: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AnyValue represents the corresponding OTEL protobuf message
|
||||||
|
type AnyValue struct {
|
||||||
|
StringValue *string
|
||||||
|
BoolValue *bool
|
||||||
|
IntValue *int64
|
||||||
|
DoubleValue *float64
|
||||||
|
ArrayValue *ArrayValue
|
||||||
|
KeyValueList *KeyValueList
|
||||||
|
BytesValue *[]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (av *AnyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
switch {
|
||||||
|
case av.StringValue != nil:
|
||||||
|
mm.AppendString(1, *av.StringValue)
|
||||||
|
case av.BoolValue != nil:
|
||||||
|
mm.AppendBool(2, *av.BoolValue)
|
||||||
|
case av.IntValue != nil:
|
||||||
|
mm.AppendInt64(3, *av.IntValue)
|
||||||
|
case av.DoubleValue != nil:
|
||||||
|
mm.AppendDouble(4, *av.DoubleValue)
|
||||||
|
case av.ArrayValue != nil:
|
||||||
|
av.ArrayValue.marshalProtobuf(mm.AppendMessage(5))
|
||||||
|
case av.KeyValueList != nil:
|
||||||
|
av.KeyValueList.marshalProtobuf(mm.AppendMessage(6))
|
||||||
|
case av.BytesValue != nil:
|
||||||
|
mm.AppendBytes(7, *av.BytesValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshalProtobuf unmarshals av from protobuf message at src.
|
||||||
|
func (av *AnyValue) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message AnyValue {
|
||||||
|
// oneof value {
|
||||||
|
// string string_value = 1;
|
||||||
|
// bool bool_value = 2;
|
||||||
|
// int64 int_value = 3;
|
||||||
|
// double double_value = 4;
|
||||||
|
// ArrayValue array_value = 5;
|
||||||
|
// KeyValueList kvlist_value = 6;
|
||||||
|
// bytes bytes_value = 7;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in AnyValue")
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
stringValue, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read StringValue")
|
||||||
|
}
|
||||||
|
stringValue = strings.Clone(stringValue)
|
||||||
|
av.StringValue = &stringValue
|
||||||
|
case 2:
|
||||||
|
boolValue, ok := fc.Bool()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read BoolValue")
|
||||||
|
}
|
||||||
|
av.BoolValue = &boolValue
|
||||||
|
case 3:
|
||||||
|
intValue, ok := fc.Int64()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read IntValue")
|
||||||
|
}
|
||||||
|
av.IntValue = &intValue
|
||||||
|
case 4:
|
||||||
|
doubleValue, ok := fc.Double()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read DoubleValue")
|
||||||
|
}
|
||||||
|
av.DoubleValue = &doubleValue
|
||||||
|
case 5:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read ArrayValue")
|
||||||
|
}
|
||||||
|
av.ArrayValue = &ArrayValue{}
|
||||||
|
if err := av.ArrayValue.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal ArrayValue: %w", err)
|
||||||
|
}
|
||||||
|
case 6:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read KeyValueList")
|
||||||
|
}
|
||||||
|
av.KeyValueList = &KeyValueList{}
|
||||||
|
if err := av.KeyValueList.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal KeyValueList: %w", err)
|
||||||
|
}
|
||||||
|
case 7:
|
||||||
|
bytesValue, ok := fc.Bytes()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read BytesValue")
|
||||||
|
}
|
||||||
|
bytesValue = bytes.Clone(bytesValue)
|
||||||
|
av.BytesValue = &bytesValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ArrayValue represents the corresponding OTEL protobuf message
|
||||||
|
type ArrayValue struct {
|
||||||
|
Values []*AnyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (av *ArrayValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
for _, v := range av.Values {
|
||||||
|
v.marshalProtobuf(mm.AppendMessage(1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshalProtobuf unmarshals av from protobuf message at src.
|
||||||
|
func (av *ArrayValue) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message ArrayValue {
|
||||||
|
// repeated AnyValue values = 1;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in ArrayValue")
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Value data")
|
||||||
|
}
|
||||||
|
av.Values = append(av.Values, &AnyValue{})
|
||||||
|
v := av.Values[len(av.Values)-1]
|
||||||
|
if err := v.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal Value: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyValueList represents the corresponding OTEL protobuf message
|
||||||
|
type KeyValueList struct {
|
||||||
|
Values []*KeyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kvl *KeyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
for _, v := range kvl.Values {
|
||||||
|
v.marshalProtobuf(mm.AppendMessage(1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshalProtobuf unmarshals kvl from protobuf message at src.
|
||||||
|
func (kvl *KeyValueList) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message KeyValueList {
|
||||||
|
// repeated KeyValue values = 1;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in KeyValueList")
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Value data")
|
||||||
|
}
|
||||||
|
kvl.Values = append(kvl.Values, &KeyValue{})
|
||||||
|
v := kvl.Values[len(kvl.Values)-1]
|
||||||
|
if err := v.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal Value: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -10,6 +10,9 @@ import (
|
||||||
|
|
||||||
// FormatString returns string reperesentation for av.
|
// FormatString returns string reperesentation for av.
|
||||||
func (av *AnyValue) FormatString() string {
|
func (av *AnyValue) FormatString() string {
|
||||||
|
if av == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
switch {
|
switch {
|
||||||
case av.StringValue != nil:
|
case av.StringValue != nil:
|
||||||
return *av.StringValue
|
return *av.StringValue
|
||||||
|
|
294
lib/protoparser/opentelemetry/pb/logs.go
Normal file
294
lib/protoparser/opentelemetry/pb/logs.go
Normal file
|
@ -0,0 +1,294 @@
|
||||||
|
package pb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/easyproto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ExportLogsServiceRequest represents the corresponding OTEL protobuf message
|
||||||
|
type ExportLogsServiceRequest struct {
|
||||||
|
ResourceLogs []ResourceLogs
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalProtobuf marshals r to protobuf message, appends it to dst and returns the result.
|
||||||
|
func (r *ExportLogsServiceRequest) MarshalProtobuf(dst []byte) []byte {
|
||||||
|
m := mp.Get()
|
||||||
|
r.marshalProtobuf(m.MessageMarshaler())
|
||||||
|
dst = m.Marshal(dst)
|
||||||
|
mp.Put(m)
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ExportLogsServiceRequest) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
for _, rm := range r.ResourceLogs {
|
||||||
|
rm.marshalProtobuf(mm.AppendMessage(1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalProtobuf unmarshals r from protobuf message at src.
|
||||||
|
func (r *ExportLogsServiceRequest) UnmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message ExportLogsServiceRequest {
|
||||||
|
// repeated ResourceLogs resource_metrics = 1;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in ExportLogsServiceRequest: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read ResourceLogs data")
|
||||||
|
}
|
||||||
|
var rl ResourceLogs
|
||||||
|
|
||||||
|
if err := rl.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal ResourceLogs: %w", err)
|
||||||
|
}
|
||||||
|
r.ResourceLogs = append(r.ResourceLogs, rl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResourceLogs represents the corresponding OTEL protobuf message
|
||||||
|
type ResourceLogs struct {
|
||||||
|
Resource Resource `json:"resource"`
|
||||||
|
ScopeLogs []ScopeLogs `json:"scopeLogs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rl *ResourceLogs) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
rl.Resource.marshalProtobuf(mm.AppendMessage(1))
|
||||||
|
for _, sm := range rl.ScopeLogs {
|
||||||
|
sm.marshalProtobuf(mm.AppendMessage(2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rl *ResourceLogs) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message ResourceLogs {
|
||||||
|
// Resource resource = 1;
|
||||||
|
// repeated ScopeLogs scope_logs = 2;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in ResourceLogs: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Resource data")
|
||||||
|
}
|
||||||
|
if err := rl.Resource.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot umarshal Resource: %w", err)
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read ScopeLogs data")
|
||||||
|
}
|
||||||
|
var sl ScopeLogs
|
||||||
|
if err := sl.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal ScopeLogs: %w", err)
|
||||||
|
}
|
||||||
|
rl.ScopeLogs = append(rl.ScopeLogs, sl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ScopeLogs represents the corresponding OTEL protobuf message
|
||||||
|
type ScopeLogs struct {
|
||||||
|
LogRecords []LogRecord
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sl *ScopeLogs) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
for _, m := range sl.LogRecords {
|
||||||
|
m.marshalProtobuf(mm.AppendMessage(2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sl *ScopeLogs) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message ScopeLogs {
|
||||||
|
// repeated LogRecord log_records = 2;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in ScopeLogs: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 2:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read LogRecord data")
|
||||||
|
}
|
||||||
|
var lr LogRecord
|
||||||
|
if err := lr.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal LogRecord: %w", err)
|
||||||
|
}
|
||||||
|
sl.LogRecords = append(sl.LogRecords, lr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogRecord represents the corresponding OTEL protobuf message
|
||||||
|
// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md
|
||||||
|
type LogRecord struct {
|
||||||
|
// time_unix_nano is the time when the event occurred.
|
||||||
|
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
|
||||||
|
// Value of 0 indicates unknown or missing timestamp.
|
||||||
|
TimeUnixNano uint64
|
||||||
|
// Time when the event was observed by the collection system.
|
||||||
|
// For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK)
|
||||||
|
// this timestamp is typically set at the generation time and is equal to Timestamp.
|
||||||
|
// For events originating externally and collected by OpenTelemetry (e.g. using
|
||||||
|
// Collector) this is the time when OpenTelemetry's code observed the event measured
|
||||||
|
// by the clock of the OpenTelemetry code. This field MUST be set once the event is
|
||||||
|
// observed by OpenTelemetry.
|
||||||
|
//
|
||||||
|
// For converting OpenTelemetry log data to formats that support only one timestamp or
|
||||||
|
// when receiving OpenTelemetry log data by recipients that support only one timestamp
|
||||||
|
// internally the following logic is recommended:
|
||||||
|
// - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano.
|
||||||
|
//
|
||||||
|
// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
|
||||||
|
// Value of 0 indicates unknown or missing timestamp.
|
||||||
|
ObservedTimeUnixNano uint64
|
||||||
|
// Numerical value of the severity, normalized to values described in Log Data Model.
|
||||||
|
SeverityNumber int32
|
||||||
|
SeverityText string
|
||||||
|
Body AnyValue
|
||||||
|
Attributes []*KeyValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *LogRecord) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
||||||
|
mm.AppendFixed64(1, lr.TimeUnixNano)
|
||||||
|
mm.AppendInt32(2, lr.SeverityNumber)
|
||||||
|
mm.AppendString(3, lr.SeverityText)
|
||||||
|
lr.Body.marshalProtobuf(mm.AppendMessage(5))
|
||||||
|
for _, a := range lr.Attributes {
|
||||||
|
a.marshalProtobuf(mm.AppendMessage(6))
|
||||||
|
}
|
||||||
|
mm.AppendFixed64(11, lr.ObservedTimeUnixNano)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lr *LogRecord) unmarshalProtobuf(src []byte) (err error) {
|
||||||
|
// message LogRecord {
|
||||||
|
// fixed64 time_unix_nano = 1;
|
||||||
|
// fixed64 observed_time_unix_nano = 11;
|
||||||
|
// SeverityNumber severity_number = 2;
|
||||||
|
// string severity_text = 3;
|
||||||
|
// AnyValue body = 5;
|
||||||
|
// repeated KeyValue attributes = 6;
|
||||||
|
// }
|
||||||
|
var fc easyproto.FieldContext
|
||||||
|
for len(src) > 0 {
|
||||||
|
src, err = fc.NextField(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot read next field in LogRecord: %w", err)
|
||||||
|
}
|
||||||
|
switch fc.FieldNum {
|
||||||
|
case 1:
|
||||||
|
ts, ok := fc.Fixed64()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read log record timestamp")
|
||||||
|
}
|
||||||
|
lr.TimeUnixNano = ts
|
||||||
|
case 11:
|
||||||
|
ts, ok := fc.Fixed64()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read log record observed timestamp")
|
||||||
|
}
|
||||||
|
lr.ObservedTimeUnixNano = ts
|
||||||
|
case 2:
|
||||||
|
severityNumber, ok := fc.Int32()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read severity number")
|
||||||
|
}
|
||||||
|
lr.SeverityNumber = severityNumber
|
||||||
|
case 3:
|
||||||
|
severityText, ok := fc.String()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read severity string")
|
||||||
|
}
|
||||||
|
lr.SeverityText = strings.Clone(severityText)
|
||||||
|
case 5:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read Body")
|
||||||
|
}
|
||||||
|
if err := lr.Body.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal Body: %w", err)
|
||||||
|
}
|
||||||
|
case 6:
|
||||||
|
data, ok := fc.MessageData()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot read attributes data")
|
||||||
|
}
|
||||||
|
lr.Attributes = append(lr.Attributes, &KeyValue{})
|
||||||
|
a := lr.Attributes[len(lr.Attributes)-1]
|
||||||
|
if err := a.unmarshalProtobuf(data); err != nil {
|
||||||
|
return fmt.Errorf("cannot unmarshal Attribute: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FormatSeverity returns normalized severity for log record
|
||||||
|
func (lr *LogRecord) FormatSeverity() string {
|
||||||
|
if lr.SeverityText != "" {
|
||||||
|
return lr.SeverityText
|
||||||
|
}
|
||||||
|
if lr.SeverityNumber > int32(len(logSeverities)-1) {
|
||||||
|
return logSeverities[0]
|
||||||
|
}
|
||||||
|
return logSeverities[lr.SeverityNumber]
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExtractTimestampNano returns timestamp for log record
|
||||||
|
func (lr *LogRecord) ExtractTimestampNano() int64 {
|
||||||
|
switch {
|
||||||
|
case lr.TimeUnixNano > 0:
|
||||||
|
return int64(lr.TimeUnixNano)
|
||||||
|
case lr.ObservedTimeUnixNano > 0:
|
||||||
|
return int64(lr.ObservedTimeUnixNano)
|
||||||
|
default:
|
||||||
|
return time.Now().UnixNano()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// https://github.com/open-telemetry/opentelemetry-collector/blob/cd1f7623fe67240e32e74735488c3db111fad47b/pdata/plog/severity_number.go#L41
|
||||||
|
var logSeverities = []string{
|
||||||
|
"Unspecified",
|
||||||
|
"Trace",
|
||||||
|
"Trace2",
|
||||||
|
"Trace3",
|
||||||
|
"Trace4",
|
||||||
|
"Debug",
|
||||||
|
"Debug2",
|
||||||
|
"Debug3",
|
||||||
|
"Debug4",
|
||||||
|
"Info",
|
||||||
|
"Info2",
|
||||||
|
"Info3",
|
||||||
|
"Info4",
|
||||||
|
"Error",
|
||||||
|
"Error2",
|
||||||
|
"Error3",
|
||||||
|
"Error4",
|
||||||
|
"Fatal",
|
||||||
|
"Fatal2",
|
||||||
|
"Fatal3",
|
||||||
|
"Fatal4",
|
||||||
|
}
|
|
@ -1,7 +1,6 @@
|
||||||
package pb
|
package pb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -113,43 +112,6 @@ func (rm *ResourceMetrics) unmarshalProtobuf(src []byte) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resource represents the corresponding OTEL protobuf message
|
|
||||||
type Resource struct {
|
|
||||||
Attributes []*KeyValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
|
||||||
for _, a := range r.Attributes {
|
|
||||||
a.marshalProtobuf(mm.AppendMessage(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Resource) unmarshalProtobuf(src []byte) (err error) {
|
|
||||||
// message Resource {
|
|
||||||
// repeated KeyValue attributes = 1;
|
|
||||||
// }
|
|
||||||
var fc easyproto.FieldContext
|
|
||||||
for len(src) > 0 {
|
|
||||||
src, err = fc.NextField(src)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read next field in Resource: %w", err)
|
|
||||||
}
|
|
||||||
switch fc.FieldNum {
|
|
||||||
case 1:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read Attribute data")
|
|
||||||
}
|
|
||||||
r.Attributes = append(r.Attributes, &KeyValue{})
|
|
||||||
a := r.Attributes[len(r.Attributes)-1]
|
|
||||||
if err := a.unmarshalProtobuf(data); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal Attribute: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ScopeMetrics represents the corresponding OTEL protobuf message
|
// ScopeMetrics represents the corresponding OTEL protobuf message
|
||||||
type ScopeMetrics struct {
|
type ScopeMetrics struct {
|
||||||
Metrics []*Metric
|
Metrics []*Metric
|
||||||
|
@ -283,229 +245,6 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KeyValue represents the corresponding OTEL protobuf message
|
|
||||||
type KeyValue struct {
|
|
||||||
Key string
|
|
||||||
Value *AnyValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kv *KeyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
|
||||||
mm.AppendString(1, kv.Key)
|
|
||||||
if kv.Value != nil {
|
|
||||||
kv.Value.marshalProtobuf(mm.AppendMessage(2))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kv *KeyValue) unmarshalProtobuf(src []byte) (err error) {
|
|
||||||
// message KeyValue {
|
|
||||||
// string key = 1;
|
|
||||||
// AnyValue value = 2;
|
|
||||||
// }
|
|
||||||
var fc easyproto.FieldContext
|
|
||||||
for len(src) > 0 {
|
|
||||||
src, err = fc.NextField(src)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read next field in KeyValue: %w", err)
|
|
||||||
}
|
|
||||||
switch fc.FieldNum {
|
|
||||||
case 1:
|
|
||||||
key, ok := fc.String()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read Key")
|
|
||||||
}
|
|
||||||
kv.Key = strings.Clone(key)
|
|
||||||
case 2:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read Value")
|
|
||||||
}
|
|
||||||
kv.Value = &AnyValue{}
|
|
||||||
if err := kv.Value.unmarshalProtobuf(data); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal Value: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AnyValue represents the corresponding OTEL protobuf message
|
|
||||||
type AnyValue struct {
|
|
||||||
StringValue *string
|
|
||||||
BoolValue *bool
|
|
||||||
IntValue *int64
|
|
||||||
DoubleValue *float64
|
|
||||||
ArrayValue *ArrayValue
|
|
||||||
KeyValueList *KeyValueList
|
|
||||||
BytesValue *[]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (av *AnyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
|
||||||
switch {
|
|
||||||
case av.StringValue != nil:
|
|
||||||
mm.AppendString(1, *av.StringValue)
|
|
||||||
case av.BoolValue != nil:
|
|
||||||
mm.AppendBool(2, *av.BoolValue)
|
|
||||||
case av.IntValue != nil:
|
|
||||||
mm.AppendInt64(3, *av.IntValue)
|
|
||||||
case av.DoubleValue != nil:
|
|
||||||
mm.AppendDouble(4, *av.DoubleValue)
|
|
||||||
case av.ArrayValue != nil:
|
|
||||||
av.ArrayValue.marshalProtobuf(mm.AppendMessage(5))
|
|
||||||
case av.KeyValueList != nil:
|
|
||||||
av.KeyValueList.marshalProtobuf(mm.AppendMessage(6))
|
|
||||||
case av.BytesValue != nil:
|
|
||||||
mm.AppendBytes(7, *av.BytesValue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (av *AnyValue) unmarshalProtobuf(src []byte) (err error) {
|
|
||||||
// message AnyValue {
|
|
||||||
// oneof value {
|
|
||||||
// string string_value = 1;
|
|
||||||
// bool bool_value = 2;
|
|
||||||
// int64 int_value = 3;
|
|
||||||
// double double_value = 4;
|
|
||||||
// ArrayValue array_value = 5;
|
|
||||||
// KeyValueList kvlist_value = 6;
|
|
||||||
// bytes bytes_value = 7;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
var fc easyproto.FieldContext
|
|
||||||
for len(src) > 0 {
|
|
||||||
src, err = fc.NextField(src)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read next field in AnyValue")
|
|
||||||
}
|
|
||||||
switch fc.FieldNum {
|
|
||||||
case 1:
|
|
||||||
stringValue, ok := fc.String()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read StringValue")
|
|
||||||
}
|
|
||||||
stringValue = strings.Clone(stringValue)
|
|
||||||
av.StringValue = &stringValue
|
|
||||||
case 2:
|
|
||||||
boolValue, ok := fc.Bool()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read BoolValue")
|
|
||||||
}
|
|
||||||
av.BoolValue = &boolValue
|
|
||||||
case 3:
|
|
||||||
intValue, ok := fc.Int64()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read IntValue")
|
|
||||||
}
|
|
||||||
av.IntValue = &intValue
|
|
||||||
case 4:
|
|
||||||
doubleValue, ok := fc.Double()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read DoubleValue")
|
|
||||||
}
|
|
||||||
av.DoubleValue = &doubleValue
|
|
||||||
case 5:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read ArrayValue")
|
|
||||||
}
|
|
||||||
av.ArrayValue = &ArrayValue{}
|
|
||||||
if err := av.ArrayValue.unmarshalProtobuf(data); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal ArrayValue: %w", err)
|
|
||||||
}
|
|
||||||
case 6:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read KeyValueList")
|
|
||||||
}
|
|
||||||
av.KeyValueList = &KeyValueList{}
|
|
||||||
if err := av.KeyValueList.unmarshalProtobuf(data); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal KeyValueList: %w", err)
|
|
||||||
}
|
|
||||||
case 7:
|
|
||||||
bytesValue, ok := fc.Bytes()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read BytesValue")
|
|
||||||
}
|
|
||||||
bytesValue = bytes.Clone(bytesValue)
|
|
||||||
av.BytesValue = &bytesValue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ArrayValue represents the corresponding OTEL protobuf message
|
|
||||||
type ArrayValue struct {
|
|
||||||
Values []*AnyValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (av *ArrayValue) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
|
||||||
for _, v := range av.Values {
|
|
||||||
v.marshalProtobuf(mm.AppendMessage(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (av *ArrayValue) unmarshalProtobuf(src []byte) (err error) {
|
|
||||||
// message ArrayValue {
|
|
||||||
// repeated AnyValue values = 1;
|
|
||||||
// }
|
|
||||||
var fc easyproto.FieldContext
|
|
||||||
for len(src) > 0 {
|
|
||||||
src, err = fc.NextField(src)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read next field in ArrayValue")
|
|
||||||
}
|
|
||||||
switch fc.FieldNum {
|
|
||||||
case 1:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read Value data")
|
|
||||||
}
|
|
||||||
av.Values = append(av.Values, &AnyValue{})
|
|
||||||
v := av.Values[len(av.Values)-1]
|
|
||||||
if err := v.unmarshalProtobuf(data); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal Value: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyValueList represents the corresponding OTEL protobuf message
|
|
||||||
type KeyValueList struct {
|
|
||||||
Values []*KeyValue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kvl *KeyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) {
|
|
||||||
for _, v := range kvl.Values {
|
|
||||||
v.marshalProtobuf(mm.AppendMessage(1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kvl *KeyValueList) unmarshalProtobuf(src []byte) (err error) {
|
|
||||||
// message KeyValueList {
|
|
||||||
// repeated KeyValue values = 1;
|
|
||||||
// }
|
|
||||||
var fc easyproto.FieldContext
|
|
||||||
for len(src) > 0 {
|
|
||||||
src, err = fc.NextField(src)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cannot read next field in KeyValueList")
|
|
||||||
}
|
|
||||||
switch fc.FieldNum {
|
|
||||||
case 1:
|
|
||||||
data, ok := fc.MessageData()
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("cannot read Value data")
|
|
||||||
}
|
|
||||||
kvl.Values = append(kvl.Values, &KeyValue{})
|
|
||||||
v := kvl.Values[len(kvl.Values)-1]
|
|
||||||
if err := v.unmarshalProtobuf(data); err != nil {
|
|
||||||
return fmt.Errorf("cannot unmarshal Value: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gauge represents the corresponding OTEL protobuf message
|
// Gauge represents the corresponding OTEL protobuf message
|
||||||
type Gauge struct {
|
type Gauge struct {
|
||||||
DataPoints []*NumberDataPoint
|
DataPoints []*NumberDataPoint
|
|
@ -248,7 +248,6 @@ func generateGauge(name, unit string) *pb.Metric {
|
||||||
func generateHistogram(name, unit string) *pb.Metric {
|
func generateHistogram(name, unit string) *pb.Metric {
|
||||||
points := []*pb.HistogramDataPoint{
|
points := []*pb.HistogramDataPoint{
|
||||||
{
|
{
|
||||||
|
|
||||||
Attributes: attributesFromKV("label2", "value2"),
|
Attributes: attributesFromKV("label2", "value2"),
|
||||||
Count: 15,
|
Count: 15,
|
||||||
Sum: func() *float64 { v := 30.0; return &v }(),
|
Sum: func() *float64 { v := 30.0; return &v }(),
|
||||||
|
|
|
@ -32,5 +32,4 @@ func BenchmarkParseStream(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue