Merge branch 'public-single-node' into victorialogs-wip

This commit is contained in:
Aliaksandr Valialkin 2024-05-20 04:05:25 +02:00
commit d69ed753a7
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
28 changed files with 1070 additions and 347 deletions

View file

@ -702,45 +702,79 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
```
## How to send data from Statsd-compatible clients
## How to send data from StatsD-compatible clients
VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored).
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance,
the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`:
VictoriaMetrics supports extended StatsD protocol. Currently, it supports `tags` and `value packing`
extensions provided by [dogstatsd](https://docs.datadoghq.com/developers/dogstatsd/datagram_shell).
During parsing, metric's `<TYPE>` is added as a special label `__statsd_metric_type__`.
It is strongly advisable to configure streaming aggregation for each metric type. This process serves two primary
objectives:
* transformation of the StatsD data model into the VictoriaMetrics data model. VictoriaMetrics requires a consistent
interval between data points.
* minimizing of the disk space utilization and overall resource consumption during data ingestion.
VictoriaMetrics supports the following metric [types](https://docs.datadoghq.com/metrics/types):
* `c` Counter type.
* `g` Gauge type.
* `ms` Timer type.
* `m` Meters type.
* `h` Histogram type.
* `s` Set type with only numeric values.
* `d` Distribution type.
_The `Not Assigned` type is not supported due to the ambiguity surrounding its aggregation method.
The correct aggregation method cannot be determined for the undefined metric._
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag and configure [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/).
For instance, the following command will enable StatsD receiver in VictoriaMetrics on TCP and UDP port `8125`:
```console
/path/to/victoria-metrics-prod -statsdListenAddr=:8125
/path/to/victoria-metrics-prod -statsdListenAddr=:8125 -streamAggr.config=statsd_aggr.yaml
```
Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`:
Example of stream aggregation config:
```yaml
# statsd_aggr.yaml
# `last` output will keep the last sample on `interval`
# for each series that match `{__statsd_metric_type__="g"}` selector
- match: '{__statsd_metric_type__="g"}'
outputs: [last]
interval: 1m
```
Example for writing data with StatsD plaintext protocol to local VictoriaMetrics using `nc`:
```console
echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125
echo "foo.bar:123|g|#tag1:baz" | nc -N localhost 8125
```
Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it.
_An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go._
An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
Explicit setting of timestamps is not supported for StatsD protocol. Timestamp is set to the current time when
VictoriaMetrics or vmagent receives it.
<div class="with-copy" markdown="1">
Once ingested, the data can be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```console
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo.*"}'
```
</div>
_Please note, with stream aggregation enabled data will become available only after specified aggregation interval._
The `/api/v1/export` endpoint should return the following response:
```json
{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]}
{"metric":{"__name__":"foo.bar:1m_last","__statsd_metric_type__":"g","tag1":"baz"},"values":[123],"timestamps":[1715843939000]}
```
Some examples of compatible statsd clients:
- [statsd-instrument](https://github.com/Shopify/statsd-instrument)
- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby)
- [go-statsd-client](https://github.com/cactus/go-statsd-client)
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance,
@ -3172,6 +3206,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0)
-sortLabels
Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit
-statsd.disableAggregationEnforcement
Whether to disable streaming aggregation requirement check. It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database.
-statsdListenAddr string
TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol
-statsdListenAddr.useProxyProtocol
Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
-storage.cacheSizeIndexDBDataBlocks size
Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View file

@ -67,6 +67,8 @@ var (
"See also -statsdListenAddr.useProxyProtocol")
statsdUseProxyProtocol = flag.Bool("statsdListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -statsdListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
statsdDisableAggregationEnforcemenet = flag.Bool(`statsd.disableAggregationEnforcement`, false, "Whether to disable streaming aggregation requirement check. "+
"It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database.")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpenTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol")
@ -145,6 +147,9 @@ func main() {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
if len(*statsdListenAddr) > 0 {
if !remotewrite.HasAnyStreamAggrConfigured() && !*statsdDisableAggregationEnforcemenet {
logger.Fatalf("streaming aggregation must be configured with enabled statsd server. It's recommended to aggregate metrics received at statsd listener. This check could be disabled with flag -statsd.disableAggregationEnforcement")
}
statsdServer = statsdserver.MustStart(*statsdListenAddr, *statsdUseProxyProtocol, statsd.InsertHandler)
}
if len(*opentsdbListenAddr) > 0 {

View file

@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -20,7 +22,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
"github.com/VictoriaMetrics/metrics"
)
var (
@ -298,6 +299,11 @@ func (c *client) runWorker() {
if !ok {
return
}
if len(block) == 0 {
// skip empty data blocks from sending
// see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241
continue
}
go func() {
startTime := time.Now()
ch <- c.sendBlock(block)

View file

@ -87,24 +87,6 @@ var (
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+
"See also -remoteWrite.dropSamplesOnOverload")
@ -139,6 +121,12 @@ func MultitenancyEnabled() bool {
// Contains the current relabelConfigs.
var allRelabelConfigs atomic.Pointer[relabelConfigs]
// Contains the current global stream aggregators.
var sasGlobal atomic.Pointer[streamaggr.Aggregators]
// Contains the current global deduplicator.
var deduplicatorGlobal *streamaggr.Deduplicator
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
var maxQueues = cgroup.AvailableCPUs() * 16
@ -215,6 +203,17 @@ func Init() {
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
sasFile, sasOpts := getStreamAggrOpts(-1)
if sasFile != "" {
sas, err := newStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -streamAggr.config=%q: %s", sasFile, err)
}
sasGlobal.Store(sas)
} else if sasOpts.DedupInterval > 0 {
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels)
}
if len(*remoteWriteURLs) > 0 {
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
}
@ -301,12 +300,6 @@ var (
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
)
func reloadStreamAggrConfigs() {
for _, rwctx := range rwctxs {
rwctx.reinitStreamAggr()
}
}
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
if len(urls) == 0 {
logger.Panicf("BUG: urls must be non-empty")
@ -342,8 +335,10 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
return rwctxs
}
var configReloaderStopCh = make(chan struct{})
var configReloaderWG sync.WaitGroup
var (
configReloaderStopCh = make(chan struct{})
configReloaderWG sync.WaitGroup
)
// StartIngestionRateLimiter starts ingestion rate limiter.
//
@ -381,6 +376,10 @@ func Stop() {
close(configReloaderStopCh)
configReloaderWG.Wait()
sasGlobal.Load().MustStop()
deduplicatorGlobal.MustStop()
deduplicatorGlobal = nil
for _, rwctx := range rwctxs {
rwctx.MustStop()
}
@ -457,6 +456,8 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
sas := sasGlobal.Load()
for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce the maximum memory usage
samplesCount := 0
@ -491,6 +492,17 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
}
sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock)
if sas != nil {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tssBlock, matchIdxs.B)
if !*streamAggrGlobalKeepInput {
tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput)
}
matchIdxsPool.Put(matchIdxs)
} else if deduplicatorGlobal != nil {
deduplicatorGlobal.Push(tssBlock)
tssBlock = tssBlock[:0]
}
if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) {
return false
}
@ -498,6 +510,12 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
return true
}
func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) {
if tryPushBlockToRemoteStorages(tss, true) {
return
}
}
func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
if len(tssBlock) == 0 {
// Nothing to push
@ -808,17 +826,9 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
}
// Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx)
sasFile, sasOpts := getStreamAggrOpts(argIdx)
if sasFile != "" {
opts := &streamaggr.Options{
DedupInterval: dedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: ignoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
sas, err := newStreamAggrConfig(argIdx, rwctx.pushInternalTrackDropped)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
}
@ -827,8 +837,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
} else if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels)
} else if sasOpts.DedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, sasOpts.DedupInterval, sasOpts.DropInputLabels)
}
return rwctx
@ -985,40 +995,6 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo
return ok
}
func (rwctx *remoteWriteCtx) reinitStreamAggr() {
sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx)
if sasFile == "" {
// There is no stream aggregation for rwctx
return
}
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(rwctx.idx),
}
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
return
}
sas := rwctx.sas.Load()
if !sasNew.Equal(sas) {
sasOld := rwctx.sas.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
} else {
sasNew.MustStop()
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
var tssPool = &sync.Pool{
New: func() interface{} {
a := []prompbmarshal.TimeSeries{}
@ -1034,27 +1010,6 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int {
return rowsCount
}
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
func CheckStreamAggrConfigs() error {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
for idx, sasFile := range *streamAggrConfig {
if sasFile == "" {
continue
}
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
}
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts)
if err != nil {
return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
}
sas.MustStop()
}
return nil
}
func newMapFromStrings(a []string) map[string]struct{} {
m := make(map[string]struct{}, len(a))
for _, s := range a {

View file

@ -0,0 +1,160 @@
package remotewrite
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
var (
// Global config
streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+
"with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDedupInterval = flagutil.NewDuration("streamAggr.dedupInterval", "0s", "Input samples are de-duplicated with this interval on "+
"aggregator before optional aggregation with -streamAggr.config . "+
"See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrGlobalIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the "+
"current aggregation interval for aggregator. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrGlobalIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start for "+
"aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from "+
"clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples for aggregator "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
// Per URL config
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+
"aggregation interval for the corresponding -remoteWrite.streamAggr.config . "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if "+
"you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("remoteWrite.streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
)
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
func CheckStreamAggrConfigs() error {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
if _, err := newStreamAggrConfig(-1, pushNoop); err != nil {
return fmt.Errorf("could not load -streamAggr.config stream aggregation config: %w", err)
}
if len(*streamAggrConfig) > len(*remoteWriteURLs) {
return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*streamAggrConfig), len(*remoteWriteURLs))
}
for idx := range *streamAggrConfig {
if _, err := newStreamAggrConfig(idx, pushNoop); err != nil {
return err
}
}
return nil
}
// HasAnyStreamAggrConfigured checks if any streaming aggregation config provided
func HasAnyStreamAggrConfigured() bool {
return len(*streamAggrConfig) > 0 || *streamAggrGlobalConfig != ""
}
func reloadStreamAggrConfigs() {
reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
for idx, rwctx := range rwctxs {
reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped)
}
}
func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) {
path, opts := getStreamAggrOpts(idx)
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err)
return
}
var sas *streamaggr.Aggregators
if idx < 0 {
sas = sasGlobal.Load()
} else {
sas = rwctxs[idx].sas.Load()
}
if !sasNew.Equal(sas) {
var sasOld *streamaggr.Aggregators
if idx < 0 {
sasOld = sasGlobal.Swap(sasNew)
} else {
sasOld = rwctxs[idx].sas.Swap(sasNew)
}
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
} else {
sasNew.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
}
func getStreamAggrOpts(idx int) (string, *streamaggr.Options) {
if idx < 0 {
return *streamAggrGlobalConfig, &streamaggr.Options{
DedupInterval: streamAggrGlobalDedupInterval.Duration(),
DropInputLabels: *streamAggrGlobalDropInputLabels,
IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals,
}
}
opts := streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
if len(*streamAggrConfig) == 0 {
return "", &opts
}
return (*streamAggrConfig)[idx], &opts
}
func newStreamAggrConfigWithOpts(pushFunc streamaggr.PushFunc, path string, opts *streamaggr.Options) (*streamaggr.Aggregators, error) {
if len(path) == 0 {
// Skip empty stream aggregation config.
return nil, nil
}
return streamaggr.LoadFromFile(path, pushFunc, opts)
}
func newStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) {
path, opts := getStreamAggrOpts(idx)
return newStreamAggrConfigWithOpts(pushFunc, path, opts)
}

View file

@ -47,13 +47,17 @@ func insertRows(at *auth.Token, rows []parser.Row) error {
Value: tag.Value,
})
}
samples = append(samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: r.Timestamp,
})
samplesLen := len(samples)
for _, v := range r.Values {
samples = append(samples, prompbmarshal.Sample{
Value: v,
Timestamp: r.Timestamp,
})
}
tssDst = append(tssDst, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[len(samples)-1:],
Samples: samples[samplesLen:],
})
}
ctx.WriteRequest.Timeseries = tssDst

View file

@ -18,6 +18,8 @@ import (
"github.com/VictoriaMetrics/metricsql"
)
var numReg = regexp.MustCompile(`\D?\d*\D?`)
// series holds input_series defined in the test file
type series struct {
Series string `yaml:"series"`
@ -86,13 +88,12 @@ func writeInputSeries(input []series, interval *promutils.Duration, startStamp t
func parseInputValue(input string, origin bool) ([]sequenceValue, error) {
var res []sequenceValue
items := strings.Split(input, " ")
reg := regexp.MustCompile(`\D?\d*\D?`)
for _, item := range items {
if item == "stale" {
res = append(res, sequenceValue{Value: decimal.StaleNaN})
continue
}
vals := reg.FindAllString(item, -1)
vals := numReg.FindAllString(item, -1)
switch len(vals) {
case 1:
if vals[0] == "_" {

File diff suppressed because one or more lines are too long

View file

@ -7,6 +7,10 @@ import (
"net/http"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/valyala/fastjson"
)
var (
@ -31,27 +35,85 @@ type promResponse struct {
} `json:"stats,omitempty"`
}
// see https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
type promInstant struct {
Result []struct {
Labels map[string]string `json:"metric"`
TV [2]interface{} `json:"value"`
} `json:"result"`
// ms is populated after Unmarshal call
ms []Metric
}
func (r promInstant) metrics() ([]Metric, error) {
result := make([]Metric, len(r.Result))
for i, res := range r.Result {
f, err := strconv.ParseFloat(res.TV[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err)
}
var m Metric
m.SetLabels(res.Labels)
m.Timestamps = append(m.Timestamps, int64(res.TV[0].(float64)))
m.Values = append(m.Values, f)
result[i] = m
// metrics returned parsed Metric slice
// Must be called only after Unmarshal
func (pi *promInstant) metrics() ([]Metric, error) {
return pi.ms, nil
}
var jsonParserPool fastjson.ParserPool
// Unmarshal unmarshals the given byte slice into promInstant
// It is using fastjson to reduce number of allocations compared to
// standard json.Unmarshal function.
// Response example:
//
// [{"metric":{"__name__":"up","job":"prometheus"},value": [ 1435781451.781,"1"]},
// {"metric":{"__name__":"up","job":"node"},value": [ 1435781451.781,"0"]}]
func (pi *promInstant) Unmarshal(b []byte) error {
p := jsonParserPool.Get()
defer jsonParserPool.Put(p)
v, err := p.ParseBytes(b)
if err != nil {
return err
}
return result, nil
rows, err := v.Array()
if err != nil {
return fmt.Errorf("cannot find the top-level array of result objects: %w", err)
}
pi.ms = make([]Metric, len(rows))
for i, row := range rows {
metric := row.Get("metric")
if metric == nil {
return fmt.Errorf("can't find `metric` object in %q", row)
}
labels := metric.GetObject()
r := &pi.ms[i]
r.Labels = make([]Label, 0, labels.Len())
labels.Visit(func(key []byte, v *fastjson.Value) {
lv, errLocal := v.StringBytes()
if errLocal != nil {
err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal)
return
}
r.Labels = append(r.Labels, Label{
Name: string(key),
Value: string(lv),
})
})
if err != nil {
return fmt.Errorf("error when parsing `metric` object in %q: %w", row, err)
}
value := row.Get("value")
if value == nil {
return fmt.Errorf("can't find `value` object in %q", row)
}
sample := value.GetArray()
if len(sample) != 2 {
return fmt.Errorf("object `value` in %q should contain 2 values, but contains %d instead", row, len(sample))
}
r.Timestamps = []int64{sample[0].GetInt64()}
val, err := sample[1].StringBytes()
if err != nil {
return fmt.Errorf("error when parsing `value` object %q: %s", sample[1], err)
}
f, err := strconv.ParseFloat(bytesutil.ToUnsafeString(val), 64)
if err != nil {
return fmt.Errorf("error when parsing float64 from %s in %q: %w", sample[1], row, err)
}
r.Values = []float64{f}
}
return nil
}
type promRange struct {
@ -118,7 +180,7 @@ func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result
switch r.Data.ResultType {
case rtVector:
var pi promInstant
if err := json.Unmarshal(r.Data.Result, &pi.Result); err != nil {
if err := pi.Unmarshal(r.Data.Result); err != nil {
return res, fmt.Errorf("unmarshal err %w; \n %#v", err, string(r.Data.Result))
}
parseFn = pi.metrics

View file

@ -1,20 +1,73 @@
package datasource
import (
"encoding/json"
"reflect"
"testing"
)
func BenchmarkMetrics(b *testing.B) {
payload := []byte(`[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]},{"metric":{"__name__":"vm_requests", "foo":"bar", "baz": "qux"},"value":[1583786140,"2000"]}]`)
var pi promInstant
if err := json.Unmarshal(payload, &pi.Result); err != nil {
b.Fatalf(err.Error())
}
b.Run("Instant", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = pi.metrics()
func TestPromInstant_UnmarshalPositive(t *testing.T) {
f := func(data string, exp []Metric) {
t.Helper()
var pi promInstant
err := pi.Unmarshal([]byte(data))
if err != nil {
t.Fatalf("unexpected unmarshal err %v; \n %v", err, string(data))
}
got, _ := pi.metrics()
if !reflect.DeepEqual(got, exp) {
t.Fatalf("expected to get:\n%v\ngot instead:\n%v", exp, got)
}
}
f(`[{"metric":{"__name__":"up"},"value":[1583780000,"42"]}]`, []Metric{
{
Labels: []Label{{Name: "__name__", Value: "up"}},
Timestamps: []int64{1583780000},
Values: []float64{42},
},
})
f(`[
{"metric":{"__name__":"up"},"value":[1583780000,"42"]},
{"metric":{"__name__":"foo"},"value":[1583780001,"7"]},
{"metric":{"__name__":"baz", "instance":"bar"},"value":[1583780002,"8"]}]`, []Metric{
{
Labels: []Label{{Name: "__name__", Value: "up"}},
Timestamps: []int64{1583780000},
Values: []float64{42},
},
{
Labels: []Label{{Name: "__name__", Value: "foo"}},
Timestamps: []int64{1583780001},
Values: []float64{7},
},
{
Labels: []Label{{Name: "__name__", Value: "baz"}, {Name: "instance", Value: "bar"}},
Timestamps: []int64{1583780002},
Values: []float64{8},
},
})
}
func TestPromInstant_UnmarshalNegative(t *testing.T) {
f := func(data string) {
t.Helper()
var pi promInstant
err := pi.Unmarshal([]byte(data))
if err == nil {
t.Fatalf("expected to get an error; got nil instead")
}
}
f(``)
f(`foo`)
f(`[{"metric":{"__name__":"up"},"value":[1583780000,"42"]},`)
f(`[{"metric":{"__name__"},"value":[1583780000,"42"]},`)
// no `metric` object
f(`[{"value":[1583780000,"42"]}]`)
// no `value` object
f(`[{"metric":{"__name__":"up"}}]`)
// less than 2 values in `value` object
f(`[{"metric":{"__name__":"up"},"value":["42"]}]`)
f(`[{"metric":{"__name__":"up"},"value":[1583780000]}]`)
// non-numeric sample value
f(`[{"metric":{"__name__":"up"},"value":[1583780000,"foo"]}]`)
}

View file

@ -0,0 +1,43 @@
package datasource
import (
"bytes"
"io"
"net/http"
"os"
"testing"
)
func BenchmarkMetrics(b *testing.B) {
payload := []byte(`[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]},{"metric":{"__name__":"vm_requests", "foo":"bar", "baz": "qux"},"value":[1583786140,"2000"]}]`)
var pi promInstant
if err := pi.Unmarshal(payload); err != nil {
b.Fatalf(err.Error())
}
b.Run("Instant", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _ = pi.metrics()
}
})
}
func BenchmarkParsePrometheusResponse(b *testing.B) {
req, _ := http.NewRequest("GET", "", nil)
resp := &http.Response{StatusCode: http.StatusOK}
data, err := os.ReadFile("testdata/instant_response.json")
if err != nil {
b.Fatalf("error while reading file: %s", err)
}
resp.Body = io.NopCloser(bytes.NewReader(data))
b.Run("Instant", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, err := parsePrometheusResponse(req, resp)
if err != nil {
b.Fatalf("unexpected parse err: %s", err)
}
resp.Body = io.NopCloser(bytes.NewReader(data))
}
})
}

View file

@ -71,6 +71,11 @@ func CheckStreamAggrConfig() error {
return nil
}
// HasStreamAggrConfigured checks if streamAggr config provided
func HasStreamAggrConfigured() bool {
return *streamAggrConfig != ""
}
// InitStreamAggr must be called after flag.Parse and before using the common package.
//
// MustStopStreamAggr must be called when stream aggr is no longer needed.

View file

@ -38,6 +38,7 @@ import (
opentsdbserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdb"
opentsdbhttpserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/opentsdbhttp"
statsdserver "github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver/statsd"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
@ -55,6 +56,8 @@ var (
"See also -statsdListenAddr.useProxyProtocol")
statsdUseProxyProtocol = flag.Bool("statsdListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -statsdListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
statsdDisableAggregationEnforcemenet = flag.Bool(`statsd.disableAggregationEnforcement`, false, "Whether to disable streaming aggregation requirement check. "+
"It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database.")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write . "+
"See also -influxListenAddr.useProxyProtocol")
@ -100,6 +103,9 @@ func Init() {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
if len(*statsdListenAddr) > 0 {
if !vminsertCommon.HasStreamAggrConfigured() && !*statsdDisableAggregationEnforcemenet {
logger.Fatalf("streaming aggregation must be configured with enabled statsd server. It's recommended to aggregate metrics received at statsd listener. This check could be disabled with flag -statsd.disableAggregationEnforcement")
}
statsdServer = statsdserver.MustStart(*statsdListenAddr, *statsdUseProxyProtocol, statsd.InsertHandler)
}
if len(*influxListenAddr) > 0 {

View file

@ -44,9 +44,15 @@ func insertRows(rows []parser.Row) error {
continue
}
ctx.SortLabelsIfNeeded()
if err := ctx.WriteDataPoint(nil, ctx.Labels, r.Timestamp, r.Value); err != nil {
return err
var metricName []byte
var err error
for _, v := range r.Values {
metricName, err = ctx.WriteDataPointExt(metricName, ctx.Labels, r.Timestamp, v)
if err != nil {
return err
}
}
}
rowsInserted.Add(len(rows))
rowsPerInsert.Update(float64(len(rows)))

View file

@ -3,7 +3,7 @@ import dayjs, { Dayjs } from "dayjs";
import CalendarHeader from "./CalendarHeader/CalendarHeader";
import CalendarBody from "./CalendarBody/CalendarBody";
import YearsList from "./YearsList/YearsList";
import { DATE_TIME_FORMAT } from "../../../../constants/date";
import { DATE_FORMAT, DATE_TIME_FORMAT } from "../../../../constants/date";
import "./style.scss";
import useDeviceDetect from "../../../../hooks/useDeviceDetect";
import classNames from "classnames";
@ -31,8 +31,8 @@ const Calendar: FC<DatePickerProps> = ({
const [viewDate, setViewDate] = useState(dayjs.tz(date));
const [selectDate, setSelectDate] = useState(dayjs.tz(date));
const today = dayjs().startOf("day").tz();
const viewDateIsToday = today.format() === viewDate.format();
const today = dayjs.tz();
const viewDateIsToday = today.format(DATE_FORMAT) === viewDate.format(DATE_FORMAT);
const { isMobile } = useDeviceDetect();
const toggleDisplayYears = () => {

View file

@ -1,6 +1,7 @@
import React, { FC, useMemo } from "preact/compat";
import dayjs, { Dayjs } from "dayjs";
import classNames from "classnames";
import Tooltip from "../../../Tooltip/Tooltip";
interface CalendarBodyProps {
viewDate: Dayjs
@ -10,9 +11,10 @@ interface CalendarBodyProps {
const weekday = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"];
const CalendarBody: FC<CalendarBodyProps> = ({ viewDate, selectDate, onChangeSelectDate }) => {
const CalendarBody: FC<CalendarBodyProps> = ({ viewDate: date, selectDate, onChangeSelectDate }) => {
const format = "YYYY-MM-DD";
const today = dayjs().tz().startOf("day");
const today = dayjs.tz();
const viewDate = dayjs(date.format(format));
const days: (Dayjs|null)[] = useMemo(() => {
const result = new Array(42).fill(null);
@ -32,12 +34,14 @@ const CalendarBody: FC<CalendarBodyProps> = ({ viewDate, selectDate, onChangeSel
return (
<div className="vm-calendar-body">
{weekday.map(w => (
<div
className="vm-calendar-body-cell vm-calendar-body-cell_weekday"
<Tooltip
title={w}
key={w}
>
{w[0]}
</div>
<div className="vm-calendar-body-cell vm-calendar-body-cell_weekday">
{w[0]}
</div>
</Tooltip>
))}
{days.map((d, i) => (

View file

@ -1,6 +1,6 @@
import { ErrorTypes } from "../../../types";
import { useAppState } from "../../../state/common/StateContext";
import { useEffect, useState } from "preact/compat";
import { useEffect, useRef, useState } from "preact/compat";
import { CardinalityRequestsParams, getCardinalityInfo } from "../../../api/tsdb";
import { TSDBStatus } from "../types";
import AppConfigurator from "../appConfigurator";
@ -25,6 +25,7 @@ export const useFetchQuery = (): {
const topN = +(searchParams.get("topN") || 10);
const date = searchParams.get("date") || dayjs().tz().format(DATE_FORMAT);
const prevDate = usePrevious(date);
const prevTotal = useRef<{data: TSDBStatus}>();
const { serverUrl } = useAppState();
const [isLoading, setIsLoading] = useState(false);
@ -72,7 +73,7 @@ export const useFetchQuery = (): {
const prevDayParams = {
...requestParams,
date: dayjs(requestParams.date).subtract(1, "day").tz().format(DATE_FORMAT),
date: dayjs(requestParams.date).subtract(1, "day").format(DATE_FORMAT),
};
const urls = [
@ -80,15 +81,16 @@ export const useFetchQuery = (): {
getCardinalityInfo(serverUrl, prevDayParams),
];
if (prevDate !== date) {
if (prevDate !== date && (requestParams.match || requestParams.focusLabel)) {
urls.push(getCardinalityInfo(serverUrl, totalParams));
}
try {
const [resp, respPrev, respTotals = {}] = await Promise.all(urls.map(getResponseJson));
const [resp, respPrev, respTotals] = await Promise.all(urls.map(getResponseJson));
const prevResult = { ...respPrev.data };
const { data: dataTotal } = respTotals;
const { data: dataTotal } = respTotals || prevTotal.current || resp;
prevTotal.current = { data: dataTotal as TSDBStatus };
const result: TSDBStatus = {
...resp.data,
totalSeries: resp.data?.totalSeries || resp.data?.headStats?.numSeries || 0,

View file

@ -31,7 +31,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
**Update note 1: the `-remoteWrite.multitenantURL` command-line flag at `vmagent` was removed starting from this release. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).**
**Update note 2: the `-streamAggr.dropInputLabels` command-line flag at `vmagent` was renamed to `-remoteWrite.streamAggr.dropInputLabels`. `-streamAggr.dropInputLabels` is now used for global streaming aggregation.**
* SECURITY: upgrade Go builder from Go1.22.2 to Go1.22.3. See [the list of issues addressed in Go1.22.3](https://github.com/golang/go/issues?q=milestone%3AGo1.22.3+label%3ACherryPickApproved).
@ -40,13 +40,18 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): add `Network Usage` panel to `Resource Usage` row.
* FEATURE: [dashboards/operator](https://grafana.com/grafana/dashboards/17869), [dashboards/backupmanager](https://grafana.com/grafana/dashboards/17798) and [dashboard/tenant-statistic](https://grafana.com/grafana/dashboards/16399): update dashboard to be compatible with Grafana 10+ version.
* FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support aggregation and deduplication configs before replicating data to configured `-remoteWrite.url` destinations. This saves CPU and memory resources when incoming data needs to be aggregated or deduplicated once and then replicated to multiple destinations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion!
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix calendar display when `UTC+00:00` timezone is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6239).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): remove redundant requests on the `Explore Cardinality` page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6240).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): prevent potential panic during [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) if more than one `--remoteWrite.streamAggr.dedupInterval` is configured. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6205).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent/): skip empty data blocks before sending to the remote write destination. Thanks to @viperstars for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `<output>_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix.
* BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830).
* BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): correctly apply `-inmemoryDataFlushInterval` when it's set to minimum supported value 1s.

