From 76af930e4ad6b45fe3a3f561da4e5085d872dca8 Mon Sep 17 00:00:00 2001 From: Oleg Date: Tue, 7 May 2024 21:46:08 +0200 Subject: [PATCH] Statsd protocol compatibility (#5053) In this PR I added compatibility with [statsd protocol](https://github.com/b/statsd_spec) with tags to be able to send metrics directly from statsd clients to vmagent or directly to VM. For example its compatible with [statsd-instrument](https://github.com/Shopify/statsd-instrument) and [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) gems Related issues: #5052, #206, #4600 (cherry picked from commit c6c5a5a18695ab5f0be1f68bbb63bf90f6f15657) Signed-off-by: hagen1778 --- app/vmagent/main.go | 13 +++++ app/vmagent/statsd/request_handler.go | 68 ++++++++++++++++++++++++++ app/vminsert/main.go | 13 +++++ app/vminsert/statsd/request_handler.go | 54 ++++++++++++++++++++ docs/README.md | 40 +++++++++++++++ docs/Single-server-VictoriaMetrics.md | 40 +++++++++++++++ docs/stream-aggregation.md | 5 +- docs/vmagent.md | 5 ++ 8 files changed, 234 insertions(+), 4 deletions(-) create mode 100644 app/vmagent/statsd/request_handler.go create mode 100644 app/vminsert/statsd/request_handler.go diff --git a/app/vmagent/main.go b/app/vmagent/main.go index 0d8c910ed..e22fb4703 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -24,6 +24,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/statsd" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" @@ -36,6 +37,7 @@ import ( influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx" opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb" opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" + statsdserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/statsd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" @@ -61,6 +63,10 @@ var ( "See also -graphiteListenAddr.useProxyProtocol") graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+ "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + statsdListenAddr = flag.String("statsdListenAddr", "", "TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. "+ + "See also -statsdListenAddr.useProxyProtocol") + statsdUseProxyProtocol = flag.Bool("statsdListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -statsdListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpenTSDB metrics. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ "Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol") @@ -80,6 +86,7 @@ var ( var ( influxServer *influxserver.Server graphiteServer *graphiteserver.Server + statsdServer *statsdserver.Server opentsdbServer *opentsdbserver.Server opentsdbhttpServer *opentsdbhttpserver.Server ) @@ -137,6 +144,9 @@ func main() { if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler) } + if len(*statsdListenAddr) > 0 { + statsdServer = statsdserver.MustStart(*statsdListenAddr, *statsdUseProxyProtocol, statsd.InsertHandler) + } if len(*opentsdbListenAddr) > 0 { httpInsertHandler := getOpenTSDBHTTPInsertHandler() opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler) @@ -172,6 +182,9 @@ func main() { if len(*graphiteListenAddr) > 0 { graphiteServer.MustStop() } + if len(*statsdListenAddr) > 0 { + statsdServer.MustStop() + } if len(*opentsdbListenAddr) > 0 { opentsdbServer.MustStop() } diff --git a/app/vmagent/statsd/request_handler.go b/app/vmagent/statsd/request_handler.go new file mode 100644 index 000000000..9cf8fd667 --- /dev/null +++ b/app/vmagent/statsd/request_handler.go @@ -0,0 +1,68 @@ +package statsd + +import ( + "io" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd/stream" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="statsd"}`) + rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="statsd"}`) +) + +// InsertHandler processes remote write for statsd plaintext protocol. +// +// See https://github.com/statsd/statsd/blob/master/docs/metric_types.md +func InsertHandler(r io.Reader) error { + return stream.Parse(r, false, func(rows []parser.Row) error { + return insertRows(nil, rows) + }) +} + +func insertRows(at *auth.Token, rows []parser.Row) error { + ctx := common.GetPushCtx() + defer common.PutPushCtx(ctx) + + tssDst := ctx.WriteRequest.Timeseries[:0] + labels := ctx.Labels[:0] + samples := ctx.Samples[:0] + for i := range rows { + r := &rows[i] + labelsLen := len(labels) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: r.Metric, + }) + for j := range r.Tags { + tag := &r.Tags[j] + labels = append(labels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: r.Value, + Timestamp: r.Timestamp, + }) + tssDst = append(tssDst, prompbmarshal.TimeSeries{ + Labels: labels[labelsLen:], + Samples: samples[len(samples)-1:], + }) + } + ctx.WriteRequest.Timeseries = tssDst + ctx.Labels = labels + ctx.Samples = samples + if !remotewrite.TryPush(at, &ctx.WriteRequest) { + return remotewrite.ErrQueueFullHTTPRetry + } + rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) + return nil +} diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 959ad77d5..34c6953e6 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -28,6 +28,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/statsd" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" @@ -41,6 +42,7 @@ import ( influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx" opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb" opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" + statsdserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/statsd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -56,6 +58,10 @@ var ( "See also -graphiteListenAddr.useProxyProtocol") graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+ "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") + statsdListenAddr = flag.String("statsdListenAddr", "", "TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. "+ + "See also -statsdListenAddr.useProxyProtocol") + statsdUseProxyProtocol = flag.Bool("statsdListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -statsdListenAddr . "+ + "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+ "This flag isn't needed when ingesting data over HTTP - just send it to http://:8428/write . "+ "See also -influxListenAddr.useProxyProtocol") @@ -85,6 +91,7 @@ var ( var ( clusternativeServer *clusternativeserver.Server graphiteServer *graphiteserver.Server + statsdServer *statsdserver.Server influxServer *influxserver.Server opentsdbServer *opentsdbserver.Server opentsdbhttpServer *opentsdbhttpserver.Server @@ -133,6 +140,9 @@ func main() { return graphite.InsertHandler(nil, r) }) } + if len(*statsdListenAddr) > 0 { + statsdServer = statsdserver.MustStart(*statsdListenAddr, *statsdUseProxyProtocol, statsd.InsertHandler) + } if len(*influxListenAddr) > 0 { influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error { return influx.InsertHandlerForReader(nil, r) @@ -171,6 +181,9 @@ func main() { if len(*graphiteListenAddr) > 0 { graphiteServer.MustStop() } + if len(*statsdListenAddr) > 0 { + statsdServer.MustStop() + } if len(*influxListenAddr) > 0 { influxServer.MustStop() } diff --git a/app/vminsert/statsd/request_handler.go b/app/vminsert/statsd/request_handler.go new file mode 100644 index 000000000..6206aba1a --- /dev/null +++ b/app/vminsert/statsd/request_handler.go @@ -0,0 +1,54 @@ +package statsd + +import ( + "io" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd/stream" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="statsd"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="statsd"}`) +) + +// InsertHandler processes remote write for statsd protocol with tags. +// +// https://github.com/statsd/statsd/blob/master/docs/metric_types.md +func InsertHandler(r io.Reader) error { + return stream.Parse(r, false, insertRows) +} + +func insertRows(rows []parser.Row) error { + ctx := netstorage.GetInsertCtx() + defer netstorage.PutInsertCtx(ctx) + + ctx.Reset() + hasRelabeling := relabel.HasRelabeling() + for i := range rows { + r := &rows[i] + ctx.Labels = ctx.Labels[:0] + ctx.AddLabel("", r.Metric) + for j := range r.Tags { + tag := &r.Tags[j] + ctx.AddLabel(tag.Key, tag.Value) + } + if hasRelabeling { + ctx.ApplyRelabeling() + } + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + ctx.SortLabelsIfNeeded() + if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil { + return err + } + } + rowsInserted.Add(len(rows)) + rowsPerInsert.Update(float64(len(rows))) + return ctx.FlushBufs() +} diff --git a/docs/README.md b/docs/README.md index 486003f45..cdceb283e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -89,6 +89,7 @@ VictoriaMetrics has the following prominent features: * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP. * [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon). + * [Statsd plaintext protocol](#how-to-send-data-from-statsd-compatible-clients) * [OpenTSDB put message](#sending-data-via-telnet-put-protocol). * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests). * [JSON line format](#how-to-import-data-in-json-line-format). @@ -704,6 +705,45 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]} ``` +## How to send data from Statsd-compatible clients + +VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored). +Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance, +the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`: + +```console +/path/to/victoria-metrics-prod -statsdListenAddr=:8125 +``` + +Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`: + +```console +echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125 +``` + +Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it. + +An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go. +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: + +
+ +```console +curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' +``` + +
+ +The `/api/v1/export` endpoint should return the following response: + +```json +{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]} +``` + +Some examples of compatible statsd clients: +- [statsd-instrument](https://github.com/Shopify/statsd-instrument) +- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) +- [go-statsd-client](https://github.com/cactus/go-statsd-client) ## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd) Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance, diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index c98f0420b..2de774ed5 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -97,6 +97,7 @@ VictoriaMetrics has the following prominent features: * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP. * [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon). + * [Statsd plaintext protocol](#how-to-send-data-from-statsd-compatible-clients) * [OpenTSDB put message](#sending-data-via-telnet-put-protocol). * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests). * [JSON line format](#how-to-import-data-in-json-line-format). @@ -712,6 +713,45 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]} ``` +## How to send data from Statsd-compatible clients + +VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored). +Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance, +the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`: + +```console +/path/to/victoria-metrics-prod -statsdListenAddr=:8125 +``` + +Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`: + +```console +echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125 +``` + +Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it. + +An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go. +After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: + +
+ +```console +curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz' +``` + +
+ +The `/api/v1/export` endpoint should return the following response: + +```json +{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]} +``` + +Some examples of compatible statsd clients: +- [statsd-instrument](https://github.com/Shopify/statsd-instrument) +- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) +- [go-statsd-client](https://github.com/cactus/go-statsd-client) ## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd) Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance, diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index be2c2b5ba..820ca4eef 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -133,7 +133,7 @@ Stream aggregation can be used in the following cases: ### Statsd alternative -Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) alternative in the following cases: +Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) drop-in replacement in the following cases: * [Counting input samples](#counting-input-samples) * [Summing input metrics](#summing-input-metrics) @@ -141,9 +141,6 @@ Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) alt * [Histograms over input metrics](#histograms-over-input-metrics) * [Aggregating histograms](#aggregating-histograms) -Currently, streaming aggregation is available only for [supported data ingestion protocols](https://docs.victoriametrics.com/#how-to-import-time-series-data) -and not available for [Statsd metrics format](https://github.com/statsd/statsd/blob/master/docs/metric_types.md). - ### Recording rules alternative Sometimes [alerting queries](https://docs.victoriametrics.com/vmalert/#alerting-rules) may require non-trivial amounts of CPU, RAM, diff --git a/docs/vmagent.md b/docs/vmagent.md index 227c6e45d..4c7d8421b 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -107,6 +107,7 @@ additionally to pull-based Prometheus-compatible targets' scraping: * DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-datadog-agent). * InfluxDB line protocol via `http://:8429/write`. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). +* Statsd plaintext protocol if `-statsdListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-statsd-compatible-clients). * OpenTelemetry http API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#sending-data-via-opentelemetry). * NewRelic API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-newrelic-agent). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-opentsdb-compatible-agents). @@ -1707,6 +1708,10 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . Whether to use proxy protocol for connections accepted at -graphiteListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -graphiteTrimTimestamp duration Trim timestamps for Graphite data to this duration. Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data (default 1s) + -statsdListenAddr string + TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol + -statsdListenAddr.useProxyProtocol + Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -http.connTimeout duration Incoming connections to -httpListenAddr are closed after the configured timeout. This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections (default 2m0s) -http.disableResponseCompression