lib/streamaggr: huge pile of changes

- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
  for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
  deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
  metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
  per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
  This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
  - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
  - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
  - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
  - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
  - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
  - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
  - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
    which took longer than the configured interval
  - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
    which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md

The memory usage reduction increases CPU usage during stream aggregation by up to 30%.

This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
This commit is contained in:
Aliaksandr Valialkin 2024-03-02 02:42:26 +02:00
parent eb8e95516f
commit 28a9e92b5e
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
25 changed files with 1834 additions and 791 deletions

View file

@ -30,6 +30,12 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). This is expecially visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states.
## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0)

View file

@ -13,7 +13,8 @@ aliases:
# Streaming aggregation
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage.
can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage
(or local storage for single-node VictoriaMetrics).
The aggregation is applied to all the metrics received via any [supported data ingestion protocol](https://docs.victoriametrics.com/#how-to-import-time-series-data)
and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter)
after applying all the configured [relabeling stages](https://docs.victoriametrics.com/vmagent.html#relabeling).
@ -40,22 +41,26 @@ This behaviour can be changed via the following command-line flags:
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.keepInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
If one of these flags are set, then all the input samples are written to the storage alongside the aggregated samples.
If one of these flags is set, then all the input samples are written to the storage alongside the aggregated samples.
The `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`.
- `-remoteWrite.streamAggr.dropInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.dropInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
If one of these flags are set, then all the input samples are dropped, while only the aggregated samples are written to the storage.
The `-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`.
By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation.
For example, if the samples are received from replicated sources.
The following command-line flag can be used for enabling the [de-duplication](https://docs.victoriametrics.com/#deduplication)
before aggregation in this case:
## Deduplication
- `-remoteWrite.streamAggr.dedupInterval` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples for the same [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
before the aggregation. For example, if the samples are received from replicated sources.
The [de-duplication](https://docs.victoriametrics.com/#deduplication) can be enabled via the following options:
- `-remoteWrite.streamAggr.dedupInterval` command-line flag at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each `-remoteWrite.url`.
This allows setting different de-duplication intervals per each configured remote storage.
- `-streamAggr.dedupInterval` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
- `-streamAggr.dedupInterval` command-line flag at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
- `dedup_interval` option per each [aggregate config](#stream-aggregation-config).
De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling).
## Use cases
@ -387,6 +392,8 @@ Output metric names for stream aggregation are constructed according to the foll
Both input and output metric names can be modified if needed via relabeling according to [these docs](#relabeling).
It is possible to leave the original metric name after the aggregation by specifying `keep_metric_names: true` option at [stream aggregation config](#stream-aggregation-config).
The `keep_metric_names` option can be used only if only a single output is set in [`outputs` list](#aggregation-outputs).
## Relabeling
@ -410,7 +417,7 @@ For example, the following config removes the `:1m_sum_samples` suffix added [to
## Aggregation outputs
The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config)
and then sent to the storage once per `interval`.
and then sent to the storage once per `interval`. The aggregated samples are named according to [output metric naming](#output-metric-names).
If `by` and `without` lists are specified in the [config](#stream-aggregation-config),
then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`.
@ -419,159 +426,312 @@ On vmagent shutdown or [configuration reload](#configuration-update) unfinished
as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in
aggregation config to make vmagent to send unfinished states to the remote storage.
Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config).
Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config):
### total
* [avg](#avg)
* [count_samples](#count_samples)
* [count_series](#count_series)
* [increase](#increase)
* [increase_prometheus](#increase_prometheus)
* [histogram_bucket](#histogram_bucket)
* [last](#last)
* [max](#max)
* [min](#min)
* [stddev](#stddev)
* [stdvar](#stdvar)
* [sum_samples](#sum_samples)
* [total](#total)
* [total_prometheus](#total_prometheus)
* [unique_samples](#unique_samples)
* [quantiles](#quantiles)
`total` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters.
`total` only makes sense for aggregating [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) metrics.
### avg
The results of `total` is equal to the `sum(some_counter)` query.
`avg` returns the average over input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
`avg` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge).
The results of `avg` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
sum(sum_over_time(some_metric[interval])) / sum(count_over_time(some_metric[interval]))
```
For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query:
<img alt="total aggregation" src="stream-aggregation-check-total.webp">
<img alt="avg aggregation" src="stream-aggregation-check-avg.webp">
`total` is not affected by [counter resets](https://docs.victoriametrics.com/keyConcepts.html#counter) -
it continues to increase monotonically with respect to the previous value.
The counters are most often reset when the application is restarted.
See also [min](#min), [max](#max), [sum_samples](#sum_samples) and [count_samples](#count_samples).
For example:
### count_samples
<img alt="total aggregation counter reset" src="stream-aggregation-check-total-reset.webp">
`count_samples` counts the number of input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`.
The same behavior will occur when creating or deleting new series in an aggregation group -
`total` will increase monotonically considering the values of the series set.
An example of changing a set of series can be restarting a pod in the Kubernetes.
This changes a label with pod's name in the series, but `total` account for such a scenario and do not reset the state of aggregated metric.
The results of `count_samples` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_inteval](#stream-aggregation-config).
```metricsql
sum(count_over_time(some_metric[interval]))
```
See also [count_series](#count_series) and [sum_samples](#sum_samples).
### count_series
`count_series` counts the number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) over the given `interval`.
The results of `count_series` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
count(last_over_time(some_metric[interval]))
```
See also [count_samples](#count_samples) and [unique_samples](#unique_samples).
### increase
`increase` returns the increase of input [counters](https://docs.victoriametrics.com/keyConcepts.html#counter).
`increase` only makes sense for aggregating [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) metrics.
`increase` returns the increase of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given 'interval'.
`increase` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter).
The results of `increase` with aggregation interval of `1m` is equal to the `increase(some_counter[1m])` query.
The results of `increase` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query:
```metricsql
sum(increase_pure(some_counter[interval]))
```
`increase` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
is `10`, then `increase` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series,
then take a look at [increase_prometheus](#increase_prometheus).
For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query:
<img alt="increase aggregation" src="stream-aggregation-check-increase.webp">
`increase` can be used as an alternative for [rate](https://docs.victoriametrics.com/MetricsQL.html#rate) function.
For example, if we have `increase` with `interval` of `5m` for a counter `some_counter`, then to get `rate` we should divide
the resulting aggregation by the `interval` in seconds: `some_counter:5m_increase / 5m` is similar to `rate(some_counter[5m])`.
For example, if `increase` is calculated for `some_counter` with `interval: 5m`, then `rate` can be calculated
by dividing the resulting aggregation by `5m`:
```metricsql
some_counter:5m_increase / 5m
```
This is similar to `rate(some_counter[5m])`.
Please note, opposite to [rate](https://docs.victoriametrics.com/MetricsQL.html#rate), `increase` aggregations can be
combined safely afterwards. This is helpful when the aggregation is calculated by more than one vmagent.
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_inteval](#stream-aggregation-config).
### count_series
See also [increase_prometheus](#increase_prometheus) and [total](#total).
`count_series` counts the number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series).
### increase_prometheus
The results of `count_series` is equal to the `count(some_metric)` query.
`increase_prometheus` returns the increase of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`.
`increase_prometheus` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter).
### count_samples
The results of `increase_prometheus` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
`count_samples` counts the number of input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
```metricsql
sum(increase_prometheus(some_counter[interval]))
```
The results of `count_samples` with aggregation interval of `1m` is equal to the `count_over_time(some_metric[1m])` query.
`increase_prometheus` skips the first seen sample value per each [time series](https://docs.victoriametrics.com/keyconcepts/#time-series).
If you need taking into account the first sample per time series, then take a look at [increase](#increase).
### sum_samples
See also [increase](#increase), [total](#total) and [total_prometheus](#total_prometheus).
`sum_samples` sums input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
`sum_samples` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics.
### histogram_bucket
The results of `sum_samples` with aggregation interval of `1m` is equal to the `sum_over_time(some_metric[1m])` query.
`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350)
for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`.
`histogram_bucket` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge).
See how to aggregate regular histograms [here](#aggregating-histograms).
For example, see below time series produced by config with aggregation interval `1m` and the regular query:
The results of `histogram_bucket` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
<img alt="sum_samples aggregation" src="stream-aggregation-check-sum-samples.webp">
```metricsql
sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange)
```
See also [quantiles](#quantiles), [min](#min), [max](#max) and [avg](#avg).
### last
`last` returns the last input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
`last` returns the last input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`.
The results of `last` with aggregation interval of `1m` is equal to the `last_over_time(some_metric[1m])` query.
The results of `last` is roughly equal to the the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
This aggregation output doesn't make much sense with `by` lists specified in the [config](#stream-aggregation-config).
The result of aggregation by labels in this case will be undetermined, because it depends on the order of processing the time series.
```metricsql
last_over_time(some_metric[interval])
```
### min
`min` returns the minimum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
The results of `min` with aggregation interval of `1m` is equal to the `min_over_time(some_metric[1m])` query.
For example, see below time series produced by config with aggregation interval `1m` and the regular query:
<img alt="min aggregation" src="stream-aggregation-check-min.webp">
See also [min](#min), [max](#max) and [avg](#avg).
### max
`max` returns the maximum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
`max` returns the maximum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`.
The results of `max` with aggregation interval of `1m` is equal to the `max_over_time(some_metric[1m])` query.
The results of `max` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
max(max_over_time(some_metric[interval]))
```
For example, see below time series produced by config with aggregation interval `1m` and the regular query:
<img alt="total aggregation" src="stream-aggregation-check-max.webp">
### avg
See also [min](#min) and [avg](#avg).
`avg` returns the average input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
### min
The results of `avg` with aggregation interval of `1m` is equal to the `avg_over_time(some_metric[1m])` query.
`min` returns the minimum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`.
For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query:
The results of `min` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
<img alt="avg aggregation" src="stream-aggregation-check-avg.webp">
```metricsql
min(min_over_time(some_metric[interval]))
```
For example, see below time series produced by config with aggregation interval `1m` and the regular query:
<img alt="min aggregation" src="stream-aggregation-check-min.webp">
See also [max](#max) and [avg](#avg).
### stddev
`stddev` returns [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
`stddev` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics.
`stddev` returns [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples)
over the given `interval`.
`stddev` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge).
The results of `stddev` with aggregation interval of `1m` is equal to the `stddev_over_time(some_metric[1m])` query.
The results of `stddev` is roughly equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
histogram_stddev(sum(histogram_over_time(some_metric[interval])) by (vmrange))
```
See also [stdvar](#stdvar) and [avg](#avg).
### stdvar
`stdvar` returns [standard variance](https://en.wikipedia.org/wiki/Variance) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
`stdvar` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics.
`stdvar` returns [standard variance](https://en.wikipedia.org/wiki/Variance) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples)
over the given `interval`.
`stdvar` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge).
The results of `stdvar` with aggregation interval of `1m` is equal to the `stdvar_over_time(some_metric[1m])` query.
The results of `stdvar` is roughly equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
histogram_stdvar(sum(histogram_over_time(some_metric[interval])) by (vmrange))
```
For example, see below time series produced by config with aggregation interval `1m` and the regular query:
<img alt="stdvar aggregation" src="stream-aggregation-check-stdvar.webp">
### histogram_bucket
See also [stddev](#stddev) and [avg](#avg).
`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350)
for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
`histogram_bucket` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics.
See how to aggregate regular histograms [here](#aggregating-histograms).
### sum_samples
The results of `histogram_bucket` with aggregation interval of `1m` is equal to the `histogram_over_time(some_histogram_bucket[1m])` query.
`sum_samples` sums input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`.
`sum_samples` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge).
The results of `sum_samples` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
sum(sum_over_time(some_metric[interval]))
```
For example, see below time series produced by config with aggregation interval `1m` and the regular query:
<img alt="sum_samples aggregation" src="stream-aggregation-check-sum-samples.webp">
See also [count_samples](#count_samples) and [count_series](#count_series).
### total
`total` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters over the given `interval`.
`total` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter).
The results of `total` is roughly equal to the the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
sum(running_sum(increase_pure(some_counter)))
```
`total` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
is `10`, then `total` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series,
then take a look at [total_prometheus](#total_prometheus).
For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query:
<img alt="total aggregation" src="stream-aggregation-check-total.webp">
`total` is not affected by [counter resets](https://docs.victoriametrics.com/keyConcepts.html#counter) -
it continues to increase monotonically with respect to the previous value.
The counters are most often reset when the application is restarted.
For example:
<img alt="total aggregation counter reset" src="stream-aggregation-check-total-reset.webp">
The same behavior occurs when creating or deleting new series in an aggregation group -
`total` output increases monotonically considering the values of the series set.
An example of changing a set of series can be restarting a pod in the Kubernetes.
This changes pod name label, but the `total` accounts for such a scenario and doesn't reset the state of aggregated metric.
Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/)
or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_inteval](#stream-aggregation-config).
See also [total_prometheus](#total_prometheus), [increase](#increase) and [increase_prometheus](#increase_prometheus).
### total_prometheus
`total_prometheus` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters over the given `interval`.
`total_prometheus` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter).
The results of `total_prometheus` is roughly equal to the the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
sum(running_sum(increase_prometheus(some_counter)))
```
`total_prometheus` skips the first seen sample value per each [time series](https://docs.victoriametrics.com/keyconcepts/#time-series).
If you need taking into account the first sample per time series, then take a look at [total](#total).
`total_prometheus` is not affected by [counter resets](https://docs.victoriametrics.com/keyConcepts.html#counter) -
it continues to increase monotonically with respect to the previous value.
The counters are most often reset when the application is restarted.
See also [total](#total), [increase](#increase) and [increase_prometheus](#increase_prometheus).
### unique_samples
`unique_samples` counts the number of unique sample values over the given `interval`.
`unique_samples` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge).
The results of `unique_samples` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
count(count_values_over_time(some_metric[interval]))
```
See also [sum_samples](#sum_samples) and [count_series](#count_series).
### quantiles
`quantiles(phi1, ..., phiN)` returns [percentiles](https://en.wikipedia.org/wiki/Percentile) for the given `phi*`
over the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
The `phi` must be in the range `[0..1]`, where `0` means `0th` percentile, while `1` means `100th` percentile.
`quantiles(...)` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics.
over the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) on the given `interval`.
`phi` must be in the range `[0..1]`, where `0` means `0th` percentile, while `1` means `100th` percentile.
`quantiles(...)` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge).
The results of `quantiles(phi1, ..., phiN)` with aggregation interval of `1m`
is equal to the `quantiles_over_time("quantile", phi1, ..., phiN, some_histogram_bucket[1m])` query.
The results of `quantiles(phi1, ..., phiN)` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query:
```metricsql
histogram_quantiles("quantile", phi1, ..., phiN, sum(histogram_over_time(some_metric[interval])) by (vmrange))
```
See also [histogram_bucket](#histogram_bucket), [min](#min), [max](#max) and [avg](#avg).
Please note, `quantiles` aggregation won't produce correct results when vmagent is in [cluster mode](#cluster-mode)
since percentiles should be calculated only on the whole matched data set.
## Aggregating by labels
@ -635,13 +795,21 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
#
interval: 1m
# staleness_interval defines an interval after which the series state will be reset if no samples have been sent during it.
# It means that:
# dedup_interval is an optional interval for de-duplication of input samples before the aggregation.
# Samples are de-duplicated on a per-series basis. See https://docs.victoriametrics.com/keyconcepts/#time-series
# and https://docs.victoriametrics.com/#deduplication
# The deduplication is performed after input_relabel_configs relabeling is applied.
# By default the deduplication is disabled.
#
# dedup_interval: 30s
# staleness_interval is an optional interval after which the series state will be reset if no samples have been sent during it.
# This means that:
# - no data point will be written for a resulting time series if it didn't receive any updates during configured interval,
# - if the series receives updates after the configured interval again, then the time series will be calculated from the initial state
# (it's like this series didn't exist until now).
# Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value.
# By default, is equal to x2 of the `interval` field.
# By default, it equals to x2 of the `interval` field.
# The parameter is only relevant for outputs: total, increase and histogram_bucket.
#
# staleness_interval: 2m
@ -649,7 +817,8 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts
# or config reloads. It is not recommended changing this setting, unless unfinished aggregations states
# are preferred to missing data points.
# Is `false` by default.
# Unfinished aggregation states aren't flushed on shutdown by default.
#
# flush_on_shutdown: false
# without is an optional list of labels, which must be removed from the output aggregation.
@ -667,6 +836,13 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
#
outputs: [total]
# keep_metric_names instructs keeping the original metric names for the aggregated samples.
# This option can be set only if outputs list contains only a single output.
# By default a special suffix is added to original metric names in the aggregated samples.
# See https://docs.victoriametrics.com/stream-aggregation/#output-metric-names
#
# keep_metric_names: false
# input_relabel_configs is an optional relabeling rules,
# which are applied to the incoming samples after they pass the match filter
# and before being aggregated.

View file

@ -0,0 +1,125 @@
package promutils
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// LabelsCompressor compresses []prompbmarshal.Label into short binary strings
type LabelsCompressor struct {
labelToIdx sync.Map
idxToLabel sync.Map
nextIdx atomic.Uint64
totalSizeBytes atomic.Uint64
}
// SizeBytes returns the size of lc data in bytes
func (lc *LabelsCompressor) SizeBytes() uint64 {
return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load()
}
// ItemsCount returns the number of items in lc
func (lc *LabelsCompressor) ItemsCount() uint64 {
return lc.nextIdx.Load()
}
// Compress compresses labels, appends the compressed labels to dst and returns the result.
func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) []byte {
if len(labels) == 0 {
// Fast path
return append(dst, 0)
}
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) {
if len(labels) == 0 {
return
}
_ = dst[len(labels)-1]
for i := range labels {
label := &labels[i]
v, ok := lc.labelToIdx.Load(*label)
if !ok {
v = lc.nextIdx.Add(1)
labelCopy := cloneLabel(label)
lc.idxToLabel.Store(v, labelCopy)
lc.labelToIdx.Store(*labelCopy, v)
// Update lc.totalSizeBytes
labelSizeBytes := uint64(len(label.Name) + len(label.Value))
entrySizeBytes := labelSizeBytes + uint64(unsafe.Sizeof(label)+unsafe.Sizeof(*label)+2*unsafe.Sizeof(v))
lc.totalSizeBytes.Add(entrySizeBytes)
}
dst[i] = v.(uint64)
}
}
func cloneLabel(label *prompbmarshal.Label) *prompbmarshal.Label {
// pre-allocate memory for label name and value
n := len(label.Name) + len(label.Value)
buf := make([]byte, 0, n)
buf = append(buf, label.Name...)
labelName := bytesutil.ToUnsafeString(buf)
buf = append(buf, label.Value...)
labelValue := bytesutil.ToUnsafeString(buf[len(labelName):])
return &prompbmarshal.Label{
Name: labelName,
Value: labelValue,
}
}
// Decompress decompresses src into []prompbmarshal.Label, appends it to dst and returns the result.
func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []prompbmarshal.Label {
tail, labelsLen, err := encoding.UnmarshalVarUint64(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels length: %s", err)
}
if labelsLen == 0 {
// fast path - nothing to decode
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail)
}
return dst
}
a := encoding.GetUint64s(int(labelsLen))
tail, err = encoding.UnmarshalVarUint64s(a.A, tail)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err)
}
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressor) decompress(dst []prompbmarshal.Label, src []uint64) []prompbmarshal.Label {
for _, idx := range src {
v, ok := lc.idxToLabel.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
label := *(v.(*prompbmarshal.Label))
dst = append(dst, label)
}
return dst
}

View file

@ -0,0 +1,119 @@
package promutils
import (
"fmt"
"sync"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestLabelsCompressorSerial(t *testing.T) {
var lc LabelsCompressor
f := func(labels []prompbmarshal.Label) {
t.Helper()
sExpected := labelsToString(labels)
data := lc.Compress(nil, labels)
labelsResult := lc.Decompress(nil, data)
sResult := labelsToString(labelsResult)
if sExpected != sResult {
t.Fatalf("unexpected result; got %s; want %s", sResult, sExpected)
}
if len(labels) > 0 {
if n := lc.SizeBytes(); n == 0 {
t.Fatalf("Unexpected zero SizeBytes()")
}
if n := lc.ItemsCount(); n == 0 {
t.Fatalf("Unexpected zero ItemsCount()")
}
}
}
// empty labels
f(nil)
f([]prompbmarshal.Label{})
// non-empty labels
f([]prompbmarshal.Label{
{
Name: "instance",
Value: "12345.4342.342.3",
},
{
Name: "job",
Value: "kube-pod-12323",
},
})
f([]prompbmarshal.Label{
{
Name: "instance",
Value: "12345.4342.342.3",
},
{
Name: "job",
Value: "kube-pod-12323",
},
{
Name: "pod",
Value: "foo-bar-baz",
},
})
}
func TestLabelsCompressorConcurrent(t *testing.T) {
const concurrency = 5
var lc LabelsCompressor
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
series := newTestSeries(100, 20)
for i, labels := range series {
sExpected := labelsToString(labels)
data := lc.Compress(nil, labels)
labelsResult := lc.Decompress(nil, data)
sResult := labelsToString(labelsResult)
if sExpected != sResult {
panic(fmt.Errorf("unexpected result on iteration %d; got %s; want %s", i, sResult, sExpected))
}
}
}()
}
wg.Wait()
if n := lc.SizeBytes(); n == 0 {
t.Fatalf("Unexpected zero SizeBytes()")
}
if n := lc.ItemsCount(); n == 0 {
t.Fatalf("Unexpected zero ItemsCount()")
}
}
func labelsToString(labels []prompbmarshal.Label) string {
l := Labels{
Labels: labels,
}
return l.String()
}
func newTestSeries(seriesCount, labelsPerSeries int) [][]prompbmarshal.Label {
series := make([][]prompbmarshal.Label, seriesCount)
for i := 0; i < seriesCount; i++ {
labels := make([]prompbmarshal.Label, labelsPerSeries)
for j := 0; j < labelsPerSeries; j++ {
labels[j] = prompbmarshal.Label{
Name: fmt.Sprintf("label_%d", j),
Value: fmt.Sprintf("value_%d_%d", i, j),
}
}
series[i] = labels
}
return series
}

View file

@ -0,0 +1,54 @@
package promutils
import (
"sync/atomic"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func BenchmarkLabelsCompressorCompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var dst []byte
for pb.Next() {
dst = dst[:0]
for _, labels := range series {
dst = lc.Compress(dst, labels)
}
Sink.Add(uint64(len(dst)))
}
})
}
func BenchmarkLabelsCompressorDecompress(b *testing.B) {
var lc LabelsCompressor
series := newTestSeries(100, 10)
datas := make([][]byte, len(series))
var dst []byte
for i, labels := range series {
dstLen := len(dst)
dst = lc.Compress(dst, labels)
datas[i] = dst[dstLen:]
}
b.ReportAllocs()
b.SetBytes(int64(len(series)))
b.RunParallel(func(pb *testing.PB) {
var labels []prompbmarshal.Label
for pb.Next() {
for _, data := range datas {
labels = lc.Decompress(labels[:0], data)
}
Sink.Add(uint64(len(labels)))
}
})
}
var Sink atomic.Uint64

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -22,35 +23,41 @@ func newAvgAggrState() *avgAggrState {
return &avgAggrState{}
}
func (as *avgAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &avgStateValue{
sum: value,
count: 1,
func (as *avgAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &avgStateValue{
sum: s.value,
count: 1,
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been successfully stored
continue
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been successfully stored
return
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += s.value
sv.count++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += value
sv.count++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -1,12 +1,13 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesAggrState calculates output=countSamples, e.g. the count of input samples.
// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples.
type countSamplesAggrState struct {
m sync.Map
}
@ -21,33 +22,39 @@ func newCountSamplesAggrState() *countSamplesAggrState {
return &countSamplesAggrState{}
}
func (as *countSamplesAggrState) pushSample(_, outputKey string, _ float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSamplesStateValue{
n: 1,
func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSamplesStateValue{
n: 1,
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.n++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.n++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -1,9 +1,12 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/cespare/xxhash/v2"
)
// countSeriesAggrState calculates output=count_series, e.g. the number of unique series.
@ -12,50 +15,57 @@ type countSeriesAggrState struct {
}
type countSeriesStateValue struct {
mu sync.Mutex
countedSeries map[string]struct{}
n uint64
deleted bool
mu sync.Mutex
m map[uint64]struct{}
deleted bool
}
func newCountSeriesAggrState() *countSeriesAggrState {
return &countSeriesAggrState{}
}
func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, _ float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSeriesStateValue{
countedSeries: map[string]struct{}{
inputKey: {},
},
n: 1,
func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
// Count unique hashes over the inputKeys instead of unique inputKey values.
// This reduces memory usage at the cost of possible hash collisions for distinct inputKey values.
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(inputKey))
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSeriesStateValue{
m: map[uint64]struct{}{
h: {},
},
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been added to the map.
continue
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The entry has been added to the map.
return
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.m[h]; !ok {
sv.m[h] = struct{}{}
}
}
// Update the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.countedSeries[inputKey]; !ok {
sv.countedSeries[inputKey] = struct{}{}
sv.n++
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
@ -67,7 +77,7 @@ func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := sv.n
n := len(sv.m)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

185
lib/streamaggr/dedup.go Normal file
View file

@ -0,0 +1,185 @@
package streamaggr
import (
"strings"
"sync"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/cespare/xxhash/v2"
)
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
}
type dedupAggrShard struct {
dedupAggrShardNopad
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte
}
type dedupAggrShardNopad struct {
mu sync.Mutex
m map[string]*dedupAggrSample
}
type dedupAggrSample struct {
value float64
}
func newDedupAggr() *dedupAggr {
shards := make([]dedupAggrShard, dedupAggrShardsCount)
return &dedupAggr{
shards: shards,
}
}
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
for i := range da.shards {
n += da.shards[i].sizeBytes()
}
return n
}
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
for i := range da.shards {
n += da.shards[i].itemsCount()
}
return n
}
func (das *dedupAggrShard) sizeBytes() uint64 {
das.mu.Lock()
n := uint64(unsafe.Sizeof(*das))
for k, s := range das.m {
n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s)+unsafe.Sizeof(*s))
}
das.mu.Unlock()
return n
}
func (das *dedupAggrShard) itemsCount() uint64 {
das.mu.Lock()
n := uint64(len(das.m))
das.mu.Unlock()
return n
}
func (da *dedupAggr) pushSamples(samples []pushSample) {
pss := getPerShardSamples()
shards := pss.shards
for _, sample := range samples {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(sample.key))
idx := h % uint64(len(shards))
shards[idx] = append(shards[idx], sample)
}
for i, shardSamples := range shards {
if len(shardSamples) == 0 {
continue
}
da.shards[i].pushSamples(shardSamples)
}
putPerShardSamples(pss)
}
type dedupFlushCtx struct {
samples []pushSample
}
func (ctx *dedupFlushCtx) reset() {
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
}
func (da *dedupAggr) flush(f func(samples []pushSample)) {
ctx := &dedupFlushCtx{}
shards := da.shards
for i := range shards {
ctx.reset()
shards[i].flush(ctx, f)
}
}
type perShardSamples struct {
shards [][]pushSample
}
func (pss *perShardSamples) reset() {
shards := pss.shards
for i, shardSamples := range shards {
if len(shardSamples) > 0 {
clear(shardSamples)
shards[i] = shardSamples[:0]
}
}
}
func getPerShardSamples() *perShardSamples {
v := perShardSamplesPool.Get()
if v == nil {
return &perShardSamples{
shards: make([][]pushSample, dedupAggrShardsCount),
}
}
return v.(*perShardSamples)
}
func putPerShardSamples(pss *perShardSamples) {
pss.reset()
perShardSamplesPool.Put(pss)
}
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample) {
das.mu.Lock()
defer das.mu.Unlock()
m := das.m
if m == nil {
m = make(map[string]*dedupAggrSample, len(samples))
das.m = m
}
for _, sample := range samples {
s, ok := m[sample.key]
if ok {
s.value = sample.value
} else {
key := strings.Clone(sample.key)
m[key] = &dedupAggrSample{
value: sample.value,
}
}
}
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
das.mu.Lock()
m := das.m
das.m = nil
das.mu.Unlock()
if len(m) == 0 {
return
}
dstSamples := ctx.samples
for key, s := range m {
dstSamples = append(dstSamples, pushSample{
key: key,
value: s.value,
})
}
ctx.samples = dstSamples
f(dstSamples)
}

View file

@ -0,0 +1,88 @@
package streamaggr
import (
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
)
func TestDedupAggrSerial(t *testing.T) {
da := newDedupAggr()
const seriesCount = 100_000
expectedSamplesMap := make(map[string]pushSample)
for i := 0; i < 2; i++ {
samples := make([]pushSample, seriesCount)
for j := range samples {
sample := &samples[j]
sample.key = fmt.Sprintf("key_%d", j)
sample.value = float64(i + j)
expectedSamplesMap[sample.key] = *sample
}
da.pushSamples(samples)
}
if n := da.sizeBytes(); n > 4_200_000 {
t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n)
}
if n := da.itemsCount(); n != seriesCount {
t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount)
}
flushedSamplesMap := make(map[string]pushSample)
flushSamples := func(samples []pushSample) {
for _, sample := range samples {
sample.key = strings.Clone(sample.key)
flushedSamplesMap[sample.key] = sample
}
}
da.flush(flushSamples)
if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
}
if n := da.sizeBytes(); n > 17_000 {
t.Fatalf("too big dedupAggr state after flush; %d bytes; it shouldn't exceed 17_000 bytes", n)
}
if n := da.itemsCount(); n != 0 {
t.Fatalf("unexpected non-zero itemsCount after flush; got %d", n)
}
}
func TestDedupAggrConcurrent(t *testing.T) {
const concurrency = 5
const seriesCount = 10_000
da := newDedupAggr()
var samplesFlushed atomic.Int64
flushSamples := func(samples []pushSample) {
samplesFlushed.Add(int64(len(samples)))
}
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
samples := make([]pushSample, seriesCount)
for j := range samples {
sample := &samples[j]
sample.key = fmt.Sprintf("key_%d", j)
sample.value = float64(i + j)
}
da.pushSamples(samples)
}
da.flush(flushSamples)
}()
}
wg.Wait()
if n := samplesFlushed.Load(); n < seriesCount {
t.Fatalf("too small number of series flushed; got %d; want at least %d", n, seriesCount)
}
}

View file

@ -0,0 +1,48 @@
package streamaggr
import (
"fmt"
"sync/atomic"
"testing"
)
func BenchmarkDedupAggr(b *testing.B) {
for _, samplesPerPush := range []int{1, 10, 100, 1_000, 10_000, 100_000} {
b.Run(fmt.Sprintf("samplesPerPush_%d", samplesPerPush), func(b *testing.B) {
benchmarkDedupAggr(b, samplesPerPush)
})
}
}
func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
flushSamples := func(samples []pushSample) {
Sink.Add(uint64(len(samples)))
}
const loops = 2
benchSamples := newBenchSamples(samplesPerPush)
da := newDedupAggr()
b.ReportAllocs()
b.SetBytes(int64(samplesPerPush * loops))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for i := 0; i < loops; i++ {
da.pushSamples(benchSamples)
}
da.flush(flushSamples)
}
})
}
func newBenchSamples(count int) []pushSample {
samples := make([]pushSample, count)
for i := range samples {
sample := &samples[i]
sample.key = fmt.Sprintf("key_%d", i)
sample.value = float64(i)
}
return samples
}
var Sink atomic.Uint64

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"math"
"strings"
"sync"
"time"
@ -9,7 +10,7 @@ import (
"github.com/VictoriaMetrics/metrics"
)
// histogramBucketAggrState calculates output=histogramBucket, e.g. VictoriaMetrics histogram over input samples.
// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct {
m sync.Map
@ -30,33 +31,38 @@ func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBuck
}
}
func (as *histogramBucketAggrState) pushSample(_, outputKey string, value float64) {
func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &histogramBucketStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &histogramBucketStateValue{}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(s.value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
sv := v.(*histogramBucketStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -1,129 +0,0 @@
package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// increaseAggrState calculates output=increase, e.g. the increase over input counters.
type increaseAggrState struct {
m sync.Map
ignoreInputDeadline uint64
stalenessSecs uint64
}
type increaseStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
deleteDeadline uint64
deleted bool
}
func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *increaseAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &increaseAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
stalenessSecs: stalenessSecs,
}
}
func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &increaseStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*increaseStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total += d
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *increaseAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increaseStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increaseStateValue)
sv.mu.Lock()
increase := sv.total
sv.total = 0
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, "increase", currentTimeMsec, increase)
}
return true
})
}

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -21,33 +22,39 @@ func newLastAggrState() *lastAggrState {
return &lastAggrState{}
}
func (as *lastAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &lastStateValue{
last: value,
func (as *lastAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &lastStateValue{
last: s.value,
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.last = s.value
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.last = value
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -21,36 +22,42 @@ func newMaxAggrState() *maxAggrState {
return &maxAggrState{}
}
func (as *maxAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &maxStateValue{
max: value,
func (as *maxAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &maxStateValue{
max: s.value,
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
sv := v.(*maxStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.value > sv.max {
sv.max = s.value
}
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*maxStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if value > sv.max {
sv.max = value
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -21,36 +22,42 @@ func newMinAggrState() *minAggrState {
return &minAggrState{}
}
func (as *minAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &minStateValue{
min: value,
func (as *minAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &minStateValue{
min: s.value,
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
sv := v.(*minStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if s.value < sv.min {
sv.min = s.value
}
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*minStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if value < sv.min {
sv.min = value
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"strconv"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -28,33 +29,39 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState {
}
}
func (as *quantilesAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
h := histogram.GetFast()
v = &quantilesStateValue{
h: h,
func (as *quantilesAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
h := histogram.GetFast()
v = &quantilesStateValue{
h: h,
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
histogram.PutFast(h)
v = vNew
}
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
histogram.PutFast(h)
v = vNew
sv := v.(*quantilesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(s.value)
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
sv := v.(*quantilesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"math"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -24,33 +25,39 @@ func newStddevAggrState() *stddevAggrState {
return &stddevAggrState{}
}
func (as *stddevAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stddevStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
func (as *stddevAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stddevStateValue{}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
sv := v.(*stddevStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -23,33 +24,39 @@ func newStdvarAggrState() *stdvarAggrState {
return &stdvarAggrState{}
}
func (as *stdvarAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stdvarStateValue{}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
func (as *stdvarAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &stdvarStateValue{}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (s.value-sv.avg)/sv.count
sv.q += (s.value - sv.avg) * (s.value - avg)
sv.avg = avg
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
sv := v.(*stdvarStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
sv.count++
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"math"
"slices"
"sort"
"strconv"
"strings"
@ -19,14 +20,18 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
"gopkg.in/yaml.v2"
)
var supportedOutputs = []string{
"total",
"total_prometheus",
"increase",
"increase_prometheus",
"count_series",
"count_samples",
"unique_samples",
"sum_samples",
"last",
"min",
@ -80,8 +85,11 @@ type Config struct {
// Interval is the interval between aggregations.
Interval string `yaml:"interval"`
// DedupInterval is an optional interval for deduplication.
DedupInterval string `yaml:"dedup_interval,omitempty"`
// Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
// The parameter is only relevant for outputs: total, increase and histogram_bucket.
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket.
StalenessInterval string `yaml:"staleness_interval,omitempty"`
// Outputs is a list of output aggregate functions to produce.
@ -89,10 +97,13 @@ type Config struct {
// The following names are allowed:
//
// - total - aggregates input counters
// - increase - counts the increase over input counters
// - count_series - counts the input series
// - total_prometheus - aggregates input counters, ignoring the first sample in new time series
// - increase - calculates the increase over input series
// - increase_prometheus - calculates the increase over input series, ignoring the first sample in new time series
// - count_series - counts the number of unique input series
// - count_samples - counts the input samples
// - sum_samples - sums the input samples
// - unique_samples - counts the number of unique sample values
// - sum_samples - sums the input sample values
// - last - the last biggest sample value
// - min - the minimum sample value
// - max - the maximum sample value
@ -102,12 +113,18 @@ type Config struct {
// - histogram_bucket - creates VictoriaMetrics histogram for input samples
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
//
// The output time series will have the following names:
// The output time series will have the following names by default:
//
// input_name:aggr_<interval>_<output>
// input_name:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
//
// See also KeepMetricNames
//
Outputs []string `yaml:"outputs"`
// KeepMetricNames instructs to leave metric names as is for the output time series
// without adding any suffix.
KeepMetricNames bool `yaml:"keep_metric_names,omitempty"`
// By is an optional list of labels for grouping input series.
//
// See also Without.
@ -144,6 +161,8 @@ type Aggregators struct {
// configData contains marshaled configs passed to NewAggregators().
// It is used in Equal() for comparing Aggregators.
configData []byte
ms *metrics.Set
}
// NewAggregators creates Aggregators from the given cfgs.
@ -155,9 +174,10 @@ type Aggregators struct {
//
// MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc, dedupInterval)
a, err := newAggregator(cfg, pushFunc, ms, dedupInterval)
if err != nil {
// Stop already initialized aggregators before returning the error.
for _, a := range as[:i] {
@ -171,9 +191,46 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
if err != nil {
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
}
_ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 {
n := uint64(0)
for _, aggr := range as {
if aggr.da != nil {
n += aggr.da.sizeBytes()
}
}
return float64(n)
})
_ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 {
n := uint64(0)
for _, aggr := range as {
if aggr.da != nil {
n += aggr.da.itemsCount()
}
}
return float64(n)
})
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
n := uint64(0)
for _, aggr := range as {
n += aggr.lc.SizeBytes()
}
return float64(n)
})
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
n := uint64(0)
for _, aggr := range as {
n += aggr.lc.ItemsCount()
}
return float64(n)
})
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
configData: configData,
ms: ms,
}, nil
}
@ -182,9 +239,14 @@ func (a *Aggregators) MustStop() {
if a == nil {
return
}
metrics.UnregisterSet(a.ms)
a.ms = nil
for _, aggr := range a.as {
aggr.MustStop()
}
a.as = nil
}
// Equal returns true if a and b are initialized from identical configs.
@ -208,12 +270,14 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []b
for i := 0; i < len(matchIdxs); i++ {
matchIdxs[i] = 0
}
if a != nil {
for _, aggr := range a.as {
aggr.Push(tss, matchIdxs)
}
if a == nil {
return matchIdxs
}
for _, aggr := range a.as {
aggr.Push(tss, matchIdxs)
}
return matchIdxs
}
@ -224,17 +288,22 @@ type aggregator struct {
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
keepMetricNames bool
by []string
without []string
aggregateOnlyByTime bool
// dedupAggr is set to non-nil if input samples must be de-duplicated according
// da is set to non-nil if input samples must be de-duplicated according
// to the dedupInterval passed to newAggregator().
dedupAggr *lastAggrState
da *dedupAggr
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
// lc is used for compressing series keys before passing them to dedupAggr and aggrState.
lc promutils.LabelsCompressor
pushFunc PushFunc
// suffix contains a suffix, which should be added to aggregate metric names
@ -250,10 +319,16 @@ type aggregator struct {
wg sync.WaitGroup
stopCh chan struct{}
flushDuration *metrics.Histogram
dedupFlushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
dedupFlushTimeouts *metrics.Counter
}
type aggrState interface {
pushSample(inputKey, outputKey string, value float64)
pushSamples(samples []pushSample)
appendSeriesForFlush(ctx *flushCtx)
}
@ -266,7 +341,7 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) {
func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterval time.Duration) (*aggregator, error) {
// check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
@ -276,6 +351,21 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
return nil, fmt.Errorf("aggregation interval cannot be smaller than 1s; got %s", interval)
}
// check cfg.DedupInterval
if cfg.DedupInterval != "" {
di, err := time.ParseDuration(cfg.DedupInterval)
if err != nil {
return nil, fmt.Errorf("cannot parse `dedup_interval: %q`: %w", cfg.DedupInterval, err)
}
dedupInterval = di
}
if dedupInterval > interval {
return nil, fmt.Errorf("dedup_interval=%s cannot exceed interval=%s", dedupInterval, interval)
}
if dedupInterval > 0 && interval%dedupInterval != 0 {
return nil, fmt.Errorf("interval=%s must be a multiple of dedup_interval=%s", interval, dedupInterval)
}
// check cfg.StalenessInterval
stalenessInterval := interval * 2
if cfg.StalenessInterval != "" {
@ -284,7 +374,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
return nil, fmt.Errorf("cannot parse `staleness_interval: %q`: %w", cfg.StalenessInterval, err)
}
if stalenessInterval < interval {
return nil, fmt.Errorf("staleness_interval cannot be less than interval (%s); got %s", cfg.Interval, cfg.StalenessInterval)
return nil, fmt.Errorf("interval=%s cannot exceed staleness_interval=%s", cfg.Interval, cfg.StalenessInterval)
}
}
@ -309,6 +399,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
by = addMissingUnderscoreName(by)
}
// check cfg.KeepMetricNames
if cfg.KeepMetricNames {
if len(cfg.Outputs) != 1 {
return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs)
}
if cfg.Outputs[0] == "histogram_bucket" || strings.HasPrefix(cfg.Outputs[0], "quantiles(") && strings.Contains(cfg.Outputs[0], ",") {
return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series", cfg.Outputs)
}
}
// initialize outputs list
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
@ -342,13 +442,19 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(interval, stalenessInterval)
aggrStates[i] = newTotalAggrState(stalenessInterval, false, true)
case "total_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, false)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval, stalenessInterval)
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
case "increase_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
case "unique_samples":
aggrStates[i] = newUniqueSamplesAggrState()
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
case "last":
@ -381,11 +487,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
}
suffix += "_"
var dedupAggr *lastAggrState
if dedupInterval > 0 {
dedupAggr = newLastAggrState()
}
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
@ -393,11 +494,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
keepMetricNames: cfg.KeepMetricNames,
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
dedupAggr: dedupAggr,
aggrStates: aggrStates,
pushFunc: pushFunc,
@ -405,75 +507,84 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
flushOnShutdown: cfg.FlushOnShutdown,
stopCh: make(chan struct{}),
flushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_flush_duration_seconds`),
dedupFlushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`),
flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`),
dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`),
}
if dedupInterval > 0 {
a.da = newDedupAggr()
}
if dedupAggr != nil {
a.wg.Add(1)
go func() {
a.runDedupFlusher(dedupInterval)
a.wg.Done()
}()
}
a.wg.Add(1)
go func() {
a.runFlusher(interval)
a.runFlusher(interval, dedupInterval)
a.wg.Done()
}()
return a, nil
}
func (a *aggregator) runDedupFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) {
tickerFlush := time.NewTicker(interval)
defer tickerFlush.Stop()
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.dedupFlush()
<-flushConcurrencyCh
var dedupTickerCh <-chan time.Time
if dedupInterval > 0 {
t := time.NewTicker(dedupInterval)
defer t.Stop()
dedupTickerCh = t.C
}
}
func (a *aggregator) runFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
case <-tickerFlush.C:
startTime := time.Now()
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
if d > interval {
a.flushTimeouts.Inc()
logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %s; "+
"possible solutions: increase interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", interval, d)
}
case <-dedupTickerCh:
startTime := time.Now()
flushConcurrencyCh <- struct{}{}
a.dedupFlush()
<-flushConcurrencyCh
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
if d > dedupInterval {
a.dedupFlushTimeouts.Inc()
logger.Warnf("stream aggregation deduplication couldn't be finished in the configured dedup_interval=%s; it took %s; "+
"possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", dedupInterval, d)
}
}
}
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func (a *aggregator) dedupFlush() {
ctx := &flushCtx{
skipAggrSuffix: true,
}
a.dedupAggr.appendSeriesForFlush(ctx)
a.push(ctx.tss, nil)
a.da.flush(a.pushSamples)
}
func (a *aggregator) flush() {
ctx := &flushCtx{
suffix: a.suffix,
a: a,
}
for _, as := range a.aggrStates {
ctx.reset()
@ -521,7 +632,7 @@ func (a *aggregator) MustStop() {
// Flush the remaining data from the last interval if needed.
flushConcurrencyCh <- struct{}{}
if a.dedupAggr != nil {
if a.da != nil {
a.dedupFlush()
}
a.flush()
@ -530,18 +641,15 @@ func (a *aggregator) MustStop() {
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
if a.dedupAggr == nil {
// Deduplication is disabled.
a.push(tss, matchIdxs)
return
}
ctx := getPushCtx()
defer putPushCtx(ctx)
samples := ctx.samples
labels := &ctx.labels
inputLabels := &ctx.inputLabels
outputLabels := &ctx.outputLabels
buf := ctx.buf
// Deduplication is enabled.
// push samples to dedupAggr, so later they will be pushed to the configured aggregators.
pushSample := a.dedupAggr.pushSample
inputKey := ""
bb := bbPool.Get()
labels := promutils.GetLabels()
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
@ -556,173 +664,140 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
}
labels.Sort()
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
outputKey := bytesutil.InternBytes(bb.B)
for _, sample := range ts.Samples {
pushSample(inputKey, outputKey, sample.Value)
}
}
promutils.PutLabels(labels)
bbPool.Put(bb)
}
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
applyFilters := matchIdxs != nil
for idx, ts := range tss {
if applyFilters {
if !a.match.Match(ts.Labels) {
continue
}
matchIdxs[idx] = 1
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
if applyFilters {
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
labels.Sort()
}
if a.aggregateOnlyByTime {
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
} else {
tmpLabels.Labels = removeUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
}
outputKey := bytesutil.InternBytes(bb.B)
inputKey := ""
inputLabels.Reset()
outputLabels.Reset()
if !a.aggregateOnlyByTime {
tmpLabels.Labels = extractUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
inputKey = bytesutil.InternBytes(bb.B)
inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without)
} else {
outputLabels.Labels = append(outputLabels.Labels, labels.Labels...)
}
bufLen := len(buf)
buf = a.compressLabels(buf, inputLabels.Labels, outputLabels.Labels)
for _, sample := range ts.Samples {
a.pushSample(inputKey, outputKey, sample.Value)
if math.IsNaN(sample.Value) {
// Skip NaN values
continue
}
samples = append(samples, pushSample{
key: bytesutil.ToUnsafeString(buf[bufLen:]),
value: sample.Value,
})
}
}
ctx.samples = samples
ctx.buf = buf
if a.da != nil {
a.da.pushSamples(samples)
} else {
a.pushSamples(samples)
}
}
func (a *aggregator) compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte {
bb := bbPool.Get()
bb.B = a.lc.Compress(bb.B, inputLabels)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
dst = append(dst, bb.B...)
bbPool.Put(bb)
promutils.PutLabels(tmpLabels)
promutils.PutLabels(labels)
dst = a.lc.Compress(dst, outputLabels)
return dst
}
var bbPool bytesutil.ByteBufferPool
func (a *aggregator) decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label {
dst = a.lc.Decompress(dst, bytesutil.ToUnsafeBytes(key))
return dst
}
func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {
if math.IsNaN(value) {
// Skip nan samples
return
func getOutputKey(key string) string {
src := bytesutil.ToUnsafeBytes(key)
tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err)
}
outputKey := tail[inputKeyLen:]
return bytesutil.ToUnsafeString(outputKey)
}
func getInputOutputKey(key string) (string, string) {
src := bytesutil.ToUnsafeBytes(key)
tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err)
}
inputKey := tail[:inputKeyLen]
outputKey := tail[inputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
}
func (a *aggregator) pushSamples(samples []pushSample) {
for _, as := range a.aggrStates {
as.pushSample(inputKey, outputKey, value)
as.pushSamples(samples)
}
}
func extractUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
type pushCtx struct {
samples []pushSample
labels promutils.Labels
inputLabels promutils.Labels
outputLabels promutils.Labels
buf []byte
}
func (ctx *pushCtx) reset() {
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
ctx.labels.Reset()
ctx.inputLabels.Reset()
ctx.outputLabels.Reset()
ctx.buf = ctx.buf[:0]
}
type pushSample struct {
key string
value float64
}
func getPushCtx() *pushCtx {
v := pushCtxPool.Get()
if v == nil {
return &pushCtx{}
}
return v.(*pushCtx)
}
func putPushCtx(ctx *pushCtx) {
ctx.reset()
pushCtxPool.Put(ctx)
}
var pushCtxPool sync.Pool
func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, without []string) ([]prompbmarshal.Label, []prompbmarshal.Label) {
if len(without) > 0 {
for _, label := range labels {
if hasInArray(label.Name, without) {
dst = append(dst, label)
if slices.Contains(without, label.Name) {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
}
}
} else {
for _, label := range labels {
if !hasInArray(label.Name, by) {
dst = append(dst, label)
if !slices.Contains(by, label.Name) {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
}
}
}
return dst
}
func removeUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if !hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func hasInArray(name string, a []string) bool {
for _, s := range a {
if name == s {
return true
}
}
return false
}
func marshalLabelsFast(dst []byte, labels []prompbmarshal.Label) []byte {
dst = encoding.MarshalUint32(dst, uint32(len(labels)))
for _, label := range labels {
dst = encoding.MarshalUint32(dst, uint32(len(label.Name)))
dst = append(dst, label.Name...)
dst = encoding.MarshalUint32(dst, uint32(len(label.Value)))
dst = append(dst, label.Value...)
}
return dst
}
func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal.Label, error) {
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal labels count from %d bytes; needs at least 4 bytes", len(src))
}
n := encoding.UnmarshalUint32(src)
src = src[4:]
for i := uint32(0); i < n; i++ {
// Unmarshal label name
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label name length from %d bytes; needs at least 4 bytes", len(src))
}
labelNameLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelNameLen {
return dst, fmt.Errorf("cannot unmarshal label name from %d bytes; needs at least %d bytes", len(src), labelNameLen)
}
labelName := bytesutil.InternBytes(src[:labelNameLen])
src = src[labelNameLen:]
// Unmarshal label value
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label value length from %d bytes; needs at least 4 bytes", len(src))
}
labelValueLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelValueLen {
return dst, fmt.Errorf("cannot unmarshal label value from %d bytes; needs at least %d bytes", len(src), labelValueLen)
}
labelValue := bytesutil.InternBytes(src[:labelValueLen])
src = src[labelValueLen:]
dst = append(dst, prompbmarshal.Label{
Name: labelName,
Value: labelValue,
})
}
if len(src) > 0 {
return dst, fmt.Errorf("unexpected non-empty tail after unmarshaling labels; tail length is %d bytes", len(src))
}
return dst, nil
return dstInput, dstOutput
}
type flushCtx struct {
skipAggrSuffix bool
suffix string
a *aggregator
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
@ -736,16 +811,12 @@ func (ctx *flushCtx) reset() {
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int64, value float64) {
var err error
func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
if !ctx.skipAggrSuffix {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.labels = ctx.a.decompressLabels(ctx.labels, key)
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
@ -757,15 +828,13 @@ func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int6
})
}
func (ctx *flushCtx) appendSeriesWithExtraLabel(labelsMarshaled, suffix string, timestamp int64, value float64, extraName, extraValue string) {
var err error
func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) {
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
ctx.labels = ctx.a.decompressLabels(ctx.labels, key)
if !ctx.a.keepMetricNames {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.labels = append(ctx.labels, prompbmarshal.Label{
Name: extraName,
Value: extraValue,
@ -843,3 +912,5 @@ func sortAndRemoveDuplicates(a []string) []string {
}
return dst
}
var bbPool bytesutil.ByteBufferPool

View file

@ -56,9 +56,43 @@ func TestAggregatorsFailure(t *testing.T) {
`)
// Negative interval
f(`- interval: -5m`)
f(`
- outputs: [total]
interval: -5m
`)
// Too small interval
f(`- interval: 10ms`)
f(`
- outputs: [total]
interval: 10ms
`)
// interval isn't multiple of dedup_interval
f(`
- interval: 1m
dedup_interval: 35s
outputs: ["quantiles"]
`)
// dedup_interval is bigger than dedup_interval
f(`
- interval: 1m
dedup_interval: 1h
outputs: ["quantiles"]
`)
// keep_metric_names is set for multiple inputs
f(`
- interval: 1m
keep_metric_names: true
outputs: ["total", "increase"]
`)
// keep_metric_names is set for unsupported input
f(`
- interval: 1m
keep_metric_names: true
outputs: ["histogram_bucket"]
`)
// Invalid input_relabel_configs
f(`
@ -477,6 +511,17 @@ foo:1m_by_abc_sum_samples{abc="456"} 8
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 4.34
foo:1m_total 123
`, "11")
// total_prometheus output for non-repeated series
f(`
- interval: 1m
outputs: [total_prometheus]
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`, "11")
@ -494,6 +539,25 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total{baz="qwe"} 6.34
bar:1m_total{baz="qwer"} 344
foo:1m_total 123
foo:1m_total{baz="qwe"} 10
`, "11111111")
// total_prometheus output for repeated series
f(`
- interval: 1m
outputs: [total_prometheus]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total{baz="qwe"} 5.02
bar:1m_total{baz="qwer"} 1
foo:1m_total 0
@ -514,6 +578,24 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 350.34
foo:1m_total 133
`, "11111111")
// total_prometheus output for repeated series with group by __name__
f(`
- interval: 1m
by: [__name__]
outputs: [total_prometheus]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 6.02
foo:1m_total 15
`, "11111111")
@ -525,6 +607,17 @@ foo:1m_total 15
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 4.34
foo:1m_increase 123
`, "11")
// increase_prometheus output for non-repeated series
f(`
- interval: 1m
outputs: [increase_prometheus]
`, `
foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 0
`, "11")
@ -542,12 +635,30 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_increase{baz="qwe"} 6.34
bar:1m_increase{baz="qwer"} 344
foo:1m_increase 123
foo:1m_increase{baz="qwe"} 10
`, "11111111")
// increase_prometheus output for repeated series
f(`
- interval: 1m
outputs: [increase_prometheus]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_increase{baz="qwe"} 5.02
bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15
`, "11111111")
// multiple aggregate configs
f(`
- interval: 1m
@ -750,7 +861,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
}
tssOutputLock.Unlock()
}
const dedupInterval = time.Hour
const dedupInterval = 30 * time.Second
a, err := newAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)

View file

@ -11,9 +11,12 @@ import (
func BenchmarkAggregatorsPushByJobAvg(b *testing.B) {
for _, output := range []string{
"total",
"total_prometheus",
"increase",
"increase_prometheus",
"count_series",
"count_samples",
"unique_samples",
"sum_samples",
"last",
"min",
@ -44,7 +47,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
}
defer a.MustStop()
const loops = 5
const loops = 10
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
@ -52,7 +55,17 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
var matchIdxs []byte
for pb.Next() {
for i := 0; i < loops; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
series := benchSeries
for len(series) > 0 {
chunk := series
if len(chunk) > 1_000 {
chunk = series[:1_000]
series = series[len(chunk):]
} else {
series = nil
}
matchIdxs = a.Push(chunk, matchIdxs)
}
}
}
})

View file

@ -1,6 +1,7 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -21,33 +22,39 @@ func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{}
}
func (as *sumSamplesAggrState) pushSample(_, outputKey string, value float64) {
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &sumSamplesStateValue{
sum: value,
func (as *sumSamplesAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &sumSamplesStateValue{
sum: s.value,
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += s.value
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += value
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"math"
"strings"
"sync"
"time"
@ -12,8 +13,10 @@ import (
type totalAggrState struct {
m sync.Map
ignoreInputDeadline uint64
stalenessSecs uint64
suffix string
resetTotalOnFlush bool
keepFirstSample bool
stalenessSecs uint64
}
type totalStateValue struct {
@ -29,58 +32,68 @@ type lastValueState struct {
deleteDeadline uint64
}
func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *totalAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
suffix := "total"
if resetTotalOnFlush {
suffix = "increase"
}
return &totalAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
stalenessSecs: stalenessSecs,
suffix: suffix,
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
stalenessSecs: stalenessSecs,
}
}
func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
func (as *totalAggrState) pushSamples(samples []pushSample) {
deleteDeadline := fasttime.UnixTimestamp() + as.stalenessSecs
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &totalStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
again:
v, ok := as.m.Load(outputKey)
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
// The entry is missing in the map. Try creating it.
v = &totalStateValue{
lastValues: make(map[string]*lastValueState),
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
d := value
if ok && lv.value <= value {
d = value - lv.value
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
inputKey = strings.Clone(inputKey)
sv.lastValues[inputKey] = lv
}
if ok || as.keepFirstSample {
if s.value >= lv.value {
sv.total += s.value - lv.value
} else {
// counter reset
sv.total += s.value
}
}
lv.value = s.value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total += d
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
@ -123,7 +136,9 @@ func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv := v.(*totalStateValue)
sv.mu.Lock()
total := sv.total
if math.Abs(sv.total) >= (1 << 53) {
if as.resetTotalOnFlush {
sv.total = 0
} else if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
@ -131,7 +146,7 @@ func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, "total", currentTimeMsec, total)
ctx.appendSeries(key, as.suffix, currentTimeMsec, total)
}
return true
})

View file

@ -0,0 +1,82 @@
package streamaggr
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values.
type uniqueSamplesAggrState struct {
m sync.Map
}
type uniqueSamplesStateValue struct {
mu sync.Mutex
m map[float64]struct{}
deleted bool
}
func newUniqueSamplesAggrState() *uniqueSamplesAggrState {
return &uniqueSamplesAggrState{}
}
func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) {
for i := range samples {
s := &samples[i]
outputKey := getOutputKey(s.key)
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &uniqueSamplesStateValue{
m: map[float64]struct{}{
s.value: {},
},
}
outputKey = strings.Clone(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
continue
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
if _, ok := sv.m[s.value]; !ok {
sv.m[s.value] = struct{}{}
}
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *uniqueSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*uniqueSamplesStateValue)
sv.mu.Lock()
n := len(sv.m)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n))
return true
})
}