From 172ae1adf7d45741c6dbd4abdb47ff84a38ecdd5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 3 Jul 2024 23:51:56 +0200 Subject: [PATCH] Revert c6c5a5a18695ab5f0be1f68bbb63bf90f6f15657 and b2765c45d03b914a66155fb82adb92f8da93b494 Reason for revert: There are many statsd servers exist: - https://github.com/statsd/statsd - classical statsd server - https://docs.datadoghq.com/developers/dogstatsd/ - statsd server from DataDog built into DatDog Agent ( https://docs.datadoghq.com/agent/ ) - https://github.com/avito-tech/bioyino - high-performance statsd server - https://github.com/atlassian/gostatsd - statsd server in Go - https://github.com/prometheus/statsd_exporter - statsd server, which exposes the aggregated data as Prometheus metrics These servers can be used for efficient aggregating of statsd data and sending it to VictoriaMetrics according to https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd ( the https://github.com/prometheus/statsd_exporter can be scraped as usual Prometheus target according to https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter ). Adding support for statsd data ingestion protocol into VictoriaMetrics makes sense only if it provides significant advantages over the existing statsd servers, while has no significant drawbacks comparing to existing statsd servers. The main advantage of statsd server built into VictoriaMetrics and vmagent - getting rid of additional statsd server. The main drawback is non-trivial and inconvenient streaming aggregation configs, which must be used for the ingested statsd metrics ( see https://docs.victoriametrics.com/stream-aggregation/ ). These configs are incompatible with the configs for standalone statsd servers. So you need to manually translate configs of the used statsd server to stream aggregation configs when migrating from standalone statsd server to statsd server built into VictoriaMetrics (or vmagent). Another important drawback is that it is very easy to shoot yourself in the foot when using built-in statsd server with the -statsd.disableAggregationEnforcement command-line flag or with improperly configured streaming aggregation. In this case the ingested statsd metrics will be stored to VictoriaMetrics as is without any aggregation. This may result in high CPU usage during data ingestion, high disk space usage for storing all the unaggregated statsd metrics and high CPU usage during querying, since all the unaggregated metrics must be read, unpacked and processed during querying. P.S. Built-in statsd server can be added to VictoriaMetrics and vmagent after figuring out more ergonomic specialized configuration for aggregating of statsd metrics. The main requirements for this configuration: - easy to write, read and update (ideally it should work out of the box for most cases without additional configuration) - hard to misconfigure (e.g. hard to shoot yourself in the foot) It would be great if this configuration will be compatible with the configuration of the most widely used statsd server. In the mean time it is recommended continue using external statsd server. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6265 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5053 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5052 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/206 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4600 --- app/vmagent/main.go | 18 - app/vmagent/remotewrite/streamaggr.go | 5 - app/vmagent/statsd/request_handler.go | 72 --- docs/README.md | 81 --- docs/Single-server-VictoriaMetrics.md | 81 --- docs/stream-aggregation.md | 5 +- docs/vmagent.md | 11 - lib/ingestserver/statsd/server.go | 173 ------ lib/protoparser/statsd/parser.go | 285 ---------- lib/protoparser/statsd/parser_test.go | 531 ------------------ lib/protoparser/statsd/parser_timing_test.go | 25 - lib/protoparser/statsd/stream/streamparser.go | 200 ------- .../statsd/stream/streamparser_test.go | 72 --- 13 files changed, 4 insertions(+), 1555 deletions(-) delete mode 100644 app/vmagent/statsd/request_handler.go delete mode 100644 lib/ingestserver/statsd/server.go delete mode 100644 lib/protoparser/statsd/parser.go delete mode 100644 lib/protoparser/statsd/parser_test.go delete mode 100644 lib/protoparser/statsd/parser_timing_test.go delete mode 100644 lib/protoparser/statsd/stream/streamparser.go delete mode 100644 lib/protoparser/statsd/stream/streamparser_test.go diff --git a/app/vmagent/main.go b/app/vmagent/main.go index c1c53ea7a..f67cdffd6 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -24,7 +24,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/statsd" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" @@ -37,7 +36,6 @@ import ( influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx" opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb" opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp" - statsdserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/statsd" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape" @@ -63,12 +61,6 @@ var ( "See also -graphiteListenAddr.useProxyProtocol") graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+ "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") - statsdListenAddr = flag.String("statsdListenAddr", "", "TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. "+ - "See also -statsdListenAddr.useProxyProtocol") - statsdUseProxyProtocol = flag.Bool("statsdListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -statsdListenAddr . "+ - "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") - statsdDisableAggregationEnforcemenet = flag.Bool(`statsd.disableAggregationEnforcement`, false, "Whether to disable streaming aggregation requirement check. "+ - "It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database.") opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpenTSDB metrics. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ "Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol") @@ -88,7 +80,6 @@ var ( var ( influxServer *influxserver.Server graphiteServer *graphiteserver.Server - statsdServer *statsdserver.Server opentsdbServer *opentsdbserver.Server opentsdbhttpServer *opentsdbhttpserver.Server ) @@ -146,12 +137,6 @@ func main() { if len(*graphiteListenAddr) > 0 { graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler) } - if len(*statsdListenAddr) > 0 { - if !remotewrite.HasAnyStreamAggrConfigured() && !*statsdDisableAggregationEnforcemenet { - logger.Fatalf("streaming aggregation must be configured with enabled statsd server. It's recommended to aggregate metrics received at statsd listener. This check could be disabled with flag -statsd.disableAggregationEnforcement") - } - statsdServer = statsdserver.MustStart(*statsdListenAddr, *statsdUseProxyProtocol, statsd.InsertHandler) - } if len(*opentsdbListenAddr) > 0 { httpInsertHandler := getOpenTSDBHTTPInsertHandler() opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler) @@ -187,9 +172,6 @@ func main() { if len(*graphiteListenAddr) > 0 { graphiteServer.MustStop() } - if len(*statsdListenAddr) > 0 { - statsdServer.MustStop() - } if len(*opentsdbListenAddr) > 0 { opentsdbServer.MustStop() } diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index 6a7a38a86..c1cba412e 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -78,11 +78,6 @@ func CheckStreamAggrConfigs() error { return nil } -// HasAnyStreamAggrConfigured checks if any streaming aggregation config provided -func HasAnyStreamAggrConfigured() bool { - return len(*streamAggrConfig) > 0 || *streamAggrGlobalConfig != "" -} - func reloadStreamAggrConfigs() { reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed) for idx, rwctx := range rwctxs { diff --git a/app/vmagent/statsd/request_handler.go b/app/vmagent/statsd/request_handler.go deleted file mode 100644 index e2bd3193b..000000000 --- a/app/vmagent/statsd/request_handler.go +++ /dev/null @@ -1,72 +0,0 @@ -package statsd - -import ( - "io" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" - parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd/stream" - "github.com/VictoriaMetrics/metrics" -) - -var ( - rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="statsd"}`) - rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="statsd"}`) -) - -// InsertHandler processes remote write for statsd plaintext protocol. -// -// See https://github.com/statsd/statsd/blob/master/docs/metric_types.md -func InsertHandler(r io.Reader) error { - return stream.Parse(r, false, func(rows []parser.Row) error { - return insertRows(nil, rows) - }) -} - -func insertRows(at *auth.Token, rows []parser.Row) error { - ctx := common.GetPushCtx() - defer common.PutPushCtx(ctx) - - tssDst := ctx.WriteRequest.Timeseries[:0] - labels := ctx.Labels[:0] - samples := ctx.Samples[:0] - for i := range rows { - r := &rows[i] - labelsLen := len(labels) - labels = append(labels, prompbmarshal.Label{ - Name: "__name__", - Value: r.Metric, - }) - for j := range r.Tags { - tag := &r.Tags[j] - labels = append(labels, prompbmarshal.Label{ - Name: tag.Key, - Value: tag.Value, - }) - } - samplesLen := len(samples) - for _, v := range r.Values { - samples = append(samples, prompbmarshal.Sample{ - Value: v, - Timestamp: r.Timestamp, - }) - } - - tssDst = append(tssDst, prompbmarshal.TimeSeries{ - Labels: labels[labelsLen:], - Samples: samples[samplesLen:], - }) - } - ctx.WriteRequest.Timeseries = tssDst - ctx.Labels = labels - ctx.Samples = samples - if !remotewrite.TryPush(at, &ctx.WriteRequest) { - return remotewrite.ErrQueueFullHTTPRetry - } - rowsInserted.Add(len(rows)) - rowsPerInsert.Update(float64(len(rows))) - return nil -} diff --git a/docs/README.md b/docs/README.md index 406e54e26..069ae10ef 100644 --- a/docs/README.md +++ b/docs/README.md @@ -88,7 +88,6 @@ VictoriaMetrics has the following prominent features: * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP. * [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon). - * [Statsd plaintext protocol](#how-to-send-data-from-statsd-compatible-clients) * [OpenTSDB put message](#sending-data-via-telnet-put-protocol). * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests). * [JSON line format](#how-to-import-data-in-json-line-format). @@ -686,79 +685,6 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]} ``` -## How to send data from StatsD-compatible clients - -VictoriaMetrics supports extended StatsD protocol. Currently, it supports `tags` and `value packing` -extensions provided by [dogstatsd](https://docs.datadoghq.com/developers/dogstatsd/datagram_shell). -During parsing, metric's `` is added as a special label `__statsd_metric_type__`. - -It is strongly advisable to configure streaming aggregation for each metric type. This process serves two primary -objectives: -* transformation of the StatsD data model into the VictoriaMetrics data model. VictoriaMetrics requires a consistent -interval between data points. -* minimizing of the disk space utilization and overall resource consumption during data ingestion. - -VictoriaMetrics supports the following metric [types](https://docs.datadoghq.com/metrics/types): - -* `c` Counter type. -* `g` Gauge type. -* `ms` Timer type. -* `m` Meters type. -* `h` Histogram type. -* `s` Set type with only numeric values. -* `d` Distribution type. - -_The `Not Assigned` type is not supported due to the ambiguity surrounding its aggregation method. -The correct aggregation method cannot be determined for the undefined metric._ - -Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag and configure [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/). -For instance, the following command will enable StatsD receiver in VictoriaMetrics on TCP and UDP port `8125`: - -```console -/path/to/victoria-metrics-prod -statsdListenAddr=:8125 -streamAggr.config=statsd_aggr.yaml -``` - -Example of stream aggregation config: - -```yaml -# statsd_aggr.yaml -# `last` output will keep the last sample on `interval` -# for each series that match `{__statsd_metric_type__="g"}` selector -- match: '{__statsd_metric_type__="g"}' - outputs: [last] - interval: 1m -``` - -Example for writing data with StatsD plaintext protocol to local VictoriaMetrics using `nc`: - -```console -echo "foo.bar:123|g|#tag1:baz" | nc -N localhost 8125 -``` - -_An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go._ - -Explicit setting of timestamps is not supported for StatsD protocol. Timestamp is set to the current time when -VictoriaMetrics or vmagent receives it. - -Once ingested, the data can be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: - -```console -curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo.*"}' -``` - -_Please note, with stream aggregation enabled data will become available only after specified aggregation interval._ - -The `/api/v1/export` endpoint should return the following response: - -```json -{"metric":{"__name__":"foo.bar:1m_last","__statsd_metric_type__":"g","tag1":"baz"},"values":[123],"timestamps":[1715843939000]} -``` - -Some examples of compatible statsd clients: -- [statsd-instrument](https://github.com/Shopify/statsd-instrument) -- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) -- [go-statsd-client](https://github.com/cactus/go-statsd-client) - ## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd) Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance, @@ -1421,7 +1347,6 @@ Additionally, VictoriaMetrics can accept metrics via the following popular data * DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details. * InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. * Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details. -* Statsd plaintext protocol. See [these docs](#how-to-send-data-from-statsd-compatible-clients) for details. * OpenTelemetry http API. See [these docs](#sending-data-via-opentelemetry) for details. * OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details. * OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details. @@ -3249,12 +3174,6 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0) -sortLabels Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit - -statsd.disableAggregationEnforcement - Whether to disable streaming aggregation requirement check. It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database. - -statsdListenAddr string - TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol - -statsdListenAddr.useProxyProtocol - Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -storage.cacheSizeIndexDBDataBlocks size Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 09e35c8ef..9f0fe25f6 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -96,7 +96,6 @@ VictoriaMetrics has the following prominent features: * [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format). * [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP. * [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon). - * [Statsd plaintext protocol](#how-to-send-data-from-statsd-compatible-clients) * [OpenTSDB put message](#sending-data-via-telnet-put-protocol). * [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests). * [JSON line format](#how-to-import-data-in-json-line-format). @@ -694,79 +693,6 @@ The `/api/v1/export` endpoint should return the following response: {"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]} ``` -## How to send data from StatsD-compatible clients - -VictoriaMetrics supports extended StatsD protocol. Currently, it supports `tags` and `value packing` -extensions provided by [dogstatsd](https://docs.datadoghq.com/developers/dogstatsd/datagram_shell). -During parsing, metric's `` is added as a special label `__statsd_metric_type__`. - -It is strongly advisable to configure streaming aggregation for each metric type. This process serves two primary -objectives: -* transformation of the StatsD data model into the VictoriaMetrics data model. VictoriaMetrics requires a consistent -interval between data points. -* minimizing of the disk space utilization and overall resource consumption during data ingestion. - -VictoriaMetrics supports the following metric [types](https://docs.datadoghq.com/metrics/types): - -* `c` Counter type. -* `g` Gauge type. -* `ms` Timer type. -* `m` Meters type. -* `h` Histogram type. -* `s` Set type with only numeric values. -* `d` Distribution type. - -_The `Not Assigned` type is not supported due to the ambiguity surrounding its aggregation method. -The correct aggregation method cannot be determined for the undefined metric._ - -Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag and configure [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/). -For instance, the following command will enable StatsD receiver in VictoriaMetrics on TCP and UDP port `8125`: - -```console -/path/to/victoria-metrics-prod -statsdListenAddr=:8125 -streamAggr.config=statsd_aggr.yaml -``` - -Example of stream aggregation config: - -```yaml -# statsd_aggr.yaml -# `last` output will keep the last sample on `interval` -# for each series that match `{__statsd_metric_type__="g"}` selector -- match: '{__statsd_metric_type__="g"}' - outputs: [last] - interval: 1m -``` - -Example for writing data with StatsD plaintext protocol to local VictoriaMetrics using `nc`: - -```console -echo "foo.bar:123|g|#tag1:baz" | nc -N localhost 8125 -``` - -_An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go._ - -Explicit setting of timestamps is not supported for StatsD protocol. Timestamp is set to the current time when -VictoriaMetrics or vmagent receives it. - -Once ingested, the data can be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint: - -```console -curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo.*"}' -``` - -_Please note, with stream aggregation enabled data will become available only after specified aggregation interval._ - -The `/api/v1/export` endpoint should return the following response: - -```json -{"metric":{"__name__":"foo.bar:1m_last","__statsd_metric_type__":"g","tag1":"baz"},"values":[123],"timestamps":[1715843939000]} -``` - -Some examples of compatible statsd clients: -- [statsd-instrument](https://github.com/Shopify/statsd-instrument) -- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) -- [go-statsd-client](https://github.com/cactus/go-statsd-client) - ## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd) Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance, @@ -1429,7 +1355,6 @@ Additionally, VictoriaMetrics can accept metrics via the following popular data * DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details. * InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details. * Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details. -* Statsd plaintext protocol. See [these docs](#how-to-send-data-from-statsd-compatible-clients) for details. * OpenTelemetry http API. See [these docs](#sending-data-via-opentelemetry) for details. * OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details. * OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details. @@ -3257,12 +3182,6 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0) -sortLabels Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit - -statsd.disableAggregationEnforcement - Whether to disable streaming aggregation requirement check. It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database. - -statsdListenAddr string - TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol - -statsdListenAddr.useProxyProtocol - Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -storage.cacheSizeIndexDBDataBlocks size Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0) diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 971f6275c..68ee15830 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -180,7 +180,7 @@ Stream aggregation can be used in the following cases: ### Statsd alternative -Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) drop-in replacement in the following cases: +Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) alternative in the following cases: * [Counting input samples](#counting-input-samples) * [Summing input metrics](#summing-input-metrics) @@ -188,6 +188,9 @@ Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) dro * [Histograms over input metrics](#histograms-over-input-metrics) * [Aggregating histograms](#aggregating-histograms) +Currently, streaming aggregation is available only for [supported data ingestion protocols](https://docs.victoriametrics.com/#how-to-import-time-series-data) +and not available for [Statsd metrics format](https://github.com/statsd/statsd/blob/master/docs/metric_types.md). + ### Recording rules alternative Sometimes [alerting queries](https://docs.victoriametrics.com/vmalert/#alerting-rules) may require non-trivial amounts of CPU, RAM, diff --git a/docs/vmagent.md b/docs/vmagent.md index f2d810735..466fcf8c1 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -107,7 +107,6 @@ additionally to pull-based Prometheus-compatible targets' scraping: * DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-datadog-agent). * InfluxDB line protocol via `http://:8429/write`. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf). * Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). -* Statsd plaintext protocol if `-statsdListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-statsd-compatible-clients). * OpenTelemetry http API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#sending-data-via-opentelemetry). * NewRelic API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-newrelic-agent). * OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-opentsdb-compatible-agents). @@ -1717,10 +1716,6 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . Whether to use proxy protocol for connections accepted at -graphiteListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -graphiteTrimTimestamp duration Trim timestamps for Graphite data to this duration. Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data (default 1s) - -statsdListenAddr string - TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol - -statsdListenAddr.useProxyProtocol - Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -http.connTimeout duration Incoming connections to -httpListenAddr are closed after the configured timeout. This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections (default 2m0s) -http.disableResponseCompression @@ -2236,12 +2231,6 @@ See the docs at https://docs.victoriametrics.com/vmagent/ . The compression level for VictoriaMetrics remote write protocol. Higher values reduce network traffic at the cost of higher CPU usage. Negative values reduce CPU usage at the cost of increased network traffic. See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol -sortLabels Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit - -statsd.disableAggregationEnforcement - Whether to disable streaming aggregation requirement check. It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database. - -statsdListenAddr string - TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol - -statsdListenAddr.useProxyProtocol - Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -streamAggr.config string Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval value diff --git a/lib/ingestserver/statsd/server.go b/lib/ingestserver/statsd/server.go deleted file mode 100644 index 533d2ed1b..000000000 --- a/lib/ingestserver/statsd/server.go +++ /dev/null @@ -1,173 +0,0 @@ -package statsd - -import ( - "errors" - "io" - "net" - "strings" - "sync" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" - "github.com/VictoriaMetrics/metrics" -) - -var ( - writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="tcp"}`) - writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="tcp"}`) - - writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="udp"}`) - writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="udp"}`) -) - -// Server accepts Statsd plaintext lines over TCP and UDP. -type Server struct { - addr string - lnTCP net.Listener - lnUDP net.PacketConn - wg sync.WaitGroup - cm ingestserver.ConnsMap -} - -// MustStart starts statsd server on the given addr. -// -// The incoming connections are processed with insertHandler. -// -// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol. -// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt -// -// MustStop must be called on the returned server when it is no longer needed. -func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server { - logger.Infof("starting TCP Statsd server at %q", addr) - lnTCP, err := netutil.NewTCPListener("statsd", addr, useProxyProtocol, nil) - if err != nil { - logger.Fatalf("cannot start TCP Statsd server at %q: %s", addr, err) - } - - logger.Infof("starting UDP Statsd server at %q", addr) - lnUDP, err := net.ListenPacket(netutil.GetUDPNetwork(), addr) - if err != nil { - logger.Fatalf("cannot start UDP Statsd server at %q: %s", addr, err) - } - - s := &Server{ - addr: addr, - lnTCP: lnTCP, - lnUDP: lnUDP, - } - s.cm.Init("statsd") - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.serveTCP(insertHandler) - logger.Infof("stopped TCP Statsd server at %q", addr) - }() - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.serveUDP(insertHandler) - logger.Infof("stopped UDP Statsd server at %q", addr) - }() - return s -} - -// MustStop stops the server. -func (s *Server) MustStop() { - logger.Infof("stopping TCP Statsd server at %q...", s.addr) - if err := s.lnTCP.Close(); err != nil { - logger.Errorf("cannot close TCP Statsd server: %s", err) - } - logger.Infof("stopping UDP Statsd server at %q...", s.addr) - if err := s.lnUDP.Close(); err != nil { - logger.Errorf("cannot close UDP Statsd server: %s", err) - } - s.cm.CloseAll(0) - s.wg.Wait() - logger.Infof("TCP and UDP Statsd servers at %q have been stopped", s.addr) -} - -func (s *Server) serveTCP(insertHandler func(r io.Reader) error) { - var wg sync.WaitGroup - for { - c, err := s.lnTCP.Accept() - if err != nil { - var ne net.Error - if errors.As(err, &ne) { - if ne.Temporary() { - logger.Errorf("statsd: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err) - time.Sleep(time.Second) - continue - } - if strings.Contains(err.Error(), "use of closed network connection") { - break - } - logger.Fatalf("unrecoverable error when accepting TCP Statsd connections: %s", err) - } - logger.Fatalf("unexpected error when accepting TCP Statsd connections: %s", err) - } - if !s.cm.Add(c) { - _ = c.Close() - break - } - wg.Add(1) - go func() { - defer func() { - s.cm.Delete(c) - _ = c.Close() - wg.Done() - }() - writeRequestsTCP.Inc() - if err := insertHandler(c); err != nil { - writeErrorsTCP.Inc() - logger.Errorf("error in TCP Statsd conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err) - } - }() - } - wg.Wait() -} - -func (s *Server) serveUDP(insertHandler func(r io.Reader) error) { - gomaxprocs := cgroup.AvailableCPUs() - var wg sync.WaitGroup - for i := 0; i < gomaxprocs; i++ { - wg.Add(1) - go func() { - defer wg.Done() - var bb bytesutil.ByteBuffer - bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024) - for { - bb.Reset() - bb.B = bb.B[:cap(bb.B)] - n, addr, err := s.lnUDP.ReadFrom(bb.B) - if err != nil { - writeErrorsUDP.Inc() - var ne net.Error - if errors.As(err, &ne) { - if ne.Temporary() { - logger.Errorf("statsd: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err) - time.Sleep(time.Second) - continue - } - if strings.Contains(err.Error(), "use of closed network connection") { - break - } - } - logger.Errorf("cannot read Statsd UDP data: %s", err) - continue - } - bb.B = bb.B[:n] - writeRequestsUDP.Inc() - if err := insertHandler(bb.NewReader()); err != nil { - writeErrorsUDP.Inc() - logger.Errorf("error in UDP Statsd conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err) - continue - } - } - }() - } - wg.Wait() -} diff --git a/lib/protoparser/statsd/parser.go b/lib/protoparser/statsd/parser.go deleted file mode 100644 index 99a8770b4..000000000 --- a/lib/protoparser/statsd/parser.go +++ /dev/null @@ -1,285 +0,0 @@ -package statsd - -import ( - "fmt" - "strings" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" - "github.com/VictoriaMetrics/metrics" - "github.com/valyala/fastjson/fastfloat" -) - -// Statsd metric format with tags: MetricName:value|type|@sample_rate|#tag1:value,tag1... -// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell?tab=metrics#the-dogstatsd-protocol -const ( - statsdSeparator = '|' - statsdPairsSeparator = ':' - statsdTagsStartSeparator = '#' - statsdTagsSeparator = ',' -) - -const statsdTypeTagName = "__statsd_metric_type__" - -// https://github.com/b/statsd_spec -var validTypes = []string{ - // counter - "c", - // gauge - "g", - // histogram - "h", - // timer - "ms", - // distribution - "d", - // set - "s", - // meters - "m", -} - -func isValidType(src string) bool { - for _, t := range validTypes { - if src == t { - return true - } - } - return false -} - -// Rows contains parsed statsd rows. -type Rows struct { - Rows []Row - - tagsPool []Tag -} - -// Reset resets rs. -func (rs *Rows) Reset() { - // Reset items, so they can be GC'ed - - for i := range rs.Rows { - rs.Rows[i].reset() - } - rs.Rows = rs.Rows[:0] - - for i := range rs.tagsPool { - rs.tagsPool[i].reset() - } - rs.tagsPool = rs.tagsPool[:0] -} - -// Unmarshal unmarshals statsd plaintext protocol rows from s. -// -// s shouldn't be modified when rs is in use. -func (rs *Rows) Unmarshal(s string) { - rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0]) -} - -// Row is a single statsd row. -type Row struct { - Metric string - Tags []Tag - Values []float64 - Timestamp int64 -} - -func (r *Row) reset() { - r.Metric = "" - r.Tags = nil - r.Values = r.Values[:0] - r.Timestamp = 0 -} - -func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) { - r.reset() - originalString := s - s = stripTrailingWhitespace(s) - nextSeparator := strings.IndexByte(s, statsdSeparator) - if nextSeparator <= 0 { - return tagsPool, fmt.Errorf("cannot find type separator %q position at: %q", statsdSeparator, originalString) - } - metricWithValues := s[:nextSeparator] - s = s[nextSeparator+1:] - valuesSeparatorPosition := strings.IndexByte(metricWithValues, statsdPairsSeparator) - if valuesSeparatorPosition <= 0 { - return tagsPool, fmt.Errorf("cannot find metric name value separator=%q at: %q; original line: %q", statsdPairsSeparator, metricWithValues, originalString) - } - - r.Metric = metricWithValues[:valuesSeparatorPosition] - metricWithValues = metricWithValues[valuesSeparatorPosition+1:] - // datadog extension v1.1 for statsd allows multiple packed values at single line - for { - nextSeparator = strings.IndexByte(metricWithValues, statsdPairsSeparator) - if nextSeparator <= 0 { - // last element - metricWithValues = stripTrailingWhitespace(metricWithValues) - v, err := fastfloat.Parse(metricWithValues) - if err != nil { - return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", metricWithValues, err, originalString) - } - r.Values = append(r.Values, v) - break - } - valueStr := metricWithValues[:nextSeparator] - v, err := fastfloat.Parse(valueStr) - if err != nil { - return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, originalString) - } - r.Values = append(r.Values, v) - metricWithValues = metricWithValues[nextSeparator+1:] - } - // search for the type end - nextSeparator = strings.IndexByte(s, statsdSeparator) - typeValue := s - if nextSeparator >= 0 { - typeValue = s[:nextSeparator] - s = s[nextSeparator+1:] - } - if !isValidType(typeValue) { - return tagsPool, fmt.Errorf("provided type=%q is not supported; original line: %q", typeValue, originalString) - } - tagsStart := len(tagsPool) - tagsPool = slicesutil.SetLength(tagsPool, len(tagsPool)+1) - // add metric type as tag - tag := &tagsPool[len(tagsPool)-1] - tag.Key = statsdTypeTagName - tag.Value = typeValue - - // process tags - nextSeparator = strings.IndexByte(s, statsdTagsStartSeparator) - if nextSeparator < 0 { - tags := tagsPool[tagsStart:] - r.Tags = tags[:len(tags):len(tags)] - return tagsPool, nil - } - tagsStr := s[nextSeparator+1:] - // search for end of tags - nextSeparator = strings.IndexByte(tagsStr, statsdSeparator) - if nextSeparator >= 0 { - tagsStr = tagsStr[:nextSeparator] - } - - tagsPool = unmarshalTags(tagsPool, tagsStr) - tags := tagsPool[tagsStart:] - r.Tags = tags[:len(tags):len(tags)] - - return tagsPool, nil -} - -func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) { - for len(s) > 0 { - n := strings.IndexByte(s, '\n') - if n < 0 { - // The last line. - return unmarshalRow(dst, s, tagsPool) - } - dst, tagsPool = unmarshalRow(dst, s[:n], tagsPool) - s = s[n+1:] - } - return dst, tagsPool -} - -func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) { - if len(s) > 0 && s[len(s)-1] == '\r' { - s = s[:len(s)-1] - } - s = stripLeadingWhitespace(s) - if len(s) == 0 { - // Skip empty line - return dst, tagsPool - } - if cap(dst) > len(dst) { - dst = dst[:len(dst)+1] - } else { - dst = append(dst, Row{}) - } - r := &dst[len(dst)-1] - var err error - tagsPool, err = r.unmarshal(s, tagsPool) - if err != nil { - dst = dst[:len(dst)-1] - logger.Errorf("cannot unmarshal Statsd line %q: %s", s, err) - invalidLines.Inc() - } - return dst, tagsPool -} - -var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="statsd"}`) - -func unmarshalTags(dst []Tag, s string) []Tag { - for { - dst = slicesutil.SetLength(dst, len(dst)+1) - tag := &dst[len(dst)-1] - - n := strings.IndexByte(s, statsdTagsSeparator) - - if n < 0 { - // The last tag found - tag.unmarshal(s) - if len(tag.Key) == 0 || len(tag.Value) == 0 { - // Skip empty tag - dst = dst[:len(dst)-1] - } - return dst - } - tag.unmarshal(s[:n]) - s = s[n+1:] - if len(tag.Key) == 0 || len(tag.Value) == 0 { - // Skip empty tag - dst = dst[:len(dst)-1] - } - } -} - -// Tag is a statsd tag. -type Tag struct { - Key string - Value string -} - -func (t *Tag) reset() { - t.Key = "" - t.Value = "" -} - -func (t *Tag) unmarshal(s string) { - t.reset() - n := strings.IndexByte(s, statsdPairsSeparator) - if n < 0 { - // Empty tag value. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1100 - t.Key = s - t.Value = s[len(s):] - } else { - t.Key = s[:n] - t.Value = s[n+1:] - } -} - -func stripTrailingWhitespace(s string) string { - n := len(s) - for { - n-- - if n < 0 { - return "" - } - ch := s[n] - - if ch != ' ' && ch != '\t' { - return s[:n+1] - } - } -} - -func stripLeadingWhitespace(s string) string { - for len(s) > 0 { - ch := s[0] - if ch != ' ' && ch != '\t' { - return s - } - s = s[1:] - } - return "" -} diff --git a/lib/protoparser/statsd/parser_test.go b/lib/protoparser/statsd/parser_test.go deleted file mode 100644 index f97ba9813..000000000 --- a/lib/protoparser/statsd/parser_test.go +++ /dev/null @@ -1,531 +0,0 @@ -package statsd - -import ( - "reflect" - "testing" -) - -func TestUnmarshalTagsSuccess(t *testing.T) { - f := func(dst []Tag, s string, tagsPoolExpected []Tag) { - t.Helper() - - tagsPool := unmarshalTags(dst, s) - if !reflect.DeepEqual(tagsPool, tagsPoolExpected) { - t.Fatalf("unexpected tags;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected) - } - - // Try unmarshaling again - tagsPool = unmarshalTags(dst, s) - if !reflect.DeepEqual(tagsPool, tagsPoolExpected) { - t.Fatalf("unexpected tags on second unmarshal;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected) - } - } - - f([]Tag{}, "foo:bar", []Tag{ - { - Key: "foo", - Value: "bar", - }, - }) - - f([]Tag{}, "foo:bar,qwe:123", []Tag{ - { - Key: "foo", - Value: "bar", - }, - { - Key: "qwe", - Value: "123", - }, - }) - - f([]Tag{}, "foo.qwe:bar", []Tag{ - { - Key: "foo.qwe", - Value: "bar", - }, - }) - - f([]Tag{}, "foo:10", []Tag{ - { - Key: "foo", - Value: "10", - }, - }) - - f([]Tag{}, "foo: _qwe", []Tag{ - { - Key: "foo", - Value: " _qwe", - }, - }) - - f([]Tag{}, "foo:qwe ", []Tag{ - { - Key: "foo", - Value: "qwe ", - }, - }) - - f([]Tag{}, "foo asd:qwe ", []Tag{ - { - Key: "foo asd", - Value: "qwe ", - }, - }) - - f([]Tag{}, "foo:var:123", []Tag{ - { - Key: "foo", - Value: "var:123", - }, - }) - - // invalid tags - f([]Tag{}, ":bar", []Tag{}) - f([]Tag{}, "foo:", []Tag{}) - f([]Tag{}, " ", []Tag{}) -} - -func TestRowsUnmarshalSuccess(t *testing.T) { - f := func(s string, rowsExpected *Rows) { - t.Helper() - var rows Rows - rows.Unmarshal(s) - if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { - t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) - } - - // Try unmarshaling again - rows.Unmarshal(s) - if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { - t.Fatalf("unexpected rows on second unmarshal;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) - } - - rows.Reset() - if len(rows.Rows) != 0 { - t.Fatalf("non-empty rows after reset: %+v", rows.Rows) - } - } - - // Empty line - f("", &Rows{}) - f("\r", &Rows{}) - f("\n\n", &Rows{}) - f("\n\r\n", &Rows{}) - - // Single line - f(" 123:455|c", &Rows{ - Rows: []Row{{ - Metric: "123", - Values: []float64{455}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }}, - }) - // multiple values statsd dog v1.1 - f(" 123:455:456|c", &Rows{ - Rows: []Row{{ - Metric: "123", - Values: []float64{455, 456}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }}, - }) - f("123:455 |c", &Rows{ - Rows: []Row{{ - Metric: "123", - Values: []float64{455}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }}, - }) - f("foobar:-123.456|c", &Rows{ - Rows: []Row{{ - Metric: "foobar", - Values: []float64{-123.456}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }}, - }) - f("foo.bar:123.456|c\n", &Rows{ - Rows: []Row{{ - Metric: "foo.bar", - Values: []float64{123.456}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }}, - }) - - // with sample rate - f("foo.bar:1|c|@0.1", &Rows{ - Rows: []Row{{ - Metric: "foo.bar", - Values: []float64{1}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }}, - }) - - // without specifying metric unit - f("foo.bar:123|h", &Rows{ - Rows: []Row{{ - Metric: "foo.bar", - Values: []float64{123}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "h", - }, - }, - }}, - }) - // without specifying metric unit but with tags - f("foo.bar:123|s|#foo:bar", &Rows{ - Rows: []Row{{ - Metric: "foo.bar", - Values: []float64{123}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "s", - }, - - { - Key: "foo", - Value: "bar", - }, - }, - }}, - }) - - f("foo.bar:123.456|c|#foo:bar,qwe:asd", &Rows{ - Rows: []Row{{ - Metric: "foo.bar", - Values: []float64{123.456}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - - { - Key: "foo", - Value: "bar", - }, - { - Key: "qwe", - Value: "asd", - }, - }, - }}, - }) - - // Whitespace in metric name, tag name and tag value - f("s a:1|c|#ta g1:aaa1,tag2:bb b2", &Rows{ - Rows: []Row{{ - Metric: "s a", - Values: []float64{1}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - { - Key: "ta g1", - Value: "aaa1", - }, - { - Key: "tag2", - Value: "bb b2", - }, - }, - }}, - }) - - // Tags - f("foo:1|c", &Rows{ - Rows: []Row{{ - Metric: "foo", - Values: []float64{1}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }}, - }) - // Empty tag name - f("foo:1|d|#:123", &Rows{ - Rows: []Row{{ - Metric: "foo", - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "d", - }, - }, - Values: []float64{1}, - }}, - }) - // Empty tag value - f("foo:1|s|#tag1:", &Rows{ - Rows: []Row{{ - Metric: "foo", - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "s", - }, - }, - Values: []float64{1}, - }}, - }) - f("foo:1|d|#bar:baz,aa:,x:y,:z", &Rows{ - Rows: []Row{{ - Metric: "foo", - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "d", - }, - { - Key: "bar", - Value: "baz", - }, - { - Key: "x", - Value: "y", - }, - }, - Values: []float64{1}, - }}, - }) - - // Multi lines - f("foo:0.3|c\naaa:3|g\nbar.baz:0.34|c\n", &Rows{ - Rows: []Row{ - { - Metric: "foo", - Values: []float64{0.3}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }, - { - Metric: "aaa", - Values: []float64{3}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "g", - }, - }, - }, - { - Metric: "bar.baz", - Values: []float64{0.34}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }, - }, - }) - - f("foo:0.3|c|#tag1:1,tag2:2\naaa:3|g|#tag3:3,tag4:4", &Rows{ - Rows: []Row{ - { - Metric: "foo", - Values: []float64{0.3}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - - { - Key: "tag1", - Value: "1", - }, - { - Key: "tag2", - Value: "2", - }, - }, - }, - { - Metric: "aaa", - Values: []float64{3}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "g", - }, - - { - Key: "tag3", - Value: "3", - }, - { - Key: "tag4", - Value: "4", - }, - }, - }, - }, - }) - - // Multi lines with invalid line - f("foo:0.3|c\naaa\nbar.baz:0.34|c\n", &Rows{ - Rows: []Row{ - { - Metric: "foo", - Values: []float64{0.3}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }, - { - Metric: "bar.baz", - Values: []float64{0.34}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }, - }, - }) - - // Whitespace after at the end - f("foo.baz:125|c\na:1.34|h\t ", &Rows{ - Rows: []Row{ - { - Metric: "foo.baz", - Values: []float64{125}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - }, - }, - { - Metric: "a", - Values: []float64{1.34}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "h", - }, - }, - }, - }, - }) - - // ignores sample rate - f("foo.baz:125|c|@0.5|#tag1:12", &Rows{ - Rows: []Row{ - { - Metric: "foo.baz", - Values: []float64{125}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - { - Key: "tag1", - Value: "12", - }, - }, - }, - }, - }) - // ignores container and timestamp - f("foo.baz:125|c|@0.5|#tag1:12|c:83c0a99c0a54c0c187f461c7980e9b57f3f6a8b0c918c8d93df19a9de6f3fe1d|T1656581400", &Rows{ - Rows: []Row{ - { - Metric: "foo.baz", - Values: []float64{125}, - Tags: []Tag{ - { - Key: statsdTypeTagName, - Value: "c", - }, - { - Key: "tag1", - Value: "12", - }, - }, - }, - }, - }) -} - -func TestRowsUnmarshalFailure(t *testing.T) { - f := func(s string) { - t.Helper() - var rows Rows - rows.Unmarshal(s) - if len(rows.Rows) != 0 { - t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows)) - } - - // Try again - rows.Unmarshal(s) - if len(rows.Rows) != 0 { - t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows)) - } - } - - // random string - f("aaa") - - // empty value - f("foo:") - - // empty metric name - f(":12") - - // empty type - f("foo:12") - - // bad values - f("foo:12:baz|c") -} diff --git a/lib/protoparser/statsd/parser_timing_test.go b/lib/protoparser/statsd/parser_timing_test.go deleted file mode 100644 index d9e3c6479..000000000 --- a/lib/protoparser/statsd/parser_timing_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package statsd - -import ( - "fmt" - "testing" -) - -func BenchmarkRowsUnmarshal(b *testing.B) { - s := `cpu.usage_user:1.23|c -cpu.usage_system:23.344|c -cpu.usage_iowait:3.3443|c -cpu.usage_irq:0.34432|c -` - b.SetBytes(int64(len(s))) - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - var rows Rows - for pb.Next() { - rows.Unmarshal(s) - if len(rows.Rows) != 4 { - panic(fmt.Errorf("unexpected number of rows unmarshaled: got %d; want 4", len(rows.Rows))) - } - } - }) -} diff --git a/lib/protoparser/statsd/stream/streamparser.go b/lib/protoparser/statsd/stream/streamparser.go deleted file mode 100644 index 7d82ce5d9..000000000 --- a/lib/protoparser/statsd/stream/streamparser.go +++ /dev/null @@ -1,200 +0,0 @@ -package stream - -import ( - "bufio" - "fmt" - "io" - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" - "github.com/VictoriaMetrics/metrics" -) - -// Parse parses Statsd lines from r and calls callback for the parsed rows. -// -// The callback can be called concurrently multiple times for streamed data from r. -// -// callback shouldn't hold rows after returning. -func Parse(r io.Reader, isGzipped bool, callback func(rows []statsd.Row) error) error { - wcr := writeconcurrencylimiter.GetReader(r) - defer writeconcurrencylimiter.PutReader(wcr) - r = wcr - - if isGzipped { - zr, err := common.GetGzipReader(r) - if err != nil { - return fmt.Errorf("cannot read gzipped statsd data: %w", err) - } - defer common.PutGzipReader(zr) - r = zr - } - - ctx := getStreamContext(r) - defer putStreamContext(ctx) - - for ctx.Read() { - uw := getUnmarshalWork() - uw.ctx = ctx - uw.callback = callback - uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf - ctx.wg.Add(1) - common.ScheduleUnmarshalWork(uw) - wcr.DecConcurrency() - } - ctx.wg.Wait() - if err := ctx.Error(); err != nil { - return err - } - return ctx.callbackErr -} - -func (ctx *streamContext) Read() bool { - readCalls.Inc() - if ctx.err != nil || ctx.hasCallbackError() { - return false - } - ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf) - if ctx.err != nil { - if ctx.err != io.EOF { - readErrors.Inc() - ctx.err = fmt.Errorf("cannot read statsd plaintext protocol data: %w", ctx.err) - } - return false - } - return true -} - -type streamContext struct { - br *bufio.Reader - reqBuf []byte - tailBuf []byte - err error - - wg sync.WaitGroup - callbackErrLock sync.Mutex - callbackErr error -} - -func (ctx *streamContext) Error() error { - if ctx.err == io.EOF { - return nil - } - return ctx.err -} - -func (ctx *streamContext) hasCallbackError() bool { - ctx.callbackErrLock.Lock() - ok := ctx.callbackErr != nil - ctx.callbackErrLock.Unlock() - return ok -} - -func (ctx *streamContext) reset() { - ctx.br.Reset(nil) - ctx.reqBuf = ctx.reqBuf[:0] - ctx.tailBuf = ctx.tailBuf[:0] - ctx.err = nil - ctx.callbackErr = nil -} - -var ( - readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="statsd"}`) - readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="statsd"}`) - rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="statsd"}`) -) - -func getStreamContext(r io.Reader) *streamContext { - select { - case ctx := <-streamContextPoolCh: - ctx.br.Reset(r) - return ctx - default: - if v := streamContextPool.Get(); v != nil { - ctx := v.(*streamContext) - ctx.br.Reset(r) - return ctx - } - return &streamContext{ - br: bufio.NewReaderSize(r, 64*1024), - } - } -} - -func putStreamContext(ctx *streamContext) { - ctx.reset() - select { - case streamContextPoolCh <- ctx: - default: - streamContextPool.Put(ctx) - } -} - -var ( - streamContextPool sync.Pool - streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs()) -) - -type unmarshalWork struct { - rows statsd.Rows - ctx *streamContext - callback func(rows []statsd.Row) error - reqBuf []byte -} - -func (uw *unmarshalWork) reset() { - uw.rows.Reset() - uw.ctx = nil - uw.callback = nil - uw.reqBuf = uw.reqBuf[:0] -} - -func (uw *unmarshalWork) runCallback(rows []statsd.Row) { - ctx := uw.ctx - if err := uw.callback(rows); err != nil { - ctx.callbackErrLock.Lock() - if ctx.callbackErr == nil { - ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err) - } - ctx.callbackErrLock.Unlock() - } - ctx.wg.Done() -} - -// Unmarshal implements common.UnmarshalWork -func (uw *unmarshalWork) Unmarshal() { - uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf)) - rows := uw.rows.Rows - rowsRead.Add(len(rows)) - - // Fill missing timestamps with the current timestamp rounded to seconds. - currentTimestamp := int64(fasttime.UnixTimestamp()) - for i := range rows { - r := &rows[i] - if r.Timestamp == 0 || r.Timestamp == -1 { - r.Timestamp = currentTimestamp * 1e3 - } - } - - uw.runCallback(rows) - putUnmarshalWork(uw) -} - -func getUnmarshalWork() *unmarshalWork { - v := unmarshalWorkPool.Get() - if v == nil { - return &unmarshalWork{} - } - return v.(*unmarshalWork) -} - -func putUnmarshalWork(uw *unmarshalWork) { - uw.reset() - unmarshalWorkPool.Put(uw) -} - -var unmarshalWorkPool sync.Pool diff --git a/lib/protoparser/statsd/stream/streamparser_test.go b/lib/protoparser/statsd/stream/streamparser_test.go deleted file mode 100644 index 626229817..000000000 --- a/lib/protoparser/statsd/stream/streamparser_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package stream - -import ( - "reflect" - "strings" - "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd" -) - -func Test_streamContext_Read(t *testing.T) { - f := func(s string, rowsExpected *statsd.Rows) { - t.Helper() - ctx := getStreamContext(strings.NewReader(s)) - if !ctx.Read() { - t.Fatalf("expecting successful read") - } - uw := getUnmarshalWork() - callbackCalls := 0 - uw.ctx = ctx - uw.callback = func(rows []statsd.Row) error { - callbackCalls++ - if len(rows) != len(rowsExpected.Rows) { - t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) - } - if !reflect.DeepEqual(rows, rowsExpected.Rows) { - t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows) - } - return nil - } - uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...) - ctx.wg.Add(1) - uw.Unmarshal() - if callbackCalls != 1 { - t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls) - } - } - - // Full line without tags - f("aaa:1123|c", &statsd.Rows{ - Rows: []statsd.Row{{ - Metric: "aaa", - Tags: []statsd.Tag{ - { - Key: "__statsd_metric_type__", - Value: "c", - }, - }, - Values: []float64{1123}, - Timestamp: int64(fasttime.UnixTimestamp()) * 1000, - }}, - }) - // Full line with tags - f("aaa:1123|c|#x:y", &statsd.Rows{ - Rows: []statsd.Row{{ - Metric: "aaa", - Tags: []statsd.Tag{ - { - Key: "__statsd_metric_type__", - Value: "c", - }, - { - Key: "x", - Value: "y", - }, - }, - Values: []float64{1123}, - Timestamp: int64(fasttime.UnixTimestamp()) * 1000, - }}, - }) -}