VictoriaMetrics/docs/guides/getting-started-with-opentelemetry-app.go.example

167 lines
5.3 KiB
Text
Raw Normal View History

package main
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)
var (
collectorEndpoint = flag.String("vm.endpoint", "localhost:8428", "VictoriaMetrics endpoint - host:port")
collectorURL = flag.String("vm.ingestPath", "/opentelemetry/api/v1/push", "url path for ingestion path")
isSecure = flag.Bool("vm.isSecure", false, "enables https connection for metrics push")
pushInterval = flag.Duration("vm.pushInterval", 10*time.Second, "how often push samples, aka scrapeInterval at pull model")
jobName = flag.String("metrics.jobName", "otlp", "job name for web-application")
instanceName = flag.String("metrics.instance", "localhost", "hostname of web-application instance")
)
func main() {
flag.Parse()
log.Printf("Starting web server...")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := http.NewServeMux()
mux.HandleFunc("/api/fast", func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`fast ok`))
})
mux.HandleFunc("/api/slow", func(writer http.ResponseWriter, request *http.Request) {
time.Sleep(time.Second * 2)
writer.WriteHeader(http.StatusOK)
writer.Write([]byte(`slow ok`))
})
mw, err := newMetricsMiddleware(ctx, mux)
if err != nil {
panic(fmt.Sprintf("cannot build metricMiddleWare: %q", err))
}
mustStop := make(chan os.Signal, 1)
signal.Notify(mustStop, os.Interrupt, syscall.SIGTERM)
go func() {
http.ListenAndServe("localhost:8081", mw)
}()
log.Printf("web server started at localhost:8081.")
<-mustStop
log.Println("receive shutdown signal, stopping webserver")
if err := mw.onShutdown(ctx); err != nil {
log.Println("cannot shutdown metric provider ", err)
}
cancel()
log.Printf("Done!")
}
func newMetricsController(ctx context.Context) (*controller.Controller, error) {
options := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(*collectorEndpoint),
otlpmetrichttp.WithURLPath(*collectorURL),
}
if !*isSecure {
options = append(options, otlpmetrichttp.WithInsecure())
}
metricExporter, err := otlpmetrichttp.New(ctx, options...)
if err != nil {
return nil, fmt.Errorf("cannot create otlphttp exporter: %w", err)
}
resourceConfig, err := resource.New(ctx, resource.WithAttributes(attribute.String("job", *jobName), attribute.String("instance", *instanceName)))
if err != nil {
return nil, fmt.Errorf("cannot create meter resource: %w", err)
}
meterController := controller.New(
processor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries([]float64{0.01, 0.05, 0.1, 0.5, 0.9, 1.0, 5.0, 10.0, 100.0}),
),
aggregation.CumulativeTemporalitySelector(),
processor.WithMemory(true),
),
controller.WithExporter(metricExporter),
controller.WithCollectPeriod(*pushInterval),
controller.WithResource(resourceConfig),
)
if err := meterController.Start(ctx); err != nil {
return nil, fmt.Errorf("cannot start meter controller: %w", err)
}
return meterController, nil
}
func newMetricsMiddleware(ctx context.Context, h http.Handler) (*metricMiddleWare, error) {
mw := &metricMiddleWare{
ctx: ctx,
h: h,
}
mc, err := newMetricsController(ctx)
if err != nil {
return nil, fmt.Errorf("cannot build metrics collector: %w", err)
}
global.SetMeterProvider(mc)
prov := mc.Meter("")
mw.requestsLatency, err = prov.SyncFloat64().Histogram("http_request_latency_seconds")
if err != nil {
return nil, fmt.Errorf("cannot create histogram: %w", err)
}
mw.requestsCount, err = prov.SyncInt64().Counter("http_requests_total")
if err != nil {
return nil, fmt.Errorf("cannot create syncInt64 counter: %w", err)
}
ar, err := prov.AsyncInt64().Gauge("http_active_requests")
if err != nil {
return nil, fmt.Errorf("cannot create AsyncInt64 gauge: %w", err)
}
if err := prov.RegisterCallback([]instrument.Asynchronous{ar}, func(ctx context.Context) {
ar.Observe(ctx, atomic.LoadInt64(&mw.activeRequests))
}); err != nil {
return nil, fmt.Errorf("cannot Register int64 gauge: %w", err)
}
mw.onShutdown = mc.Stop
return mw, nil
}
type metricMiddleWare struct {
ctx context.Context
h http.Handler
requestsCount syncint64.Counter
requestsLatency syncfloat64.Histogram
activeRequests int64
onShutdown func(ctx context.Context) error
}
func (m *metricMiddleWare) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t := time.Now()
path := r.URL.Path
m.requestsCount.Add(m.ctx, 1, attribute.String("path", path))
atomic.AddInt64(&m.activeRequests, 1)
defer func() {
atomic.AddInt64(&m.activeRequests, -1)
m.requestsLatency.Record(m.ctx, time.Since(t).Seconds(), attribute.String("path", path))
}()
m.h.ServeHTTP(w, r)
}