diff --git a/app/vmagent/README.md b/app/vmagent/README.md index d398389fd..56b2c1ebe 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -24,8 +24,8 @@ additionally to [discovering Prometheus-compatible targets and scraping metrics see [these docs](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter). * Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details. * Can accept data via all the ingestion protocols supported by VictoriaMetrics - see [these docs](#how-to-push-data-to-vmagent). -* Can replicate collected metrics simultaneously to multiple remote storage systems - - see [these docs](#replication-and-high-availability). +* Can aggregate incoming samples by time and by labels before sending them to remote storage - see [these docs](https://docs.victoriametrics.com/stream-aggregation.html). +* Can replicate collected metrics simultaneously to multiple remote storage systems - see [these docs](#replication-and-high-availability). * Works smoothly in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as the connection to the remote storage is repaired. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`. @@ -126,6 +126,12 @@ If you use Prometheus only for scraping metrics from various targets and forward then `vmagent` can replace Prometheus. Typically, `vmagent` requires lower amounts of RAM, CPU and network bandwidth compared with Prometheus. See [these docs](#how-to-collect-metrics-in-prometheus-format) for details. +### Statsd alternative + +`vmagent` can be used as an alternative to [statsd](https://github.com/statsd/statsd) +when [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) is enabled. +See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#statsd-alternative) for details. + ### Flexible metrics relay `vmagent` can accept metrics in [various popular data ingestion protocols](#how-to-push-data-to-vmagent), apply [relabeling](#relabeling) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index d64d38113..5a67c4a34 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -21,6 +21,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" "github.com/cespare/xxhash/v2" @@ -58,6 +59,13 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+ "Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") + + streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html ."+ + "See also -remoteWrite.streamAggr.keepInput") + streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config ."+ + "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ + "See https://docs.victoriametrics.com/stream-aggregation.html") ) var ( @@ -140,6 +148,7 @@ func Init() { logger.Fatalf("cannot load relabel configs: %s", err) } allRelabelConfigs.Store(rcs) + configSuccess.Set(1) configTimestamp.Set(fasttime.UnixTimestamp()) @@ -435,9 +444,13 @@ var ( ) type remoteWriteCtx struct { - idx int - fq *persistentqueue.FastQueue - c *client + idx int + fq *persistentqueue.FastQueue + c *client + + sas *streamaggr.Aggregators + streamAggrKeepInput bool + pss []*pendingSeries pssNextIdx uint64 @@ -469,6 +482,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI } c.init(argIdx, *queues, sanitizedURL) + // Initialize pss sf := significantFigures.GetOptionalArgOrDefault(argIdx, 0) rd := roundDigits.GetOptionalArgOrDefault(argIdx, 100) pssLen := *queues @@ -481,7 +495,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI for i := range pss { pss[i] = newPendingSeries(fq.MustWriteBlock, sf, rd) } - return &remoteWriteCtx{ + + rwctx := &remoteWriteCtx{ idx: argIdx, fq: fq, c: c, @@ -490,6 +505,19 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q, url=%q}`, queuePath, sanitizedURL)), rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), } + + // Initialize sas + sasFile := streamAggrConfig.GetOptionalArg(argIdx) + if sasFile != "" { + sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal) + if err != nil { + logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) + } + rwctx.sas = sas + rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) + } + + return rwctx } func (rwctx *remoteWriteCtx) MustStop() { @@ -501,6 +529,8 @@ func (rwctx *remoteWriteCtx) MustStop() { rwctx.fq.UnblockAllReaders() rwctx.c.MustStop() rwctx.c = nil + rwctx.sas.MustStop() + rwctx.sas = nil rwctx.fq.MustClose() rwctx.fq = nil @@ -509,6 +539,7 @@ func (rwctx *remoteWriteCtx) MustStop() { } func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { + // Apply relabeling var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries rcs := allRelabelConfigs.Load().(*relabelConfigs) @@ -526,11 +557,17 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { rowsCountAfterRelabel := getRowsCount(tss) rwctx.rowsDroppedByRelabel.Add(rowsCountBeforeRelabel - rowsCountAfterRelabel) } - pss := rwctx.pss - idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) rowsCount := getRowsCount(tss) rwctx.rowsPushedAfterRelabel.Add(rowsCount) - pss[idx].Push(tss) + + // Apply stream aggregation if any + rwctx.sas.Push(tss) + if rwctx.sas == nil || rwctx.streamAggrKeepInput { + // Push samples to the remote storage + rwctx.pushInternal(tss) + } + + // Return back relabeling contexts to the pool if rctx != nil { *v = prompbmarshal.ResetTimeSeries(tss) tssRelabelPool.Put(v) @@ -538,6 +575,12 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { } } +func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) { + pss := rwctx.pss + idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) + pss[idx].Push(tss) +} + var tssRelabelPool = &sync.Pool{ New: func() interface{} { a := []prompbmarshal.TimeSeries{} diff --git a/app/vmalert/README.md b/app/vmalert/README.md index 7ff7d747a..107529ad6 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -69,16 +69,17 @@ Then configure `vmalert` accordingly: -external.label=replica=a # Multiple external labels may be set ``` -Note there's a separate `remoteWrite.url` to allow writing results of +Note there's a separate `-remoteWrite.url` command-line flag to allow writing results of alerting/recording rules into a different storage than the initial data that's queried. This allows using `vmalert` to aggregate data from a short-term, high-frequency, high-cardinality storage into a long-term storage with decreased cardinality and a bigger interval between samples. +See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html). See the full list of configuration flags in [configuration](#configuration) section. If you run multiple `vmalert` services for the same datastore or AlertManager - do not forget -to specify different `external.label` flags in order to define which `vmalert` generated rules or alerts. +to specify different `-external.label` command-line flags in order to define which `vmalert` generated rules or alerts. Configuration for [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) and [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) rules is very @@ -514,8 +515,8 @@ groups: expr: avg_over_time(http_requests[5m]) ``` -Ability of `vmalert` to be configured with different `datasource.url` and `remoteWrite.url` allows -reading data from one data source and backfilling results to another. This helps to build a system +Ability of `vmalert` to be configured with different `-datasource.url` and `-remoteWrite.url` command-line flags +allows reading data from one data source and backfilling results to another. This helps to build a system for aggregating and downsampling the data. The following example shows how to build a topology where `vmalert` will process data from one cluster @@ -539,7 +540,7 @@ Please note, [replay](#rules-backfilling) feature may be used for transforming h Flags `-remoteRead.url` and `-notifier.url` are omitted since we assume only recording rules are used. -See also [downsampling docs](https://docs.victoriametrics.com/#downsampling). +See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) and [downsampling](https://docs.victoriametrics.com/#downsampling). #### Multiple remote writes diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 4be9c377e..80b8d0313 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to explore metrics exported by a particular `job` / `instance`. See [these docs](https://docs.victoriametrics.com/#metrics-explorer) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3386). * FEATURE: allow passing partial `RFC3339` date/time to `time`, `start` and `end` query args at [querying APIs](https://docs.victoriametrics.com/#prometheus-querying-api-usage) and [export APIs](https://docs.victoriametrics.com/#how-to-export-time-series). For example, `2022` is equivalent to `2022-01-01T00:00:00Z`, while `2022-01-30T14` is equivalent to `2022-01-30T14:00:00Z`. See [these docs](https://docs.victoriametrics.com/#timestamp-formats). * FEATURE: [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling): add support for `keepequal` and `dropequal` relabeling actions, which are supported by Prometheus starting from [v2.41.0](https://github.com/prometheus/prometheus/releases/tag/v2.41.0). These relabeling actions are almost identical to `keep_if_equal` and `drop_if_equal` relabeling actions supported by VictoriaMetrics since `v1.38.0` - see [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements) - so it is recommended sticking to `keep_if_equal` and `drop_if_equal` actions instead of switching to `keepequal` and `dropequal`. diff --git a/docs/README.md b/docs/README.md index 280c0bf0b..56ea4e324 100644 --- a/docs/README.md +++ b/docs/README.md @@ -83,6 +83,7 @@ VictoriaMetrics has the following prominent features: * [Arbitrary CSV data](#how-to-import-csv-data). * [Native binary format](#how-to-import-data-in-native-format). * [DataDog agent or DogStatsD](#how-to-send-data-from-datadog-agent). +* It supports powerful [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html), which can be used as a [statsd](https://github.com/statsd/statsd) alternative. * It supports metrics [relabeling](#relabeling). * It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues via [series limiter](#cardinality-limiter). diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 7100424a0..4fe577056 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -86,6 +86,7 @@ VictoriaMetrics has the following prominent features: * [Arbitrary CSV data](#how-to-import-csv-data). * [Native binary format](#how-to-import-data-in-native-format). * [DataDog agent or DogStatsD](#how-to-send-data-from-datadog-agent). +* It supports powerful [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html), which can be used as a [statsd](https://github.com/statsd/statsd) alternative. * It supports metrics [relabeling](#relabeling). * It can deal with [high cardinality issues](https://docs.victoriametrics.com/FAQ.html#what-is-high-cardinality) and [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) issues via [series limiter](#cardinality-limiter). diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md new file mode 100644 index 000000000..50234ae8d --- /dev/null +++ b/docs/stream-aggregation.md @@ -0,0 +1,438 @@ +--- +sort: 98 +--- + +# streaming aggregation + +[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) +can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels. +The aggregation is applied to all the metrics received via any [supported data ingestion protocol](https://docs.victoriametrics.com/#how-to-import-time-series-data) +and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter). + +The stream aggregation is configured via the following command-line flags: + +- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html). + This flag can be specified individually per each specified `-remoteWrite.url`. + This allows writing different aggregates to different remote storage destinations. +- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). + +These flags must point to a file containing [stream aggregation config](#stream-aggregation-config). + +By default only the aggregated data is written to the storage. If the original incoming samples must be written to the storage too, +then the following command-line flags must be specified: + +- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html). + This flag can be specified individually per each specified `-remoteWrite.url`. + This allows writing both raw and aggregate data to different remote storage destinations. +- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). + +Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +It expects that the ingested samples have timestamps close to the current time. + +## Use cases + +Stream aggregation can be used in the following cases: + +* [Statsd alternative](#statsd-alternative) +* [Recording rules alternative](#recording-rules-alternative) +* [Reducing the number of stored samples](#reducing-the-number-of-stored-samples) +* [Reducing the number of stored series](#reducing-the-number-of-stored-series) + +### Statsd alternative + +Stream aggregation can be used as [statsd](https://github.com/statsd/statsd) altnernative in the following cases: + +* [Counting input samples](#counting-input-samples) +* [Summing input metrics](#summing-input-metrics) +* [Quantiles over input metrics](#quantiles-over-input-metrics) +* [Histograms over input metrics](#histograms-over-input-metrics) + +### Recording rules alternative + +Sometimes [alerting queries](https://docs.victoriametrics.com/vmalert.html#alerting-rules) may require non-trivial amounts of CPU, RAM, +disk IO and network bandwith at metrics storage side. For example, if `http_request_duration_seconds` histogram is generated by thousands +of app instances, then the alerting query `histogram_quantile(0.99, sum(increase(http_request_duration_seconds_bucket[5m])) without (instance)) > 0.5` +can become slow, since it needs to scan too big number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) +with `http_request_duration_seconds_bucket` name. This alerting query can be sped up by pre-calculating +the `sum(increase(http_request_duration_seconds_bucket[5m])) without (instance)` via [recording rule](https://docs.victoriametrics.com/vmalert.html#recording-rules). +But this recording rule may take too much time to execute too. In this case the slow recording rule can be substituted +with the following [stream aggregation config](#stream-aggregation-config): + +```yaml +- match: 'http_request_duration_seconds_bucket' + interval: 5m + without: [instance] + outputs: [total] +``` + +This stream aggregation generates `http_request_duration_seconds_bucket:5m_without_instance_total` output series according to [output metric naming](#output-metric-names). +Then these series can be used in [alerting rules](https://docs.victoriametrics.com/vmalert.html#alerting-rules): + +```metricsql +histogram_quantile(0.99, last_over_time(http_request_duration_seconds_bucket:5m_without_instance_total[5m])) > 0.5 +``` + +This query is executed much faster than the original query, because it needs to scan much lower number of time series. + +See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field. +See also [aggregating by labels](#aggregating-by-labels). + + +### Reducing the number of stored samples + +If per-[series](https://docs.victoriametrics.com/keyConcepts.html#time-series) samples are ingested at high frequency, +then this may result in high disk space usage, since too much data must be stored to disk. This also may result +in slow queries, since too much data must be processed during queries. + +This can be fixed with the stream aggregation by increasing the interval between per-series samples stored in the database. + +For example, the following [stream aggregation config](#stream-aggregation-config) reduces the frequency of input samples +to one sample per 5 minutes per each input time series (this operation is also known as downsampling): + +```yaml + # Aggregate metrics ending with _total with `total` output. + # See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs +- match: '{__name__=~".+_total"}' + interval: 5m + outputs: [total] + + # Downsample other metrics with `count_samples`, `sum_samples`, `min` and `max` outputs + # See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs +- match: '{__name__!~".+_total"}' + interval: 5m + outputs: [count_samples, sum_samples, min, max] +``` + +The aggregated output metrics have the following names according to [output metric naming](#output-metric-names): + +``` +# For input metrics ending with _total +some_metric_total:5m_total + +# For input metrics not ending with _total +some_metric:5m_count_samples +some_metric:5m_sum_samples +some_metric:5m_min +some_metric:5m_max +``` + +See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field. +See also [aggregating by labels](#aggregating-by-labels). + +### Reducing the number of stored series + +Sometimes apps may generate too many [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series). +For example, the `http_requests_total` metric may have `path` or `user` label with too big number of unique values. +In this case the following stream aggregation can be used for reducing the number metrics stored in VictoriaMetrics: + +```yaml +- match: 'http_requests_total' + interval: 30s + without: [path, user] + outputs: [total] +``` + +This config specifies labels, which must be removed from the aggregate outpit, in the `without` list. +See [these docs](#aggregating-by-labels) for more details. + +The aggregated output metric has the following name according to [output metric naming](#output-metric-names): + +``` +http_requests_total:30s_without_path_user_total +``` + +See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field. + + +### Counting input samples + +If the monitored app generates event-based metrics, then it may be useful to count the number of such metrics +at stream aggregation level. + +For example, if an advertising server generates `hits{some="labels"} 1` and `clicks{some="labels"} 1` metrics +per each incoming hit and click, then the following [stream aggregation config](#stream-aggregation-config) +can be used for counting these metrics per every 30 second interval: + +```yml +- match: '{__name__=~"hits|clicks"}' + interval: 30s + outputs: [count_samples] +``` + +This config generates the following output metrics for `hits` and `clicks` input metrics +according to [output metric naming](#output-metric-names): + +``` +hits:30s_count_samples count1 +clicks:30s_count_samples count2 +``` + +See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field. +See also [aggregating by labels](#aggregating-by-labels). + + +### Summing input metrics + +If the monitored app calulates some events and then sends the calculated number of events to VictoriaMetrics +at irregular intervals or at too high frequency, then stream aggregation can be used for summing such events +and writing the aggregate sums to the storage at regular intervals. + +For example, if an advertising server generates `hits{some="labels} N` and `clicks{some="labels"} M` metrics +at irregular intervals, then the following [stream aggregation config](#stream-aggregation-config) +can be used for summing these metrics per every minute: + +```yml +- match: '{__name__=~"hits|clicks"}' + interval: 1m + outputs: [sum_samples] +``` + +This config generates the following output metrics according to [output metric naming](#output-metric-names): + +``` +hits:1m_sum_samples sum1 +clicks:1m_sum_samples sum2 +``` + +See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field. +See also [aggregating by labels](#aggregating-by-labels). + + +### Quantiles over input metrics + +If the monitored app generates measurement metrics per each request, then it may be useful to calculate +the pre-defined set of [percentiles](https://en.wikipedia.org/wiki/Percentile) over these measurements. + +For example, if the monitored app generates `request_duration_seconds N` and `response_size_bytes M` metrics +per each incoming request, then the following [stream aggregation config](#stream-aggregation-config) +can be used for calculating 50th and 99th percentiles for these metrics every 30 seconds: + +```yaml +- match: '{__name__=~"request_duration_seconds|response_size_bytes"}' + interval: 30s + outputs: ["quantiles(0.50, 0.99)"] +``` + +This config generates the following output metrics according to [output metric naming](#output-metric-names): + +``` +request_duration_seconds:30s_quantiles{quantile="0.50"} value1 +request_duration_seconds:30s_quantiles{quantile="0.99"} value2 + +response_size_bytes:30s_quantiles{quantile="0.50"} value1 +response_size_bytes:30s_quantiles{quantile="0.99"} value2 +``` + +See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field. +See also [histograms over input metrics](#histograms-over-input-metrics) and [aggregating by labels](#aggregating-by-labels). + +### Histograms over input metrics + +If the monitored app generates measurement metrics per each request, then it may be useful to calculate +a [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram) over these metrics. + +For example, if the monitored app generates `request_duration_seconds N` and `response_size_bytes M` metrics +per each incoming request, then the following [stream aggregation config](#stream-aggregation-config) +can be used for calculating [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) +for these metrics every 60 seconds: + +```yaml +- match: '{__name__=~"request_duration_seconds|response_size_bytes"}' + interval: 60s + outputs: [histogram_bucket] +``` + +This config generates the following output metrics according to [output metric naming](#output-metric-names). + +``` +request_duration_seconds:60s_histogram_bucket{vmrange="start1...end1"} count1 +request_duration_seconds:60s_histogram_bucket{vmrange="start2...end2"} count2 +... +request_duration_seconds:60s_histogram_bucket{vmrange="startN...endN"} countN + +response_size_bytes:60s_histogram_bucket{vmrange="start1...end1"} count1 +response_size_bytes:60s_histogram_bucket{vmrange="start2...end2"} count2 +... +response_size_bytes:60s_histogram_bucket{vmrange="startN...endN"} countN +``` + +The resulting histogram buckets can be queried with [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) in the following ways: + +1. An estimated 50th and 99th [percentiles](https://en.wikipedia.org/wiki/Percentile) of the request duration over the last hour: + + ```metricsql + histogram_quantiles("quantile", 0.50, 0.99, sum(increase(request_duration_seconds:60s_histogram_bucket[1h])) by (vmrange)) + ``` + + This query uses [histogram_quantiles](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantiles) function. + +2. An estimated [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) of the request duration over the last hour: + + ```metricsql + histogram_stddev(sum(increase(request_duration_seconds:60s_histogram_bucket[1h])) by (vmrange)) + ``` + + This query uses [histogram_stddev](https://docs.victoriametrics.com/MetricsQL.html#histogram_stddev) function. + +3. An estimated share of requests with the duration smaller than `0.5s` over the last hour: + + ```metricsql + histogram_share(0.5, sum(increase(request_duration_seconds:60s_histogram_bucket[1h])) by (vmrange)) + ``` + + This query uses [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share) function. + +See [the list of aggregate output](#aggregation-outputs), which can be specified at `output` field. +See also [quantiles over input metrics](#quantiles-over-input-metrics) and [aggregating by labels](#aggregating-by-labels). + + +## Output metric names + +Output metric names for stream aggregation are constructed according to the following pattern: + +``` +:[_by_][_without_]_ +``` + +- `` is the original metric name. +- `` is the interval specified in the [stream aggregation config](#stream-aggregation-config). +- `` is `_`-delimited list of `by` labels specified in the [stream aggregation config](#stream-aggregation-config). + If the `by` list is missing in the config, then the `_by_` part isn't included in the output metric name. +- `` is an optional `_`-delimited list of `without` labels specified in the [stream aggregation config](#stream-aggregation-config). + If the `without` list is missing in the config, then the `_without_` part isn't included in the output metric name. +- `` is the aggregate used for constucting the output metric. The aggregate name is taken from the `outputs` list + at the corresponding [stream aggregation config](#stream-aggregation-config). + +Both input and ouput metric names can be modified if needed via relabeling according to [these docs](#relabeling). + + +## Relabeling + +It is possible to apply [arbitrary relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) to input and output metrics +during stream aggregation via `input_relabel_configs` and `output_relabel_config` options in [stream aggregation config](#stream-aggregation-config). + +For example, the following config removes the `:1m_sum_samples` suffix added [to the output metric name](#output-metric-names): + +```yml +- interval: 1m + outputs: [sum_samples] + output_relabel_configs: + - source_labels: [__name__] + target_label: __name__ + regex: "(.+):.+" +``` + +## Aggregation outputs + +The following aggregation outputs are supported in the `outputs` list of the [stream aggregation config](#stream-aggregation-config): + +* `total` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters. + The `total` handler properly handles input counter resets. + The `total` handler returns garbage when something other than [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) is passed to the input. +* `increase` returns the increase of input [counters](https://docs.victoriametrics.com/keyConcepts.html#counter). + The `increase` handler properly handles the input counter resets. + The `increase` handler returns garbage when something other than [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) is passed to the input. +* `count_series` counts the number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series). +* `count_samples` counts the number of input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `sum_samples` sums input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `last` returns the last input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `min` returns the minimum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `max` returns the maximum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `avg` returns the average input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `stddev` returns [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `stdvar` returns [standard variance](https://en.wikipedia.org/wiki/Variance) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) + for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +* `quantiles(phi1, ..., phiN)` returns [percentiles](https://en.wikipedia.org/wiki/Percentile) for the given `phi*` + over the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). + The `phi` must be in the range `[0..1]`, where `0` means `0th` percentile, while `1` means `100th` percentile. + +The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config) +and then sent to the storage. + +If `by` and `without` lists are specified in the [config](#stream-aggregation-config), +then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`. + + +## Aggregating by labels + +All the labels for the input metrics are preserved by default in the output metrics. For example, +the input metric `foo{app="bar",instance="host1"}` results to the output metric `foo:1m_sum_samples{app="bar",instance="host1"}` +when the following [stream aggregation config](#stream-aggregation-config) is used: + +```yaml +- interval: 1m + outputs: [sum_samples] +``` + +The input labels can be removed via `without` list specified in the config. For example, the following config +removes the `instance` label from output metrics by summing input samples across all the instances: + +```yaml +- interval: 1m + without: [instance] + outputs: [sum_samples] +``` + +In this case the `foo{app="bar",instance="..."}` input metrics are transformed into `foo:1m_without_instance_sum_samples{app="bar"}` +output metric. + +It is possible specifying the exact list of labels in the output metrics via `by` list. +For example, the following config sums input samples by the `app` label: + +```yaml +- interval: 1m + by: [app] + outputs: [sum_samples] +``` + +In this case the `foo{app="bar",instance="..."}` input metrics are transformed into `foo:1m_by_app_sum_samples{app="bar"}` +output metric. + + +## Stream aggregation config + +Below is the format for stream aggregation config file, which may be referred via `-remoteWrite.streamAggr.config` command-line flag +at [vmagent](https://docs.victoriametrics.com/vmagent.html) or via `-streamAggr.config` command-line flag +at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html): + +```yaml + # match is an optional filter for incoming samples to aggregate. + # It can contain arbitrary Prometheus series selector + # according to https://docs.victoriametrics.com/keyConcepts.html#filtering . + # If match is missing, then all the incoming samples are aggregated. +- match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}' + + # interval is the interval for the aggregation. + # The aggregated stats is sent to remote storage once per interval. + interval: 1m + + # without is an optional list of labels, which must be removed from the output aggregation. + # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels + without: [instance] + + # by is an optioanl list of labels, which must be preserved in the output aggregation. + # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels + # by: [job, vmrange] + + # outputs is the list of aggregations to perform on the input data. + # See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs + outputs: [total] + + # input_relabel_configs is an optional relabeling rules, + # which are applied to the incoming samples after they pass the match filter + # and before being aggregated. + # See https://docs.victoriametrics.com/stream-aggregation.html#relabeling + input_relabel_configs: + - target_label: vmaggr + replacement: before + + # output_relabel_configs is an optional relabeling rules, + # which are applied to the aggregated output metrics. + output_relabel_configs: + - target_label: vmaggr + replacement: after +``` + +The file can contain multiple aggregation configs. The aggregation is performed independently +per each specified config entry. diff --git a/docs/vmagent.md b/docs/vmagent.md index 9b25c8700..ff853c436 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -28,8 +28,8 @@ additionally to [discovering Prometheus-compatible targets and scraping metrics see [these docs](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter). * Can add, remove and modify labels (aka tags) via Prometheus relabeling. Can filter data before sending it to remote storage. See [these docs](#relabeling) for details. * Can accept data via all the ingestion protocols supported by VictoriaMetrics - see [these docs](#how-to-push-data-to-vmagent). -* Can replicate collected metrics simultaneously to multiple remote storage systems - - see [these docs](#replication-and-high-availability). +* Can aggregate incoming samples by time and by labels before sending them to remote storage - see [these docs](https://docs.victoriametrics.com/stream-aggregation.html). +* Can replicate collected metrics simultaneously to multiple remote storage systems - see [these docs](#replication-and-high-availability). * Works smoothly in environments with unstable connections to remote storage. If the remote storage is unavailable, the collected metrics are buffered at `-remoteWrite.tmpDataPath`. The buffered metrics are sent to remote storage as soon as the connection to the remote storage is repaired. The maximum disk usage for the buffer can be limited with `-remoteWrite.maxDiskUsagePerURL`. @@ -130,6 +130,12 @@ If you use Prometheus only for scraping metrics from various targets and forward then `vmagent` can replace Prometheus. Typically, `vmagent` requires lower amounts of RAM, CPU and network bandwidth compared with Prometheus. See [these docs](#how-to-collect-metrics-in-prometheus-format) for details. +### Statsd alternative + +`vmagent` can be used as an alternative to [statsd](https://github.com/statsd/statsd) +when [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) is enabled. +See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#statsd-alternative) for details. + ### Flexible metrics relay `vmagent` can accept metrics in [various popular data ingestion protocols](#how-to-push-data-to-vmagent), apply [relabeling](#relabeling) diff --git a/docs/vmalert.md b/docs/vmalert.md index 1afa9a977..08e33b64d 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -73,16 +73,17 @@ Then configure `vmalert` accordingly: -external.label=replica=a # Multiple external labels may be set ``` -Note there's a separate `remoteWrite.url` to allow writing results of +Note there's a separate `-remoteWrite.url` command-line flag to allow writing results of alerting/recording rules into a different storage than the initial data that's queried. This allows using `vmalert` to aggregate data from a short-term, high-frequency, high-cardinality storage into a long-term storage with decreased cardinality and a bigger interval between samples. +See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html). See the full list of configuration flags in [configuration](#configuration) section. If you run multiple `vmalert` services for the same datastore or AlertManager - do not forget -to specify different `external.label` flags in order to define which `vmalert` generated rules or alerts. +to specify different `-external.label` command-line flags in order to define which `vmalert` generated rules or alerts. Configuration for [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) and [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) rules is very @@ -518,8 +519,8 @@ groups: expr: avg_over_time(http_requests[5m]) ``` -Ability of `vmalert` to be configured with different `datasource.url` and `remoteWrite.url` allows -reading data from one data source and backfilling results to another. This helps to build a system +Ability of `vmalert` to be configured with different `-datasource.url` and `-remoteWrite.url` command-line flags +allows reading data from one data source and backfilling results to another. This helps to build a system for aggregating and downsampling the data. The following example shows how to build a topology where `vmalert` will process data from one cluster @@ -543,7 +544,7 @@ Please note, [replay](#rules-backfilling) feature may be used for transforming h Flags `-remoteRead.url` and `-notifier.url` are omitted since we assume only recording rules are used. -See also [downsampling docs](https://docs.victoriametrics.com/#downsampling). +See also [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) and [downsampling](https://docs.victoriametrics.com/#downsampling). #### Multiple remote writes diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go new file mode 100644 index 000000000..ce9becc82 --- /dev/null +++ b/lib/streamaggr/avg.go @@ -0,0 +1,74 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// avgAggrState calculates output=avg, e.g. the average value over input samples. +type avgAggrState struct { + m sync.Map +} + +type avgStateValue struct { + mu sync.Mutex + sum float64 + count int64 + deleted bool +} + +func newAvgAggrState() *avgAggrState { + return &avgAggrState{} +} + +func (as *avgAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &avgStateValue{ + sum: value, + count: 1, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The entry has been successfully stored + return + } + // Update the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*avgStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.sum += value + sv.count++ + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*avgStateValue) + sv.mu.Lock() + avg := sv.sum / float64(sv.count) + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "avg", currentTimeMsec, avg) + return true + }) +} diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go new file mode 100644 index 000000000..ddf582e8c --- /dev/null +++ b/lib/streamaggr/count_samples.go @@ -0,0 +1,71 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// countSamplesAggrState calculates output=countSamples, e.g. the count of input samples. +type countSamplesAggrState struct { + m sync.Map +} + +type countSamplesStateValue struct { + mu sync.Mutex + n uint64 + deleted bool +} + +func newCountSamplesAggrState() *countSamplesAggrState { + return &countSamplesAggrState{} +} + +func (as *countSamplesAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &countSamplesStateValue{ + n: 1, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + return + } + // Use the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*countSamplesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.n++ + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*countSamplesStateValue) + sv.mu.Lock() + n := sv.n + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "count_samples", currentTimeMsec, float64(n)) + return true + }) +} diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go new file mode 100644 index 000000000..ffb8e5578 --- /dev/null +++ b/lib/streamaggr/count_series.go @@ -0,0 +1,78 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// countSeriesAggrState calculates output=count_series, e.g. the number of unique series. +type countSeriesAggrState struct { + m sync.Map +} + +type countSeriesStateValue struct { + mu sync.Mutex + countedSeries map[string]struct{} + n uint64 + deleted bool +} + +func newCountSeriesAggrState() *countSeriesAggrState { + return &countSeriesAggrState{} +} + +func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &countSeriesStateValue{ + countedSeries: map[string]struct{}{ + inputKey: {}, + }, + n: 1, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The entry has been added to the map. + return + } + // Update the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*countSeriesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + if _, ok := sv.countedSeries[inputKey]; !ok { + sv.countedSeries[inputKey] = struct{}{} + sv.n++ + } + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*countSeriesStateValue) + sv.mu.Lock() + n := sv.n + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "count_series", currentTimeMsec, float64(n)) + return true + }) +} diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go new file mode 100644 index 000000000..b3c087332 --- /dev/null +++ b/lib/streamaggr/histogram_bucket.go @@ -0,0 +1,102 @@ +package streamaggr + +import ( + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/metrics" +) + +// histogramBucketAggrState calculates output=histogramBucket, e.g. VictoriaMetrics histogram over input samples. +type histogramBucketAggrState struct { + m sync.Map + + ignoreInputDeadline uint64 + intervalSecs uint64 +} + +type histogramBucketStateValue struct { + mu sync.Mutex + h metrics.Histogram + deleteDeadline uint64 + deleted bool +} + +func newHistogramBucketAggrState(interval time.Duration) *histogramBucketAggrState { + intervalSecs := uint64(interval.Seconds() + 1) + return &histogramBucketAggrState{ + intervalSecs: intervalSecs, + } +} + +func (as *histogramBucketAggrState) pushSample(inputKey, outputKey string, value float64) { + currentTime := fasttime.UnixTimestamp() + deleteDeadline := currentTime + 2*as.intervalSecs + +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &histogramBucketStateValue{} + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*histogramBucketStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.h.Update(value) + sv.deleteDeadline = deleteDeadline + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) { + m := &as.m + m.Range(func(k, v interface{}) bool { + sv := v.(*histogramBucketStateValue) + + sv.mu.Lock() + deleted := currentTime > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + } + sv.mu.Unlock() + + if deleted { + m.Delete(k) + } + return true + }) +} + +func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTime := fasttime.UnixTimestamp() + currentTimeMsec := int64(currentTime) * 1000 + + as.removeOldEntries(currentTime) + + m := &as.m + m.Range(func(k, v interface{}) bool { + sv := v.(*histogramBucketStateValue) + sv.mu.Lock() + if !sv.deleted { + key := k.(string) + sv.h.VisitNonZeroBuckets(func(vmrange string, count uint64) { + ctx.appendSeriesWithExtraLabel(key, "histogram_bucket", currentTimeMsec, float64(count), "vmrange", vmrange) + }) + } + sv.mu.Unlock() + return true + }) +} diff --git a/lib/streamaggr/increase.go b/lib/streamaggr/increase.go new file mode 100644 index 000000000..0161d60f1 --- /dev/null +++ b/lib/streamaggr/increase.go @@ -0,0 +1,129 @@ +package streamaggr + +import ( + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// increaseAggrState calculates output=increase, e.g. the increase over input counters. +type increaseAggrState struct { + m sync.Map + + ignoreInputDeadline uint64 + intervalSecs uint64 +} + +type increaseStateValue struct { + mu sync.Mutex + lastValues map[string]*lastValueState + total float64 + increase float64 + deleteDeadline uint64 + deleted bool +} + +func newIncreaseAggrState(interval time.Duration) *increaseAggrState { + currentTime := fasttime.UnixTimestamp() + intervalSecs := uint64(interval.Seconds() + 1) + return &increaseAggrState{ + ignoreInputDeadline: currentTime + intervalSecs, + intervalSecs: intervalSecs, + } +} + +func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) { + currentTime := fasttime.UnixTimestamp() + deleteDeadline := currentTime + 2*as.intervalSecs + +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &increaseStateValue{ + lastValues: make(map[string]*lastValueState), + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*increaseStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + lv, ok := sv.lastValues[inputKey] + if !ok { + lv = &lastValueState{} + sv.lastValues[inputKey] = lv + } + d := value + if ok && lv.value <= value { + d = value - lv.value + } + if ok || currentTime > as.ignoreInputDeadline { + sv.total += d + } + lv.value = value + lv.deleteDeadline = deleteDeadline + sv.deleteDeadline = deleteDeadline + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *increaseAggrState) removeOldEntries(currentTime uint64) { + m := &as.m + m.Range(func(k, v interface{}) bool { + sv := v.(*increaseStateValue) + + sv.mu.Lock() + deleted := currentTime > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + } else { + // Delete outdated entries in sv.lastValues + m := sv.lastValues + for k1, v1 := range m { + if currentTime > v1.deleteDeadline { + delete(m, k1) + } + } + } + sv.mu.Unlock() + + if deleted { + m.Delete(k) + } + return true + }) +} + +func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTime := fasttime.UnixTimestamp() + currentTimeMsec := int64(currentTime) * 1000 + + as.removeOldEntries(currentTime) + + m := &as.m + m.Range(func(k, v interface{}) bool { + sv := v.(*increaseStateValue) + sv.mu.Lock() + increase := sv.total + sv.total = 0 + deleted := sv.deleted + sv.mu.Unlock() + if !deleted { + key := k.(string) + ctx.appendSeries(key, "increase", currentTimeMsec, increase) + } + return true + }) +} diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go new file mode 100644 index 000000000..c81a41db6 --- /dev/null +++ b/lib/streamaggr/last.go @@ -0,0 +1,71 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// lastAggrState calculates output=last, e.g. the last value over input samples. +type lastAggrState struct { + m sync.Map +} + +type lastStateValue struct { + mu sync.Mutex + last float64 + deleted bool +} + +func newLastAggrState() *lastAggrState { + return &lastAggrState{} +} + +func (as *lastAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &lastStateValue{ + last: value, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + return + } + // Use the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*lastStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.last = value + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*lastStateValue) + sv.mu.Lock() + last := sv.last + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "last", currentTimeMsec, last) + return true + }) +} diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go new file mode 100644 index 000000000..09122ee64 --- /dev/null +++ b/lib/streamaggr/max.go @@ -0,0 +1,73 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// maxAggrState calculates output=max, e.g. the maximum value over input samples. +type maxAggrState struct { + m sync.Map +} + +type maxStateValue struct { + mu sync.Mutex + max float64 + deleted bool +} + +func newMaxAggrState() *maxAggrState { + return &maxAggrState{} +} + +func (as *maxAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &maxStateValue{ + max: value, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + return + } + // Use the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*maxStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + if value > sv.max { + sv.max = value + } + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*maxStateValue) + sv.mu.Lock() + max := sv.max + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "max", currentTimeMsec, max) + return true + }) +} diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go new file mode 100644 index 000000000..e9e5dfcd2 --- /dev/null +++ b/lib/streamaggr/min.go @@ -0,0 +1,73 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// minAggrState calculates output=min, e.g. the minimum value over input samples. +type minAggrState struct { + m sync.Map +} + +type minStateValue struct { + mu sync.Mutex + min float64 + deleted bool +} + +func newMinAggrState() *minAggrState { + return &minAggrState{} +} + +func (as *minAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &minStateValue{ + min: value, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + return + } + // Use the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*minStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + if value < sv.min { + sv.min = value + } + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*minStateValue) + sv.mu.Lock() + min := sv.min + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "min", currentTimeMsec, min) + return true + }) +} diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go new file mode 100644 index 000000000..ebd9590fb --- /dev/null +++ b/lib/streamaggr/quantiles.go @@ -0,0 +1,87 @@ +package streamaggr + +import ( + "strconv" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/valyala/histogram" +) + +// quantilesAggrState calculates output=quantiles, e.g. the the given quantiles over the input samples. +type quantilesAggrState struct { + m sync.Map + + phis []float64 +} + +type quantilesStateValue struct { + mu sync.Mutex + h *histogram.Fast + deleted bool +} + +func newQuantilesAggrState(phis []float64) *quantilesAggrState { + return &quantilesAggrState{ + phis: phis, + } +} + +func (as *quantilesAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + h := histogram.GetFast() + v = &quantilesStateValue{ + h: h, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + histogram.PutFast(h) + v = vNew + } + } + sv := v.(*quantilesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.h.Update(value) + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + phis := as.phis + var quantiles []float64 + var b []byte + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*quantilesStateValue) + sv.mu.Lock() + quantiles = sv.h.Quantiles(quantiles[:0], phis) + histogram.PutFast(sv.h) + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + + key := k.(string) + for i, quantile := range quantiles { + b = strconv.AppendFloat(b[:0], phis[i], 'g', -1, 64) + phiStr := bytesutil.InternBytes(b) + ctx.appendSeriesWithExtraLabel(key, "quantiles", currentTimeMsec, quantile, "quantile", phiStr) + } + return true + }) +} diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go new file mode 100644 index 000000000..107c78896 --- /dev/null +++ b/lib/streamaggr/stddev.go @@ -0,0 +1,74 @@ +package streamaggr + +import ( + "math" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// stddevAggrState calculates output=stddev, e.g. the average value over input samples. +type stddevAggrState struct { + m sync.Map +} + +type stddevStateValue struct { + mu sync.Mutex + count float64 + avg float64 + q float64 + deleted bool +} + +func newStddevAggrState() *stddevAggrState { + return &stddevAggrState{} +} + +func (as *stddevAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &stddevStateValue{} + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*stddevStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation + sv.count++ + avg := sv.avg + (value-sv.avg)/sv.count + sv.q += (value - sv.avg) * (value - avg) + sv.avg = avg + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*stddevStateValue) + sv.mu.Lock() + stddev := math.Sqrt(sv.q / sv.count) + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "stddev", currentTimeMsec, stddev) + return true + }) +} diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go new file mode 100644 index 000000000..d9a19e592 --- /dev/null +++ b/lib/streamaggr/stdvar.go @@ -0,0 +1,73 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples. +type stdvarAggrState struct { + m sync.Map +} + +type stdvarStateValue struct { + mu sync.Mutex + count float64 + avg float64 + q float64 + deleted bool +} + +func newStdvarAggrState() *stdvarAggrState { + return &stdvarAggrState{} +} + +func (as *stdvarAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &stdvarStateValue{} + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*stdvarStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation + sv.count++ + avg := sv.avg + (value-sv.avg)/sv.count + sv.q += (value - sv.avg) * (value - avg) + sv.avg = avg + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*stdvarStateValue) + sv.mu.Lock() + stdvar := sv.q / sv.count + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "stdvar", currentTimeMsec, stdvar) + return true + }) +} diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go new file mode 100644 index 000000000..ff9bab6a6 --- /dev/null +++ b/lib/streamaggr/streamaggr.go @@ -0,0 +1,641 @@ +package streamaggr + +import ( + "fmt" + "math" + "strconv" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "gopkg.in/yaml.v2" +) + +var supportedOutputs = []string{ + "total", + "increase", + "count_series", + "count_samples", + "sum_samples", + "last", + "min", + "max", + "avg", + "stddev", + "stdvar", + "histogram_bucket", + "quantiles(phi1, ..., phiN)", +} + +// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. +// +// The returned Aggregators must be stopped with MustStop() when no longer needed. +func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) { + data, err := fs.ReadFileOrHTTP(path) + if err != nil { + return nil, fmt.Errorf("cannot load aggregators: %w", err) + } + as, err := NewAggregatorsFromData(data, pushFunc) + if err != nil { + return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) + } + return as, nil +} + +// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. +// +// The returned Aggregators must be stopped with MustStop() when no longer needed. +func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) { + var cfgs []*Config + if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { + return nil, err + } + return NewAggregators(cfgs, pushFunc) +} + +// Config is a configuration for a single stream aggregation. +type Config struct { + // Match is a label selector for filtering time series for the given selector. + // + // If the match isn't set, then all the input time series are processed. + Match *promrelabel.IfExpression `yaml:"match,omitempty"` + + // Interval is the interval between aggregations. + Interval string `yaml:"interval"` + + // Outputs is a list of output aggregate functions to produce. + // + // The following names are allowed: + // + // - total - aggregates input counters + // - increase - counts the increase over input counters + // - count_series - counts the input series + // - count_samples - counts the input samples + // - sum_samples - sums the input samples + // - last - the last biggest sample value + // - min - the minimum sample value + // - max - the maximum sample value + // - avg - the average value across all the samples + // - stddev - standard deviation across all the samples + // - stdvar - standard variance across all the samples + // - histogram_bucket - creates VictoriaMetrics histogram for input samples + // - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1] + // + // The output time series will have the following names: + // + // input_name:aggr__ + // + Outputs []string `yaml:"outputs"` + + // By is an optional list of labels for grouping input series. + // + // See also Without. + // + // If neither By nor Without are set, then the Outputs are calculated + // individually per each input time series. + By []string `yaml:"by,omitempty"` + + // Without is an optional list of labels, which must be excluded when grouping input series. + // + // See also By. + // + // If neither By nor Without are set, then the Outputs are calculated + // individually per each input time series. + Without []string `yaml:"without,omitempty"` + + // InputRelabelConfigs is an optional relabeling rules, which are applied on the input + // before aggregation. + InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"` + + // OutputRelabelConfigs is an optional relabeling rules, which are applied + // on the aggregated output before being sent to remote storage. + OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` +} + +// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. +type Aggregators struct { + as []*aggregator +} + +// NewAggregators creates Aggregators from the given cfgs. +// +// pushFunc is called when the aggregated data must be flushed. +// +// MustStop must be called on the returned Aggregators when they are no longer needed. +func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) { + if len(cfgs) == 0 { + return nil, nil + } + as := make([]*aggregator, len(cfgs)) + for i, cfg := range cfgs { + a, err := newAggregator(cfg, pushFunc) + if err != nil { + return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) + } + as[i] = a + } + return &Aggregators{ + as: as, + }, nil +} + +// MustStop stops a. +func (a *Aggregators) MustStop() { + if a == nil { + return + } + for _, aggr := range a.as { + aggr.MustStop() + } +} + +// Push pushes tss to a. +func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) { + if a == nil { + return + } + for _, aggr := range a.as { + aggr.Push(tss) + } +} + +// aggregator aggregates input series according to the config passed to NewAggregator +type aggregator struct { + match *promrelabel.IfExpression + + inputRelabeling *promrelabel.ParsedConfigs + outputRelabeling *promrelabel.ParsedConfigs + + by []string + without []string + aggregateOnlyByTime bool + + // aggrStates contains aggregate states for the given outputs + aggrStates []aggrState + + pushFunc PushFunc + + // suffix contains a suffix, which should be added to aggregate metric names + // + // It contains the interval, lables in (by, without), plus output name. + // For example, foo_bar metric name is transformed to foo_bar:1m_by_job + // for `interval: 1m`, `by: [job]` + suffix string + + wg sync.WaitGroup + stopCh chan struct{} +} + +type aggrState interface { + pushSample(inputKey, outputKey string, value float64) + appendSeriesForFlush(ctx *flushCtx) +} + +// PushFunc is called by Aggregators when it needs to push its state to metrics storage +type PushFunc func(tss []prompbmarshal.TimeSeries) + +// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. +// +// The returned aggregator must be stopped when no longer needed by calling MustStop(). +func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { + // check cfg.Interval + interval, err := time.ParseDuration(cfg.Interval) + if err != nil { + return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err) + } + if interval <= time.Second { + return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval) + } + + // initialize input_relabel_configs and output_relabel_configs + inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs) + if err != nil { + return nil, fmt.Errorf("cannot parse input_relabel_configs: %w", err) + } + outputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.OutputRelabelConfigs) + if err != nil { + return nil, fmt.Errorf("cannot parse output_relabel_configs: %w", err) + } + + // check by and without lists + by := cfg.By + without := cfg.Without + if len(by) > 0 && len(without) > 0 { + return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously", by, without) + } + aggregateOnlyByTime := (len(by) == 0 && len(without) == 0) + if !aggregateOnlyByTime && len(without) == 0 { + by = addMissingUnderscoreName(by) + } + + // initialize outputs list + if len(cfg.Outputs) == 0 { + return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ + "see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", supportedOutputs) + } + aggrStates := make([]aggrState, len(cfg.Outputs)) + for i, output := range cfg.Outputs { + if strings.HasPrefix(output, "quantiles(") { + if !strings.HasSuffix(output, ")") { + return nil, fmt.Errorf("missing closing brace for `quantiles()` output") + } + argsStr := output[len("quantiles(") : len(output)-1] + if len(argsStr) == 0 { + return nil, fmt.Errorf("`quantiles()` must contain at least one phi") + } + args := strings.Split(argsStr, ",") + phis := make([]float64, len(args)) + for j, arg := range args { + arg = strings.TrimSpace(arg) + phi, err := strconv.ParseFloat(arg, 64) + if err != nil { + return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) + } + if phi < 0 || phi > 1 { + return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) + } + phis[j] = phi + } + aggrStates[i] = newQuantilesAggrState(phis) + continue + } + switch output { + case "total": + aggrStates[i] = newTotalAggrState(interval) + case "increase": + aggrStates[i] = newIncreaseAggrState(interval) + case "count_series": + aggrStates[i] = newCountSeriesAggrState() + case "count_samples": + aggrStates[i] = newCountSamplesAggrState() + case "sum_samples": + aggrStates[i] = newSumSamplesAggrState() + case "last": + aggrStates[i] = newLastAggrState() + case "min": + aggrStates[i] = newMinAggrState() + case "max": + aggrStates[i] = newMaxAggrState() + case "avg": + aggrStates[i] = newAvgAggrState() + case "stddev": + aggrStates[i] = newStddevAggrState() + case "stdvar": + aggrStates[i] = newStdvarAggrState() + case "histogram_bucket": + aggrStates[i] = newHistogramBucketAggrState(interval) + default: + return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+ + "see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs) + } + } + + // initialize suffix to add to metric names after aggregation + suffix := ":" + cfg.Interval + if labels := removeUnderscoreName(by); len(labels) > 0 { + suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_")) + } + if labels := removeUnderscoreName(without); len(labels) > 0 { + suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_")) + } + suffix += "_" + + // initialize the aggregator + a := &aggregator{ + match: cfg.Match, + + inputRelabeling: inputRelabeling, + outputRelabeling: outputRelabeling, + + by: by, + without: without, + aggregateOnlyByTime: aggregateOnlyByTime, + + aggrStates: aggrStates, + pushFunc: pushFunc, + + suffix: suffix, + + stopCh: make(chan struct{}), + } + + a.wg.Add(1) + go func() { + a.runFlusher(interval) + defer a.wg.Done() + }() + + return a, nil +} + +func (a *aggregator) runFlusher(interval time.Duration) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-a.stopCh: + return + case <-t.C: + } + a.flush() + } +} + +func (a *aggregator) flush() { + ctx := &flushCtx{ + suffix: a.suffix, + } + for _, as := range a.aggrStates { + ctx.reset() + as.appendSeriesForFlush(ctx) + + tss := ctx.tss + + // Apply output relabeling + if a.outputRelabeling != nil { + dst := tss[:0] + for _, ts := range tss { + ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0) + if len(ts.Labels) == 0 { + // The metric has been deleted by the relabeling + continue + } + dst = append(dst, ts) + } + tss = dst + } + + // Push the output metrics + a.pushFunc(tss) + } +} + +// MustStop stops the aggregator. +// +// The aggregator stops pushing the aggregated metrics after this call. +func (a *aggregator) MustStop() { + close(a.stopCh) + a.wg.Wait() +} + +// Push pushes series to a. +func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { + labels := promutils.GetLabels() + tmpLabels := promutils.GetLabels() + bb := bbPool.Get() + for _, ts := range tss { + if !a.match.Match(ts.Labels) { + continue + } + + labels.Labels = append(labels.Labels[:0], ts.Labels...) + labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) + if len(labels.Labels) == 0 { + // The metric has been deleted by the relabeling + continue + } + labels.Sort() + + if a.aggregateOnlyByTime { + bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) + } else { + tmpLabels.Labels = removeUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without) + bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels) + } + outputKey := bytesutil.InternBytes(bb.B) + inputKey := "" + if !a.aggregateOnlyByTime { + tmpLabels.Labels = extractUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without) + bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels) + inputKey = bytesutil.InternBytes(bb.B) + } + + for _, sample := range ts.Samples { + a.pushSample(inputKey, outputKey, sample.Value) + } + } + bbPool.Put(bb) + promutils.PutLabels(tmpLabels) + promutils.PutLabels(labels) +} + +var bbPool bytesutil.ByteBufferPool + +func (a *aggregator) pushSample(inputKey, outputKey string, value float64) { + if math.IsNaN(value) { + // Skip nan samples + return + } + for _, as := range a.aggrStates { + as.pushSample(inputKey, outputKey, value) + } +} + +func extractUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label { + if len(without) > 0 { + for _, label := range labels { + if hasInArray(label.Name, without) { + dst = append(dst, label) + } + } + } else { + for _, label := range labels { + if !hasInArray(label.Name, by) { + dst = append(dst, label) + } + } + } + return dst +} + +func removeUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label { + if len(without) > 0 { + for _, label := range labels { + if !hasInArray(label.Name, without) { + dst = append(dst, label) + } + } + } else { + for _, label := range labels { + if hasInArray(label.Name, by) { + dst = append(dst, label) + } + } + } + return dst +} + +func hasInArray(name string, a []string) bool { + for _, s := range a { + if name == s { + return true + } + } + return false +} + +func marshalLabelsFast(dst []byte, labels []prompbmarshal.Label) []byte { + dst = encoding.MarshalUint32(dst, uint32(len(labels))) + for _, label := range labels { + dst = encoding.MarshalUint32(dst, uint32(len(label.Name))) + dst = append(dst, label.Name...) + dst = encoding.MarshalUint32(dst, uint32(len(label.Value))) + dst = append(dst, label.Value...) + } + return dst +} + +func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal.Label, error) { + if len(src) < 4 { + return dst, fmt.Errorf("cannot unmarshal labels count from %d bytes; needs at least 4 bytes", len(src)) + } + n := encoding.UnmarshalUint32(src) + src = src[4:] + for i := uint32(0); i < n; i++ { + // Unmarshal label name + if len(src) < 4 { + return dst, fmt.Errorf("cannot unmarshal label name length from %d bytes; needs at least 4 bytes", len(src)) + } + labelNameLen := encoding.UnmarshalUint32(src) + src = src[4:] + if uint32(len(src)) < labelNameLen { + return dst, fmt.Errorf("cannot unmarshal label name from %d bytes; needs at least %d bytes", len(src), labelNameLen) + } + labelName := bytesutil.InternBytes(src[:labelNameLen]) + src = src[labelNameLen:] + + // Unmarshal label value + if len(src) < 4 { + return dst, fmt.Errorf("cannot unmarshal label value length from %d bytes; needs at least 4 bytes", len(src)) + } + labelValueLen := encoding.UnmarshalUint32(src) + src = src[4:] + if uint32(len(src)) < labelValueLen { + return dst, fmt.Errorf("cannot unmarshal label value from %d bytes; needs at least %d bytes", len(src), labelValueLen) + } + labelValue := bytesutil.InternBytes(src[:labelValueLen]) + src = src[labelValueLen:] + + dst = append(dst, prompbmarshal.Label{ + Name: labelName, + Value: labelValue, + }) + } + if len(src) > 0 { + return dst, fmt.Errorf("unexpected non-empty tail after unmarshaling labels; tail length is %d bytes", len(src)) + } + return dst, nil +} + +type flushCtx struct { + suffix string + + tss []prompbmarshal.TimeSeries + labels []prompbmarshal.Label + samples []prompbmarshal.Sample +} + +func (ctx *flushCtx) reset() { + ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss) + promrelabel.CleanLabels(ctx.labels) + ctx.labels = ctx.labels[:0] + ctx.samples = ctx.samples[:0] +} + +func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int64, value float64) { + var err error + labelsLen := len(ctx.labels) + samplesLen := len(ctx.samples) + ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled)) + if err != nil { + logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err) + } + ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) + ctx.samples = append(ctx.samples, prompbmarshal.Sample{ + Timestamp: timestamp, + Value: value, + }) + ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{ + Labels: ctx.labels[labelsLen:], + Samples: ctx.samples[samplesLen:], + }) +} + +func (ctx *flushCtx) appendSeriesWithExtraLabel(labelsMarshaled, suffix string, timestamp int64, value float64, extraName, extraValue string) { + var err error + labelsLen := len(ctx.labels) + samplesLen := len(ctx.samples) + ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled)) + if err != nil { + logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err) + } + ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) + ctx.labels = append(ctx.labels, prompbmarshal.Label{ + Name: extraName, + Value: extraValue, + }) + ctx.samples = append(ctx.samples, prompbmarshal.Sample{ + Timestamp: timestamp, + Value: value, + }) + ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{ + Labels: ctx.labels[labelsLen:], + Samples: ctx.samples[samplesLen:], + }) +} + +func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label { + src := labels[offset:] + for i := range src { + label := &src[i] + if label.Name != "__name__" { + continue + } + bb := bbPool.Get() + bb.B = append(bb.B, label.Value...) + bb.B = append(bb.B, firstSuffix...) + bb.B = append(bb.B, lastSuffix...) + label.Value = bytesutil.InternBytes(bb.B) + bbPool.Put(bb) + return labels + } + // The __name__ isn't found. Add it + bb := bbPool.Get() + bb.B = append(bb.B, firstSuffix...) + bb.B = append(bb.B, lastSuffix...) + labelValue := bytesutil.InternBytes(bb.B) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: labelValue, + }) + return labels +} + +func addMissingUnderscoreName(labels []string) []string { + result := []string{"__name__"} + for _, s := range labels { + if s == "__name__" { + continue + } + result = append(result, s) + } + return result +} + +func removeUnderscoreName(labels []string) []string { + var result []string + for _, s := range labels { + if s == "__name__" { + continue + } + result = append(result, s) + } + return result +} diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go new file mode 100644 index 000000000..bc2a11388 --- /dev/null +++ b/lib/streamaggr/streamaggr_test.go @@ -0,0 +1,662 @@ +package streamaggr + +import ( + "fmt" + "sort" + "strings" + "sync" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" +) + +func TestAggregatorsFailure(t *testing.T) { + f := func(config string) { + t.Helper() + pushFunc := func(tss []prompbmarshal.TimeSeries) { + panic(fmt.Errorf("pushFunc shouldn't be called")) + } + a, err := NewAggregatorsFromData([]byte(config), pushFunc) + if err == nil { + t.Fatalf("expecting non-nil error") + } + if a != nil { + t.Fatalf("expecting nil a") + } + } + + // Invalid config + f(`foobar`) + + // Unknown option + f(` +- interval: 1m + outputs: [total] + foobar: baz +`) + + // missing interval + f(` +- outputs: [total] +`) + + // missing outputs + f(` +- interval: 1m +`) + + // Invalid output + f(` +- interval: 1m + outputs: [foobar] +`) + + // Negative interval + f(`- interval: -5m`) + // Too small interval + f(`- interval: 10ms`) + + // Invalid input_relabel_configs + f(` +- interval: 1m + outputs: [total] + input_relabel_configs: + - foo: bar +`) + f(` +- interval: 1m + outputs: [total] + input_relabel_configs: + - action: replace +`) + + // Invalid output_relabel_configs + f(` +- interval: 1m + outputs: [total] + output_relabel_configs: + - foo: bar +`) + f(` +- interval: 1m + outputs: [total] + output_relabel_configs: + - action: replace +`) + + // Both by and without are non-empty + f(` +- interval: 1m + outputs: [total] + by: [foo] + without: [bar] +`) + + // Invalid quantiles() + f(` +- interval: 1m + outputs: ["quantiles("] +`) + f(` +- interval: 1m + outputs: ["quantiles()"] +`) + f(` +- interval: 1m + outputs: ["quantiles(foo)"] +`) + f(` +- interval: 1m + outputs: ["quantiles(-0.5)"] +`) + f(` +- interval: 1m + outputs: ["quantiles(1.5)"] +`) +} + +func TestAggregatorsSuccess(t *testing.T) { + f := func(config, inputMetrics, outputMetricsExpected string) { + t.Helper() + + // Initialize Aggregators + var tssOutput []prompbmarshal.TimeSeries + var tssOutputLock sync.Mutex + pushFunc := func(tss []prompbmarshal.TimeSeries) { + tssOutputLock.Lock() + for _, ts := range tss { + labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) + samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) + tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ + Labels: labelsCopy, + Samples: samplesCopy, + }) + } + tssOutputLock.Unlock() + } + a, err := NewAggregatorsFromData([]byte(config), pushFunc) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + tssInput := mustParsePromMetrics(inputMetrics) + a.Push(tssInput) + if a != nil { + for _, aggr := range a.as { + aggr.flush() + } + } + a.MustStop() + + // Verify the tssOutput contains the expected metrics + tsStrings := make([]string, len(tssOutput)) + for i, ts := range tssOutput { + tsStrings[i] = timeSeriesToString(ts) + } + sort.Strings(tsStrings) + outputMetrics := strings.Join(tsStrings, "") + if outputMetrics != outputMetricsExpected { + t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) + } + } + + // Empty config + f(``, ``, ``) + f(``, `foo{bar="baz"} 1`, ``) + f(``, "foo 1\nbaz 2", ``) + + // Empty by list - aggregate only by time + f(` +- interval: 1m + outputs: [count_samples, sum_samples, count_series, last] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_count_samples 1 +bar:1m_count_series 1 +bar:1m_last 5 +bar:1m_sum_samples 5 +foo:1m_count_samples{abc="123"} 2 +foo:1m_count_samples{abc="456",de="fg"} 1 +foo:1m_count_series{abc="123"} 1 +foo:1m_count_series{abc="456",de="fg"} 1 +foo:1m_last{abc="123"} 8.5 +foo:1m_last{abc="456",de="fg"} 8 +foo:1m_sum_samples{abc="123"} 12.5 +foo:1m_sum_samples{abc="456",de="fg"} 8 +`) + + // Special case: __name__ in by list + f(` +- interval: 1m + by: [__name__] + outputs: [count_samples, sum_samples, count_series] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_count_samples 1 +bar:1m_count_series 1 +bar:1m_sum_samples 5 +foo:1m_count_samples 3 +foo:1m_count_series 2 +foo:1m_sum_samples 20.5 +`) + + // Non-empty by list with non-existing labels + f(` +- interval: 1m + by: [foo, bar] + outputs: [count_samples, sum_samples, count_series] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_by_foo_bar_count_samples 1 +bar:1m_by_foo_bar_count_series 1 +bar:1m_by_foo_bar_sum_samples 5 +foo:1m_by_foo_bar_count_samples 3 +foo:1m_by_foo_bar_count_series 2 +foo:1m_by_foo_bar_sum_samples 20.5 +`) + + // Non-empty by list with existing label + f(` +- interval: 1m + by: [abc] + outputs: [count_samples, sum_samples, count_series] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_by_abc_count_samples 1 +bar:1m_by_abc_count_series 1 +bar:1m_by_abc_sum_samples 5 +foo:1m_by_abc_count_samples{abc="123"} 2 +foo:1m_by_abc_count_samples{abc="456"} 1 +foo:1m_by_abc_count_series{abc="123"} 1 +foo:1m_by_abc_count_series{abc="456"} 1 +foo:1m_by_abc_sum_samples{abc="123"} 12.5 +foo:1m_by_abc_sum_samples{abc="456"} 8 +`) + + // Non-empty without list with non-existing labels + f(` +- interval: 1m + without: [foo] + outputs: [count_samples, sum_samples, count_series] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_without_foo_count_samples 1 +bar:1m_without_foo_count_series 1 +bar:1m_without_foo_sum_samples 5 +foo:1m_without_foo_count_samples{abc="123"} 2 +foo:1m_without_foo_count_samples{abc="456",de="fg"} 1 +foo:1m_without_foo_count_series{abc="123"} 1 +foo:1m_without_foo_count_series{abc="456",de="fg"} 1 +foo:1m_without_foo_sum_samples{abc="123"} 12.5 +foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8 +`) + + // Non-empty without list with existing labels + f(` +- interval: 1m + without: [abc] + outputs: [count_samples, sum_samples, count_series] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_without_abc_count_samples 1 +bar:1m_without_abc_count_series 1 +bar:1m_without_abc_sum_samples 5 +foo:1m_without_abc_count_samples 2 +foo:1m_without_abc_count_samples{de="fg"} 1 +foo:1m_without_abc_count_series 1 +foo:1m_without_abc_count_series{de="fg"} 1 +foo:1m_without_abc_sum_samples 12.5 +foo:1m_without_abc_sum_samples{de="fg"} 8 +`) + + // Special case: __name__ in without list + f(` +- interval: 1m + without: [__name__] + outputs: [count_samples, sum_samples, count_series] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `:1m_count_samples 1 +:1m_count_samples{abc="123"} 2 +:1m_count_samples{abc="456",de="fg"} 1 +:1m_count_series 1 +:1m_count_series{abc="123"} 1 +:1m_count_series{abc="456",de="fg"} 1 +:1m_sum_samples 5 +:1m_sum_samples{abc="123"} 12.5 +:1m_sum_samples{abc="456",de="fg"} 8 +`) + + // drop some input metrics + f(` +- interval: 1m + without: [abc] + outputs: [count_samples, sum_samples, count_series] + input_relabel_configs: + - if: 'foo' + action: drop +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_without_abc_count_samples 1 +bar:1m_without_abc_count_series 1 +bar:1m_without_abc_sum_samples 5 +`) + + // rename output metrics + f(` +- interval: 1m + without: [abc] + outputs: [count_samples, sum_samples, count_series] + output_relabel_configs: + - action: replace_all + source_labels: [__name__] + regex: ":|_" + replacement: "-" + target_label: __name__ + - action: drop + source_labels: [de] + regex: fg +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar-1m-without-abc-count-samples 1 +bar-1m-without-abc-count-series 1 +bar-1m-without-abc-sum-samples 5 +foo-1m-without-abc-count-samples 2 +foo-1m-without-abc-count-series 1 +foo-1m-without-abc-sum-samples 12.5 +`) + + // match doesn't match anything + f(` +- interval: 1m + without: [abc] + outputs: [count_samples, sum_samples, count_series] + match: '{non_existing_label!=""}' +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, ``) + + // match matches foo series with non-empty abc label + f(` +- interval: 1m + by: [abc] + outputs: [count_samples, sum_samples, count_series] + match: 'foo{abc=~".+"}' +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `foo:1m_by_abc_count_samples{abc="123"} 2 +foo:1m_by_abc_count_samples{abc="456"} 1 +foo:1m_by_abc_count_series{abc="123"} 1 +foo:1m_by_abc_count_series{abc="456"} 1 +foo:1m_by_abc_sum_samples{abc="123"} 12.5 +foo:1m_by_abc_sum_samples{abc="456"} 8 +`) + + // total output for non-repeated series + f(` +- interval: 1m + outputs: [total] +`, ` +foo 123 +bar{baz="qwe"} 4.34 +`, `bar:1m_total{baz="qwe"} 0 +foo:1m_total 0 +`) + + // total output for repeated series + f(` +- interval: 1m + outputs: [total] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 +`, `bar:1m_total{baz="qwe"} 5.02 +bar:1m_total{baz="qwer"} 1 +foo:1m_total 0 +foo:1m_total{baz="qwe"} 15 +`) + + // total output for repeated series with group by __name__ + f(` +- interval: 1m + by: [__name__] + outputs: [total] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 +`, `bar:1m_total 6.02 +foo:1m_total 15 +`) + + // increase output for non-repeated series + f(` +- interval: 1m + outputs: [increase] +`, ` +foo 123 +bar{baz="qwe"} 4.34 +`, `bar:1m_increase{baz="qwe"} 0 +foo:1m_increase 0 +`) + + // increase output for repeated series + f(` +- interval: 1m + outputs: [increase] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 +`, `bar:1m_increase{baz="qwe"} 5.02 +bar:1m_increase{baz="qwer"} 1 +foo:1m_increase 0 +foo:1m_increase{baz="qwe"} 15 +`) + + // multiple aggregate configs + f(` +- interval: 1m + outputs: [count_series, sum_samples] +- interval: 5m + by: [bar] + outputs: [sum_samples] +`, ` +foo 1 +foo{bar="baz"} 2 +foo 3.3 +`, `foo:1m_count_series 1 +foo:1m_count_series{bar="baz"} 1 +foo:1m_sum_samples 4.3 +foo:1m_sum_samples{bar="baz"} 2 +foo:5m_by_bar_sum_samples 4.3 +foo:5m_by_bar_sum_samples{bar="baz"} 2 +`) + + // min and max outputs + f(` +- interval: 1m + outputs: [min, max] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_max 5 +bar:1m_min 5 +foo:1m_max{abc="123"} 8.5 +foo:1m_max{abc="456",de="fg"} 8 +foo:1m_min{abc="123"} 4 +foo:1m_min{abc="456",de="fg"} 8 +`) + + // avg output + f(` +- interval: 1m + outputs: [avg] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_avg 5 +foo:1m_avg{abc="123"} 6.25 +foo:1m_avg{abc="456",de="fg"} 8 +`) + + // stddev output + f(` +- interval: 1m + outputs: [stddev] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_stddev 0 +foo:1m_stddev{abc="123"} 2.25 +foo:1m_stddev{abc="456",de="fg"} 0 +`) + + // stdvar output + f(` +- interval: 1m + outputs: [stdvar] +`, ` +foo{abc="123"} 4 +bar 5 +foo{abc="123"} 8.5 +foo{abc="456",de="fg"} 8 +`, `bar:1m_stdvar 0 +foo:1m_stdvar{abc="123"} 5.0625 +foo:1m_stdvar{abc="456",de="fg"} 0 +`) + + // histogram_bucket output + f(` +- interval: 1m + outputs: [histogram_bucket] +`, ` +cpu_usage{cpu="1"} 12.5 +cpu_usage{cpu="1"} 13.3 +cpu_usage{cpu="1"} 13 +cpu_usage{cpu="1"} 12 +cpu_usage{cpu="1"} 14 +cpu_usage{cpu="1"} 25 +cpu_usage{cpu="2"} 90 +`, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2 +cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3 +cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1 +cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1 +`) + + // histogram_bucket output without cpu + f(` +- interval: 1m + without: [cpu] + outputs: [histogram_bucket] +`, ` +cpu_usage{cpu="1"} 12.5 +cpu_usage{cpu="1"} 13.3 +cpu_usage{cpu="1"} 13 +cpu_usage{cpu="1"} 12 +cpu_usage{cpu="1"} 14 +cpu_usage{cpu="1"} 25 +cpu_usage{cpu="2"} 90 +`, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2 +cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3 +cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1 +cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1 +`) + + // quantiles output + f(` +- interval: 1m + outputs: ["quantiles(0, 0.5, 1)"] +`, ` +cpu_usage{cpu="1"} 12.5 +cpu_usage{cpu="1"} 13.3 +cpu_usage{cpu="1"} 13 +cpu_usage{cpu="1"} 12 +cpu_usage{cpu="1"} 14 +cpu_usage{cpu="1"} 25 +cpu_usage{cpu="2"} 90 +`, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12 +cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3 +cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25 +cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90 +cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90 +cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90 +`) + + // quantiles output without cpu + f(` +- interval: 1m + without: [cpu] + outputs: ["quantiles(0, 0.5, 1)"] +`, ` +cpu_usage{cpu="1"} 12.5 +cpu_usage{cpu="1"} 13.3 +cpu_usage{cpu="1"} 13 +cpu_usage{cpu="1"} 12 +cpu_usage{cpu="1"} 14 +cpu_usage{cpu="1"} 25 +cpu_usage{cpu="2"} 90 +`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12 +cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3 +cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 +`) +} + +func timeSeriesToString(ts prompbmarshal.TimeSeries) string { + labelsString := promrelabel.LabelsToString(ts.Labels) + if len(ts.Samples) != 1 { + panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples))) + } + return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value) +} + +func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries { + var rows prometheus.Rows + errLogger := func(s string) { + panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s)) + } + rows.UnmarshalWithErrLogger(s, errLogger) + var tss []prompbmarshal.TimeSeries + samples := make([]prompbmarshal.Sample, 0, len(rows.Rows)) + for _, row := range rows.Rows { + labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1) + labels = append(labels, prompbmarshal.Label{ + Name: "__name__", + Value: row.Metric, + }) + for _, tag := range row.Tags { + labels = append(labels, prompbmarshal.Label{ + Name: tag.Key, + Value: tag.Value, + }) + } + samples = append(samples, prompbmarshal.Sample{ + Value: row.Value, + Timestamp: row.Timestamp, + }) + ts := prompbmarshal.TimeSeries{ + Labels: labels, + Samples: samples[len(samples)-1:], + } + tss = append(tss, ts) + } + return tss +} diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go new file mode 100644 index 000000000..6ff151241 --- /dev/null +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -0,0 +1,73 @@ +package streamaggr + +import ( + "fmt" + "strings" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func BenchmarkAggregatorsPushByJobAvg(b *testing.B) { + for _, output := range []string{ + "total", + "increase", + "count_series", + "count_samples", + "sum_samples", + "last", + "min", + "max", + "avg", + "stddev", + "stdvar", + "histogram_bucket", + "quantiles(0, 0.5, 1)", + } { + b.Run(fmt.Sprintf("output=%s", output), func(b *testing.B) { + benchmarkAggregatorsPush(b, output) + }) + } +} + +func benchmarkAggregatorsPush(b *testing.B, output string) { + config := fmt.Sprintf(` +- match: http_requests_total + interval: 24h + without: [job] + outputs: [%q] +`, output) + pushFunc := func(tss []prompbmarshal.TimeSeries) { + panic(fmt.Errorf("unexpected pushFunc call")) + } + a, err := NewAggregatorsFromData([]byte(config), pushFunc) + if err != nil { + b.Fatalf("unexpected error when initializing aggregators: %s", err) + } + defer a.MustStop() + + b.ReportAllocs() + b.SetBytes(int64(len(benchSeries))) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.Push(benchSeries) + } + }) +} + +func newBenchSeries(seriesCount, samplesPerSeries int) []prompbmarshal.TimeSeries { + a := make([]string, seriesCount*samplesPerSeries) + for i := 0; i < samplesPerSeries; i++ { + for j := 0; j < seriesCount; j++ { + s := fmt.Sprintf(`http_requests_total{path="/foo/%d",job="foo",instance="bar"} %d`, j, i*10) + a = append(a, s) + } + } + metrics := strings.Join(a, "\n") + return mustParsePromMetrics(metrics) +} + +const seriesCount = 10000 +const samplesPerSeries = 10 + +var benchSeries = newBenchSeries(seriesCount, samplesPerSeries) diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go new file mode 100644 index 000000000..ba349a080 --- /dev/null +++ b/lib/streamaggr/sum_samples.go @@ -0,0 +1,71 @@ +package streamaggr + +import ( + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples. +type sumSamplesAggrState struct { + m sync.Map +} + +type sumSamplesStateValue struct { + mu sync.Mutex + sum float64 + deleted bool +} + +func newSumSamplesAggrState() *sumSamplesAggrState { + return &sumSamplesAggrState{} +} + +func (as *sumSamplesAggrState) pushSample(inputKey, outputKey string, value float64) { +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &sumSamplesStateValue{ + sum: value, + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + return + } + // Use the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*sumSamplesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.sum += value + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*sumSamplesStateValue) + sv.mu.Lock() + sum := sv.sum + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "sum_samples", currentTimeMsec, sum) + return true + }) +} diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go new file mode 100644 index 000000000..b2e935acc --- /dev/null +++ b/lib/streamaggr/total.go @@ -0,0 +1,137 @@ +package streamaggr + +import ( + "math" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// totalAggrState calculates output=total, e.g. the summary counter over input counters. +type totalAggrState struct { + m sync.Map + + ignoreInputDeadline uint64 + intervalSecs uint64 +} + +type totalStateValue struct { + mu sync.Mutex + lastValues map[string]*lastValueState + total float64 + deleteDeadline uint64 + deleted bool +} + +type lastValueState struct { + value float64 + deleteDeadline uint64 +} + +func newTotalAggrState(interval time.Duration) *totalAggrState { + currentTime := fasttime.UnixTimestamp() + intervalSecs := uint64(interval.Seconds() + 1) + return &totalAggrState{ + ignoreInputDeadline: currentTime + intervalSecs, + intervalSecs: intervalSecs, + } +} + +func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) { + currentTime := fasttime.UnixTimestamp() + deleteDeadline := currentTime + as.intervalSecs + (as.intervalSecs >> 1) + +again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &totalStateValue{ + lastValues: make(map[string]*lastValueState), + } + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*totalStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + lv, ok := sv.lastValues[inputKey] + if !ok { + lv = &lastValueState{} + sv.lastValues[inputKey] = lv + } + d := value + if ok && lv.value <= value { + d = value - lv.value + } + if ok || currentTime > as.ignoreInputDeadline { + sv.total += d + } + lv.value = value + lv.deleteDeadline = deleteDeadline + sv.deleteDeadline = deleteDeadline + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } +} + +func (as *totalAggrState) removeOldEntries(currentTime uint64) { + m := &as.m + m.Range(func(k, v interface{}) bool { + sv := v.(*totalStateValue) + + sv.mu.Lock() + deleted := currentTime > sv.deleteDeadline + if deleted { + // Mark the current entry as deleted + sv.deleted = deleted + } else { + // Delete outdated entries in sv.lastValues + m := sv.lastValues + for k1, v1 := range m { + if currentTime > v1.deleteDeadline { + delete(m, k1) + } + } + } + sv.mu.Unlock() + + if deleted { + m.Delete(k) + } + return true + }) +} + +func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTime := fasttime.UnixTimestamp() + currentTimeMsec := int64(currentTime) * 1000 + + as.removeOldEntries(currentTime) + + m := &as.m + m.Range(func(k, v interface{}) bool { + sv := v.(*totalStateValue) + sv.mu.Lock() + total := sv.total + if math.Abs(sv.total) >= (1 << 53) { + // It is time to reset the entry, since it starts losing float64 precision + sv.total = 0 + } + deleted := sv.deleted + sv.mu.Unlock() + if !deleted { + key := k.(string) + ctx.appendSeries(key, "total", currentTimeMsec, total) + } + return true + }) +}