View file

@ -328,6 +328,7 @@ See also [increases_over_time](#increases_over_time).
`default_rollup(series_selector[d])` is a [rollup function](#rollup-functions), which returns the last [raw sample](https://docs.victoriametrics.com/keyconcepts/#raw-samples)
value on the given lookbehind window `d` per each time series returned from the given [series_selector](https://docs.victoriametrics.com/keyconcepts/#filtering).
Compared to [last_over_time](last_over_time) it accounts for [staleness markers](https://docs.victoriametrics.com/vmagent/#prometheus-staleness-markers) to detect stale series.
If the lookbehind window is skipped in square brackets, then it is automatically calculated as `max(step, scrape_interval)`, where `step` is the query arg value
passed to [/api/v1/query_range](https://docs.victoriametrics.com/keyconcepts/#range-query) or [/api/v1/query](https://docs.victoriametrics.com/keyconcepts/#instant-query),

View file

@ -705,45 +705,79 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
```
## How to send data from Statsd-compatible clients
## How to send data from StatsD-compatible clients
VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored).
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance,
the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`:
VictoriaMetrics supports extended StatsD protocol. Currently, it supports `tags` and `value packing`
extensions provided by [dogstatsd](https://docs.datadoghq.com/developers/dogstatsd/datagram_shell).
During parsing, metric's `<TYPE>` is added as a special label `__statsd_metric_type__`.
It is strongly advisable to configure streaming aggregation for each metric type. This process serves two primary
objectives:
* transformation of the StatsD data model into the VictoriaMetrics data model. VictoriaMetrics requires a consistent
interval between data points.
* minimizing of the disk space utilization and overall resource consumption during data ingestion.
VictoriaMetrics supports the following metric [types](https://docs.datadoghq.com/metrics/types):
* `c` Counter type.
* `g` Gauge type.
* `ms` Timer type.
* `m` Meters type.
* `h` Histogram type.
* `s` Set type with only numeric values.
* `d` Distribution type.
_The `Not Assigned` type is not supported due to the ambiguity surrounding its aggregation method.
The correct aggregation method cannot be determined for the undefined metric._
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag and configure [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/).
For instance, the following command will enable StatsD receiver in VictoriaMetrics on TCP and UDP port `8125`:
```console
/path/to/victoria-metrics-prod -statsdListenAddr=:8125
/path/to/victoria-metrics-prod -statsdListenAddr=:8125 -streamAggr.config=statsd_aggr.yaml
```
Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`:
Example of stream aggregation config:
```yaml
# statsd_aggr.yaml
# `last` output will keep the last sample on `interval`
# for each series that match `{__statsd_metric_type__="g"}` selector
- match: '{__statsd_metric_type__="g"}'
outputs: [last]
interval: 1m
```
Example for writing data with StatsD plaintext protocol to local VictoriaMetrics using `nc`:
```console
echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125
echo "foo.bar:123|g|#tag1:baz" | nc -N localhost 8125
```
Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it.
_An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go._
An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
Explicit setting of timestamps is not supported for StatsD protocol. Timestamp is set to the current time when
VictoriaMetrics or vmagent receives it.
<div class="with-copy" markdown="1">
Once ingested, the data can be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```console
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo.*"}'
```
</div>
_Please note, with stream aggregation enabled data will become available only after specified aggregation interval._
The `/api/v1/export` endpoint should return the following response:
```json
{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]}
{"metric":{"__name__":"foo.bar:1m_last","__statsd_metric_type__":"g","tag1":"baz"},"values":[123],"timestamps":[1715843939000]}
```
Some examples of compatible statsd clients:
- [statsd-instrument](https://github.com/Shopify/statsd-instrument)
- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby)
- [go-statsd-client](https://github.com/cactus/go-statsd-client)
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance,
@ -3175,6 +3209,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0)
-sortLabels
Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit
-statsd.disableAggregationEnforcement
Whether to disable streaming aggregation requirement check. It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database.
-statsdListenAddr string
TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol
-statsdListenAddr.useProxyProtocol
Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
-storage.cacheSizeIndexDBDataBlocks size
Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View file

@ -713,45 +713,79 @@ The `/api/v1/export` endpoint should return the following response:
{"metric":{"__name__":"measurement_field2","tag1":"value1","tag2":"value2"},"values":[1.23],"timestamps":[1695902762311]}
```
## How to send data from Statsd-compatible clients
## How to send data from StatsD-compatible clients
VictoriaMetrics supports extended statsd protocol with tags. Also it does not support sampling and metric types(it will be ignored).
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag. For instance,
the following command will enable Statsd receiver in VictoriaMetrics on TCP and UDP port `8125`:
VictoriaMetrics supports extended StatsD protocol. Currently, it supports `tags` and `value packing`
extensions provided by [dogstatsd](https://docs.datadoghq.com/developers/dogstatsd/datagram_shell).
During parsing, metric's `<TYPE>` is added as a special label `__statsd_metric_type__`.
It is strongly advisable to configure streaming aggregation for each metric type. This process serves two primary
objectives:
* transformation of the StatsD data model into the VictoriaMetrics data model. VictoriaMetrics requires a consistent
interval between data points.
* minimizing of the disk space utilization and overall resource consumption during data ingestion.
VictoriaMetrics supports the following metric [types](https://docs.datadoghq.com/metrics/types):
* `c` Counter type.
* `g` Gauge type.
* `ms` Timer type.
* `m` Meters type.
* `h` Histogram type.
* `s` Set type with only numeric values.
* `d` Distribution type.
_The `Not Assigned` type is not supported due to the ambiguity surrounding its aggregation method.
The correct aggregation method cannot be determined for the undefined metric._
Enable Statsd receiver in VictoriaMetrics by setting `-statsdListenAddr` command line flag and configure [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/).
For instance, the following command will enable StatsD receiver in VictoriaMetrics on TCP and UDP port `8125`:
```console
/path/to/victoria-metrics-prod -statsdListenAddr=:8125
/path/to/victoria-metrics-prod -statsdListenAddr=:8125 -streamAggr.config=statsd_aggr.yaml
```
Example for writing data with Statsd plaintext protocol to local VictoriaMetrics using `nc`:
Example of stream aggregation config:
```yaml
# statsd_aggr.yaml
# `last` output will keep the last sample on `interval`
# for each series that match `{__statsd_metric_type__="g"}` selector
- match: '{__statsd_metric_type__="g"}'
outputs: [last]
interval: 1m
```
Example for writing data with StatsD plaintext protocol to local VictoriaMetrics using `nc`:
```console
echo "foo.bar:123|g|#foo:bar" | nc -N localhost 8125
echo "foo.bar:123|g|#tag1:baz" | nc -N localhost 8125
```
Explicit setting of timestamps is not supported for statsd protocol. Timestamp is set to the current time when VictoriaMetrics or vmagent receives it.
_An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go._
An arbitrary number of lines delimited by `\n` (aka newline char) can be sent in one go.
After that the data may be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
Explicit setting of timestamps is not supported for StatsD protocol. Timestamp is set to the current time when
VictoriaMetrics or vmagent receives it.
<div class="with-copy" markdown="1">
Once ingested, the data can be read via [/api/v1/export](#how-to-export-data-in-json-line-format) endpoint:
```console
curl -G 'http://localhost:8428/api/v1/export' -d 'match=foo.bar.baz'
curl -G 'http://localhost:8428/api/v1/export' -d 'match={__name__=~"foo.*"}'
```
</div>
_Please note, with stream aggregation enabled data will become available only after specified aggregation interval._
The `/api/v1/export` endpoint should return the following response:
```json
{"metric":{"__name__":"foo.bar.baz","tag1":"value1","tag2":"value2"},"values":[123],"timestamps":[1560277406000]}
{"metric":{"__name__":"foo.bar:1m_last","__statsd_metric_type__":"g","tag1":"baz"},"values":[123],"timestamps":[1715843939000]}
```
Some examples of compatible statsd clients:
- [statsd-instrument](https://github.com/Shopify/statsd-instrument)
- [dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby)
- [go-statsd-client](https://github.com/cactus/go-statsd-client)
## How to send data from Graphite-compatible agents such as [StatsD](https://github.com/etsy/statsd)
Enable Graphite receiver in VictoriaMetrics by setting `-graphiteListenAddr` command line flag. For instance,
@ -3183,6 +3217,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0)
-sortLabels
Whether to sort labels for incoming samples before writing them to storage. This may be needed for reducing memory usage at storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}. Enabled sorting for labels can slow down ingestion performance a bit
-statsd.disableAggregationEnforcement
Whether to disable streaming aggregation requirement check. It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database.
-statsdListenAddr string
TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol
-statsdListenAddr.useProxyProtocol
Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
-storage.cacheSizeIndexDBDataBlocks size
Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/single-server-victoriametrics/#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)

View file

@ -19,15 +19,16 @@ The aggregation is applied to all the metrics received via any [supported data i
and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter)
after applying all the configured [relabeling stages](https://docs.victoriametrics.com/vmagent/#relabeling).
By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples).
It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples).
_By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples).
It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples)._
Stream aggregation can be configured via the following command-line flags:
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent/).
This flag can be specified individually per each `-remoteWrite.url`.
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and at [vmagent](https://docs.victoriametrics.com/vmagent/).
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent/) only.
This flag can be specified individually per each `-remoteWrite.url` and aggregation will happen independently for each of them.
This allows writing different aggregates to different remote storage destinations.
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/).
These flags must point to a file containing [stream aggregation config](#stream-aggregation-config).
The file may contain `%{ENV_VAR}` placeholders which are substituted by the corresponding `ENV_VAR` environment variable values.
@ -39,29 +40,61 @@ By default, the following data is written to the storage when stream aggregation
This behaviour can be changed via the following command-line flags:
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent/) and `-streamAggr.keepInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/).
- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`.
If one of these flags is set, then all the input samples are written to the storage alongside the aggregated samples.
The `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`.
- `-remoteWrite.streamAggr.dropInput` at [vmagent](https://docs.victoriametrics.com/vmagent/) and `-streamAggr.dropInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/).
- `-streamAggr.dropInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`.
If one of these flags are set, then all the input samples are dropped, while only the aggregated samples are written to the storage.
The `-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`.
## Routing
[Single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) supports relabeling,
deduplication and stream aggregation for all the received data, scraped or pushed.
The processed data is then stored in local storage and **can't be forwarded further**.
[vmagent](https://docs.victoriametrics.com/vmagent/) supports relabeling, deduplication and stream aggregation for all
the received data, scraped or pushed. Then, the collected data will be forwarded to specified `-remoteWrite.url` destinations.
The data processing order is the following:
1. All the received data is [relabeled](https://docs.victoriametrics.com/vmagent/#relabeling) according to
specified `-remoteWrite.relabelConfig`;
1. All the received data is [deduplicated](https://docs.victoriametrics.com/stream-aggregation/#deduplication)
according to specified `-streamAggr.dedupInterval`;
1. All the received data is aggregated according to specified `-streamAggr.config`;
1. The resulting data from p1 and p2 is then replicated to each `-remoteWrite.url`;
1. Data sent to each `-remoteWrite.url` can be additionally relabeled according to the
corresponding `-remoteWrite.urlRelabelConfig` (set individually per URL);
1. Data sent to each `-remoteWrite.url` can be additionally deduplicated according to the
corresponding `-remoteWrite.streamAggr.dedupInterval` (set individually per URL);
1. Data sent to each `-remoteWrite.url` can be additionally aggregated according to the
corresponding `-remoteWrite.streamAggr.config` (set individually per URL). Please note, it is not recommended
to use `-streamAggr.config` and `-remoteWrite.streamAggr.config` together, unless you understand the complications.
Typical scenarios for data routing with vmagent:
1. **Aggregate incoming data and replicate to N destinations**. For this one should configure `-streamAggr.config`
to aggregate the incoming data before replicating it to all the configured `-remoteWrite.url` destinations.
2. **Individually aggregate incoming data for each destination**. For this on should configure `-remoteWrite.streamAggr.config`
for each `-remoteWrite.url` destination. [Relabeling](https://docs.victoriametrics.com/vmagent/#relabeling)
via `-remoteWrite.urlRelabelConfig` can be used for routing only selected metrics to each `-remoteWrite.url` destination.
## Deduplication
[vmagent](https://docs.victoriametrics.com/vmagent/) supports online [de-duplication](https://docs.victoriametrics.com/#deduplication) of samples
before sending them to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options:
- By specifying the desired de-duplication interval via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url`.
- By specifying the desired de-duplication interval via `-streamAggr.dedupInterval` command-line flag for all received data
or via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url` destination.
For example, `./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=30s` instructs `vmagent` to leave
only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
The de-duplication is performed after applying `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling).
The de-deduplication is performed after applying [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling) and
before performing the aggregation.
If the `-remoteWrite.streamAggr.config` and / or `-streamAggr.config` is set, then the de-duplication is performed individually per each
[stream aggregation config](#stream-aggregation-config) for the matching samples after applying [input_relabel_configs](#relabeling).
If the `-remoteWrite.streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config)
for the matching samples after applying [input_relabel_configs](#relabeling).
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-remoteWrite.streamAggr.config`.
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config)
in `-remoteWrite.streamAggr.config` or `-streamAggr.config` configs.
[Single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) supports two types of de-duplication:
- After storing the duplicate samples to local storage. See [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication) command-line option.
@ -82,11 +115,12 @@ The online de-duplication uses the same logic as [`-dedup.minScrapeInterval` com
## Ignoring old samples
By default, all the input samples are taken into account during stream aggregation. If samples with old timestamps outside the current [aggregation interval](#stream-aggregation-config)
must be ignored, then the following options can be used:
By default, all the input samples are taken into account during stream aggregation. If samples with old timestamps
outside the current [aggregation interval](#stream-aggregation-config) must be ignored, then the following options can be used:
- To pass `-remoteWrite.streamAggr.ignoreOldSamples` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
or `-streamAggr.ignoreOldSamples` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/).
- To pass `-streamAggr.ignoreOldSamples` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/)
or to [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.ignoreOldSamples` flag can be specified individually per each `-remoteWrite.url`.
This enables ignoring old samples for all the [aggregation configs](#stream-aggregation-config).
- To set `ignore_old_samples: true` option at the particular [aggregation config](#stream-aggregation-config).
@ -99,11 +133,12 @@ received from clients that maintain a queue of unsent data, such as Prometheus o
cleared within the aggregation `interval`, only a portion of the time series may be processed, leading to distorted
calculations. To mitigate this, consider the following options:
- Set `-remoteWrite.streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
or `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/)
to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config)
from persisting to the storage. It is expected that all incomplete or queued data will be processed during
specified `<intervalsCount>` and all subsequent aggregation intervals will produce correct data.
- Set `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/)
or to [vmagent](https://docs.victoriametrics.com/vmagent/) to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config)
from persisting to the storage. At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.ignoreFirstIntervals=<intervalsCount>` flag can be specified individually per each `-remoteWrite.url`.
It is expected that all incomplete or queued data will be processed during specified `<intervalsCount>`
and all subsequent aggregation intervals will produce correct data.
- Set `ignore_first_intervals: <intervalsCount>` option individually per [aggregation config](#stream-aggregation-config).
This enables ignoring first `<intervalsCount>` aggregation intervals for that particular aggregation config.
@ -480,7 +515,8 @@ See also [dropping unneded labels](#dropping-unneeded-labels).
If you need dropping some labels from input samples before [input relabeling](#relabeling), [de-duplication](#deduplication)
and [stream aggregation](#aggregation-outputs), then the following options exist:
- To specify comma-separated list of label names to drop in `-streamAggr.dropInputLabels` command-line flag.
- To specify comma-separated list of label names to drop in `-streamAggr.dropInputLabels` command-line flag
or via `-remoteWrite.streamAggr.dropInputLabels` individually per each `-remoteWrite.url`.
For example, `-streamAggr.dropInputLabels=replica,az` instructs to drop `replica` and `az` labels from input samples
before applying de-duplication and stream aggregation.
@ -879,9 +915,10 @@ See also [aggregation outputs](#aggregation-outputs).
## Stream aggregation config
Below is the format for stream aggregation config file, which may be referred via `-remoteWrite.streamAggr.config` command-line flag
at [vmagent](https://docs.victoriametrics.com/vmagent/) or via `-streamAggr.config` command-line flag
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/):
Below is the format for stream aggregation config file, which may be referred via `-streamAggr.config` command-line flag at
[single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/).
At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.config` command-line flag can be
specified individually per each `-remoteWrite.url`:
```yaml
# match is an optional filter for incoming samples to aggregate.
@ -957,13 +994,13 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-
# ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval.
# See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
# See also -streamAggr.ignoreOldSamples command-line flag.
# See also -remoteWrite.streamAggr.ignoreOldSamples or -streamAggr.ignoreOldSamples command-line flag.
#
# ignore_old_samples: false
# ignore_first_intervals instructs ignoring first N aggregation intervals after process start.
# See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
# See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals
# See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals command-line flag.
#
# ignore_first_intervals: false
@ -1019,6 +1056,8 @@ support the following approaches for hot reloading stream aggregation configs fr
The following outputs track the last seen per-series values in order to properly calculate output values:
- [rate_sum](#rate_sum)
- [rate_avg](#rate_avg)
- [total](#total)
- [total_prometheus](#total_prometheus)
- [increase](#increase)

View file

@ -121,7 +121,8 @@ additionally to pull-based Prometheus-compatible targets' scraping:
`vmagent` should be restarted in order to update config options set via command-line args.
`vmagent` supports multiple approaches for reloading configs from updated config files such as
`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig`, `-streamAggr.config`
and `-remoteWrite.streamAggr.config`:
* Sending `SIGHUP` signal to `vmagent` process:
@ -1972,6 +1973,8 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs/#nomad_sd_configs for details (default 30s)
-promscrape.openstackSDCheckInterval duration
Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs/#openstack_sd_configs for details (default 30s)
-promscrape.scrapeExemplars
Whether to enable scraping of exemplars from scrape targets.
-promscrape.seriesLimitPerTarget int
Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter for more info
-promscrape.streamParse
@ -2173,6 +2176,10 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to false.
-remoteWrite.streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-remoteWrite.streamAggr.ignoreOldSamples array
@ -2221,22 +2228,41 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
The compression level for VictoriaMetrics remote write protocol. Higher values reduce network traffic at the cost of higher CPU usage. Negative values reduce CPU usage at the cost of increased network traffic. See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol
-sortLabels
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
-statsd.disableAggregationEnforcement
Whether to disable streaming aggregation requirement check. It's recommended to run statsdServer with pre-configured streaming aggregation to decrease load at database.
-statsdListenAddr string
TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol
-statsdListenAddr.useProxyProtocol
Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval value
Input samples are de-duplicated with this interval on aggregator before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0s)
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/
-streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
An optional list of labels to drop from samples for aggregator before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start for aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-streamAggr.ignoreOldSamples
Whether to ignore input samples with old timestamps outside the current aggregation interval for aggregator. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
-streamAggr.keepInput
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/
-tls array
Whether to enable TLS for incoming HTTP requests at the given -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set. See also -mtls
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to false.
-tlsAutocertCacheDir string
Directory to store TLS certificates issued via Let's Encrypt. Certificates are lost on restarts if this flag isn't set. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/
-tlsAutocertEmail string
Contact email for the issued Let's Encrypt TLS certificates. See also -tlsAutocertHosts and -tlsAutocertCacheDir .This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/
-tlsAutocertHosts array
Optional hostnames for automatic issuing of Let's Encrypt TLS certificates. These hostnames must be reachable at -httpListenAddr . The -httpListenAddr must listen tcp port 443 . The -tlsAutocertHosts overrides -tlsCertFile and -tlsKeyFile . See also -tlsAutocertEmail and -tlsAutocertCacheDir . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
Whether to enable TLS for incoming HTTP requests at the given -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set. See also -mtls
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to false.
-tlsAutocertCacheDir string
Directory to store TLS certificates issued via Let's Encrypt. Certificates are lost on restarts if this flag isn't set. This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/
-tlsAutocertEmail string
Contact email for the issued Let's Encrypt TLS certificates. See also -tlsAutocertHosts and -tlsAutocertCacheDir .This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/
-tlsAutocertHosts array
Optional hostnames for automatic issuing of Let's Encrypt TLS certificates. These hostnames must be reachable at -httpListenAddr . The -httpListenAddr must listen tcp port 443 . The -tlsAutocertHosts overrides -tlsCertFile and -tlsKeyFile . See also -tlsAutocertEmail and -tlsAutocertCacheDir . This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-tlsCertFile array
Path to file with TLS certificate for the corresponding -httpListenAddr if -tls is set. Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. See also -tlsAutocertHosts
Supports an array of values separated by comma or specified via multiple flags.

View file

@ -5,15 +5,48 @@ import (
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat"
)
// Statsd metric format with tags: MetricName:value|type|@sample_rate|#tag1:value,tag1...
const statsdSeparator = '|'
const statsdPairsSeparator = ':'
const statsdTagsStartSeparator = '#'
const statsdTagsSeparator = ','
// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell?tab=metrics#the-dogstatsd-protocol
const (
statsdSeparator = '|'
statsdPairsSeparator = ':'
statsdTagsStartSeparator = '#'
statsdTagsSeparator = ','
)
const statsdTypeTagName = "__statsd_metric_type__"
// https://github.com/b/statsd_spec
var validTypes = []string{
// counter
"c",
// gauge
"g",
// histogram
"h",
// timer
"ms",
// distribution
"d",
// set
"s",
// meters
"m",
}
func isValidType(src string) bool {
for _, t := range validTypes {
if src == t {
return true
}
}
return false
}
// Rows contains parsed statsd rows.
type Rows struct {
@ -48,14 +81,14 @@ func (rs *Rows) Unmarshal(s string) {
type Row struct {
Metric string
Tags []Tag
Value float64
Values []float64
Timestamp int64
}
func (r *Row) reset() {
r.Metric = ""
r.Tags = nil
r.Value = 0
r.Values = r.Values[:0]
r.Timestamp = 0
}
@ -63,42 +96,72 @@ func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
r.reset()
originalString := s
s = stripTrailingWhitespace(s)
separatorPosition := strings.IndexByte(s, statsdSeparator)
if separatorPosition < 0 {
s = stripTrailingWhitespace(s)
} else {
s = stripTrailingWhitespace(s[:separatorPosition])
nextSeparator := strings.IndexByte(s, statsdSeparator)
if nextSeparator <= 0 {
return tagsPool, fmt.Errorf("cannot find type separator %q position at: %q", statsdSeparator, originalString)
}
metricWithValues := s[:nextSeparator]
s = s[nextSeparator+1:]
valuesSeparatorPosition := strings.IndexByte(metricWithValues, statsdPairsSeparator)
if valuesSeparatorPosition <= 0 {
return tagsPool, fmt.Errorf("cannot find metric name value separator=%q at: %q; original line: %q", statsdPairsSeparator, metricWithValues, originalString)
}
valuesSeparatorPosition := strings.LastIndexByte(s, statsdPairsSeparator)
if valuesSeparatorPosition == 0 {
return tagsPool, fmt.Errorf("cannot find metric name for %q", s)
r.Metric = metricWithValues[:valuesSeparatorPosition]
metricWithValues = metricWithValues[valuesSeparatorPosition+1:]
// datadog extension v1.1 for statsd allows multiple packed values at single line
for {
nextSeparator = strings.IndexByte(metricWithValues, statsdPairsSeparator)
if nextSeparator <= 0 {
// last element
metricWithValues = stripTrailingWhitespace(metricWithValues)
v, err := fastfloat.Parse(metricWithValues)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", metricWithValues, err, originalString)
}
r.Values = append(r.Values, v)
break
}
valueStr := metricWithValues[:nextSeparator]
v, err := fastfloat.Parse(valueStr)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, originalString)
}
r.Values = append(r.Values, v)
metricWithValues = metricWithValues[nextSeparator+1:]
}
if valuesSeparatorPosition < 0 {
return tagsPool, fmt.Errorf("cannot find separator for %q", s)
// search for the type end
nextSeparator = strings.IndexByte(s, statsdSeparator)
typeValue := s
if nextSeparator >= 0 {
typeValue = s[:nextSeparator]
s = s[nextSeparator+1:]
}
r.Metric = s[:valuesSeparatorPosition]
valueStr := s[valuesSeparatorPosition+1:]
v, err := fastfloat.Parse(valueStr)
if err != nil {
return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, originalString)
if !isValidType(typeValue) {
return tagsPool, fmt.Errorf("provided type=%q is not supported; original line: %q", typeValue, originalString)
}
r.Value = v
tagsStart := len(tagsPool)
tagsPool = slicesutil.SetLength(tagsPool, len(tagsPool)+1)
// add metric type as tag
tag := &tagsPool[len(tagsPool)-1]
tag.Key = statsdTypeTagName
tag.Value = typeValue
// parsing tags
tagsSeparatorPosition := strings.LastIndexByte(originalString, statsdTagsStartSeparator)
if tagsSeparatorPosition < 0 {
// no tags
// process tags
nextSeparator = strings.IndexByte(s, statsdTagsStartSeparator)
if nextSeparator < 0 {
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
return tagsPool, nil
}
tagsStr := s[nextSeparator+1:]
// search for end of tags
nextSeparator = strings.IndexByte(tagsStr, statsdSeparator)
if nextSeparator >= 0 {
tagsStr = tagsStr[:nextSeparator]
}
tagsStart := len(tagsPool)
tagsPool = unmarshalTags(tagsPool, originalString[tagsSeparatorPosition+1:])
tagsPool = unmarshalTags(tagsPool, tagsStr)
tags := tagsPool[tagsStart:]
r.Tags = tags[:len(tags):len(tags)]
@ -147,11 +210,7 @@ var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="statsd"}`)
func unmarshalTags(dst []Tag, s string) []Tag {
for {
if cap(dst) > len(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, Tag{})
}
dst = slicesutil.SetLength(dst, len(dst)+1)
tag := &dst[len(dst)-1]
n := strings.IndexByte(s, statsdTagsSeparator)

View file

@ -115,28 +115,65 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
f("\n\r\n", &Rows{})
// Single line
f(" 123:455", &Rows{
f(" 123:455|c", &Rows{
Rows: []Row{{
Metric: "123",
Value: 455,
Values: []float64{455},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
}},
})
// multiple values statsd dog v1.1
f(" 123:455:456|c", &Rows{
Rows: []Row{{
Metric: "123",
Values: []float64{455, 456},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
}},
})
f("123:455 |c", &Rows{
Rows: []Row{{
Metric: "123",
Value: 455,
Values: []float64{455},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
}},
})
f("foobar:-123.456|c", &Rows{
Rows: []Row{{
Metric: "foobar",
Value: -123.456,
Values: []float64{-123.456},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
}},
})
f("foo.bar:123.456|c\n", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123.456,
Values: []float64{123.456},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
}},
})
@ -144,23 +181,40 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
f("foo.bar:1|c|@0.1", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 1,
Values: []float64{1},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
}},
})
// without specifying metric unit
f("foo.bar:123", &Rows{
f("foo.bar:123|h", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123,
Values: []float64{123},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "h",
},
},
}},
})
// without specifying metric unit but with tags
f("foo.bar:123|#foo:bar", &Rows{
f("foo.bar:123|s|#foo:bar", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123,
Values: []float64{123},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "s",
},
{
Key: "foo",
Value: "bar",
@ -172,8 +226,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
f("foo.bar:123.456|c|#foo:bar,qwe:asd", &Rows{
Rows: []Row{{
Metric: "foo.bar",
Value: 123.456,
Values: []float64{123.456},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
{
Key: "foo",
Value: "bar",
@ -190,8 +249,12 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
f("s a:1|c|#ta g1:aaa1,tag2:bb b2", &Rows{
Rows: []Row{{
Metric: "s a",
Value: 1,
Values: []float64{1},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
{
Key: "ta g1",
Value: "aaa1",
@ -208,29 +271,49 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
f("foo:1|c", &Rows{
Rows: []Row{{
Metric: "foo",
Value: 1,
Values: []float64{1},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
}},
})
// Empty tag name
f("foo:1|#:123", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{},
Value: 1,
}},
})
// Empty tag value
f("foo:1|#tag1:", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{},
Value: 1,
}},
})
f("foo:1|#bar:baz,aa:,x:y,:z", &Rows{
f("foo:1|d|#:123", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "d",
},
},
Values: []float64{1},
}},
})
// Empty tag value
f("foo:1|s|#tag1:", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "s",
},
},
Values: []float64{1},
}},
})
f("foo:1|d|#bar:baz,aa:,x:y,:z", &Rows{
Rows: []Row{{
Metric: "foo",
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "d",
},
{
Key: "bar",
Value: "baz",
@ -240,7 +323,7 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
Value: "y",
},
},
Value: 1,
Values: []float64{1},
}},
})
@ -249,15 +332,33 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
Values: []float64{0.3},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
},
{
Metric: "aaa",
Value: 3,
Values: []float64{3},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "g",
},
},
},
{
Metric: "bar.baz",
Value: 0.34,
Values: []float64{0.34},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
},
},
})
@ -266,8 +367,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
Values: []float64{0.3},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
{
Key: "tag1",
Value: "1",
@ -280,8 +386,13 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
},
{
Metric: "aaa",
Value: 3,
Values: []float64{3},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "g",
},
{
Key: "tag3",
Value: "3",
@ -296,40 +407,87 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
})
// Multi lines with invalid line
f("foo:0.3|c\naaa\nbar.baz:0.34\n", &Rows{
f("foo:0.3|c\naaa\nbar.baz:0.34|c\n", &Rows{
Rows: []Row{
{
Metric: "foo",
Value: 0.3,
Values: []float64{0.3},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
},
{
Metric: "bar.baz",
Value: 0.34,
Values: []float64{0.34},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
},
},
})
// Whitespace after at the end
f("foo.baz:125|c\na:1.34\t ", &Rows{
f("foo.baz:125|c\na:1.34|h\t ", &Rows{
Rows: []Row{
{
Metric: "foo.baz",
Value: 125,
Values: []float64{125},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
},
},
{
Metric: "a",
Value: 1.34,
Values: []float64{1.34},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "h",
},
},
},
},
})
// ignores sample rate
f("foo.baz:125|c|@0.5#tag1:12", &Rows{
f("foo.baz:125|c|@0.5|#tag1:12", &Rows{
Rows: []Row{
{
Metric: "foo.baz",
Value: 125,
Values: []float64{125},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
{
Key: "tag1",
Value: "12",
},
},
},
},
})
// ignores container and timestamp
f("foo.baz:125|c|@0.5|#tag1:12|c:83c0a99c0a54c0c187f461c7980e9b57f3f6a8b0c918c8d93df19a9de6f3fe1d|T1656581400", &Rows{
Rows: []Row{
{
Metric: "foo.baz",
Values: []float64{125},
Tags: []Tag{
{
Key: statsdTypeTagName,
Value: "c",
},
{
Key: "tag1",
Value: "12",
@ -364,4 +522,10 @@ func TestRowsUnmarshalFailure(t *testing.T) {
// empty metric name
f(":12")
// empty type
f("foo:12")
// bad values
f("foo:12:baz|c")
}

View file

@ -2,11 +2,9 @@ package stream
import (
"bufio"
"flag"
"fmt"
"io"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@ -17,11 +15,6 @@ import (
"github.com/VictoriaMetrics/metrics"
)
var (
trimTimestamp = flag.Duration("statsdTrimTimestamp", time.Second, "Trim timestamps for Statsd data to this duration. "+
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
)
// Parse parses Statsd lines from r and calls callback for the parsed rows.
//
// The callback can be called concurrently multiple times for streamed data from r.
@ -141,8 +134,10 @@ func putStreamContext(ctx *streamContext) {
}
}
var streamContextPool sync.Pool
var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
var (
streamContextPool sync.Pool
streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
)
type unmarshalWork struct {
rows statsd.Rows
@ -181,20 +176,7 @@ func (uw *unmarshalWork) Unmarshal() {
for i := range rows {
r := &rows[i]
if r.Timestamp == 0 || r.Timestamp == -1 {
r.Timestamp = currentTimestamp
}
}
// Convert timestamps from seconds to milliseconds.
for i := range rows {
rows[i].Timestamp *= 1e3
}
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
r.Timestamp = currentTimestamp * 1e3
}
}

View file

@ -40,8 +40,14 @@ func Test_streamContext_Read(t *testing.T) {
// Full line without tags
f("aaa:1123|c", &statsd.Rows{
Rows: []statsd.Row{{
Metric: "aaa",
Value: 1123,
Metric: "aaa",
Tags: []statsd.Tag{
{
Key: "__statsd_metric_type__",
Value: "c",
},
},
Values: []float64{1123},
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})
@ -49,11 +55,17 @@ func Test_streamContext_Read(t *testing.T) {
f("aaa:1123|c|#x:y", &statsd.Rows{
Rows: []statsd.Row{{
Metric: "aaa",
Tags: []statsd.Tag{{
Key: "x",
Value: "y",
}},
Value: 1123,
Tags: []statsd.Tag{
{
Key: "__statsd_metric_type__",
Value: "c",
},
{
Key: "x",
Value: "y",
},
},
Values: []float64{1123},
Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
}},
})

View file

@ -27,6 +27,8 @@ import (
)
var supportedOutputs = []string{
"rate_sum",
"rate_avg",
"total",
"total_prometheus",
"increase",