2024-09-03 18:12:05 +00:00
|
|
|
package opentelemetry
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net/http"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
|
|
"github.com/VictoriaMetrics/metrics"
|
|
|
|
)
|
|
|
|
|
|
|
|
// RequestHandler processes Opentelemetry insert requests
|
|
|
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
|
|
|
switch path {
|
|
|
|
// use the same path as opentelemetry collector
|
|
|
|
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
|
|
|
|
case "/v1/logs":
|
|
|
|
if r.Header.Get("Content-Type") == "application/json" {
|
|
|
|
httpserver.Errorf(w, r, "json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
handleProtobuf(r, w)
|
|
|
|
return true
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func handleProtobuf(r *http.Request, w http.ResponseWriter) {
|
|
|
|
startTime := time.Now()
|
|
|
|
requestsProtobufTotal.Inc()
|
|
|
|
reader := r.Body
|
|
|
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
|
|
|
zr, err := common.GetGzipReader(reader)
|
|
|
|
if err != nil {
|
|
|
|
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer common.PutGzipReader(zr)
|
|
|
|
reader = zr
|
|
|
|
}
|
|
|
|
|
|
|
|
wcr := writeconcurrencylimiter.GetReader(reader)
|
|
|
|
data, err := io.ReadAll(wcr)
|
|
|
|
writeconcurrencylimiter.PutReader(wcr)
|
|
|
|
if err != nil {
|
|
|
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
cp, err := insertutils.GetCommonParams(r)
|
|
|
|
if err != nil {
|
|
|
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if err := vlstorage.CanWriteData(); err != nil {
|
|
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-11-30 16:19:13 +00:00
|
|
|
lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf")
|
2024-12-04 12:57:20 +00:00
|
|
|
useDefaultStreamFields := len(cp.StreamFields) == 0
|
|
|
|
err = pushProtobufRequest(data, lmp, useDefaultStreamFields)
|
2024-09-03 18:12:05 +00:00
|
|
|
lmp.MustClose()
|
|
|
|
if err != nil {
|
|
|
|
httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// update requestProtobufDuration only for successfully parsed requests
|
|
|
|
// There is no need in updating requestProtobufDuration for request errors,
|
|
|
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
|
|
|
requestProtobufDuration.UpdateDuration(startTime)
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
|
|
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
|
|
|
|
|
|
|
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
|
|
|
)
|
|
|
|
|
2024-12-04 12:57:20 +00:00
|
|
|
func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) error {
|
2024-09-03 18:12:05 +00:00
|
|
|
var req pb.ExportLogsServiceRequest
|
|
|
|
if err := req.UnmarshalProtobuf(data); err != nil {
|
|
|
|
errorsTotal.Inc()
|
2024-12-04 09:46:17 +00:00
|
|
|
return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
|
2024-09-03 18:12:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var commonFields []logstorage.Field
|
|
|
|
for _, rl := range req.ResourceLogs {
|
|
|
|
attributes := rl.Resource.Attributes
|
|
|
|
commonFields = slicesutil.SetLength(commonFields, len(attributes))
|
|
|
|
for i, attr := range attributes {
|
|
|
|
commonFields[i].Name = attr.Key
|
|
|
|
commonFields[i].Value = attr.Value.FormatString()
|
|
|
|
}
|
|
|
|
commonFieldsLen := len(commonFields)
|
|
|
|
for _, sc := range rl.ScopeLogs {
|
2024-12-04 12:57:20 +00:00
|
|
|
commonFields = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp, useDefaultStreamFields)
|
2024-09-03 18:12:05 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-12-04 09:46:17 +00:00
|
|
|
return nil
|
2024-09-03 18:12:05 +00:00
|
|
|
}
|
|
|
|
|
2024-12-04 12:57:20 +00:00
|
|
|
func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutils.LogMessageProcessor, useDefaultStreamFields bool) []logstorage.Field {
|
2024-09-03 18:12:05 +00:00
|
|
|
fields := commonFields
|
|
|
|
for _, lr := range sc.LogRecords {
|
|
|
|
fields = fields[:len(commonFields)]
|
|
|
|
fields = append(fields, logstorage.Field{
|
|
|
|
Name: "_msg",
|
|
|
|
Value: lr.Body.FormatString(),
|
|
|
|
})
|
|
|
|
for _, attr := range lr.Attributes {
|
|
|
|
fields = append(fields, logstorage.Field{
|
|
|
|
Name: attr.Key,
|
|
|
|
Value: attr.Value.FormatString(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
fields = append(fields, logstorage.Field{
|
|
|
|
Name: "severity",
|
|
|
|
Value: lr.FormatSeverity(),
|
|
|
|
})
|
|
|
|
|
2024-12-04 12:57:20 +00:00
|
|
|
var streamFields []logstorage.Field
|
|
|
|
if useDefaultStreamFields {
|
|
|
|
streamFields = commonFields
|
|
|
|
}
|
|
|
|
lmp.AddRow(lr.ExtractTimestampNano(), fields, streamFields)
|
2024-09-03 18:12:05 +00:00
|
|
|
}
|
2024-12-04 09:46:17 +00:00
|
|
|
return fields
|
2024-09-03 18:12:05 +00:00
|
|
|
}
|