docs/guides/otel: added logs integration, updated old otel dependencies

### Describe Your Changes

- added VictoriaLogs to OpenTelemetry guide
- updated deprecated dependencies
- added deltatocumulative processor to example and deltatemporality
selector to one of examples to use for counters by default
- added exponential histograms to example

---
Signed-off-by: Andrii Chubatiuk <andrew.chubatiuk@gmail.com>
This commit is contained in:
Andrii Chubatiuk 2024-12-19 13:32:41 +02:00 committed by f41gh7
parent c422236280
commit 56b94e06ec
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
12 changed files with 491 additions and 288 deletions

View file

@ -1,5 +1,5 @@
VictoriaMetrics supports metrics ingestion with [OpenTelemetry metrics format](https://docs.victoriametrics.com/single-server-victoriametrics/#sending-data-via-opentelemetry).
This guide covers data ingestion via [opentelemetry-collector](https://opentelemetry.io/docs/collector/) and direct metrics push from application.
VictoriaMetrics and VictoriaLogs support ingestion of OpenTelemetry [metrics](https://docs.victoriametrics.com/single-server-victoriametrics/#sending-data-via-opentelemetry) and [logs](https://docs.victoriametrics.com/victorialogs/data-ingestion/opentelemetry/) respectively.
This guide covers data ingestion via [opentelemetry-collector](https://opentelemetry.io/docs/collector/) and direct metrics and logs push from application.
## Pre-Requirements
@ -7,52 +7,105 @@ This guide covers data ingestion via [opentelemetry-collector](https://opentelem
* [kubectl](https://kubernetes.io/docs/tasks/tools/#kubectl)
* [helm](https://helm.sh/docs/intro/install/)
### Install VictoriaMetrics single-server via helm chart
### Install VictoriaMetrics and VictoriaLogs
Install single-server version:
Install VictoriaMetrics helm repo:
```sh
helm repo add vm https://victoriametrics.github.io/helm-charts/
helm repo update
helm install victoria-metrics vm/victoria-metrics-single
helm repo update
```
Add VictoriaMetrics chart values to sanitize OTEL metrics:
```sh
cat << EOF > vm-values.yaml
server:
extraArgs:
opentelemetry.usePrometheusNaming: true
EOF
```
Install VictoriaMetrics single-server version
```sh
helm install victoria-metrics vm/victoria-metrics-single -f vm-values.yaml
```
Verify it's up and running:
```sh
kubectl get pods
# NAME READY STATUS RESTARTS AGE
# victoria-metrics-victoria-metrics-single-server-0 1/1 Running 0 3m1s
```
Helm chart provides the following urls for reading and writing data:
VictoriaMetrics helm chart provides the following URL for writing data:
```text
Write url inside the kubernetes cluster:
http://victoria-metrics-victoria-metrics-single-server.default.svc.cluster.local:8428
```text
Write URL inside the kubernetes cluster:
http://victoria-metrics-victoria-metrics-single-server.default.svc.cluster.local.:8428/<protocol-specific-write-endpoint>
Read Data:
The following url can be used as the datasource url in Grafana:
http://victoria-metrics-victoria-metrics-single-server.default.svc.cluster.local:8428
All supported write endpoints can be found at https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-import-time-series-data.
```
## Using opentelemetry-collector with VictoriaMetrics
For OpenTelemetry VictoriaMetrics write endpoint is:
```text
http://victoria-metrics-victoria-metrics-single-server.default.svc.cluster.local.:8428/opentelemetry/v1/metrics
```
Install VictoriaLogs single-server version
```sh
helm install victoria-logs vm/victoria-logs-single
```
Verify it's up and running:
```sh
kubectl get pods
# NAME READY STATUS RESTARTS AGE
# victoria-logs-victoria-logs-single-server-0 1/1 Running 0 1m10s
```
VictoriaLogs helm chart provides the following URL for writing data:
```text
Write URL inside the kubernetes cluster:
http://victoria-logs-victoria-logs-single-server.default.svc.cluster.local.:9428/<protocol-specific-write-endpoint>
All supported write endpoints can be found at https://docs.victoriametrics.com/victorialogs/data-ingestion/
```
For OpenTelemetry VictoriaLogs write endpoint is:
```text
http://victoria-logs-victoria-logs-single-server.default.svc.cluster.local.:9428/insert/opentelemetry/v1/logs
```
## Using opentelemetry-collector with VictoriaMetrics and VictoriaLogs
![OTEL Collector](collector.webp)
### Deploy opentelemetry-collector and configure metrics forwarding
### Deploy opentelemetry-collector and configure metrics and logs forwarding
Add OpenTelemetry helm repo
```sh
helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
helm repo update
helm repo update
```
# add values
cat << EOF > values.yaml
Add OTEL Collector values
```sh
cat << EOF > otel-values.yaml
mode: deployment
image:
repository: "otel/opentelemetry-collector-contrib"
presets:
clusterMetrics:
enabled: true
logsCollection:
enabled: true
config:
# deltatocumulative processor is needed only to support metrics with delta temporality, which is not supported by VictoriaMetrics
processors:
deltatocumulative:
max_stale: 5m
receivers:
otlp:
protocols:
@ -61,46 +114,60 @@ config:
http:
endpoint: 0.0.0.0:4318
exporters:
otlphttp/victoriametrics:
compression: gzip
encoding: proto
# Setting below will work for sending data to VictoriaMetrics single-node version.
# Cluster version of VictoriaMetrics will require a different URL - https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
endpoint: http://victoria-metrics-victoria-metrics-single-server.default.svc.cluster.local:8428/opentelemetry
tls:
otlphttp/victoriametrics:
compression: gzip
encoding: proto
# Setting below will work for sending data to VictoriaMetrics single-node version.
# Cluster version of VictoriaMetrics will require a different URL - https://docs.victoriametrics.com/cluster-victoriametrics/#url-format
metrics_endpoint: http://victoria-metrics-victoria-metrics-single-server.default.svc.cluster.local:8428/opentelemetry/v1/metrics
logs_endpoint: http://victoria-logs-victoria-logs-single-server.default.svc.cluster.local:9428/insert/opentelemetry/v1/logs
tls:
insecure: true
service:
pipelines:
metrics:
receivers: [otlp]
logs:
processors: []
exporters: [otlphttp/victoriametrics]
metrics:
receivers: [otlp]
processors: [deltatocumulative]
exporters: [otlphttp/victoriametrics]
EOF
# install helm chart
helm upgrade -i otl-collector open-telemetry/opentelemetry-collector -f values.yaml
# check if pod is healthy
kubectl get pod
NAME READY STATUS RESTARTS AGE
otl-collector-opentelemetry-collector-7467bbb559-2pq2n 1/1 Running 0 23m
# forward port to local machine to verify metrics are ingested
kubectl port-forward service/victoria-metrics-victoria-metrics-single-server 8428
# check metric `k8s_container_ready` via browser http://localhost:8428/vmui/#/?g0.expr=k8s_container_ready
# forward port to local machine to setup opentelemetry-collector locally
kubectl port-forward otl-collector-opentelemetry-collector 4318
```
The full version of possible configuration options could be found in [OpenTelemetry docs](https://opentelemetry.io/docs/collector/configuration/).
Install OTEL Collector helm chart
```sh
helm upgrade -i otel open-telemetry/opentelemetry-collector -f otel-values.yaml
```
## Sending to VictoriaMetrics via OpenTelemetry
Metrics could be sent to VictoriaMetrics via OpenTelemetry instrumentation libraries. You can use any compatible OpenTelemetry instrumentation [clients](https://opentelemetry.io/docs/languages/).
In our example, we'll create a WEB server in [Golang](https://go.dev/) and instrument it with metrics.
Check if OTEL Collector pod is healthy
```
kubectl get pod
# NAME READY STATUS RESTARTS AGE
# otel-opentelemetry-collector-7467bbb559-2pq2n 1/1 Running 0 23m
```
### Building the Go application instrumented with metrics
Forward VictoriaMetrics port to local machine to verify metrics are ingested
```sh
kubectl port-forward svc/victoria-metrics-victoria-metrics-single-server 8428
```
Check metric `k8s_container_ready` via browser `http://localhost:8428/vmui/#/?g0.expr=k8s_container_ready`
Forward VictoriaMetrics port to local machine to verify metrics are ingested
```sh
kubectl port-forward svc/victoria-logs-victoria-logs-single-server 9428
```
Check ingested logs in browser at `http://localhost:9428/select/vmui`
The full version of possible configuration options can be found in [OpenTelemetry docs](https://opentelemetry.io/docs/collector/configuration/).
## Sending to VictoriaMetrics and VictoriaLogs via OpenTelemetry
Metrics and logs can be sent to VictoriaMetrics and VictoriaLogs via OpenTelemetry instrumentation libraries. You can use any compatible OpenTelemetry instrumentation [clients](https://opentelemetry.io/docs/languages/).
In our example, we'll create a WEB server in [Golang](https://go.dev/) and instrument it with metrics and logs.
### Building the Go application instrumented with metrics and logs
Copy the go file from [here](app.go-collector.example). This will give you a basic implementation of a dice roll WEB server with the urls for opentelemetry-collector pointing to localhost:4318.
In the same directory run the following command to create the `go.mod` file:
```sh
@ -109,33 +176,37 @@ go mod init vm/otel
For demo purposes, we'll add the following dependencies to `go.mod` file:
```go
require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.27.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.27.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/contrib/bridges/otelslog v0.7.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0
go.opentelemetry.io/otel v1.32.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.8.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0
go.opentelemetry.io/otel/log v0.8.0
go.opentelemetry.io/otel/metric v1.32.0
go.opentelemetry.io/otel/sdk v1.32.0
go.opentelemetry.io/otel/sdk/log v0.8.0
go.opentelemetry.io/otel/sdk/metric v1.32.0
)
require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirectdice.rolls
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
)
```
@ -149,39 +220,63 @@ Now you can run the application:
go run .
```
### Test metrics ingestion
### Test ingestion
By default, the application will be available at `localhost:8080`. You can start sending requests to /rolldice endpoint to generate metrics. The following command will send 20 requests to the /rolldice endpoint:
```sh
for i in `seq 1 20`; do curl http://localhost:8080/rolldice; done
```
After a few seconds you should start to see metrics sent over to the vmui interface by visiting `http://localhost:8428/vmui/#/?g0.expr=dice.rolls` in your browser or by querying the metric `dice.rolls` in the vmui interface.
![Dice roll](vmui-dice-roll.webp)
## Direct metrics push
After a few seconds you should start to see metrics sent to VictoriaMetrics by visiting `http://localhost:8428/vmui/#/?g0.expr=dice_rolls_total` in your browser or by querying the metric `dice_rolls_total` in the UI interface.
![Dice roll metrics](vmui-dice-roll-metrics.webp)
Metrics could be ingested into VictoriaMetrics directly with HTTP requests. You can use any compatible OpenTelemetry
Logs should be available by visiting `http://localhost:9428/select/vmui` using query `service.name: unknown_service:otel`.
![Dice roll logs](vmui-dice-roll-logs.webp)
## Direct metrics and logs push
Metrics and logs can be ingested into VictoriaMetrics directly with HTTP requests. You can use any compatible OpenTelemetry
instrumentation [clients](https://opentelemetry.io/docs/languages/).
In our example, we'll create a WEB server in [Golang](https://go.dev/) and instrument it with metrics.
In our example, we'll create a WEB server in [Golang](https://go.dev/) and instrument it with metrics and logs.
![OTEL direct](direct.webp)
### Building the Go application instrumented with metrics
### Building the Go application instrumented with metrics and logs
See the full source code of the example [here](app.go.example).
The list of OpenTelemetry dependencies for `go.mod` is the following:
```go
go 1.20
go 1.23.4
require (
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.30.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.30.0
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/sdk/metric v0.30.0
go.opentelemetry.io/contrib/bridges/otelslog v0.7.0
go.opentelemetry.io/otel v1.32.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.8.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.32.0
go.opentelemetry.io/otel/log v0.8.0
go.opentelemetry.io/otel/metric v1.32.0
go.opentelemetry.io/otel/sdk v1.32.0
go.opentelemetry.io/otel/sdk/log v0.8.0
go.opentelemetry.io/otel/sdk/metric v1.32.0
)
require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
)
```
@ -190,126 +285,175 @@ Let's create a new file `main.go` with basic implementation of the WEB server:
package main
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/api/fast", func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`fast ok`))
})
mux.HandleFunc("/api/slow", func(writer http.ResponseWriter, request *http.Request) {
time.Sleep(time.Second * 2)
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`slow ok`))
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := http.NewServeMux()
mux.HandleFunc("/api/fast", func(writer http.ResponseWriter, request *http.Request) {
logger.InfoContext(ctx, "Anonymous access to fast endpoint")
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`fast ok`))
})
mux.HandleFunc("/api/slow", func(writer http.ResponseWriter, request *http.Request) {
time.Sleep(time.Second * 2)
logger.InfoContext(ctx, "Anonymous access to slow endpoint")
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`slow ok`))
})
mw, err := newMiddleware(ctx, mux)
if err != nil {
panic(fmt.Sprintf("cannot build middleware: %q", err))
}
mustStop := make(chan os.Signal, 1)
signal.Notify(mustStop, os.Interrupt, syscall.SIGTERM)
go func() {
http.ListenAndServe("localhost:8081", mw)
}()
log.Printf("web server started at localhost:8081.")
<-mustStop
log.Println("receive shutdown signal, stopping webserver")
mw, err := newMetricsMiddleware(mux)
if err != nil {
panic(fmt.Sprintf("cannot build metricMiddleWare: %q", err))
}
go func() {
http.ListenAndServe("localhost:8081", mw)
}()
for _, shutdown := range mw.onShutdown {
if err := shutdown(ctx); err != nil {
log.Println("cannot shutdown metric provider ", err)
}
}
log.Printf("Done!")
}
```
In the code above, we used `newMetricsMiddleware` function to create a `handler` for our server.
In the code above, we used `newMiddleware` function to create a `handler` for our server.
Let's define it below:
```go
type metricMiddleWare struct {
h http.Handler
requestsCount syncint64.Counter
requestsLatency syncfloat64.Histogram
activeRequests int64
type middleware struct {
ctx context.Context
h http.Handler
requestsCount metric.Int64Counter
requestsLatency metric.Float64Histogram
activeRequests int64
onShutdown []func(ctx context.Context) error
}
func newMetricsMiddleware(h http.Handler) (*metricMiddleWare, error) {
mw := &metricMiddleWare{h: h}
mc, err := newMetricsController(ctx)
if err != nil {
return nil, fmt.Errorf("cannot build metrics collector: %w", err)
}
global.SetMeterProvider(mc)
func newMiddleware(ctx context.Context, h http.Handler) (*middleware, error) {
mw := &middleware{
ctx: ctx,
h: h,
}
prov := mc.Meter("")
lp, err := newLoggerProvider(ctx)
if err != nil {
return nil, fmt.Errorf("cannot build logs provider: %w", err)
}
global.SetLoggerProvider(lp)
mw.requestsLatency, err = prov.SyncFloat64().Histogram("http_request_latency_seconds")
if err != nil {
return nil, fmt.Errorf("cannot create histogram: %w", err)
}
mw.requestsCount, err = prov.SyncInt64().Counter("http_requests_total")
if err != nil {
return nil, fmt.Errorf("cannot create syncInt64 counter: %w", err)
}
ar, err := prov.AsyncInt64().Gauge("http_active_requests")
if err != nil {
return nil, fmt.Errorf("cannot create AsyncInt64 gauge: %w", err)
}
if err := prov.RegisterCallback([]instrument.Asynchronous{ar}, func(ctx context.Context) {
ar.Observe(ctx, atomic.LoadInt64(&mw.activeRequests))
}); err != nil {
return nil, fmt.Errorf("cannot Register int64 gauge: %w", err)
}
mp, err := newMeterProvider(ctx)
if err != nil {
return nil, fmt.Errorf("cannot build metrics provider: %w", err)
}
otel.SetMeterProvider(mp)
meter := mp.Meter("")
return mw, nil
mw.requestsLatency, err = meter.Float64Histogram("http.requests.latency")
if err != nil {
return nil, fmt.Errorf("cannot create histogram: %w", err)
}
mw.requestsCount, err = meter.Int64Counter("http.requests")
if err != nil {
return nil, fmt.Errorf("cannot create int64 counter: %w", err)
}
cb := func(c context.Context, o metric.Int64Observer) error {
o.Observe(atomic.LoadInt64(&mw.activeRequests))
return nil
}
_, err = meter.Int64ObservableGauge("http.requests.active", metric.WithInt64Callback(cb))
if err != nil {
return nil, fmt.Errorf("cannot create Int64ObservableGauge: %w", err)
}
mw.onShutdown = append(mw.onShutdown, mp.Shutdown, lp.Shutdown)
return mw, nil
}
```
The new type `metricMiddleWare` is instrumented with 3 [metrics](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#timeseries-model)
initialized in `newMetricsMiddleware` method:
* counter `http_requests_total`
* histogram `http_request_latency_seconds`
* gauge `http_active_requests`
Also you can find there `logger`, which is `otel` logger:
Let's implement http.Handler interface for `metricMiddleWare` by adding `ServeHTTP` method:
```go
func (m *metricMiddleWare) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t := time.Now()
path := r.URL.Path
m.requestsCount.Add(nil, 1, attribute.String("path", path))
atomic.AddInt64(&m.activeRequests, 1)
defer func() {
atomic.AddInt64(&m.activeRequests, -1)
m.requestsLatency.Record(nil, time.Since(t).Seconds(), attribute.String("path", path))
}()
m.h.ServeHTTP(w, r)
var (
logger = otelslog.NewLogger("rolldice")
)
```
and initialized in a `newLoggerProvider`
```go
func newLoggerProvider(ctx context.Context) (*sdklog.LoggerProvider, error) {
exporter, err := otlploghttp.New(ctx, otlploghttp.WithEndpointURL(*logsEndpoint))
if err != nil {
return nil, err
}
provider := sdklog.NewLoggerProvider(
sdklog.WithProcessor(sdklog.NewBatchProcessor(exporter)),
)
return provider, nil
}
```
The new type `middleware` is instrumented with 3 [metrics](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#timeseries-model)
initialized in `newMiddleware` method:
* counter `http.requests`
* histogram `http.requests.latency`
* gauge `http.requests.active`
Let's implement http.Handler interface for `middleWare` by adding `ServeHTTP` method:
```go
func (m *middleWare) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t := time.Now()
path := r.URL.Path
m.requestsCount.Add(m.ctx, 1, metric.WithAttributes(attribute.String("path", path)))
atomic.AddInt64(&m.activeRequests, 1)
defer func() {
atomic.AddInt64(&m.activeRequests, -1)
m.requestsLatency.Record(m.ctx, time.Since(t).Seconds(), metric.WithAttributes(attribute.String("path", path)))
}()
m.h.ServeHTTP(w, r)
}
```
In method above, our middleware processes received HTTP requests and updates metrics with each new request.
But for these metrics to be shipped we need to add a new method `newMetricsController` to organize metrics collection:
But for these metrics to be shipped we need to add a new method `newMeterProvider` to organize metrics collection:
```go
func newMetricsController(ctx context.Context) (*controller.Controller, error) {
options := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint("<VictoriaMetrics endpoint - host:port>"),
otlpmetrichttp.WithURLPath("/opentelemetry/api/v1/push"),
}
func newMeterProvider(ctx context.Context) (*sdkmetric.MeterProvider, error) {
exporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpointURL(*metricsEndpoint))
if err != nil {
return nil, fmt.Errorf("cannot create otlphttp exporter: %w", err)
}
metricExporter, err := otlpmetrichttp.New(ctx, options...)
if err != nil {
return nil, fmt.Errorf("cannot create otlphttp exporter: %w", err)
}
resourceConfig, err := resource.New(ctx, resource.WithAttributes(attribute.String("job", "otlp"), attribute.String("instance", "localhost")))
if err != nil {
return nil, fmt.Errorf("cannot create meter resource: %w", err)
}
meterController := controller.New(
processor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries([]float64{0.01, 0.05, 0.1, 0.5, 0.9, 1.0, 5.0, 10.0, 100.0}),
),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
controller.WithExporter(metricExporter),
controller.WithCollectPeriod(time.Second * 10),
controller.WithResource(resourceConfig),
)
if err := meterController.Start(ctx); err != nil {
return nil, fmt.Errorf("cannot start meter controller: %w", err)
}
return meterController, nil
res, err := resource.New(ctx,
resource.WithAttributes(
attribute.String("job", *jobName),
attribute.String("instance", *instanceName),
),
)
if err != nil {
return nil, fmt.Errorf("cannot create meter resource: %w", err)
}
expView := sdkmetric.NewView(
sdkmetric.Instrument{
Name: "http.requests.latency",
Kind: sdkmetric.InstrumentKindHistogram,
},
sdkmetric.Stream{
Name: "http.requests.latency.exp",
Aggregation: sdkmetric.AggregationBase2ExponentialHistogram{
MaxSize: 160,
MaxScale: 20,
},
},
)
return sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, sdkmetric.WithInterval(*pushInterval))),
sdkmetric.WithResource(res),
sdkmetric.WithView(expView),
), nil
}
```
@ -317,14 +461,16 @@ This controller will collect and push collected metrics to VictoriaMetrics addre
See the full source code of the example [here](app.go.example).
### Test metrics ingestion
### Test ingestion
In order to push metrics and logs of our WEB server to VictoriaMetrics and VictoriaLogs it is necessary to ensure that both services are available locally.
In previous steps we already deployed a single-server VictoriaMetrics and VictoriaLogs, so let's make them available locally:
In order to push metrics of our WEB server to VictoriaMetrics it is necessary to ensure that VictoriaMetrics ingestion
endpoint is available locally.
In previous steps we already deployed a single-server VictoriaMetrics, so let's make it available locally:
```sh
# port-forward victoriametrics to ingest metrics
kubectl port-forward victoria-metrics-victoria-metrics-single-server-0 8428
# port-forward victorialogs to ingest logs
kubectl port-forward victoria-logs-victoria-logs-single-server-0 9428
```
Now let's run our WEB server and call its APIs:
@ -339,12 +485,15 @@ curl http://localhost:8081/api/fast
curl http://localhost:8081/api/slow
```
Open [vmui](https://docs.victoriametrics.com/#vmui) and query `http_requests_total` or `http_active_requests`
with [metricsql](https://docs.victoriametrics.com/metricsql/).
Open VictoriaMetrics at `http://localhost:8428/vmui` and query `http_requests_total` or `http_requests_active`
![OTEL VMUI](vmui.webp)
![OTEL Metrics VMUI](vmui-direct-metrics.webp)
Open VictoriaLogs UI at `http://localhost:9428/select/vmui` and query `service.name: unknown_service:otel`
![OTEL Logs VMUI](vmui-direct-logs.webp)
## Limitations
* VictoriaMetrics doesn't support experimental JSON encoding [format](https://github.com/open-telemetry/opentelemetry-proto/blob/main/examples/metrics.json).
* VictoriaMetrics supports only `AggregationTemporalityCumulative` type for [histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram) and [summary](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#summary-legacy)
* VictoriaMetrics and VictoriaLogs do not support experimental JSON encoding [format](https://github.com/open-telemetry/opentelemetry-proto/blob/main/examples/metrics.json).
* VictoriaMetrics supports only `AggregationTemporalityCumulative` type for [histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram) and [summary](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#summary-legacy). Either consider using cumulative temporality temporality or try [`delta-to-cumulative processor`](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/deltatocumulativeprocessor) to make conversion to cumulative temporality in OTEL Collector.

View file

@ -1,9 +1,12 @@
---
weight: 5
title: How to use OpenTelemetry metrics with VictoriaMetrics
title: How to use OpenTelemetry with VictoriaMetrics and VictoriaLogs
menu:
docs:
parent: "guides"
weight: 5
aliases:
- /guides/how-to-use-opentelemetry-metrics-with-victoriametrics/
- /guides/how-to-use-opentelemetry-with-victoriametrics-and-victorialogs/
---
{{% content "README.md" %}}

View file

@ -13,17 +13,22 @@ import (
"strconv"
"time"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
func main() {
@ -98,14 +103,15 @@ func newHTTPHandler() http.Handler {
var (
tracer = otel.Tracer("rolldice")
meter = otel.Meter("rolldice")
rollCnt otelmetric.Int64Counter
logger = otelslog.NewLogger("rolldice")
rollCnt metric.Int64Counter
)
func init() {
var err error
rollCnt, err = meter.Int64Counter("dice.rolls",
otelmetric.WithDescription("The number of rolls by roll value"),
otelmetric.WithUnit("{roll}"))
metric.WithDescription("The number of rolls by roll value"),
metric.WithUnit("{roll}"))
if err != nil {
panic(err)
}
@ -119,7 +125,8 @@ func rolldice(w http.ResponseWriter, r *http.Request) {
rollValueAttr := attribute.Int("roll.value", roll)
span.SetAttributes(rollValueAttr)
rollCnt.Add(ctx, 1, otelmetric.WithAttributes(rollValueAttr))
rollCnt.Add(ctx, 1, metric.WithAttributes(rollValueAttr))
logger.InfoContext(ctx, "Anonymous player is rolling the dice", "result", roll)
resp := strconv.Itoa(roll) + "\n"
if _, err := io.WriteString(w, resp); err != nil {
@ -171,6 +178,15 @@ func setupOTelSDK(ctx context.Context) (shutdown func(context.Context) error, er
shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
otel.SetMeterProvider(meterProvider)
// Set up log provider.
logProvider, err := newLoggerProvider(ctx)
if err != nil {
handleErr(err)
return
}
global.SetLoggerProvider(logProvider)
shutdownFuncs = append(shutdownFuncs, logProvider.Shutdown)
return
}
@ -182,28 +198,32 @@ func newPropagator() propagation.TextMapPropagator {
}
func newTraceProvider(ctx context.Context) (*trace.TracerProvider, error) {
traceExporter, err := otlptracehttp.New(ctx, otlptracehttp.WithInsecure(), otlptracehttp.WithEndpoint("localhost:4318"))
exporter, err := otlptracehttp.New(ctx, otlptracehttp.WithInsecure(), otlptracehttp.WithEndpoint("localhost:4318"))
if err != nil {
return nil, err
}
traceProvider := trace.NewTracerProvider(
trace.WithBatcher(traceExporter,
provider := trace.NewTracerProvider(
trace.WithBatcher(exporter,
// Default is 5s. Set to 1s for demonstrative purposes.
trace.WithBatchTimeout(time.Second)),
)
return traceProvider, nil
return provider, nil
}
func newMeterProvider(ctx context.Context) (*metric.MeterProvider, error) {
metricExporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithInsecure(), otlpmetrichttp.WithEndpoint("localhost:4318"))
func newMeterProvider(ctx context.Context) (*sdkmetric.MeterProvider, error) {
exporter, err := otlpmetrichttp.New(ctx,
otlpmetrichttp.WithInsecure(),
otlpmetrichttp.WithEndpoint("localhost:4318"),
otlpmetrichttp.WithTemporalitySelector(
func(kind sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
),
)
if err != nil {
return nil, err
}
//metricExporter, err := stdoutmetric.New()
//if err != nil {
// return nil, err
//}
res, err := resource.Merge(resource.Default(),
resource.NewWithAttributes(semconv.SchemaURL,
semconv.ServiceName("dice-roller"),
@ -212,12 +232,26 @@ func newMeterProvider(ctx context.Context) (*metric.MeterProvider, error) {
if err != nil {
return nil, err
}
meterProvider := metric.NewMeterProvider(
metric.WithResource(res),
metric.WithReader(metric.NewPeriodicReader(metricExporter,
// Default is 1m. Set to 3s for demonstrative purposes.
metric.WithInterval(3*time.Second))),
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(res),
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
exporter,
sdkmetric.WithInterval(3*time.Second),
),
),
)
return meterProvider, nil
return provider, nil
}
func newLoggerProvider(ctx context.Context) (*sdklog.LoggerProvider, error) {
exporter, err := otlploghttp.New(ctx, otlploghttp.WithInsecure(), otlploghttp.WithEndpoint("localhost:4318"))
if err != nil {
return nil, err
}
provider := sdklog.NewLoggerProvider(
sdklog.WithProcessor(sdklog.NewBatchProcessor(exporter)),
)
return provider, nil
}

View file

@ -12,27 +12,28 @@ import (
"syscall"
"time"
"go.opentelemetry.io/contrib/bridges/otelslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/metric"
sdklog "go.opentelemetry.io/otel/sdk/log"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
var (
collectorEndpoint = flag.String("vm.endpoint", "localhost:8428", "VictoriaMetrics endpoint - host:port")
collectorURL = flag.String("vm.ingestPath", "/opentelemetry/api/v1/push", "url path for ingestion path")
isSecure = flag.Bool("vm.isSecure", false, "enables https connection for metrics push")
pushInterval = flag.Duration("vm.pushInterval", 10*time.Second, "how often push samples, aka scrapeInterval at pull model")
jobName = flag.String("metrics.jobName", "otlp", "job name for web-application")
instanceName = flag.String("metrics.instance", "localhost", "hostname of web-application instance")
metricsEndpoint = flag.String("vm.endpoint", "http://localhost:8428/opentelemetry/v1/metrics", "VictoriaMetrics endpoint")
logsEndpoint = flag.String("vl.endpoint", "http://localhost:9428/insert/opentelemetry/v1/logs", "VictoriaLogs endpoint")
pushInterval = flag.Duration("vm.pushInterval", 10*time.Second, "how often push samples, aka scrapeInterval at pull model")
jobName = flag.String("metrics.jobName", "otlp", "job name for web-application")
instanceName = flag.String("metrics.instance", "localhost", "hostname of web-application instance")
)
var (
logger = otelslog.NewLogger("rolldice")
)
func main() {
@ -42,17 +43,19 @@ func main() {
defer cancel()
mux := http.NewServeMux()
mux.HandleFunc("/api/fast", func(writer http.ResponseWriter, request *http.Request) {
logger.InfoContext(ctx, "Anonymous access to fast endpoint")
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`fast ok`))
})
mux.HandleFunc("/api/slow", func(writer http.ResponseWriter, request *http.Request) {
time.Sleep(time.Second * 2)
logger.InfoContext(ctx, "Anonymous access to slow endpoint")
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`slow ok`))
})
mw, err := newMetricsMiddleware(ctx, mux)
mw, err := newMiddleware(ctx, mux)
if err != nil {
panic(fmt.Sprintf("cannot build metricMiddleWare: %q", err))
panic(fmt.Sprintf("cannot build middleware: %q", err))
}
mustStop := make(chan os.Signal, 1)
signal.Notify(mustStop, os.Interrupt, syscall.SIGTERM)
@ -63,103 +66,117 @@ func main() {
<-mustStop
log.Println("receive shutdown signal, stopping webserver")
if err := mw.onShutdown(ctx); err != nil {
log.Println("cannot shutdown metric provider ", err)
for _, shutdown := range mw.onShutdown {
if err := shutdown(ctx); err != nil {
log.Println("cannot shutdown metric provider ", err)
}
}
cancel()
log.Printf("Done!")
}
func newMetricsController(ctx context.Context) (*controller.Controller, error) {
options := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(*collectorEndpoint),
otlpmetrichttp.WithURLPath(*collectorURL),
}
if !*isSecure {
options = append(options, otlpmetrichttp.WithInsecure())
}
metricExporter, err := otlpmetrichttp.New(ctx, options...)
func newMeterProvider(ctx context.Context) (*sdkmetric.MeterProvider, error) {
exporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpointURL(*metricsEndpoint))
if err != nil {
return nil, fmt.Errorf("cannot create otlphttp exporter: %w", err)
}
resourceConfig, err := resource.New(ctx, resource.WithAttributes(attribute.String("job", *jobName), attribute.String("instance", *instanceName)))
res, err := resource.New(ctx,
resource.WithAttributes(
attribute.String("job", *jobName),
attribute.String("instance", *instanceName),
),
)
if err != nil {
return nil, fmt.Errorf("cannot create meter resource: %w", err)
}
meterController := controller.New(
processor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries([]float64{0.01, 0.05, 0.1, 0.5, 0.9, 1.0, 5.0, 10.0, 100.0}),
),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
controller.WithExporter(metricExporter),
controller.WithCollectPeriod(*pushInterval),
controller.WithResource(resourceConfig),
expView := sdkmetric.NewView(
sdkmetric.Instrument{
Name: "http.requests.latency",
Kind: sdkmetric.InstrumentKindHistogram,
},
sdkmetric.Stream{
Name: "http.requests.latency.exp",
Aggregation: sdkmetric.AggregationBase2ExponentialHistogram{
MaxSize: 160,
MaxScale: 20,
},
},
)
if err := meterController.Start(ctx); err != nil {
return nil, fmt.Errorf("cannot start meter controller: %w", err)
}
return meterController, nil
return sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, sdkmetric.WithInterval(*pushInterval))),
sdkmetric.WithResource(res),
sdkmetric.WithView(expView),
), nil
}
func newMetricsMiddleware(ctx context.Context, h http.Handler) (*metricMiddleWare, error) {
mw := &metricMiddleWare{
func newLoggerProvider(ctx context.Context) (*sdklog.LoggerProvider, error) {
exporter, err := otlploghttp.New(ctx, otlploghttp.WithEndpointURL(*logsEndpoint))
if err != nil {
return nil, err
}
provider := sdklog.NewLoggerProvider(
sdklog.WithProcessor(sdklog.NewBatchProcessor(exporter)),
)
return provider, nil
}
func newMiddleware(ctx context.Context, h http.Handler) (*middleware, error) {
mw := &middleware{
ctx: ctx,
h: h,
}
mc, err := newMetricsController(ctx)
lp, err := newLoggerProvider(ctx)
if err != nil {
return nil, fmt.Errorf("cannot build metrics collector: %w", err)
return nil, fmt.Errorf("cannot build logs provider: %w", err)
}
global.SetMeterProvider(mc)
global.SetLoggerProvider(lp)
prov := mc.Meter("")
mp, err := newMeterProvider(ctx)
if err != nil {
return nil, fmt.Errorf("cannot build metrics provider: %w", err)
}
otel.SetMeterProvider(mp)
meter := mp.Meter("")
mw.requestsLatency, err = prov.SyncFloat64().Histogram("http_request_latency_seconds")
mw.requestsLatency, err = meter.Float64Histogram("http.requests.latency")
if err != nil {
return nil, fmt.Errorf("cannot create histogram: %w", err)
}
mw.requestsCount, err = prov.SyncInt64().Counter("http_requests_total")
mw.requestsCount, err = meter.Int64Counter("http.requests")
if err != nil {
return nil, fmt.Errorf("cannot create syncInt64 counter: %w", err)
return nil, fmt.Errorf("cannot create int64 counter: %w", err)
}
ar, err := prov.AsyncInt64().Gauge("http_active_requests")
cb := func(c context.Context, o metric.Int64Observer) error {
o.Observe(atomic.LoadInt64(&mw.activeRequests))
return nil
}
_, err = meter.Int64ObservableGauge("http.requests.active", metric.WithInt64Callback(cb))
if err != nil {
return nil, fmt.Errorf("cannot create AsyncInt64 gauge: %w", err)
return nil, fmt.Errorf("cannot create Int64ObservableGauge: %w", err)
}
if err := prov.RegisterCallback([]instrument.Asynchronous{ar}, func(ctx context.Context) {
ar.Observe(ctx, atomic.LoadInt64(&mw.activeRequests))
}); err != nil {
return nil, fmt.Errorf("cannot Register int64 gauge: %w", err)
}
mw.onShutdown = mc.Stop
mw.onShutdown = append(mw.onShutdown, mp.Shutdown, lp.Shutdown)
return mw, nil
}
type metricMiddleWare struct {
type middleware struct {
ctx context.Context
h http.Handler
requestsCount syncint64.Counter
requestsLatency syncfloat64.Histogram
requestsCount metric.Int64Counter
requestsLatency metric.Float64Histogram
activeRequests int64
onShutdown func(ctx context.Context) error
onShutdown []func(ctx context.Context) error
}
func (m *metricMiddleWare) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (m *middleware) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t := time.Now()
path := r.URL.Path
m.requestsCount.Add(m.ctx, 1, attribute.String("path", path))
m.requestsCount.Add(m.ctx, 1, metric.WithAttributes(attribute.String("path", path)))
atomic.AddInt64(&m.activeRequests, 1)
defer func() {
atomic.AddInt64(&m.activeRequests, -1)
m.requestsLatency.Record(m.ctx, time.Since(t).Seconds(), attribute.String("path", path))
m.requestsLatency.Record(m.ctx, time.Since(t).Seconds(), metric.WithAttributes(attribute.String("path", path)))
}()
m.h.ServeHTTP(w, r)

Binary file not shown.

Before

(image error) Size: 49 KiB

After

(image error) Size: 35 KiB

Binary file not shown.

Before

(image error) Size: 44 KiB

After

(image error) Size: 48 KiB

Binary file not shown.

After

(image error) Size: 83 KiB

Binary file not shown.

After

(image error) Size: 32 KiB

Binary file not shown.

Before

(image error) Size: 17 KiB

Binary file not shown.

After

(image error) Size: 84 KiB

Binary file not shown.

After

(image error) Size: 34 KiB

Binary file not shown.

Before

(image error) Size: 46 KiB