From 01430a155c79a9306596b48e745e100d4ae98dca Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Tue, 3 Sep 2024 21:12:05 +0300 Subject: [PATCH] vlinsert: added opentelemetry logs support Commit adds the following changes: * Adds support of OpenTelemetry logs for Victoria Logs with protobuf encoded messages * json encoding is not supported for the following reasons: - It brings a lot of fragile code, which works inefficiently. - json encoding is impossible to use with language SDK. * splits metrics and logs structures at lib/protoparser/opentelemetry/pb package. * adds docs with examples for opentelemetry logs. --- Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4839 Co-authored-by: AndrewChubatiuk Co-authored-by: f41gh7 --- app/vlinsert/main.go | 4 + app/vlinsert/opentelemetry/opentelemetry.go | 143 +++++++++ .../opentelemetry/opentelemetry_test.go | 126 ++++++++ .../opentelemetry/opentemetry_timing_test.go | 79 +++++ .../opentelemetry-collector/.gitignore | 1 + .../elasticsearch/README.md | 27 ++ .../elasticsearch/compose.yml | 48 +++ .../elasticsearch/config.yaml | 14 + .../elasticsearch/scrape.yml | 11 + .../opentelemetry-collector/loki/README.md | 27 ++ .../opentelemetry-collector/loki/compose.yml | 48 +++ .../opentelemetry-collector/loki/config.yaml | 13 + .../opentelemetry-collector/loki/scrape.yml | 11 + .../opentelemetry-collector/otlp/README.md | 27 ++ .../opentelemetry-collector/otlp/compose.yml | 48 +++ .../opentelemetry-collector/otlp/config.yaml | 15 + .../opentelemetry-collector/otlp/scrape.yml | 11 + .../opentelemetry-collector/syslog/README.md | 27 ++ .../syslog/compose.yml | 49 +++ .../syslog/config.yaml | 24 ++ .../opentelemetry-collector/syslog/scrape.yml | 11 + docs/VictoriaLogs/data-ingestion/README.md | 23 +- .../data-ingestion/opentelemetry.md | 120 +++++++ docs/changelog/CHANGELOG.md | 1 + lib/filestream/filestream_darwin.go | 2 +- lib/fs/fadvise_darwin.go | 2 +- .../opentelemetry/firehose/parser_test.go | 7 +- lib/protoparser/opentelemetry/pb/common.go | 275 ++++++++++++++++ lib/protoparser/opentelemetry/pb/helpers.go | 3 + lib/protoparser/opentelemetry/pb/logs.go | 294 ++++++++++++++++++ .../opentelemetry/pb/{pb.go => metrics.go} | 261 ---------------- .../opentelemetry/stream/streamparser_test.go | 1 - .../stream/streamparser_timing_test.go | 1 - 33 files changed, 1476 insertions(+), 278 deletions(-) create mode 100644 app/vlinsert/opentelemetry/opentelemetry.go create mode 100644 app/vlinsert/opentelemetry/opentelemetry_test.go create mode 100644 app/vlinsert/opentelemetry/opentemetry_timing_test.go create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/.gitignore create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/README.md create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/compose.yml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/config.yaml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/scrape.yml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/loki/README.md create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/loki/compose.yml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/loki/config.yaml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/loki/scrape.yml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/otlp/README.md create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/otlp/compose.yml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/otlp/config.yaml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/otlp/scrape.yml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/syslog/README.md create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/syslog/compose.yml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/syslog/config.yaml create mode 100644 deployment/docker/victorialogs/opentelemetry-collector/syslog/scrape.yml create mode 100644 docs/VictoriaLogs/data-ingestion/opentelemetry.md create mode 100644 lib/protoparser/opentelemetry/pb/common.go create mode 100644 lib/protoparser/opentelemetry/pb/logs.go rename lib/protoparser/opentelemetry/pb/{pb.go => metrics.go} (74%) diff --git a/app/vlinsert/main.go b/app/vlinsert/main.go index ac9a727d8..a5c01fb98 100644 --- a/app/vlinsert/main.go +++ b/app/vlinsert/main.go @@ -7,6 +7,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/elasticsearch" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/jsonline" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/loki" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/opentelemetry" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/syslog" ) @@ -41,6 +42,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { case strings.HasPrefix(path, "/loki/"): path = strings.TrimPrefix(path, "/loki") return loki.RequestHandler(path, w, r) + case strings.HasPrefix(path, "/opentelemetry/"): + path = strings.TrimPrefix(path, "/opentelemetry") + return opentelemetry.RequestHandler(path, w, r) default: return false } diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go new file mode 100644 index 000000000..b300500ca --- /dev/null +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -0,0 +1,143 @@ +package opentelemetry + +import ( + "fmt" + "io" + "net/http" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" + "github.com/VictoriaMetrics/metrics" +) + +// RequestHandler processes Opentelemetry insert requests +func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { + switch path { + // use the same path as opentelemetry collector + // https://opentelemetry.io/docs/specs/otlp/#otlphttp-request + case "/v1/logs": + if r.Header.Get("Content-Type") == "application/json" { + httpserver.Errorf(w, r, "json encoding isn't supported for opentelemetry format. Use protobuf encoding") + return true + } + handleProtobuf(r, w) + return true + default: + return false + } +} + +func handleProtobuf(r *http.Request, w http.ResponseWriter) { + startTime := time.Now() + requestsProtobufTotal.Inc() + reader := r.Body + if r.Header.Get("Content-Encoding") == "gzip" { + zr, err := common.GetGzipReader(reader) + if err != nil { + httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err) + return + } + defer common.PutGzipReader(zr) + reader = zr + } + + wcr := writeconcurrencylimiter.GetReader(reader) + data, err := io.ReadAll(wcr) + writeconcurrencylimiter.PutReader(wcr) + if err != nil { + httpserver.Errorf(w, r, "cannot read request body: %s", err) + return + } + + cp, err := insertutils.GetCommonParams(r) + if err != nil { + httpserver.Errorf(w, r, "cannot parse common params from request: %s", err) + return + } + if err := vlstorage.CanWriteData(); err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + lmp := cp.NewLogMessageProcessor() + n, err := pushProtobufRequest(data, lmp) + lmp.MustClose() + if err != nil { + httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err) + return + } + + rowsIngestedProtobufTotal.Add(n) + + // update requestProtobufDuration only for successfully parsed requests + // There is no need in updating requestProtobufDuration for request errors, + // since their timings are usually much smaller than the timing for successful request parsing. + requestProtobufDuration.UpdateDuration(startTime) +} + +var ( + rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry",format="protobuf"}`) + + requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) + errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) + + requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`) +) + +func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) { + var req pb.ExportLogsServiceRequest + if err := req.UnmarshalProtobuf(data); err != nil { + errorsTotal.Inc() + return 0, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err) + } + + var rowsIngested int + var commonFields []logstorage.Field + for _, rl := range req.ResourceLogs { + attributes := rl.Resource.Attributes + commonFields = slicesutil.SetLength(commonFields, len(attributes)) + for i, attr := range attributes { + commonFields[i].Name = attr.Key + commonFields[i].Value = attr.Value.FormatString() + } + commonFieldsLen := len(commonFields) + for _, sc := range rl.ScopeLogs { + var scopeIngested int + commonFields, scopeIngested = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp) + rowsIngested += scopeIngested + } + } + + return rowsIngested, nil +} + +func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutils.LogMessageProcessor) ([]logstorage.Field, int) { + fields := commonFields + for _, lr := range sc.LogRecords { + fields = fields[:len(commonFields)] + fields = append(fields, logstorage.Field{ + Name: "_msg", + Value: lr.Body.FormatString(), + }) + for _, attr := range lr.Attributes { + fields = append(fields, logstorage.Field{ + Name: attr.Key, + Value: attr.Value.FormatString(), + }) + } + fields = append(fields, logstorage.Field{ + Name: "severity", + Value: lr.FormatSeverity(), + }) + + lmp.AddRow(lr.ExtractTimestampNano(), fields) + } + return fields, len(sc.LogRecords) +} diff --git a/app/vlinsert/opentelemetry/opentelemetry_test.go b/app/vlinsert/opentelemetry/opentelemetry_test.go new file mode 100644 index 000000000..fa856af42 --- /dev/null +++ b/app/vlinsert/opentelemetry/opentelemetry_test.go @@ -0,0 +1,126 @@ +package opentelemetry + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" +) + +func TestPushProtoOk(t *testing.T) { + f := func(src []pb.ResourceLogs, timestampsExpected []int64, resultExpected string) { + t.Helper() + lr := pb.ExportLogsServiceRequest{ + ResourceLogs: src, + } + + pData := lr.MarshalProtobuf(nil) + tlp := &insertutils.TestLogMessageProcessor{} + n, err := pushProtobufRequest(pData, tlp) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if err := tlp.Verify(n, timestampsExpected, resultExpected); err != nil { + t.Fatal(err) + } + } + // single line without resource attributes + f([]pb.ResourceLogs{ + { + ScopeLogs: []pb.ScopeLogs{ + { + LogRecords: []pb.LogRecord{ + {Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}}, + }, + }, + }, + }, + }, + []int64{1234}, + `{"_msg":"log-line-message","severity":"Trace"}`, + ) + // multi-line with resource attributes + f([]pb.ResourceLogs{ + { + Resource: pb.Resource{ + Attributes: []*pb.KeyValue{ + {Key: "logger", Value: &pb.AnyValue{StringValue: ptrTo("context")}}, + {Key: "instance_id", Value: &pb.AnyValue{IntValue: ptrTo[int64](10)}}, + {Key: "node_taints", Value: &pb.AnyValue{KeyValueList: &pb.KeyValueList{ + Values: []*pb.KeyValue{ + {Key: "role", Value: &pb.AnyValue{StringValue: ptrTo("dev")}}, + {Key: "cluster_load_percent", Value: &pb.AnyValue{DoubleValue: ptrTo(0.55)}}, + }, + }}}, + }, + }, + ScopeLogs: []pb.ScopeLogs{ + { + LogRecords: []pb.LogRecord{ + {Attributes: []*pb.KeyValue{}, TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}}, + {Attributes: []*pb.KeyValue{}, TimeUnixNano: 1235, SeverityNumber: 21, Body: pb.AnyValue{StringValue: ptrTo("log-line-message-msg-2")}}, + }, + }, + }, + }, + }, + []int64{1234, 1235}, + `{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message","severity":"Trace"} +{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message-msg-2","severity":"Unspecified"}`, + ) + + // multi-scope with resource attributes and multi-line + f([]pb.ResourceLogs{ + { + Resource: pb.Resource{ + Attributes: []*pb.KeyValue{ + {Key: "logger", Value: &pb.AnyValue{StringValue: ptrTo("context")}}, + {Key: "instance_id", Value: &pb.AnyValue{IntValue: ptrTo[int64](10)}}, + {Key: "node_taints", Value: &pb.AnyValue{KeyValueList: &pb.KeyValueList{ + Values: []*pb.KeyValue{ + {Key: "role", Value: &pb.AnyValue{StringValue: ptrTo("dev")}}, + {Key: "cluster_load_percent", Value: &pb.AnyValue{DoubleValue: ptrTo(0.55)}}, + }, + }}}, + }, + }, + ScopeLogs: []pb.ScopeLogs{ + { + LogRecords: []pb.LogRecord{ + {TimeUnixNano: 1234, SeverityNumber: 1, Body: pb.AnyValue{StringValue: ptrTo("log-line-message")}}, + {TimeUnixNano: 1235, SeverityNumber: 5, Body: pb.AnyValue{StringValue: ptrTo("log-line-message-msg-2")}}, + }, + }, + }, + }, + { + ScopeLogs: []pb.ScopeLogs{ + { + LogRecords: []pb.LogRecord{ + {TimeUnixNano: 2345, SeverityNumber: 10, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-0-0")}}, + {TimeUnixNano: 2346, SeverityNumber: 10, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-0-1")}}, + }, + }, + { + LogRecords: []pb.LogRecord{ + {TimeUnixNano: 2347, SeverityNumber: 12, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-1-0")}}, + {ObservedTimeUnixNano: 2348, SeverityNumber: 12, Body: pb.AnyValue{StringValue: ptrTo("log-line-resource-scope-1-1-1")}}, + }, + }, + }, + }, + }, + []int64{1234, 1235, 2345, 2346, 2347, 2348}, + `{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message","severity":"Trace"} +{"logger":"context","instance_id":"10","node_taints":"[{\"Key\":\"role\",\"Value\":{\"StringValue\":\"dev\",\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":null,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}},{\"Key\":\"cluster_load_percent\",\"Value\":{\"StringValue\":null,\"BoolValue\":null,\"IntValue\":null,\"DoubleValue\":0.55,\"ArrayValue\":null,\"KeyValueList\":null,\"BytesValue\":null}}]","_msg":"log-line-message-msg-2","severity":"Debug"} +{"_msg":"log-line-resource-scope-1-0-0","severity":"Info2"} +{"_msg":"log-line-resource-scope-1-0-1","severity":"Info2"} +{"_msg":"log-line-resource-scope-1-1-0","severity":"Info4"} +{"_msg":"log-line-resource-scope-1-1-1","severity":"Info4"}`, + ) +} + +func ptrTo[T any](s T) *T { + return &s +} diff --git a/app/vlinsert/opentelemetry/opentemetry_timing_test.go b/app/vlinsert/opentelemetry/opentemetry_timing_test.go new file mode 100644 index 000000000..c5998cca0 --- /dev/null +++ b/app/vlinsert/opentelemetry/opentemetry_timing_test.go @@ -0,0 +1,79 @@ +package opentelemetry + +import ( + "fmt" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" +) + +func BenchmarkParseProtobufRequest(b *testing.B) { + for _, scopes := range []int{1, 2} { + for _, rows := range []int{100, 1000} { + for _, attributes := range []int{5, 10} { + b.Run(fmt.Sprintf("scopes_%d/rows_%d/attributes_%d", scopes, rows, attributes), func(b *testing.B) { + benchmarkParseProtobufRequest(b, scopes, rows, attributes) + }) + } + } + } +} + +func benchmarkParseProtobufRequest(b *testing.B, streams, rows, labels int) { + blp := &insertutils.BenchmarkLogMessageProcessor{} + b.ReportAllocs() + b.SetBytes(int64(streams * rows)) + b.RunParallel(func(pb *testing.PB) { + body := getProtobufBody(streams, rows, labels) + for pb.Next() { + _, err := pushProtobufRequest(body, blp) + if err != nil { + panic(fmt.Errorf("unexpected error: %w", err)) + } + } + }) +} + +func getProtobufBody(scopesCount, rowsCount, attributesCount int) []byte { + msg := "12345678910" + + attrValues := []*pb.AnyValue{ + {StringValue: ptrTo("string-attribute")}, + {IntValue: ptrTo[int64](12345)}, + {DoubleValue: ptrTo(3.14)}, + } + attrs := make([]*pb.KeyValue, attributesCount) + for j := 0; j < attributesCount; j++ { + attrs[j] = &pb.KeyValue{ + Key: fmt.Sprintf("key-%d", j), + Value: attrValues[j%3], + } + } + entries := make([]pb.LogRecord, rowsCount) + for j := 0; j < rowsCount; j++ { + entries[j] = pb.LogRecord{ + TimeUnixNano: 12345678910, ObservedTimeUnixNano: 12345678910, Body: pb.AnyValue{StringValue: &msg}, + } + } + scopes := make([]pb.ScopeLogs, scopesCount) + + for j := 0; j < scopesCount; j++ { + scopes[j] = pb.ScopeLogs{ + LogRecords: entries, + } + } + + pr := pb.ExportLogsServiceRequest{ + ResourceLogs: []pb.ResourceLogs{ + { + Resource: pb.Resource{ + Attributes: attrs, + }, + ScopeLogs: scopes, + }, + }, + } + + return pr.MarshalProtobuf(nil) +} diff --git a/deployment/docker/victorialogs/opentelemetry-collector/.gitignore b/deployment/docker/victorialogs/opentelemetry-collector/.gitignore new file mode 100644 index 000000000..db7323f9a --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/.gitignore @@ -0,0 +1 @@ +**/logs diff --git a/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/README.md b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/README.md new file mode 100644 index 000000000..30cfc3f2c --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/README.md @@ -0,0 +1,27 @@ +# Docker compose OpenTelemetry Elasticsearch integration with VictoriaLogs for docker + +The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs + +To spin-up environment run the following command: +``` +docker compose up -d +``` + +To shut down the docker-compose environment run the following command: +``` +docker compose down +docker compose rm -f +``` + +The docker compose file contains the following components: + +* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics. +* VictoriaLogs - the log database, it accepts the data from `collector` by elastic protocol +* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics` + +Querying the data + +* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui` +* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line) + +Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance. diff --git a/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/compose.yml b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/compose.yml new file mode 100644 index 000000000..9ab142161 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/compose.yml @@ -0,0 +1,48 @@ +services: + collector: + image: docker.io/otel/opentelemetry-collector-contrib:0.102.1 + restart: on-failure + volumes: + - $PWD/logs:/tmp/logs + - $PWD/config.yaml:/etc/otelcol-contrib/config.yaml + depends_on: + victorialogs: + condition: service_healthy + victoriametrics: + condition: service_healthy + + victorialogs: + image: docker.io/victoriametrics/victoria-logs:v0.28.0-victorialogs + volumes: + - victorialogs-vector-docker-vl:/vlogs + ports: + - '9428:9428' + command: + - -storageDataPath=/vlogs + - -loggerFormat=json + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"] + interval: 1s + timeout: 1s + retries: 10 + + victoriametrics: + image: victoriametrics/victoria-metrics:latest + ports: + - '8428:8428' + command: + - -storageDataPath=/vmsingle + - -promscrape.config=/promscrape.yml + - -loggerFormat=json + volumes: + - victorialogs-vector-docker-vm:/vmsingle + - ./scrape.yml:/promscrape.yml + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"] + interval: 1s + timeout: 1s + retries: 10 + +volumes: + victorialogs-vector-docker-vl: + victorialogs-vector-docker-vm: diff --git a/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/config.yaml b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/config.yaml new file mode 100644 index 000000000..84f7deea1 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/config.yaml @@ -0,0 +1,14 @@ +exporters: + elasticsearch: + endpoints: + - http://victorialogs:9428/insert/elasticsearch +receivers: + filelog: + include: [/tmp/logs/*.log] + resource: + region: us-east-1 +service: + pipelines: + logs: + receivers: [filelog] + exporters: [elasticsearch] diff --git a/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/scrape.yml b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/scrape.yml new file mode 100644 index 000000000..8257db8ef --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/elasticsearch/scrape.yml @@ -0,0 +1,11 @@ +scrape_configs: + - job_name: "victoriametrics" + scrape_interval: 30s + static_configs: + - targets: + - victoriametrics:8428 + - job_name: "victorialogs" + scrape_interval: 30s + static_configs: + - targets: + - victorialogs:9428 \ No newline at end of file diff --git a/deployment/docker/victorialogs/opentelemetry-collector/loki/README.md b/deployment/docker/victorialogs/opentelemetry-collector/loki/README.md new file mode 100644 index 000000000..eabf498e4 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/loki/README.md @@ -0,0 +1,27 @@ +# Docker compose OpenTelemetry Loki integration with VictoriaLogs for docker + +The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs + +To spin-up environment run the following command: +``` +docker compose up -d +``` + +To shut down the docker-compose environment run the following command: +``` +docker compose down +docker compose rm -f +``` + +The docker compose file contains the following components: + +* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics. +* VictoriaLogs - the log database, it accepts the data from `collector` by Loki protocol +* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics` + +Querying the data + +* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui` +* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line) + +Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance. diff --git a/deployment/docker/victorialogs/opentelemetry-collector/loki/compose.yml b/deployment/docker/victorialogs/opentelemetry-collector/loki/compose.yml new file mode 100644 index 000000000..0f4b600f1 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/loki/compose.yml @@ -0,0 +1,48 @@ +services: + collector: + image: docker.io/otel/opentelemetry-collector-contrib:0.102.1 + restart: on-failure + volumes: + - $PWD/logs:/tmp/logs + - $PWD/config.yaml:/etc/otelcol-contrib/config.yaml + depends_on: + victorialogs: + condition: service_healthy + victoriametrics: + condition: service_healthy + + victorialogs: + image: docker.io/victoriametrics/victoria-logs:v0.28.0-victorialogs + volumes: + - victorialogs-vector-docker-vl:/loki + ports: + - '9428:9428' + command: + - -storageDataPath=/loki + - -loggerFormat=json + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"] + interval: 1s + timeout: 1s + retries: 10 + + victoriametrics: + image: victoriametrics/victoria-metrics:latest + ports: + - '8428:8428' + command: + - -storageDataPath=/vmsingle + - -promscrape.config=/promscrape.yml + - -loggerFormat=json + volumes: + - victorialogs-vector-docker-vm:/vmsingle + - ./scrape.yml:/promscrape.yml + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"] + interval: 1s + timeout: 1s + retries: 10 + +volumes: + victorialogs-vector-docker-vl: + victorialogs-vector-docker-vm: diff --git a/deployment/docker/victorialogs/opentelemetry-collector/loki/config.yaml b/deployment/docker/victorialogs/opentelemetry-collector/loki/config.yaml new file mode 100644 index 000000000..63df2e614 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/loki/config.yaml @@ -0,0 +1,13 @@ +exporters: + loki: + endpoint: http://victorialogs:9428/insert/loki/api/v1/push +receivers: + filelog: + include: [/tmp/logs/*.log] + resource: + region: us-east-1 +service: + pipelines: + logs: + receivers: [filelog] + exporters: [loki] diff --git a/deployment/docker/victorialogs/opentelemetry-collector/loki/scrape.yml b/deployment/docker/victorialogs/opentelemetry-collector/loki/scrape.yml new file mode 100644 index 000000000..8257db8ef --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/loki/scrape.yml @@ -0,0 +1,11 @@ +scrape_configs: + - job_name: "victoriametrics" + scrape_interval: 30s + static_configs: + - targets: + - victoriametrics:8428 + - job_name: "victorialogs" + scrape_interval: 30s + static_configs: + - targets: + - victorialogs:9428 \ No newline at end of file diff --git a/deployment/docker/victorialogs/opentelemetry-collector/otlp/README.md b/deployment/docker/victorialogs/opentelemetry-collector/otlp/README.md new file mode 100644 index 000000000..fc0e993bd --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/otlp/README.md @@ -0,0 +1,27 @@ +# Docker compose OpenTelemetry OTLP integration with VictoriaLogs for docker + +The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs + +To spin-up environment run the following command: +``` +docker compose up -d +``` + +To shut down the docker-compose environment run the following command: +``` +docker compose down +docker compose rm -f +``` + +The docker compose file contains the following components: + +* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics. +* VictoriaLogs - the log database, it accepts the data from `collector` by otlp protocol +* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics` + +Querying the data + +* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui` +* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line) + +Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance. diff --git a/deployment/docker/victorialogs/opentelemetry-collector/otlp/compose.yml b/deployment/docker/victorialogs/opentelemetry-collector/otlp/compose.yml new file mode 100644 index 000000000..ca1798882 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/otlp/compose.yml @@ -0,0 +1,48 @@ +services: + collector: + image: docker.io/otel/opentelemetry-collector-contrib:0.102.1 + restart: on-failure + volumes: + - $PWD/logs:/tmp/logs + - $PWD/config.yaml:/etc/otelcol-contrib/config.yaml + depends_on: + victorialogs: + condition: service_healthy + victoriametrics: + condition: service_healthy + + victorialogs: + image: docker.io/victoriametrics/victoria-logs:v0.29.0-victorialogs + volumes: + - victorialogs-vector-docker-vl:/otlp + ports: + - '9428:9428' + command: + - -storageDataPath=/otlp + - -loggerFormat=json + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"] + interval: 1s + timeout: 1s + retries: 10 + + victoriametrics: + image: victoriametrics/victoria-metrics:latest + ports: + - '8428:8428' + command: + - -storageDataPath=/vmsingle + - -promscrape.config=/promscrape.yml + - -loggerFormat=json + volumes: + - victorialogs-vector-docker-vm:/vmsingle + - ./scrape.yml:/promscrape.yml + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"] + interval: 1s + timeout: 1s + retries: 10 + +volumes: + victorialogs-vector-docker-vl: + victorialogs-vector-docker-vm: diff --git a/deployment/docker/victorialogs/opentelemetry-collector/otlp/config.yaml b/deployment/docker/victorialogs/opentelemetry-collector/otlp/config.yaml new file mode 100644 index 000000000..1b56a034f --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/otlp/config.yaml @@ -0,0 +1,15 @@ +exporters: + otlphttp: + logs_endpoint: http://victorialogs:9428/insert/opentelemetry/v1/logs + debug: + verbosity: detailed +receivers: + filelog: + include: [/tmp/logs/*.log] + resource: + region: us-east-1 +service: + pipelines: + logs: + receivers: [filelog] + exporters: [otlphttp, debug] diff --git a/deployment/docker/victorialogs/opentelemetry-collector/otlp/scrape.yml b/deployment/docker/victorialogs/opentelemetry-collector/otlp/scrape.yml new file mode 100644 index 000000000..8257db8ef --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/otlp/scrape.yml @@ -0,0 +1,11 @@ +scrape_configs: + - job_name: "victoriametrics" + scrape_interval: 30s + static_configs: + - targets: + - victoriametrics:8428 + - job_name: "victorialogs" + scrape_interval: 30s + static_configs: + - targets: + - victorialogs:9428 \ No newline at end of file diff --git a/deployment/docker/victorialogs/opentelemetry-collector/syslog/README.md b/deployment/docker/victorialogs/opentelemetry-collector/syslog/README.md new file mode 100644 index 000000000..12a7290ae --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/syslog/README.md @@ -0,0 +1,27 @@ +# Docker compose OpenTelemetry Syslog integration with VictoriaLogs for docker + +The folder contains the example of integration of [OpenTelemetry collector](https://opentelemetry.io/docs/collector/) with Victorialogs + +To spin-up environment run the following command: +``` +docker compose up -d +``` + +To shut down the docker-compose environment run the following command: +``` +docker compose down +docker compose rm -f +``` + +The docker compose file contains the following components: + +* collector - vector is configured to collect logs from the `docker`, you can find configuration in the `config.yaml`. It writes data in VictoriaLogs. It pushes metrics to VictoriaMetrics. +* VictoriaLogs - the log database, it accepts the data from `collector` by syslog protocol +* VictoriaMetrics - collects metrics from `VictoriaLogs` and `VictoriaMetrics` + +Querying the data + +* [vmui](https://docs.victoriametrics.com/victorialogs/querying/#vmui) - a web UI is accessible by `http://localhost:9428/select/vmui` +* for querying the data via command-line please check [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line) + +Please, note that `_stream_fields` parameter must follow recommended [best practices](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) to achieve better performance. diff --git a/deployment/docker/victorialogs/opentelemetry-collector/syslog/compose.yml b/deployment/docker/victorialogs/opentelemetry-collector/syslog/compose.yml new file mode 100644 index 000000000..f2f0c8307 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/syslog/compose.yml @@ -0,0 +1,49 @@ +services: + collector: + image: docker.io/otel/opentelemetry-collector-contrib:0.107.0 + restart: on-failure + volumes: + - $PWD/logs:/tmp/logs + - $PWD/config.yaml:/etc/otelcol-contrib/config.yaml + depends_on: + victorialogs: + condition: service_healthy + victoriametrics: + condition: service_healthy + + victorialogs: + image: docker.io/victoriametrics/victoria-logs:v0.28.0-victorialogs + volumes: + - victorialogs-vector-docker-vl:/syslog + ports: + - '9428:9428' + command: + - -storageDataPath=/syslog + - -syslog.listenAddr.tcp=:5410 + - -syslog.useLocalTimestamp.tcp + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:9428/health"] + interval: 1s + timeout: 1s + retries: 10 + + victoriametrics: + image: victoriametrics/victoria-metrics:latest + ports: + - '8428:8428' + command: + - -storageDataPath=/vmsingle + - -promscrape.config=/promscrape.yml + - -loggerFormat=json + volumes: + - victorialogs-vector-docker-vm:/vmsingle + - ./scrape.yml:/promscrape.yml + healthcheck: + test: ["CMD", "wget", "-qO-", "http://127.0.0.1:8428/health"] + interval: 1s + timeout: 1s + retries: 10 + +volumes: + victorialogs-vector-docker-vl: + victorialogs-vector-docker-vm: diff --git a/deployment/docker/victorialogs/opentelemetry-collector/syslog/config.yaml b/deployment/docker/victorialogs/opentelemetry-collector/syslog/config.yaml new file mode 100644 index 000000000..d556ee111 --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/syslog/config.yaml @@ -0,0 +1,24 @@ +exporters: + syslog: + network: tcp + endpoint: victorialogs + port: 5410 + tls: + insecure: true + debug: + verbosity: detailed +processors: + transform: + log_statements: + - context: log + statements: + - set(attributes["message"], body) +receivers: + filelog: + include: [/tmp/logs/*.log] +service: + pipelines: + logs: + receivers: [filelog] + exporters: [syslog, debug] + processors: [transform] diff --git a/deployment/docker/victorialogs/opentelemetry-collector/syslog/scrape.yml b/deployment/docker/victorialogs/opentelemetry-collector/syslog/scrape.yml new file mode 100644 index 000000000..8257db8ef --- /dev/null +++ b/deployment/docker/victorialogs/opentelemetry-collector/syslog/scrape.yml @@ -0,0 +1,11 @@ +scrape_configs: + - job_name: "victoriametrics" + scrape_interval: 30s + static_configs: + - targets: + - victoriametrics:8428 + - job_name: "victorialogs" + scrape_interval: 30s + static_configs: + - targets: + - victorialogs:9428 \ No newline at end of file diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index b2f443416..350c8fd71 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -7,6 +7,7 @@ - Vector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/). - Promtail (aka Grafana Loki) - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). - Telegraf - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/). +- OpenTelemetry Collector - see [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/opentelemetry/). The ingested logs can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/). @@ -22,6 +23,7 @@ VictoriaLogs supports the following data ingestion HTTP APIs: - Elasticsearch bulk API. See [these docs](#elasticsearch-bulk-api). - JSON stream API aka [ndjson](https://jsonlines.org/). See [these docs](#json-stream-api). - Loki JSON API. See [these docs](#loki-json-api). +- OpenTelemetry API. See [these docs](#opentelemetry-api). VictoriaLogs accepts optional [HTTP parameters](#http-parameters) at data ingestion HTTP APIs. @@ -273,13 +275,14 @@ VictoriaLogs exposes various [metrics](https://docs.victoriametrics.com/victoria Here is the list of log collectors and their ingestion formats supported by VictoriaLogs: -| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog | -|----------------------------|-----------------------|---------------------|--------------|----------------| -| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) | -| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) | -| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No | -| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/elasticsearch) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) | -| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html) | [Yes](https://grafana.com/docs/loki/latest/send-data/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) | -| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No | -| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No | -| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) | +| How to setup the collector | Format: Elasticsearch | Format: JSON Stream | Format: Loki | Format: syslog | Format: OpenTelemetry | +|----------------------------|-----------------------|---------------------|--------------|----------------|-----------------------| +| [Rsyslog](https://docs.victoriametrics.com/victorialogs/data-ingestion/syslog/) | [Yes](https://www.rsyslog.com/doc/configuration/modules/omelasticsearch.html) | No | No | [Yes](https://www.rsyslog.com/doc/configuration/modules/omfwd.html) | No | +| [Syslog-ng](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | Yes, [v1](https://support.oneidentity.com/technical-documents/syslog-ng-open-source-edition/3.16/administration-guide/28#TOPIC-956489), [v2](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/29#TOPIC-956494) | No | No | [Yes](https://support.oneidentity.com/technical-documents/doc/syslog-ng-open-source-edition/3.16/administration-guide/44#TOPIC-956553) | No | +| [Filebeat](https://docs.victoriametrics.com/victorialogs/data-ingestion/filebeat/) | [Yes](https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html) | No | No | No | No | +| [Fluentbit](https://docs.victoriametrics.com/victorialogs/data-ingestion/fluentbit/) | No | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/http) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/loki) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/syslog) | [Yes](https://docs.fluentbit.io/manual/pipeline/outputs/opentelemetry) | +| [Logstash](https://docs.victoriametrics.com/victorialogs/data-ingestion/logstash/) | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html) | No | No | [Yes](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-syslog.html) | [Yes](https://github.com/paulgrav/logstash-output-opentelemetry) | +| [Vector](https://docs.victoriametrics.com/victorialogs/data-ingestion/vector/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/elasticsearch/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/http/) | [Yes](https://vector.dev/docs/reference/configuration/sinks/loki/) | No | [Yes](https://vector.dev/docs/reference/configuration/sources/opentelemetry/) | +| [Promtail](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/) | No | No | [Yes](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#clients) | No | No | +| [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/elasticsearchexporter) | No | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/lokiexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/syslogexporter) | [Yes](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter) | +| [Telegraf](https://docs.victoriametrics.com/victorialogs/data-ingestion/telegraf/) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/elasticsearch) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/http) | [Yes](https://github.com/influxdata/telegraf/tree/master/plugins/outputs/loki) | [Yes](https://github.com/influxdata/telegraf/blob/master/plugins/outputs/syslog) | Yes | diff --git a/docs/VictoriaLogs/data-ingestion/opentelemetry.md b/docs/VictoriaLogs/data-ingestion/opentelemetry.md new file mode 100644 index 000000000..b8c7b6fa0 --- /dev/null +++ b/docs/VictoriaLogs/data-ingestion/opentelemetry.md @@ -0,0 +1,120 @@ +--- +weight: 4 +title: OpenTelemetry setup +disableToc: true +menu: + docs: + parent: "victorialogs-data-ingestion" + weight: 4 +aliases: + - /VictoriaLogs/data-ingestion/OpenTelemetry.html +--- + + +VictoriaLogs supports both client open-telemetry [SDK](https://opentelemetry.io/docs/languages/) and [collector](https://opentelemetry.io/docs/collector/). + +## Client SDK + + Specify `EndpointURL` for http-exporter builder. + +Consider the following example for `golang` `SDK`: + +```go + // Create the OTLP log exporter that sends logs to configured destination + logExporter, err := otlploghttp.New(ctx, + otlploghttp.WithEndpointURL("http://victorialogs:9428/insert/opentelemetry/v1/logs"), + ) +``` + + Optionally, [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) could be defined via headers: + +```go + // Create the OTLP log exporter that sends logs to configured destination + logExporter, err := otlploghttp.New(ctx, + otlploghttp.WithEndpointURL("http://victorialogs:9428/insert/opentelemetry/v1/logs"), + otlploghttp.WithHeaders(map[string]string{"VL-Stream-Fields": "telemetry.sdk.language,severity"}), + ) + +``` + + Given config defines 2 stream fields - `severity` and `telemetry.sdk.language`. + +See also [HTTP headers](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-headers) + +## Collector configuration + +VictoriaLogs supports given below OpenTelemetry collector exporters: + +* [Elasticsearch](#elasticsearch) +* [Loki](#loki) +* [OpenTelemetry](#opentelemetry) + +### Elasticsearch + +```yaml +exporters: + elasticsearch: + endpoints: + - http://victorialogs:9428/insert/elasticsearch +receivers: + filelog: + include: [/tmp/logs/*.log] + resource: + region: us-east-1 +service: + pipelines: + logs: + receivers: [filelog] + exporters: [elasticsearch] +``` + +### Loki + +```yaml +exporters: + loki: + endpoint: http://victorialogs:9428/insert/loki/api/v1/push +receivers: + filelog: + include: [/tmp/logs/*.log] + resource: + region: us-east-1 +service: + pipelines: + logs: + receivers: [filelog] + exporters: [loki] +``` + +### OpenTelemetry + +Specify logs endpoint for [OTLP/HTTP exporter](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlphttpexporter/README.md) in configuration file +for sending the collected logs to [VictoriaLogs](https://docs.victoriametrics.com/VictoriaLogs/): + +```yaml +exporters: + otlphttp: + logs_endpoint: http://localhost:9428/insert/opentelemetry/v1/logs +``` + + Optionally, [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) could be defined via headers: + +```yaml +exporters: + otlphttp: + logs_endpoint: http://localhost:9428/insert/opentelemetry/v1/logs + headers: + VL-Stream-Fields: telemetry.sdk.language,severity +``` + +See also [HTTP headers](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-headers) + +Substitute `localhost:9428` address inside `exporters.oltphttp.logs_endpoint` with the real address of VictoriaLogs. + +The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/). + +See also: + +* [Data ingestion troubleshooting](https://docs.victoriametrics.com/victorialogs/data-ingestion/#troubleshooting). +* [How to query VictoriaLogs](https://docs.victoriametrics.com/victorialogs/querying/). +* [Docker-compose demo for OpenTelemetry collector integration with VictoriaLogs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker/victorialogs/opentelemetry-collector). diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 418c1afa6..127d3cafc 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -202,6 +202,7 @@ Released at 2024-06-07 * SECURITY: upgrade base docker image (Alpine) from 3.19.1 to 3.20.0. See [alpine 3.20.0 release notes](https://www.alpinelinux.org/posts/Alpine-3.20.0-released.html). * SECURITY: add release images built from scratch image. Such images could be more preferable for using in environments with higher security standards. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6386). +* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/): added OpenTelemetry logs ingestion support. * FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details. * FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): properly display version in the Stats row for the custom builds of VictoriaMetrics. * FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): add `Network Usage` panel to `Resource Usage` row. diff --git a/lib/filestream/filestream_darwin.go b/lib/filestream/filestream_darwin.go index 040660083..f498d099f 100644 --- a/lib/filestream/filestream_darwin.go +++ b/lib/filestream/filestream_darwin.go @@ -1,6 +1,6 @@ package filestream -func (st *streamTracker) adviseDontNeed(n int, fdatasync bool) error { +func (st *streamTracker) adviseDontNeed(_ int, _ bool) error { return nil } diff --git a/lib/fs/fadvise_darwin.go b/lib/fs/fadvise_darwin.go index 73cfe81a7..69d9175ec 100644 --- a/lib/fs/fadvise_darwin.go +++ b/lib/fs/fadvise_darwin.go @@ -4,7 +4,7 @@ import ( "os" ) -func fadviseSequentialRead(f *os.File, prefetch bool) error { +func fadviseSequentialRead(_ *os.File, _ bool) error { // TODO: implement this properly return nil } diff --git a/lib/protoparser/opentelemetry/firehose/parser_test.go b/lib/protoparser/opentelemetry/firehose/parser_test.go index 070ace641..f290ad023 100644 --- a/lib/protoparser/opentelemetry/firehose/parser_test.go +++ b/lib/protoparser/opentelemetry/firehose/parser_test.go @@ -3,12 +3,13 @@ package firehose import ( "bytes" "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" "strings" "sync/atomic" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/stream" ) func TestProcessRequestBody(t *testing.T) { diff --git a/lib/protoparser/opentelemetry/pb/common.go b/lib/protoparser/opentelemetry/pb/common.go new file mode 100644 index 000000000..b58d6a7fb --- /dev/null +++ b/lib/protoparser/opentelemetry/pb/common.go @@ -0,0 +1,275 @@ +package pb + +import ( + "bytes" + "fmt" + "strings" + + "github.com/VictoriaMetrics/easyproto" +) + +// Resource represents the corresponding OTEL protobuf message +type Resource struct { + Attributes []*KeyValue +} + +// marshalProtobuf marshals +func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, a := range r.Attributes { + a.marshalProtobuf(mm.AppendMessage(1)) + } +} + +// unmarshalProtobuf unmarshals r from protobuf message at src. +func (r *Resource) unmarshalProtobuf(src []byte) (err error) { + // message Resource { + // repeated KeyValue attributes = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in Resource: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Attribute data") + } + r.Attributes = append(r.Attributes, &KeyValue{}) + a := r.Attributes[len(r.Attributes)-1] + if err := a.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Attribute: %w", err) + } + } + } + return nil +} + +// KeyValue represents the corresponding OTEL protobuf message +type KeyValue struct { + Key string + Value *AnyValue +} + +func (kv *KeyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendString(1, kv.Key) + if kv.Value != nil { + kv.Value.marshalProtobuf(mm.AppendMessage(2)) + } +} + +// unmarshalProtobuf unmarshals kv from protobuf message at src. +func (kv *KeyValue) unmarshalProtobuf(src []byte) (err error) { + // message KeyValue { + // string key = 1; + // AnyValue value = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in KeyValue: %w", err) + } + switch fc.FieldNum { + case 1: + key, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read Key") + } + kv.Key = strings.Clone(key) + case 2: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Value") + } + kv.Value = &AnyValue{} + if err := kv.Value.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Value: %w", err) + } + } + } + return nil +} + +// AnyValue represents the corresponding OTEL protobuf message +type AnyValue struct { + StringValue *string + BoolValue *bool + IntValue *int64 + DoubleValue *float64 + ArrayValue *ArrayValue + KeyValueList *KeyValueList + BytesValue *[]byte +} + +func (av *AnyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { + switch { + case av.StringValue != nil: + mm.AppendString(1, *av.StringValue) + case av.BoolValue != nil: + mm.AppendBool(2, *av.BoolValue) + case av.IntValue != nil: + mm.AppendInt64(3, *av.IntValue) + case av.DoubleValue != nil: + mm.AppendDouble(4, *av.DoubleValue) + case av.ArrayValue != nil: + av.ArrayValue.marshalProtobuf(mm.AppendMessage(5)) + case av.KeyValueList != nil: + av.KeyValueList.marshalProtobuf(mm.AppendMessage(6)) + case av.BytesValue != nil: + mm.AppendBytes(7, *av.BytesValue) + } +} + +// unmarshalProtobuf unmarshals av from protobuf message at src. +func (av *AnyValue) unmarshalProtobuf(src []byte) (err error) { + // message AnyValue { + // oneof value { + // string string_value = 1; + // bool bool_value = 2; + // int64 int_value = 3; + // double double_value = 4; + // ArrayValue array_value = 5; + // KeyValueList kvlist_value = 6; + // bytes bytes_value = 7; + // } + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in AnyValue") + } + switch fc.FieldNum { + case 1: + stringValue, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read StringValue") + } + stringValue = strings.Clone(stringValue) + av.StringValue = &stringValue + case 2: + boolValue, ok := fc.Bool() + if !ok { + return fmt.Errorf("cannot read BoolValue") + } + av.BoolValue = &boolValue + case 3: + intValue, ok := fc.Int64() + if !ok { + return fmt.Errorf("cannot read IntValue") + } + av.IntValue = &intValue + case 4: + doubleValue, ok := fc.Double() + if !ok { + return fmt.Errorf("cannot read DoubleValue") + } + av.DoubleValue = &doubleValue + case 5: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ArrayValue") + } + av.ArrayValue = &ArrayValue{} + if err := av.ArrayValue.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal ArrayValue: %w", err) + } + case 6: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read KeyValueList") + } + av.KeyValueList = &KeyValueList{} + if err := av.KeyValueList.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal KeyValueList: %w", err) + } + case 7: + bytesValue, ok := fc.Bytes() + if !ok { + return fmt.Errorf("cannot read BytesValue") + } + bytesValue = bytes.Clone(bytesValue) + av.BytesValue = &bytesValue + } + } + return nil +} + +// ArrayValue represents the corresponding OTEL protobuf message +type ArrayValue struct { + Values []*AnyValue +} + +func (av *ArrayValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, v := range av.Values { + v.marshalProtobuf(mm.AppendMessage(1)) + } +} + +// unmarshalProtobuf unmarshals av from protobuf message at src. +func (av *ArrayValue) unmarshalProtobuf(src []byte) (err error) { + // message ArrayValue { + // repeated AnyValue values = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ArrayValue") + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Value data") + } + av.Values = append(av.Values, &AnyValue{}) + v := av.Values[len(av.Values)-1] + if err := v.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Value: %w", err) + } + } + } + return nil +} + +// KeyValueList represents the corresponding OTEL protobuf message +type KeyValueList struct { + Values []*KeyValue +} + +func (kvl *KeyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, v := range kvl.Values { + v.marshalProtobuf(mm.AppendMessage(1)) + } +} + +// unmarshalProtobuf unmarshals kvl from protobuf message at src. +func (kvl *KeyValueList) unmarshalProtobuf(src []byte) (err error) { + // message KeyValueList { + // repeated KeyValue values = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in KeyValueList") + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Value data") + } + kvl.Values = append(kvl.Values, &KeyValue{}) + v := kvl.Values[len(kvl.Values)-1] + if err := v.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Value: %w", err) + } + } + } + return nil +} diff --git a/lib/protoparser/opentelemetry/pb/helpers.go b/lib/protoparser/opentelemetry/pb/helpers.go index 1c26778fa..d3b574930 100644 --- a/lib/protoparser/opentelemetry/pb/helpers.go +++ b/lib/protoparser/opentelemetry/pb/helpers.go @@ -10,6 +10,9 @@ import ( // FormatString returns string reperesentation for av. func (av *AnyValue) FormatString() string { + if av == nil { + return "" + } switch { case av.StringValue != nil: return *av.StringValue diff --git a/lib/protoparser/opentelemetry/pb/logs.go b/lib/protoparser/opentelemetry/pb/logs.go new file mode 100644 index 000000000..6bbac31b2 --- /dev/null +++ b/lib/protoparser/opentelemetry/pb/logs.go @@ -0,0 +1,294 @@ +package pb + +import ( + "fmt" + "strings" + "time" + + "github.com/VictoriaMetrics/easyproto" +) + +// ExportLogsServiceRequest represents the corresponding OTEL protobuf message +type ExportLogsServiceRequest struct { + ResourceLogs []ResourceLogs +} + +// MarshalProtobuf marshals r to protobuf message, appends it to dst and returns the result. +func (r *ExportLogsServiceRequest) MarshalProtobuf(dst []byte) []byte { + m := mp.Get() + r.marshalProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + mp.Put(m) + return dst +} + +func (r *ExportLogsServiceRequest) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, rm := range r.ResourceLogs { + rm.marshalProtobuf(mm.AppendMessage(1)) + } +} + +// UnmarshalProtobuf unmarshals r from protobuf message at src. +func (r *ExportLogsServiceRequest) UnmarshalProtobuf(src []byte) (err error) { + // message ExportLogsServiceRequest { + // repeated ResourceLogs resource_metrics = 1; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ExportLogsServiceRequest: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ResourceLogs data") + } + var rl ResourceLogs + + if err := rl.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal ResourceLogs: %w", err) + } + r.ResourceLogs = append(r.ResourceLogs, rl) + } + } + return nil +} + +// ResourceLogs represents the corresponding OTEL protobuf message +type ResourceLogs struct { + Resource Resource `json:"resource"` + ScopeLogs []ScopeLogs `json:"scopeLogs"` +} + +func (rl *ResourceLogs) marshalProtobuf(mm *easyproto.MessageMarshaler) { + rl.Resource.marshalProtobuf(mm.AppendMessage(1)) + for _, sm := range rl.ScopeLogs { + sm.marshalProtobuf(mm.AppendMessage(2)) + } +} + +func (rl *ResourceLogs) unmarshalProtobuf(src []byte) (err error) { + // message ResourceLogs { + // Resource resource = 1; + // repeated ScopeLogs scope_logs = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ResourceLogs: %w", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Resource data") + } + if err := rl.Resource.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot umarshal Resource: %w", err) + } + case 2: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ScopeLogs data") + } + var sl ScopeLogs + if err := sl.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal ScopeLogs: %w", err) + } + rl.ScopeLogs = append(rl.ScopeLogs, sl) + } + } + return nil +} + +// ScopeLogs represents the corresponding OTEL protobuf message +type ScopeLogs struct { + LogRecords []LogRecord +} + +func (sl *ScopeLogs) marshalProtobuf(mm *easyproto.MessageMarshaler) { + for _, m := range sl.LogRecords { + m.marshalProtobuf(mm.AppendMessage(2)) + } +} + +func (sl *ScopeLogs) unmarshalProtobuf(src []byte) (err error) { + // message ScopeLogs { + // repeated LogRecord log_records = 2; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ScopeLogs: %w", err) + } + switch fc.FieldNum { + case 2: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read LogRecord data") + } + var lr LogRecord + if err := lr.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal LogRecord: %w", err) + } + sl.LogRecords = append(sl.LogRecords, lr) + } + } + return nil +} + +// LogRecord represents the corresponding OTEL protobuf message +// https://github.com/open-telemetry/oteps/blob/main/text/logs/0097-log-data-model.md +type LogRecord struct { + // time_unix_nano is the time when the event occurred. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // Value of 0 indicates unknown or missing timestamp. + TimeUnixNano uint64 + // Time when the event was observed by the collection system. + // For events that originate in OpenTelemetry (e.g. using OpenTelemetry Logging SDK) + // this timestamp is typically set at the generation time and is equal to Timestamp. + // For events originating externally and collected by OpenTelemetry (e.g. using + // Collector) this is the time when OpenTelemetry's code observed the event measured + // by the clock of the OpenTelemetry code. This field MUST be set once the event is + // observed by OpenTelemetry. + // + // For converting OpenTelemetry log data to formats that support only one timestamp or + // when receiving OpenTelemetry log data by recipients that support only one timestamp + // internally the following logic is recommended: + // - Use time_unix_nano if it is present, otherwise use observed_time_unix_nano. + // + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // Value of 0 indicates unknown or missing timestamp. + ObservedTimeUnixNano uint64 + // Numerical value of the severity, normalized to values described in Log Data Model. + SeverityNumber int32 + SeverityText string + Body AnyValue + Attributes []*KeyValue +} + +func (lr *LogRecord) marshalProtobuf(mm *easyproto.MessageMarshaler) { + mm.AppendFixed64(1, lr.TimeUnixNano) + mm.AppendInt32(2, lr.SeverityNumber) + mm.AppendString(3, lr.SeverityText) + lr.Body.marshalProtobuf(mm.AppendMessage(5)) + for _, a := range lr.Attributes { + a.marshalProtobuf(mm.AppendMessage(6)) + } + mm.AppendFixed64(11, lr.ObservedTimeUnixNano) +} + +func (lr *LogRecord) unmarshalProtobuf(src []byte) (err error) { + // message LogRecord { + // fixed64 time_unix_nano = 1; + // fixed64 observed_time_unix_nano = 11; + // SeverityNumber severity_number = 2; + // string severity_text = 3; + // AnyValue body = 5; + // repeated KeyValue attributes = 6; + // } + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in LogRecord: %w", err) + } + switch fc.FieldNum { + case 1: + ts, ok := fc.Fixed64() + if !ok { + return fmt.Errorf("cannot read log record timestamp") + } + lr.TimeUnixNano = ts + case 11: + ts, ok := fc.Fixed64() + if !ok { + return fmt.Errorf("cannot read log record observed timestamp") + } + lr.ObservedTimeUnixNano = ts + case 2: + severityNumber, ok := fc.Int32() + if !ok { + return fmt.Errorf("cannot read severity number") + } + lr.SeverityNumber = severityNumber + case 3: + severityText, ok := fc.String() + if !ok { + return fmt.Errorf("cannot read severity string") + } + lr.SeverityText = strings.Clone(severityText) + case 5: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read Body") + } + if err := lr.Body.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Body: %w", err) + } + case 6: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read attributes data") + } + lr.Attributes = append(lr.Attributes, &KeyValue{}) + a := lr.Attributes[len(lr.Attributes)-1] + if err := a.unmarshalProtobuf(data); err != nil { + return fmt.Errorf("cannot unmarshal Attribute: %w", err) + } + } + } + return nil +} + +// FormatSeverity returns normalized severity for log record +func (lr *LogRecord) FormatSeverity() string { + if lr.SeverityText != "" { + return lr.SeverityText + } + if lr.SeverityNumber > int32(len(logSeverities)-1) { + return logSeverities[0] + } + return logSeverities[lr.SeverityNumber] +} + +// ExtractTimestampNano returns timestamp for log record +func (lr *LogRecord) ExtractTimestampNano() int64 { + switch { + case lr.TimeUnixNano > 0: + return int64(lr.TimeUnixNano) + case lr.ObservedTimeUnixNano > 0: + return int64(lr.ObservedTimeUnixNano) + default: + return time.Now().UnixNano() + } +} + +// https://github.com/open-telemetry/opentelemetry-collector/blob/cd1f7623fe67240e32e74735488c3db111fad47b/pdata/plog/severity_number.go#L41 +var logSeverities = []string{ + "Unspecified", + "Trace", + "Trace2", + "Trace3", + "Trace4", + "Debug", + "Debug2", + "Debug3", + "Debug4", + "Info", + "Info2", + "Info3", + "Info4", + "Error", + "Error2", + "Error3", + "Error4", + "Fatal", + "Fatal2", + "Fatal3", + "Fatal4", +} diff --git a/lib/protoparser/opentelemetry/pb/pb.go b/lib/protoparser/opentelemetry/pb/metrics.go similarity index 74% rename from lib/protoparser/opentelemetry/pb/pb.go rename to lib/protoparser/opentelemetry/pb/metrics.go index 883950e9f..c6f525abb 100644 --- a/lib/protoparser/opentelemetry/pb/pb.go +++ b/lib/protoparser/opentelemetry/pb/metrics.go @@ -1,7 +1,6 @@ package pb import ( - "bytes" "fmt" "strings" @@ -113,43 +112,6 @@ func (rm *ResourceMetrics) unmarshalProtobuf(src []byte) (err error) { return nil } -// Resource represents the corresponding OTEL protobuf message -type Resource struct { - Attributes []*KeyValue -} - -func (r *Resource) marshalProtobuf(mm *easyproto.MessageMarshaler) { - for _, a := range r.Attributes { - a.marshalProtobuf(mm.AppendMessage(1)) - } -} - -func (r *Resource) unmarshalProtobuf(src []byte) (err error) { - // message Resource { - // repeated KeyValue attributes = 1; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in Resource: %w", err) - } - switch fc.FieldNum { - case 1: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Attribute data") - } - r.Attributes = append(r.Attributes, &KeyValue{}) - a := r.Attributes[len(r.Attributes)-1] - if err := a.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Attribute: %w", err) - } - } - } - return nil -} - // ScopeMetrics represents the corresponding OTEL protobuf message type ScopeMetrics struct { Metrics []*Metric @@ -283,229 +245,6 @@ func (m *Metric) unmarshalProtobuf(src []byte) (err error) { return nil } -// KeyValue represents the corresponding OTEL protobuf message -type KeyValue struct { - Key string - Value *AnyValue -} - -func (kv *KeyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { - mm.AppendString(1, kv.Key) - if kv.Value != nil { - kv.Value.marshalProtobuf(mm.AppendMessage(2)) - } -} - -func (kv *KeyValue) unmarshalProtobuf(src []byte) (err error) { - // message KeyValue { - // string key = 1; - // AnyValue value = 2; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in KeyValue: %w", err) - } - switch fc.FieldNum { - case 1: - key, ok := fc.String() - if !ok { - return fmt.Errorf("cannot read Key") - } - kv.Key = strings.Clone(key) - case 2: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Value") - } - kv.Value = &AnyValue{} - if err := kv.Value.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Value: %w", err) - } - } - } - return nil -} - -// AnyValue represents the corresponding OTEL protobuf message -type AnyValue struct { - StringValue *string - BoolValue *bool - IntValue *int64 - DoubleValue *float64 - ArrayValue *ArrayValue - KeyValueList *KeyValueList - BytesValue *[]byte -} - -func (av *AnyValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { - switch { - case av.StringValue != nil: - mm.AppendString(1, *av.StringValue) - case av.BoolValue != nil: - mm.AppendBool(2, *av.BoolValue) - case av.IntValue != nil: - mm.AppendInt64(3, *av.IntValue) - case av.DoubleValue != nil: - mm.AppendDouble(4, *av.DoubleValue) - case av.ArrayValue != nil: - av.ArrayValue.marshalProtobuf(mm.AppendMessage(5)) - case av.KeyValueList != nil: - av.KeyValueList.marshalProtobuf(mm.AppendMessage(6)) - case av.BytesValue != nil: - mm.AppendBytes(7, *av.BytesValue) - } -} - -func (av *AnyValue) unmarshalProtobuf(src []byte) (err error) { - // message AnyValue { - // oneof value { - // string string_value = 1; - // bool bool_value = 2; - // int64 int_value = 3; - // double double_value = 4; - // ArrayValue array_value = 5; - // KeyValueList kvlist_value = 6; - // bytes bytes_value = 7; - // } - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in AnyValue") - } - switch fc.FieldNum { - case 1: - stringValue, ok := fc.String() - if !ok { - return fmt.Errorf("cannot read StringValue") - } - stringValue = strings.Clone(stringValue) - av.StringValue = &stringValue - case 2: - boolValue, ok := fc.Bool() - if !ok { - return fmt.Errorf("cannot read BoolValue") - } - av.BoolValue = &boolValue - case 3: - intValue, ok := fc.Int64() - if !ok { - return fmt.Errorf("cannot read IntValue") - } - av.IntValue = &intValue - case 4: - doubleValue, ok := fc.Double() - if !ok { - return fmt.Errorf("cannot read DoubleValue") - } - av.DoubleValue = &doubleValue - case 5: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read ArrayValue") - } - av.ArrayValue = &ArrayValue{} - if err := av.ArrayValue.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal ArrayValue: %w", err) - } - case 6: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read KeyValueList") - } - av.KeyValueList = &KeyValueList{} - if err := av.KeyValueList.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal KeyValueList: %w", err) - } - case 7: - bytesValue, ok := fc.Bytes() - if !ok { - return fmt.Errorf("cannot read BytesValue") - } - bytesValue = bytes.Clone(bytesValue) - av.BytesValue = &bytesValue - } - } - return nil -} - -// ArrayValue represents the corresponding OTEL protobuf message -type ArrayValue struct { - Values []*AnyValue -} - -func (av *ArrayValue) marshalProtobuf(mm *easyproto.MessageMarshaler) { - for _, v := range av.Values { - v.marshalProtobuf(mm.AppendMessage(1)) - } -} - -func (av *ArrayValue) unmarshalProtobuf(src []byte) (err error) { - // message ArrayValue { - // repeated AnyValue values = 1; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in ArrayValue") - } - switch fc.FieldNum { - case 1: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Value data") - } - av.Values = append(av.Values, &AnyValue{}) - v := av.Values[len(av.Values)-1] - if err := v.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Value: %w", err) - } - } - } - return nil -} - -// KeyValueList represents the corresponding OTEL protobuf message -type KeyValueList struct { - Values []*KeyValue -} - -func (kvl *KeyValueList) marshalProtobuf(mm *easyproto.MessageMarshaler) { - for _, v := range kvl.Values { - v.marshalProtobuf(mm.AppendMessage(1)) - } -} - -func (kvl *KeyValueList) unmarshalProtobuf(src []byte) (err error) { - // message KeyValueList { - // repeated KeyValue values = 1; - // } - var fc easyproto.FieldContext - for len(src) > 0 { - src, err = fc.NextField(src) - if err != nil { - return fmt.Errorf("cannot read next field in KeyValueList") - } - switch fc.FieldNum { - case 1: - data, ok := fc.MessageData() - if !ok { - return fmt.Errorf("cannot read Value data") - } - kvl.Values = append(kvl.Values, &KeyValue{}) - v := kvl.Values[len(kvl.Values)-1] - if err := v.unmarshalProtobuf(data); err != nil { - return fmt.Errorf("cannot unmarshal Value: %w", err) - } - } - } - return nil -} - // Gauge represents the corresponding OTEL protobuf message type Gauge struct { DataPoints []*NumberDataPoint diff --git a/lib/protoparser/opentelemetry/stream/streamparser_test.go b/lib/protoparser/opentelemetry/stream/streamparser_test.go index f363e5dfa..4d5dd253e 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_test.go +++ b/lib/protoparser/opentelemetry/stream/streamparser_test.go @@ -248,7 +248,6 @@ func generateGauge(name, unit string) *pb.Metric { func generateHistogram(name, unit string) *pb.Metric { points := []*pb.HistogramDataPoint{ { - Attributes: attributesFromKV("label2", "value2"), Count: 15, Sum: func() *float64 { v := 30.0; return &v }(), diff --git a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go b/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go index b0df4135f..0d70c6d35 100644 --- a/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go +++ b/lib/protoparser/opentelemetry/stream/streamparser_timing_test.go @@ -32,5 +32,4 @@ func BenchmarkParseStream(b *testing.B) { } } }) - }