mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-01 15:33:35 +00:00

This metric tracks an approximate amounts of bytes processed when parsing the ingested logs. The metric is exposed individually per every supported data ingestion protocol. The protocol name is exposed via "type" label in order to be consistent with vl_rows_ingested_total metric. Thanks to @tenmozes for the initial idea and implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7682 While at it, remove the unneeded "format" label from vl_rows_ingested_total metric. The "type" label must be enough for encoding the data ingestion format.
143 lines
4.6 KiB
Go
143 lines
4.6 KiB
Go
package opentelemetry
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlinsert/insertutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
)
|
|
|
|
// RequestHandler processes Opentelemetry insert requests
|
|
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
|
switch path {
|
|
// use the same path as opentelemetry collector
|
|
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
|
|
case "/v1/logs":
|
|
if r.Header.Get("Content-Type") == "application/json" {
|
|
httpserver.Errorf(w, r, "json encoding isn't supported for opentelemetry format. Use protobuf encoding")
|
|
return true
|
|
}
|
|
handleProtobuf(r, w)
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func handleProtobuf(r *http.Request, w http.ResponseWriter) {
|
|
startTime := time.Now()
|
|
requestsProtobufTotal.Inc()
|
|
reader := r.Body
|
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
|
zr, err := common.GetGzipReader(reader)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot initialize gzip reader: %s", err)
|
|
return
|
|
}
|
|
defer common.PutGzipReader(zr)
|
|
reader = zr
|
|
}
|
|
|
|
wcr := writeconcurrencylimiter.GetReader(reader)
|
|
data, err := io.ReadAll(wcr)
|
|
writeconcurrencylimiter.PutReader(wcr)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot read request body: %s", err)
|
|
return
|
|
}
|
|
|
|
cp, err := insertutils.GetCommonParams(r)
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot parse common params from request: %s", err)
|
|
return
|
|
}
|
|
if err := vlstorage.CanWriteData(); err != nil {
|
|
httpserver.Errorf(w, r, "%s", err)
|
|
return
|
|
}
|
|
|
|
lmp := cp.NewLogMessageProcessor("opentelelemtry_protobuf")
|
|
n, err := pushProtobufRequest(data, lmp)
|
|
lmp.MustClose()
|
|
if err != nil {
|
|
httpserver.Errorf(w, r, "cannot parse OpenTelemetry protobuf request: %s", err)
|
|
return
|
|
}
|
|
|
|
rowsIngestedProtobufTotal.Add(n)
|
|
|
|
// update requestProtobufDuration only for successfully parsed requests
|
|
// There is no need in updating requestProtobufDuration for request errors,
|
|
// since their timings are usually much smaller than the timing for successful request parsing.
|
|
requestProtobufDuration.UpdateDuration(startTime)
|
|
}
|
|
|
|
var (
|
|
rowsIngestedProtobufTotal = metrics.NewCounter(`vl_rows_ingested_total{type="opentelemetry_protobuf"}`)
|
|
|
|
requestsProtobufTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
|
errorsTotal = metrics.NewCounter(`vl_http_errors_total{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
|
|
|
requestProtobufDuration = metrics.NewHistogram(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
|
|
)
|
|
|
|
func pushProtobufRequest(data []byte, lmp insertutils.LogMessageProcessor) (int, error) {
|
|
var req pb.ExportLogsServiceRequest
|
|
if err := req.UnmarshalProtobuf(data); err != nil {
|
|
errorsTotal.Inc()
|
|
return 0, fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
|
|
}
|
|
|
|
var rowsIngested int
|
|
var commonFields []logstorage.Field
|
|
for _, rl := range req.ResourceLogs {
|
|
attributes := rl.Resource.Attributes
|
|
commonFields = slicesutil.SetLength(commonFields, len(attributes))
|
|
for i, attr := range attributes {
|
|
commonFields[i].Name = attr.Key
|
|
commonFields[i].Value = attr.Value.FormatString()
|
|
}
|
|
commonFieldsLen := len(commonFields)
|
|
for _, sc := range rl.ScopeLogs {
|
|
var scopeIngested int
|
|
commonFields, scopeIngested = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp)
|
|
rowsIngested += scopeIngested
|
|
}
|
|
}
|
|
|
|
return rowsIngested, nil
|
|
}
|
|
|
|
func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutils.LogMessageProcessor) ([]logstorage.Field, int) {
|
|
fields := commonFields
|
|
for _, lr := range sc.LogRecords {
|
|
fields = fields[:len(commonFields)]
|
|
fields = append(fields, logstorage.Field{
|
|
Name: "_msg",
|
|
Value: lr.Body.FormatString(),
|
|
})
|
|
for _, attr := range lr.Attributes {
|
|
fields = append(fields, logstorage.Field{
|
|
Name: attr.Key,
|
|
Value: attr.Value.FormatString(),
|
|
})
|
|
}
|
|
fields = append(fields, logstorage.Field{
|
|
Name: "severity",
|
|
Value: lr.FormatSeverity(),
|
|
})
|
|
|
|
lmp.AddRow(lr.ExtractTimestampNano(), fields)
|
|
}
|
|
return fields, len(sc.LogRecords)
|
|
}
|