Statsd protocol compatibility (#5053)

In this PR I added compatibility with [statsd
protocol](https://github.com/b/statsd_spec) with tags to be able to send
metrics directly from statsd clients to vmagent or directly to VM.
For example its compatible with
[statsd-instrument](https://github.com/Shopify/statsd-instrument) and
[dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) gems

Related issues: #5052, #206, #4600
This commit is contained in:
Oleg 2024-05-07 23:46:08 +04:00 committed by GitHub
parent 55c7dafb35
commit c6c5a5a186
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1344 additions and 4 deletions

View file

@ -86,6 +86,7 @@ VictoriaMetrics has the following prominent features:
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
* [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP.
* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon).
* [Statsd plaintext protocol](#how-to-send-data-from-statsd-compatible-clients)
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol).
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests).
* [JSON line format](#how-to-import-data-in-json-line-format).
@ -701,6 +702,45 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
```
## How to send data from Statsd-compatible clients
VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored).
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance,
the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`:
```console
/path/to/victoria-metrics-prod -statsdListenAddr=:8125
```
Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`:
```console
echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125
```
Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it.
An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
<div class="with-copy" markdown="1">
```console
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
```
</div>
The `/api/v1/export` endpoint should return the following response:
```json
{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]}
```
Some examples of compatible statsd clients:
- [statsd-instrument](https://github.com/Shopify/statsd-instrument)
- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby)
- [go-statsd-client](https://github.com/cactus/go-statsd-client)
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance,
@ -1356,6 +1396,7 @@ Additionally, VictoriaMetrics can accept metrics via the following popular data
* DataDog `submit metrics` API. See [these docs](#how-to-send-data-from-datadog-agent) for details.
* InfluxDB line protocol. See [these docs](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) for details.
* Graphite plaintext protocol. See [these docs](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) for details.
* Statsd plaintext protocol. See [these docs](#how-to-send-data-from-statsd-compatible-clients) for details.
* OpenTelemetry http API. See [these docs](#sending-data-via-opentelemetry) for details.
* OpenTSDB telnet put protocol. See [these docs](#sending-data-via-telnet-put-protocol) for details.
* OpenTSDB http `/api/put` protocol. See [these docs](#sending-opentsdb-data-via-http-apiput-requests) for details.

View file

@ -24,6 +24,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/prometheusimport"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
@ -36,6 +37,7 @@ import (
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
statsdserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
@ -61,6 +63,10 @@ var (
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
statsdListenAddr = flag.String("statsdListenAddr", "", "TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. "+
"See also -statsdListenAddr.useProxyProtocol")
statsdUseProxyProtocol = flag.Bool("statsdListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -statsdListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpenTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol")
@ -80,6 +86,7 @@ var (
var (
influxServer *influxserver.Server
graphiteServer *graphiteserver.Server
statsdServer *statsdserver.Server
opentsdbServer *opentsdbserver.Server
opentsdbhttpServer *opentsdbhttpserver.Server
)
@ -137,6 +144,9 @@ func main() {
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
if len(*statsdListenAddr) > 0 {
statsdServer = statsdserver.MustStart(*statsdListenAddr, *statsdUseProxyProtocol, statsd.InsertHandler)
}
if len(*opentsdbListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler)
@ -172,6 +182,9 @@ func main() {
if len(*graphiteListenAddr) > 0 {
graphiteServer.MustStop()
}
if len(*statsdListenAddr) > 0 {
statsdServer.MustStop()
}
if len(*opentsdbListenAddr) > 0 {
opentsdbServer.MustStop()
}

View file

@ -0,0 +1,68 @@
package statsd
import (
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd/stream"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vmagent_rows_inserted_total{type="statsd"}`)
rowsPerInsert = metrics.NewHistogram(`vmagent_rows_per_insert{type="statsd"}`)
)
// InsertHandler processes remote write for statsd plaintext protocol.
//
// See https://github.com/statsd/statsd/blob/master/docs/metric_types.md
func InsertHandler(r io.Reader) error {
return stream.Parse(r, false, func(rows []parser.Row) error {
return insertRows(nil, rows)
})
}
func insertRows(at *auth.Token, rows []parser.Row) error {
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
tssDst := ctx.WriteRequest.Timeseries[:0]
labels := ctx.Labels[:0]
samples := ctx.Samples[:0]
for i := range rows {
r := &rows[i]
labelsLen := len(labels)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: r.Metric,
})
for j := range r.Tags {
tag := &r.Tags[j]
labels = append(labels, prompbmarshal.Label{
Name: tag.Key,
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,
})
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
})
}
ctx.WriteRequest.Timeseries = tssDst
ctx.Labels = labels
ctx.Samples = samples
if !remotewrite.TryPush(at, &ctx.WriteRequest) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return nil
}

View file

@ -26,6 +26,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prompush"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -36,6 +37,7 @@ import (
influxserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/influx"
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
statsdserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
@ -49,6 +51,10 @@ var (
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
statsdListenAddr = flag.String("statsdListenAddr", "", "TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. "+
"See also -statsdListenAddr.useProxyProtocol")
statsdUseProxyProtocol = flag.Bool("statsdListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -statsdListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write . "+
"See also -influxListenAddr.useProxyProtocol")
@ -72,6 +78,7 @@ var (
var (
graphiteServer *graphiteserver.Server
statsdServer *statsdserver.Server
influxServer *influxserver.Server
opentsdbServer *opentsdbserver.Server
opentsdbhttpServer *opentsdbhttpserver.Server
@ -92,6 +99,9 @@ func Init() {
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
if len(*statsdListenAddr) > 0 {
statsdServer = statsdserver.MustStart(*statsdListenAddr, *statsdUseProxyProtocol, statsd.InsertHandler)
}
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, influx.InsertHandlerForReader)
}
@ -112,6 +122,9 @@ func Stop() {
if len(*graphiteListenAddr) > 0 {
graphiteServer.MustStop()
}
if len(*statsdListenAddr) > 0 {
statsdServer.MustStop()
}
if len(*influxListenAddr) > 0 {
influxServer.MustStop()
}

View file

@ -0,0 +1,54 @@
package statsd
import (
"io"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd/stream"
"github.com/VictoriaMetrics/metrics"
)
var (
rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="statsd"}`)
rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="statsd"}`)
)
// InsertHandler processes remote write for statsd protocol with tags.
//
// https://github.com/statsd/statsd/blob/master/docs/metric_types.md
func InsertHandler(r io.Reader) error {
return stream.Parse(r, false, insertRows)
}
func insertRows(rows []parser.Row) error {
ctx := common.GetInsertCtx()
defer common.PutInsertCtx(ctx)
ctx.Reset(len(rows))
hasRelabeling := relabel.HasRelabeling()
for i := range rows {
r := &rows[i]
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", r.Metric)
for j := range r.Tags {
tag := &r.Tags[j]
ctx.AddLabel(tag.Key, tag.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
}
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))
return ctx.FlushBufs()
}

View file

@ -89,6 +89,7 @@ VictoriaMetrics has the following prominent features:
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
* [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP.
* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon).
* [Statsd plaintext protocol](#how-to-send-data-from-statsd-compatible-clients)
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol).
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests).
* [JSON line format](#how-to-import-data-in-json-line-format).
@ -704,6 +705,45 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
```
## How to send data from Statsd-compatible clients
VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored).
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance,
the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`:
```console
/path/to/victoria-metrics-prod -statsdListenAddr=:8125
```
Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`:
```console
echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125
```
Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it.
An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
<div class="with-copy" markdown="1">
```console
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
```
</div>
The `/api/v1/export` endpoint should return the following response:
```json
{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]}
```
Some examples of compatible statsd clients:
- [statsd-instrument](https://github.com/Shopify/statsd-instrument)
- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby)
- [go-statsd-client](https://github.com/cactus/go-statsd-client)
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance,

View file

@ -97,6 +97,7 @@ VictoriaMetrics has the following prominent features:
* [Prometheus exposition format](#how-to-import-data-in-prometheus-exposition-format).
* [InfluxDB line protocol](#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf) over HTTP, TCP and UDP.
* [Graphite plaintext protocol](#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) with [tags](https://graphite.readthedocs.io/en/latest/tags.html#carbon).
* [Statsd plaintext protocol](#how-to-send-data-from-statsd-compatible-clients)
* [OpenTSDB put message](#sending-data-via-telnet-put-protocol).
* [HTTP OpenTSDB /api/put requests](#sending-opentsdb-data-via-http-apiput-requests).
* [JSON line format](#how-to-import-data-in-json-line-format).
@ -712,6 +713,45 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
```
## How to send data from Statsd-compatible clients
VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored).
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance,
the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`:
```console
/path/to/victoria-metrics-prod -statsdListenAddr=:8125
```
Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`:
```console
echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125
```
Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it.
An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
<div class="with-copy" markdown="1">
```console
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
```
</div>
The `/api/v1/export` endpoint should return the following response:
```json
{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]}
```
Some examples of compatible statsd clients:
- [statsd-instrument](https://github.com/Shopify/statsd-instrument)
- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby)
- [go-statsd-client](https://github.com/cactus/go-statsd-client)
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance,

View file

@ -133,7 +133,7 @@ Stream aggregation can be used in the following cases:
### Statsd alternative
Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) alternative in the following cases:
Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) drop-in replacement in the following cases:
* [Counting input samples](#counting-input-samples)
* [Summing input metrics](#summing-input-metrics)
@ -141,9 +141,6 @@ Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) alt
* [Histograms over input metrics](#histograms-over-input-metrics)
* [Aggregating histograms](#aggregating-histograms)
Currently, streaming aggregation is available only for [supported data ingestion protocols](https://docs.victoriametrics.com/#how-to-import-time-series-data)
and not available for [Statsd metrics format](https://github.com/statsd/statsd/blob/master/docs/metric_types.md).
### Recording rules alternative
Sometimes [alerting queries](https://docs.victoriametrics.com/vmalert/#alerting-rules) may require non-trivial amounts of CPU, RAM,

View file

@ -107,6 +107,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
* DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-datadog-agent).
* InfluxDB line protocol via `http://<vmagent>:8429/write`. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
* Graphite plaintext protocol if `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd).
* Statsd plaintext protocol if `-statsdListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-statsd-compatible-clients).
* OpenTelemetry http API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#sending-data-via-opentelemetry).
* NewRelic API. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-newrelic-agent).
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#how-to-send-data-from-opentsdb-compatible-agents).
@ -1707,6 +1708,10 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Whether to use proxy protocol for connections accepted at -graphiteListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
-graphiteTrimTimestamp duration
Trim timestamps for Graphite data to this duration. Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data (default 1s)
-statsdListenAddr string
TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol
-statsdListenAddr.useProxyProtocol
Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
-http.connTimeout duration
Incoming connections to -httpListenAddr are closed after the configured timeout. This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections (default 2m0s)
-http.disableResponseCompression

View file

@ -0,0 +1,173 @@
package statsd
import (
"errors"
"io"
"net"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
var (
writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="tcp"}`)
writeErrorsTCP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="tcp"}`)
writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="udp"}`)
writeErrorsUDP = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="udp"}`)
)
// Server accepts Statsd plaintext lines over TCP and UDP.
type Server struct {
addr string
lnTCP net.Listener
lnUDP net.PacketConn
wg sync.WaitGroup
cm ingestserver.ConnsMap
}
// MustStart starts statsd server on the given addr.
//
// The incoming connections are processed with insertHandler.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP Statsd server at %q", addr)
lnTCP, err := netutil.NewTCPListener("statsd", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP Statsd server at %q: %s", addr, err)
}
logger.Infof("starting UDP Statsd server at %q", addr)
lnUDP, err := net.ListenPacket(netutil.GetUDPNetwork(), addr)
if err != nil {
logger.Fatalf("cannot start UDP Statsd server at %q: %s", addr, err)
}
s := &Server{
addr: addr,
lnTCP: lnTCP,
lnUDP: lnUDP,
}
s.cm.Init("statsd")
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.serveTCP(insertHandler)
logger.Infof("stopped TCP Statsd server at %q", addr)
}()
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.serveUDP(insertHandler)
logger.Infof("stopped UDP Statsd server at %q", addr)
}()
return s
}
// MustStop stops the server.
func (s *Server) MustStop() {
logger.Infof("stopping TCP Statsd server at %q...", s.addr)
if err := s.lnTCP.Close(); err != nil {
logger.Errorf("cannot close TCP Statsd server: %s", err)
}
logger.Infof("stopping UDP Statsd server at %q...", s.addr)
if err := s.lnUDP.Close(); err != nil {
logger.Errorf("cannot close UDP Statsd server: %s", err)
}
s.cm.CloseAll(0)
s.wg.Wait()
logger.Infof("TCP and UDP Statsd servers at %q have been stopped", s.addr)
}
func (s *Server) serveTCP(insertHandler func(r io.Reader) error) {
var wg sync.WaitGroup
for {
c, err := s.lnTCP.Accept()
if err != nil {
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("statsd: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
logger.Fatalf("unrecoverable error when accepting TCP Statsd connections: %s", err)
}
logger.Fatalf("unexpected error when accepting TCP Statsd connections: %s", err)
}
if !s.cm.Add(c) {
_ = c.Close()
break
}
wg.Add(1)
go func() {
defer func() {
s.cm.Delete(c)
_ = c.Close()
wg.Done()
}()
writeRequestsTCP.Inc()
if err := insertHandler(c); err != nil {
writeErrorsTCP.Inc()
logger.Errorf("error in TCP Statsd conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
}
}()
}
wg.Wait()
}
func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
gomaxprocs := cgroup.AvailableCPUs()
var wg sync.WaitGroup
for i := 0; i < gomaxprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var bb bytesutil.ByteBuffer
bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
for {
bb.Reset()
bb.B = bb.B[:cap(bb.B)]
n, addr, err := s.lnUDP.ReadFrom(bb.B)
if err != nil {
writeErrorsUDP.Inc()
var ne net.Error
if errors.As(err, &ne) {
if ne.Temporary() {
logger.Errorf("statsd: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err)
time.Sleep(time.Second)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
}
logger.Errorf("cannot read Statsd UDP data: %s", err)
continue
}
bb.B = bb.B[:n]
writeRequestsUDP.Inc()
if err := insertHandler(bb.NewReader()); err != nil {
writeErrorsUDP.Inc()
logger.Errorf("error in UDP Statsd conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err)
continue
}
}
}()
}
wg.Wait()
}

View file

@ -0,0 +1,226 @@
package statsd
import (
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
)
// Statsd metric format with tags: MetricName:value|type|@sample_rate|#tag1:value,tag1...
const statsdSeparator = '|'
const statsdPairsSeparator = ':'
const statsdTagsStartSeparator = '#'
const statsdTagsSeparator = ','
// Rows contains parsed statsd rows.
type Rows struct {
Rows []Row
tagsPool []Tag
}
// Reset resets rs.
func (rs *Rows) Reset() {
// Reset items, so they can be GC'ed
for i := range rs.Rows {
rs.Rows[i].reset()
}
rs.Rows = rs.Rows[:0]
for i := range rs.tagsPool {
rs.tagsPool[i].reset()
}
rs.tagsPool = rs.tagsPool[:0]
}
// Unmarshal unmarshals statsd plaintext protocol rows from s.
//
// s shouldn't be modified when rs is in use.
func (rs *Rows) Unmarshal(s string) {
rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0])
}
// Row is a single statsd row.
type Row struct {
Metric string
Tags []Tag
Value float64
Timestamp int64
}
func (r *Row) reset() {
r.Metric = ""
r.Tags = nil
r.Value = 0
r.Timestamp = 0
}
func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
r.reset()
originalString := s
s = stripTrailingWhitespace(s)
separatorPosition := strings.IndexByte(s, statsdSeparator)
if separatorPosition < 0 {
s = stripTrailingWhitespace(s)
} else {
s = stripTrailingWhitespace(s[:separatorPosition])
}
valuesSeparatorPosition := strings.LastIndexByte(s, statsdPairsSeparator)
if valuesSeparatorPosition == 0 {
return tagsPool, fmt.Errorf("cannot find metric name for %q", s)
}
if valuesSeparatorPosition < 0 {
return tagsPool, fmt.Errorf("cannot find separator for %q", s)
}
r.Metric = s[:valuesSeparatorPosition]
valueStr := s[valuesSeparatorPosition+1:]
v, err := fastfloat.Parse(valueStr)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, originalString)
}
r.Value = v
// parsing tags
tagsSeparatorPosition := strings.LastIndexByte(originalString, statsdTagsStartSeparator)
if tagsSeparatorPosition < 0 {
// no tags
return tagsPool, nil
}
tagsStart := len(tagsPool)
tagsPool = unmarshalTags(tagsPool, originalString[tagsSeparatorPosition+1:])
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
return tagsPool, nil
}
func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
for len(s) > 0 {
n := strings.IndexByte(s, '\n')
if n < 0 {
// The last line.
return unmarshalRow(dst, s, tagsPool)
}
dst, tagsPool = unmarshalRow(dst, s[:n], tagsPool)
s = s[n+1:]
}
return dst, tagsPool
}
func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
if len(s) > 0 && s[len(s)-1] == '\r' {
s = s[:len(s)-1]
}
s = stripLeadingWhitespace(s)
if len(s) == 0 {
// Skip empty line
return dst, tagsPool
}
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Row{})
}
r := &dst[len(dst)-1]
var err error
tagsPool, err = r.unmarshal(s, tagsPool)
if err != nil {
dst = dst[:len(dst)-1]
logger.Errorf("cannot unmarshal Statsd line %q: %s", s, err)
invalidLines.Inc()
}
return dst, tagsPool
}
var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="statsd"}`)
func unmarshalTags(dst []Tag, s string) []Tag {
for {
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Tag{})
}
tag := &dst[len(dst)-1]
n := strings.IndexByte(s, statsdTagsSeparator)
if n < 0 {
// The last tag found
tag.unmarshal(s)
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
return dst
}
tag.unmarshal(s[:n])
s = s[n+1:]
if len(tag.Key) == 0 || len(tag.Value) == 0 {
// Skip empty tag
dst = dst[:len(dst)-1]
}
}
}
// Tag is a statsd tag.
type Tag struct {
Key string
Value string
}
func (t *Tag) reset() {
t.Key = ""
t.Value = ""
}
func (t *Tag) unmarshal(s string) {
t.reset()
n := strings.IndexByte(s, statsdPairsSeparator)
if n < 0 {
// Empty tag value.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1100
t.Key = s
t.Value = s[len(s):]
} else {
t.Key = s[:n]
t.Value = s[n+1:]
}
}
func stripTrailingWhitespace(s string) string {
n := len(s)
for {
n--
if n < 0 {
return ""
}
ch := s[n]
if ch != ' ' && ch != '\t' {
return s[:n+1]
}
}
}
func stripLeadingWhitespace(s string) string {
for len(s) > 0 {
ch := s[0]
if ch != ' ' && ch != '\t' {
return s
}
s = s[1:]
}
return ""
}

View file

@ -0,0 +1,367 @@
package statsd
import (
"reflect"
"testing"
)
func TestUnmarshalTagsSuccess(t *testing.T) {
f := func(dst []Tag, s string, tagsPoolExpected []Tag) {
t.Helper()
tagsPool := unmarshalTags(dst, s)
if !reflect.DeepEqual(tagsPool, tagsPoolExpected) {
t.Fatalf("unexpected tags;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected)
}
// Try unmarshaling again
tagsPool = unmarshalTags(dst, s)
if !reflect.DeepEqual(tagsPool, tagsPoolExpected) {
t.Fatalf("unexpected tags on second unmarshal;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected)
}
}
f([]Tag{}, "foo:bar", []Tag{
{
Key: "foo",
Value: "bar",
},
})
f([]Tag{}, "foo:bar,qwe:123", []Tag{
{
Key: "foo",
Value: "bar",
},
{
Key: "qwe",
Value: "123",
},
})
f([]Tag{}, "foo.qwe:bar", []Tag{
{
Key: "foo.qwe",
Value: "bar",
},
})
f([]Tag{}, "foo:10", []Tag{
{
Key: "foo",
Value: "10",
},
})
f([]Tag{}, "foo: _qwe", []Tag{
{
Key: "foo",
Value: " _qwe",
},
})
f([]Tag{}, "foo:qwe ", []Tag{
{
Key: "foo",
Value: "qwe ",
},
})
f([]Tag{}, "foo asd:qwe ", []Tag{
{
Key: "foo asd",
Value: "qwe ",
},
})
f([]Tag{}, "foo:var:123", []Tag{
{
Key: "foo",
Value: "var:123",
},
})
// invalid tags
f([]Tag{}, ":bar", []Tag{})
f([]Tag{}, "foo:", []Tag{})
f([]Tag{}, " ", []Tag{})
}
func TestRowsUnmarshalSuccess(t *testing.T) {
f := func(s string, rowsExpected *Rows) {
t.Helper()
var rows Rows
rows.Unmarshal(s)
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
}
// Try unmarshaling again
rows.Unmarshal(s)
if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows on second unmarshal;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
}
rows.Reset()
if len(rows.Rows) != 0 {
t.Fatalf("non-empty rows after reset: %+v", rows.Rows)
}
}
// Empty line
f("", &Rows{})
f("\r", &Rows{})
f("\n\n", &Rows{})
f("\n\r\n", &Rows{})
// Single line
f(" 123:455", &Rows{
Rows: []Row{{
Metric: "123",
Value: 455,
}},
})
f("123:455 |c", &Rows{
Rows: []Row{{
Metric: "123",
Value: 455,
}},
})
f("foobar:-123.456|c", &Rows{
Rows: []Row{{
Metric: "foobar",
Value: -123.456,
}},
})
f("foo.bar:123.456|c\n", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123.456,
}},
})
// with sample rate
f("foo.bar:1|c|@0.1", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 1,
}},
})
// without specifying metric unit
f("foo.bar:123", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123,
}},
})
// without specifying metric unit but with tags
f("foo.bar:123|#foo:bar", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123,
Tags: []Tag{
{
Key: "foo",
Value: "bar",
},
},
}},
})
f("foo.bar:123.456|c|#foo:bar,qwe:asd", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123.456,
Tags: []Tag{
{
Key: "foo",
Value: "bar",
},
{
Key: "qwe",
Value: "asd",
},
},
}},
})
// Whitespace in metric name, tag name and tag value
f("s a:1|c|#ta g1:aaa1,tag2:bb b2", &Rows{
Rows: []Row{{
Metric: "s a",
Value: 1,
Tags: []Tag{
{
Key: "ta g1",
Value: "aaa1",
},
{
Key: "tag2",
Value: "bb b2",
},
},
}},
})
// Tags
f("foo:1|c", &Rows{
Rows: []Row{{
Metric: "foo",
Value: 1,
}},
})
// Empty tag name
f("foo:1|#:123", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{},
Value: 1,
}},
})
// Empty tag value
f("foo:1|#tag1:", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{},
Value: 1,
}},
})
f("foo:1|#bar:baz,aa:,x:y,:z", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{
{
Key: "bar",
Value: "baz",
},
{
Key: "x",
Value: "y",
},
},
Value: 1,
}},
})
// Multi lines
f("foo:0.3|c\naaa:3|g\nbar.baz:0.34|c\n", &Rows{
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
},
{
Metric: "aaa",
Value: 3,
},
{
Metric: "bar.baz",
Value: 0.34,
},
},
})
f("foo:0.3|c|#tag1:1,tag2:2\naaa:3|g|#tag3:3,tag4:4", &Rows{
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
Tags: []Tag{
{
Key: "tag1",
Value: "1",
},
{
Key: "tag2",
Value: "2",
},
},
},
{
Metric: "aaa",
Value: 3,
Tags: []Tag{
{
Key: "tag3",
Value: "3",
},
{
Key: "tag4",
Value: "4",
},
},
},
},
})
// Multi lines with invalid line
f("foo:0.3|c\naaa\nbar.baz:0.34\n", &Rows{
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
},
{
Metric: "bar.baz",
Value: 0.34,
},
},
})
// Whitespace after at the end
f("foo.baz:125|c\na:1.34\t ", &Rows{
Rows: []Row{
{
Metric: "foo.baz",
Value: 125,
},
{
Metric: "a",
Value: 1.34,
},
},
})
// ignores sample rate
f("foo.baz:125|c|@0.5#tag1:12", &Rows{
Rows: []Row{
{
Metric: "foo.baz",
Value: 125,
Tags: []Tag{
{
Key: "tag1",
Value: "12",
},
},
},
},
})
}
func TestRowsUnmarshalFailure(t *testing.T) {
f := func(s string) {
t.Helper()
var rows Rows
rows.Unmarshal(s)
if len(rows.Rows) != 0 {
t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
}
// Try again
rows.Unmarshal(s)
if len(rows.Rows) != 0 {
t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
}
}
// random string
f("aaa")
// empty value
f("foo:")
// empty metric name
f(":12")
}

View file

@ -0,0 +1,25 @@
package statsd
import (
"fmt"
"testing"
)
func BenchmarkRowsUnmarshal(b *testing.B) {
s := `cpu.usage_user:1.23|c
cpu.usage_system:23.344|c
cpu.usage_iowait:3.3443|c
cpu.usage_irq:0.34432|c
`
b.SetBytes(int64(len(s)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var rows Rows
for pb.Next() {
rows.Unmarshal(s)
if len(rows.Rows) != 4 {
panic(fmt.Errorf("unexpected number of rows unmarshaled: got %d; want 4", len(rows.Rows)))
}
}
})
}

View file

@ -0,0 +1,218 @@
package stream
import (
"bufio"
"flag"
"fmt"
"io"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
var (
trimTimestamp = flag.Duration("statsdTrimTimestamp", time.Second, "Trim timestamps for Statsd data to this duration. "+
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
)
// Parse parses Statsd lines from r and calls callback for the parsed rows.
//
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold rows after returning.
func Parse(r io.Reader, isGzipped bool, callback func(rows []statsd.Row) error) error {
wcr := writeconcurrencylimiter.GetReader(r)
defer writeconcurrencylimiter.PutReader(wcr)
r = wcr
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped statsd data: %w", err)
}
defer common.PutGzipReader(zr)
r = zr
}
ctx := getStreamContext(r)
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.ctx = ctx
uw.callback = callback
uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
ctx.wg.Add(1)
common.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
ctx.wg.Wait()
if err := ctx.Error(); err != nil {
return err
}
return ctx.callbackErr
}
func (ctx *streamContext) Read() bool {
readCalls.Inc()
if ctx.err != nil || ctx.hasCallbackError() {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()
ctx.err = fmt.Errorf("cannot read statsd plaintext protocol data: %w", ctx.err)
}
return false
}
return true
}
type streamContext struct {
br *bufio.Reader
reqBuf []byte
tailBuf []byte
err error
wg sync.WaitGroup
callbackErrLock sync.Mutex
callbackErr error
}
func (ctx *streamContext) Error() error {
if ctx.err == io.EOF {
return nil
}
return ctx.err
}
func (ctx *streamContext) hasCallbackError() bool {
ctx.callbackErrLock.Lock()
ok := ctx.callbackErr != nil
ctx.callbackErrLock.Unlock()
return ok
}
func (ctx *streamContext) reset() {
ctx.br.Reset(nil)
ctx.reqBuf = ctx.reqBuf[:0]
ctx.tailBuf = ctx.tailBuf[:0]
ctx.err = nil
ctx.callbackErr = nil
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="statsd"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="statsd"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="statsd"}`)
)
func getStreamContext(r io.Reader) *streamContext {
select {
case ctx := <-streamContextPoolCh:
ctx.br.Reset(r)
return ctx
default:
if v := streamContextPool.Get(); v != nil {
ctx := v.(*streamContext)
ctx.br.Reset(r)
return ctx
}
return &streamContext{
br: bufio.NewReaderSize(r, 64*1024),
}
}
}
func putStreamContext(ctx *streamContext) {
ctx.reset()
select {
case streamContextPoolCh <- ctx:
default:
streamContextPool.Put(ctx)
}
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows statsd.Rows
ctx *streamContext
callback func(rows []statsd.Row) error
reqBuf []byte
}
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.ctx = nil
uw.callback = nil
uw.reqBuf = uw.reqBuf[:0]
}
func (uw *unmarshalWork) runCallback(rows []statsd.Row) {
ctx := uw.ctx
if err := uw.callback(rows); err != nil {
ctx.callbackErrLock.Lock()
if ctx.callbackErr == nil {
ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
}
ctx.callbackErrLock.Unlock()
}
ctx.wg.Done()
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
rows := uw.rows.Rows
rowsRead.Add(len(rows))
// Fill missing timestamps with the current timestamp rounded to seconds.
currentTimestamp := int64(fasttime.UnixTimestamp())
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 || r.Timestamp == -1 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps from seconds to milliseconds.
for i := range rows {
rows[i].Timestamp *= 1e3
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
uw.runCallback(rows)
putUnmarshalWork(uw)
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool

View file

@ -0,0 +1,60 @@
package stream
import (
"reflect"
"strings"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
)
func Test_streamContext_Read(t *testing.T) {
f := func(s string, rowsExpected *statsd.Rows) {
t.Helper()
ctx := getStreamContext(strings.NewReader(s))
if !ctx.Read() {
t.Fatalf("expecting successful read")
}
uw := getUnmarshalWork()
callbackCalls := 0
uw.ctx = ctx
uw.callback = func(rows []statsd.Row) error {
callbackCalls++
if len(rows) != len(rowsExpected.Rows) {
t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
if !reflect.DeepEqual(rows, rowsExpected.Rows) {
t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
}
return nil
}
uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
ctx.wg.Add(1)
uw.Unmarshal()
if callbackCalls != 1 {
t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
}
}
// Full line without tags
f("aaa:1123|c", &statsd.Rows{
Rows: []statsd.Row{{
Metric: "aaa",
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
// Full line with tags
f("aaa:1123|c|#x:y", &statsd.Rows{
Rows: []statsd.Row{{
Metric: "aaa",
Tags: []statsd.Tag{{
Key: "x",
Value: "y",
}},
Value: 1123,
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
}