diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 994a5c99a..b88f03ba1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -30,6 +30,12 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). This is expecially visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add support for `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored. +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states. ## [v1.99.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.99.0) diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 59ca6239e..4f9553a28 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -13,7 +13,8 @@ aliases: # Streaming aggregation [vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) -can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage. +can aggregate incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) in streaming mode by time and by labels before data is written to remote storage +(or local storage for single-node VictoriaMetrics). The aggregation is applied to all the metrics received via any [supported data ingestion protocol](https://docs.victoriametrics.com/#how-to-import-time-series-data) and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter) after applying all the configured [relabeling stages](https://docs.victoriametrics.com/vmagent.html#relabeling). @@ -40,22 +41,26 @@ This behaviour can be changed via the following command-line flags: - `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). - If one of these flags are set, then all the input samples are written to the storage alongside the aggregated samples. + If one of these flags is set, then all the input samples are written to the storage alongside the aggregated samples. The `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`. - `-remoteWrite.streamAggr.dropInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html) and `-streamAggr.dropInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). If one of these flags are set, then all the input samples are dropped, while only the aggregated samples are written to the storage. The `-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`. -By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation. -For example, if the samples are received from replicated sources. -The following command-line flag can be used for enabling the [de-duplication](https://docs.victoriametrics.com/#deduplication) -before aggregation in this case: +## Deduplication -- `-remoteWrite.streamAggr.dedupInterval` at [vmagent](https://docs.victoriametrics.com/vmagent.html). +By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples for the same [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) +before the aggregation. For example, if the samples are received from replicated sources. +The [de-duplication](https://docs.victoriametrics.com/#deduplication) can be enabled via the following options: + +- `-remoteWrite.streamAggr.dedupInterval` command-line flag at [vmagent](https://docs.victoriametrics.com/vmagent.html). This flag can be specified individually per each `-remoteWrite.url`. This allows setting different de-duplication intervals per each configured remote storage. -- `-streamAggr.dedupInterval` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). +- `-streamAggr.dedupInterval` command-line flag at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). +- `dedup_interval` option per each [aggregate config](#stream-aggregation-config). + +De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling). ## Use cases @@ -387,6 +392,8 @@ Output metric names for stream aggregation are constructed according to the foll Both input and output metric names can be modified if needed via relabeling according to [these docs](#relabeling). +It is possible to leave the original metric name after the aggregation by specifying `keep_metric_names: true` option at [stream aggregation config](#stream-aggregation-config). +The `keep_metric_names` option can be used only if only a single output is set in [`outputs` list](#aggregation-outputs). ## Relabeling @@ -410,7 +417,7 @@ For example, the following config removes the `:1m_sum_samples` suffix added [to ## Aggregation outputs The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config) -and then sent to the storage once per `interval`. +and then sent to the storage once per `interval`. The aggregated samples are named according to [output metric naming](#output-metric-names). If `by` and `without` lists are specified in the [config](#stream-aggregation-config), then the [aggregation by labels](#aggregating-by-labels) is performed additionally to aggregation by `interval`. @@ -419,159 +426,312 @@ On vmagent shutdown or [configuration reload](#configuration-update) unfinished as they might produce lower values than user expects. It is possible to specify `flush_on_shutdown: true` setting in aggregation config to make vmagent to send unfinished states to the remote storage. -Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config). +Below are aggregation functions that can be put in the `outputs` list at [stream aggregation config](#stream-aggregation-config): -### total +* [avg](#avg) +* [count_samples](#count_samples) +* [count_series](#count_series) +* [increase](#increase) +* [increase_prometheus](#increase_prometheus) +* [histogram_bucket](#histogram_bucket) +* [last](#last) +* [max](#max) +* [min](#min) +* [stddev](#stddev) +* [stdvar](#stdvar) +* [sum_samples](#sum_samples) +* [total](#total) +* [total_prometheus](#total_prometheus) +* [unique_samples](#unique_samples) +* [quantiles](#quantiles) -`total` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters. -`total` only makes sense for aggregating [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) metrics. +### avg -The results of `total` is equal to the `sum(some_counter)` query. +`avg` returns the average over input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +`avg` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge). + +The results of `avg` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +sum(sum_over_time(some_metric[interval])) / sum(count_over_time(some_metric[interval])) +``` For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query: -total aggregation +avg aggregation -`total` is not affected by [counter resets](https://docs.victoriametrics.com/keyConcepts.html#counter) - -it continues to increase monotonically with respect to the previous value. -The counters are most often reset when the application is restarted. +See also [min](#min), [max](#max), [sum_samples](#sum_samples) and [count_samples](#count_samples). -For example: +### count_samples -total aggregation counter reset +`count_samples` counts the number of input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`. -The same behavior will occur when creating or deleting new series in an aggregation group - -`total` will increase monotonically considering the values of the series set. -An example of changing a set of series can be restarting a pod in the Kubernetes. -This changes a label with pod's name in the series, but `total` account for such a scenario and do not reset the state of aggregated metric. +The results of `count_samples` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: -Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) -or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_inteval](#stream-aggregation-config). +```metricsql +sum(count_over_time(some_metric[interval])) +``` + +See also [count_series](#count_series) and [sum_samples](#sum_samples). + +### count_series + +`count_series` counts the number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series) over the given `interval`. + +The results of `count_series` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +count(last_over_time(some_metric[interval])) +``` + +See also [count_samples](#count_samples) and [unique_samples](#unique_samples). ### increase -`increase` returns the increase of input [counters](https://docs.victoriametrics.com/keyConcepts.html#counter). -`increase` only makes sense for aggregating [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) metrics. +`increase` returns the increase of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given 'interval'. +`increase` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter). -The results of `increase` with aggregation interval of `1m` is equal to the `increase(some_counter[1m])` query. +The results of `increase` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: -For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query: +```metricsql +sum(increase_pure(some_counter[interval])) +``` + +`increase` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) +is `10`, then `increase` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series, +then take a look at [increase_prometheus](#increase_prometheus). + +For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query: increase aggregation `increase` can be used as an alternative for [rate](https://docs.victoriametrics.com/MetricsQL.html#rate) function. -For example, if we have `increase` with `interval` of `5m` for a counter `some_counter`, then to get `rate` we should divide -the resulting aggregation by the `interval` in seconds: `some_counter:5m_increase / 5m` is similar to `rate(some_counter[5m])`. +For example, if `increase` is calculated for `some_counter` with `interval: 5m`, then `rate` can be calculated +by dividing the resulting aggregation by `5m`: + +```metricsql +some_counter:5m_increase / 5m +``` + +This is similar to `rate(some_counter[5m])`. + Please note, opposite to [rate](https://docs.victoriametrics.com/MetricsQL.html#rate), `increase` aggregations can be combined safely afterwards. This is helpful when the aggregation is calculated by more than one vmagent. Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_inteval](#stream-aggregation-config). -### count_series +See also [increase_prometheus](#increase_prometheus) and [total](#total). -`count_series` counts the number of unique [time series](https://docs.victoriametrics.com/keyConcepts.html#time-series). +### increase_prometheus -The results of `count_series` is equal to the `count(some_metric)` query. +`increase_prometheus` returns the increase of input [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) over the given `interval`. +`increase_prometheus` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter). -### count_samples +The results of `increase_prometheus` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: -`count_samples` counts the number of input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +```metricsql +sum(increase_prometheus(some_counter[interval])) +``` -The results of `count_samples` with aggregation interval of `1m` is equal to the `count_over_time(some_metric[1m])` query. +`increase_prometheus` skips the first seen sample value per each [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). +If you need taking into account the first sample per time series, then take a look at [increase](#increase). -### sum_samples +See also [increase](#increase), [total](#total) and [total_prometheus](#total_prometheus). -`sum_samples` sums input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). -`sum_samples` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics. +### histogram_bucket -The results of `sum_samples` with aggregation interval of `1m` is equal to the `sum_over_time(some_metric[1m])` query. +`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) + for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`. +`histogram_bucket` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge). +See how to aggregate regular histograms [here](#aggregating-histograms). -For example, see below time series produced by config with aggregation interval `1m` and the regular query: +The results of `histogram_bucket` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: -sum_samples aggregation +```metricsql +sum(histogram_over_time(some_histogram_bucket[interval])) by (vmrange) +``` + +See also [quantiles](#quantiles), [min](#min), [max](#max) and [avg](#avg). ### last -`last` returns the last input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +`last` returns the last input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`. -The results of `last` with aggregation interval of `1m` is equal to the `last_over_time(some_metric[1m])` query. +The results of `last` is roughly equal to the the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: -This aggregation output doesn't make much sense with `by` lists specified in the [config](#stream-aggregation-config). -The result of aggregation by labels in this case will be undetermined, because it depends on the order of processing the time series. +```metricsql +last_over_time(some_metric[interval]) +``` -### min - -`min` returns the minimum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). - -The results of `min` with aggregation interval of `1m` is equal to the `min_over_time(some_metric[1m])` query. - -For example, see below time series produced by config with aggregation interval `1m` and the regular query: - -min aggregation +See also [min](#min), [max](#max) and [avg](#avg). ### max -`max` returns the maximum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +`max` returns the maximum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`. -The results of `max` with aggregation interval of `1m` is equal to the `max_over_time(some_metric[1m])` query. +The results of `max` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +max(max_over_time(some_metric[interval])) +``` For example, see below time series produced by config with aggregation interval `1m` and the regular query: total aggregation -### avg +See also [min](#min) and [avg](#avg). -`avg` returns the average input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). +### min -The results of `avg` with aggregation interval of `1m` is equal to the `avg_over_time(some_metric[1m])` query. +`min` returns the minimum input [sample value](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`. -For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query: +The results of `min` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: -avg aggregation +```metricsql +min(min_over_time(some_metric[interval])) +``` + +For example, see below time series produced by config with aggregation interval `1m` and the regular query: + +min aggregation + +See also [max](#max) and [avg](#avg). ### stddev -`stddev` returns [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). -`stddev` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics. +`stddev` returns [standard deviation](https://en.wikipedia.org/wiki/Standard_deviation) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +over the given `interval`. +`stddev` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge). -The results of `stddev` with aggregation interval of `1m` is equal to the `stddev_over_time(some_metric[1m])` query. +The results of `stddev` is roughly equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +histogram_stddev(sum(histogram_over_time(some_metric[interval])) by (vmrange)) +``` + +See also [stdvar](#stdvar) and [avg](#avg). ### stdvar -`stdvar` returns [standard variance](https://en.wikipedia.org/wiki/Variance) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). -`stdvar` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics. +`stdvar` returns [standard variance](https://en.wikipedia.org/wiki/Variance) for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) +over the given `interval`. +`stdvar` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge). -The results of `stdvar` with aggregation interval of `1m` is equal to the `stdvar_over_time(some_metric[1m])` query. +The results of `stdvar` is roughly equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +histogram_stdvar(sum(histogram_over_time(some_metric[interval])) by (vmrange)) +``` For example, see below time series produced by config with aggregation interval `1m` and the regular query: stdvar aggregation -### histogram_bucket +See also [stddev](#stddev) and [avg](#avg). -`histogram_bucket` returns [VictoriaMetrics histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) - for the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). -`histogram_bucket` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics. -See how to aggregate regular histograms [here](#aggregating-histograms). +### sum_samples -The results of `histogram_bucket` with aggregation interval of `1m` is equal to the `histogram_over_time(some_histogram_bucket[1m])` query. +`sum_samples` sums input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) over the given `interval`. +`sum_samples` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge). + +The results of `sum_samples` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +sum(sum_over_time(some_metric[interval])) +``` + +For example, see below time series produced by config with aggregation interval `1m` and the regular query: + +sum_samples aggregation + +See also [count_samples](#count_samples) and [count_series](#count_series). + +### total + +`total` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters over the given `interval`. +`total` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter). + +The results of `total` is roughly equal to the the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +sum(running_sum(increase_pure(some_counter))) +``` + +`total` assumes that all the counters start from 0. For example, if the fist seen sample for new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) +is `10`, then `total` assumes that the time series has been increased by `10`. If you need ignoring the first sample for new time series, +then take a look at [total_prometheus](#total_prometheus). + +For example, see below time series produced by config with aggregation interval `1m` and `by: ["instance"]` and the regular query: + +total aggregation + +`total` is not affected by [counter resets](https://docs.victoriametrics.com/keyConcepts.html#counter) - +it continues to increase monotonically with respect to the previous value. +The counters are most often reset when the application is restarted. + +For example: + +total aggregation counter reset + +The same behavior occurs when creating or deleting new series in an aggregation group - +`total` output increases monotonically considering the values of the series set. +An example of changing a set of series can be restarting a pod in the Kubernetes. +This changes pod name label, but the `total` accounts for such a scenario and doesn't reset the state of aggregated metric. Aggregating irregular and sporadic metrics (received from [Lambdas](https://aws.amazon.com/lambda/) or [Cloud Functions](https://cloud.google.com/functions)) can be controlled via [staleness_inteval](#stream-aggregation-config). +See also [total_prometheus](#total_prometheus), [increase](#increase) and [increase_prometheus](#increase_prometheus). + +### total_prometheus + +`total_prometheus` generates output [counter](https://docs.victoriametrics.com/keyConcepts.html#counter) by summing the input counters over the given `interval`. +`total_prometheus` makes sense only for aggregating [counters](https://docs.victoriametrics.com/keyConcepts.html#counter). + +The results of `total_prometheus` is roughly equal to the the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +sum(running_sum(increase_prometheus(some_counter))) +``` + +`total_prometheus` skips the first seen sample value per each [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). +If you need taking into account the first sample per time series, then take a look at [total](#total). + +`total_prometheus` is not affected by [counter resets](https://docs.victoriametrics.com/keyConcepts.html#counter) - +it continues to increase monotonically with respect to the previous value. +The counters are most often reset when the application is restarted. + +See also [total](#total), [increase](#increase) and [increase_prometheus](#increase_prometheus). + +### unique_samples + +`unique_samples` counts the number of unique sample values over the given `interval`. +`unique_samples` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge). + +The results of `unique_samples` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +count(count_values_over_time(some_metric[interval])) +``` + +See also [sum_samples](#sum_samples) and [count_series](#count_series). + ### quantiles `quantiles(phi1, ..., phiN)` returns [percentiles](https://en.wikipedia.org/wiki/Percentile) for the given `phi*` -over the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). -The `phi` must be in the range `[0..1]`, where `0` means `0th` percentile, while `1` means `100th` percentile. -`quantiles(...)` makes sense only for aggregating [gauge](https://docs.victoriametrics.com/keyConcepts.html#gauge) metrics. +over the input [sample values](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) on the given `interval`. +`phi` must be in the range `[0..1]`, where `0` means `0th` percentile, while `1` means `100th` percentile. +`quantiles(...)` makes sense only for aggregating [gauges](https://docs.victoriametrics.com/keyConcepts.html#gauge). -The results of `quantiles(phi1, ..., phiN)` with aggregation interval of `1m` -is equal to the `quantiles_over_time("quantile", phi1, ..., phiN, some_histogram_bucket[1m])` query. +The results of `quantiles(phi1, ..., phiN)` is equal to the following [MetricsQL](https://docs.victoriametrics.com/metricsql/) query: + +```metricsql +histogram_quantiles("quantile", phi1, ..., phiN, sum(histogram_over_time(some_metric[interval])) by (vmrange)) +``` + +See also [histogram_bucket](#histogram_bucket), [min](#min), [max](#max) and [avg](#avg). -Please note, `quantiles` aggregation won't produce correct results when vmagent is in [cluster mode](#cluster-mode) -since percentiles should be calculated only on the whole matched data set. ## Aggregating by labels @@ -635,13 +795,21 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # interval: 1m - # staleness_interval defines an interval after which the series state will be reset if no samples have been sent during it. - # It means that: + # dedup_interval is an optional interval for de-duplication of input samples before the aggregation. + # Samples are de-duplicated on a per-series basis. See https://docs.victoriametrics.com/keyconcepts/#time-series + # and https://docs.victoriametrics.com/#deduplication + # The deduplication is performed after input_relabel_configs relabeling is applied. + # By default the deduplication is disabled. + # + # dedup_interval: 30s + + # staleness_interval is an optional interval after which the series state will be reset if no samples have been sent during it. + # This means that: # - no data point will be written for a resulting time series if it didn't receive any updates during configured interval, # - if the series receives updates after the configured interval again, then the time series will be calculated from the initial state # (it's like this series didn't exist until now). # Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value. - # By default, is equal to x2 of the `interval` field. + # By default, it equals to x2 of the `interval` field. # The parameter is only relevant for outputs: total, increase and histogram_bucket. # # staleness_interval: 2m @@ -649,7 +817,8 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # flush_on_shutdown defines whether to flush the unfinished aggregation states on process restarts # or config reloads. It is not recommended changing this setting, unless unfinished aggregations states # are preferred to missing data points. - # Is `false` by default. + # Unfinished aggregation states aren't flushed on shutdown by default. + # # flush_on_shutdown: false # without is an optional list of labels, which must be removed from the output aggregation. @@ -667,6 +836,13 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # outputs: [total] + # keep_metric_names instructs keeping the original metric names for the aggregated samples. + # This option can be set only if outputs list contains only a single output. + # By default a special suffix is added to original metric names in the aggregated samples. + # See https://docs.victoriametrics.com/stream-aggregation/#output-metric-names + # + # keep_metric_names: false + # input_relabel_configs is an optional relabeling rules, # which are applied to the incoming samples after they pass the match filter # and before being aggregated. diff --git a/lib/promutils/labelscompressor.go b/lib/promutils/labelscompressor.go new file mode 100644 index 000000000..0a481dd6d --- /dev/null +++ b/lib/promutils/labelscompressor.go @@ -0,0 +1,125 @@ +package promutils + +import ( + "sync" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +// LabelsCompressor compresses []prompbmarshal.Label into short binary strings +type LabelsCompressor struct { + labelToIdx sync.Map + idxToLabel sync.Map + + nextIdx atomic.Uint64 + + totalSizeBytes atomic.Uint64 +} + +// SizeBytes returns the size of lc data in bytes +func (lc *LabelsCompressor) SizeBytes() uint64 { + return uint64(unsafe.Sizeof(*lc)) + lc.totalSizeBytes.Load() +} + +// ItemsCount returns the number of items in lc +func (lc *LabelsCompressor) ItemsCount() uint64 { + return lc.nextIdx.Load() +} + +// Compress compresses labels, appends the compressed labels to dst and returns the result. +func (lc *LabelsCompressor) Compress(dst []byte, labels []prompbmarshal.Label) []byte { + if len(labels) == 0 { + // Fast path + return append(dst, 0) + } + + a := encoding.GetUint64s(len(labels) + 1) + a.A[0] = uint64(len(labels)) + lc.compress(a.A[1:], labels) + dst = encoding.MarshalVarUint64s(dst, a.A) + encoding.PutUint64s(a) + return dst +} + +func (lc *LabelsCompressor) compress(dst []uint64, labels []prompbmarshal.Label) { + if len(labels) == 0 { + return + } + _ = dst[len(labels)-1] + for i := range labels { + label := &labels[i] + v, ok := lc.labelToIdx.Load(*label) + if !ok { + v = lc.nextIdx.Add(1) + labelCopy := cloneLabel(label) + lc.idxToLabel.Store(v, labelCopy) + lc.labelToIdx.Store(*labelCopy, v) + + // Update lc.totalSizeBytes + labelSizeBytes := uint64(len(label.Name) + len(label.Value)) + entrySizeBytes := labelSizeBytes + uint64(unsafe.Sizeof(label)+unsafe.Sizeof(*label)+2*unsafe.Sizeof(v)) + lc.totalSizeBytes.Add(entrySizeBytes) + } + dst[i] = v.(uint64) + } +} + +func cloneLabel(label *prompbmarshal.Label) *prompbmarshal.Label { + // pre-allocate memory for label name and value + n := len(label.Name) + len(label.Value) + buf := make([]byte, 0, n) + + buf = append(buf, label.Name...) + labelName := bytesutil.ToUnsafeString(buf) + + buf = append(buf, label.Value...) + labelValue := bytesutil.ToUnsafeString(buf[len(labelName):]) + return &prompbmarshal.Label{ + Name: labelName, + Value: labelValue, + } +} + +// Decompress decompresses src into []prompbmarshal.Label, appends it to dst and returns the result. +func (lc *LabelsCompressor) Decompress(dst []prompbmarshal.Label, src []byte) []prompbmarshal.Label { + tail, labelsLen, err := encoding.UnmarshalVarUint64(src) + if err != nil { + logger.Panicf("BUG: cannot unmarshal labels length: %s", err) + } + if labelsLen == 0 { + // fast path - nothing to decode + if len(tail) > 0 { + logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail) + } + return dst + } + + a := encoding.GetUint64s(int(labelsLen)) + tail, err = encoding.UnmarshalVarUint64s(a.A, tail) + if err != nil { + logger.Panicf("BUG: cannot unmarshal label indexes: %s", err) + } + if len(tail) > 0 { + logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail) + } + dst = lc.decompress(dst, a.A) + encoding.PutUint64s(a) + return dst +} + +func (lc *LabelsCompressor) decompress(dst []prompbmarshal.Label, src []uint64) []prompbmarshal.Label { + for _, idx := range src { + v, ok := lc.idxToLabel.Load(idx) + if !ok { + logger.Panicf("BUG: missing label for idx=%d", idx) + } + label := *(v.(*prompbmarshal.Label)) + dst = append(dst, label) + } + return dst +} diff --git a/lib/promutils/labelscompressor_test.go b/lib/promutils/labelscompressor_test.go new file mode 100644 index 000000000..b5d787be7 --- /dev/null +++ b/lib/promutils/labelscompressor_test.go @@ -0,0 +1,119 @@ +package promutils + +import ( + "fmt" + "sync" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func TestLabelsCompressorSerial(t *testing.T) { + var lc LabelsCompressor + + f := func(labels []prompbmarshal.Label) { + t.Helper() + + sExpected := labelsToString(labels) + + data := lc.Compress(nil, labels) + labelsResult := lc.Decompress(nil, data) + + sResult := labelsToString(labelsResult) + if sExpected != sResult { + t.Fatalf("unexpected result; got %s; want %s", sResult, sExpected) + } + + if len(labels) > 0 { + if n := lc.SizeBytes(); n == 0 { + t.Fatalf("Unexpected zero SizeBytes()") + } + if n := lc.ItemsCount(); n == 0 { + t.Fatalf("Unexpected zero ItemsCount()") + } + } + } + + // empty labels + f(nil) + f([]prompbmarshal.Label{}) + + // non-empty labels + f([]prompbmarshal.Label{ + { + Name: "instance", + Value: "12345.4342.342.3", + }, + { + Name: "job", + Value: "kube-pod-12323", + }, + }) + f([]prompbmarshal.Label{ + { + Name: "instance", + Value: "12345.4342.342.3", + }, + { + Name: "job", + Value: "kube-pod-12323", + }, + { + Name: "pod", + Value: "foo-bar-baz", + }, + }) +} + +func TestLabelsCompressorConcurrent(t *testing.T) { + const concurrency = 5 + var lc LabelsCompressor + + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + series := newTestSeries(100, 20) + for i, labels := range series { + sExpected := labelsToString(labels) + data := lc.Compress(nil, labels) + labelsResult := lc.Decompress(nil, data) + sResult := labelsToString(labelsResult) + if sExpected != sResult { + panic(fmt.Errorf("unexpected result on iteration %d; got %s; want %s", i, sResult, sExpected)) + } + } + }() + } + wg.Wait() + + if n := lc.SizeBytes(); n == 0 { + t.Fatalf("Unexpected zero SizeBytes()") + } + if n := lc.ItemsCount(); n == 0 { + t.Fatalf("Unexpected zero ItemsCount()") + } +} + +func labelsToString(labels []prompbmarshal.Label) string { + l := Labels{ + Labels: labels, + } + return l.String() +} + +func newTestSeries(seriesCount, labelsPerSeries int) [][]prompbmarshal.Label { + series := make([][]prompbmarshal.Label, seriesCount) + for i := 0; i < seriesCount; i++ { + labels := make([]prompbmarshal.Label, labelsPerSeries) + for j := 0; j < labelsPerSeries; j++ { + labels[j] = prompbmarshal.Label{ + Name: fmt.Sprintf("label_%d", j), + Value: fmt.Sprintf("value_%d_%d", i, j), + } + } + series[i] = labels + } + return series +} diff --git a/lib/promutils/labelscompressor_timing_test.go b/lib/promutils/labelscompressor_timing_test.go new file mode 100644 index 000000000..11f6c4f1c --- /dev/null +++ b/lib/promutils/labelscompressor_timing_test.go @@ -0,0 +1,54 @@ +package promutils + +import ( + "sync/atomic" + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func BenchmarkLabelsCompressorCompress(b *testing.B) { + var lc LabelsCompressor + series := newTestSeries(100, 10) + + b.ReportAllocs() + b.SetBytes(int64(len(series))) + + b.RunParallel(func(pb *testing.PB) { + var dst []byte + for pb.Next() { + dst = dst[:0] + for _, labels := range series { + dst = lc.Compress(dst, labels) + } + Sink.Add(uint64(len(dst))) + } + }) +} + +func BenchmarkLabelsCompressorDecompress(b *testing.B) { + var lc LabelsCompressor + series := newTestSeries(100, 10) + datas := make([][]byte, len(series)) + var dst []byte + for i, labels := range series { + dstLen := len(dst) + dst = lc.Compress(dst, labels) + datas[i] = dst[dstLen:] + } + + b.ReportAllocs() + b.SetBytes(int64(len(series))) + + b.RunParallel(func(pb *testing.PB) { + var labels []prompbmarshal.Label + for pb.Next() { + for _, data := range datas { + labels = lc.Decompress(labels[:0], data) + } + Sink.Add(uint64(len(labels))) + } + }) +} + +var Sink atomic.Uint64 diff --git a/lib/streamaggr/avg.go b/lib/streamaggr/avg.go index 3f5463649..9a5e6df5f 100644 --- a/lib/streamaggr/avg.go +++ b/lib/streamaggr/avg.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -22,35 +23,41 @@ func newAvgAggrState() *avgAggrState { return &avgAggrState{} } -func (as *avgAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &avgStateValue{ - sum: value, - count: 1, +func (as *avgAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &avgStateValue{ + sum: s.value, + count: 1, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The entry has been successfully stored + continue + } + // Update the entry created by a concurrent goroutine. + v = vNew } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The entry has been successfully stored - return + sv := v.(*avgStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.sum += s.value + sv.count++ + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - // Update the entry created by a concurrent goroutine. - v = vNew - } - sv := v.(*avgStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.sum += value - sv.count++ - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/count_samples.go b/lib/streamaggr/count_samples.go index b83d83268..cd59829ae 100644 --- a/lib/streamaggr/count_samples.go +++ b/lib/streamaggr/count_samples.go @@ -1,12 +1,13 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" ) -// countSamplesAggrState calculates output=countSamples, e.g. the count of input samples. +// countSamplesAggrState calculates output=count_samples, e.g. the count of input samples. type countSamplesAggrState struct { m sync.Map } @@ -21,33 +22,39 @@ func newCountSamplesAggrState() *countSamplesAggrState { return &countSamplesAggrState{} } -func (as *countSamplesAggrState) pushSample(_, outputKey string, _ float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &countSamplesStateValue{ - n: 1, +func (as *countSamplesAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &countSamplesStateValue{ + n: 1, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + continue + } + // Use the entry created by a concurrent goroutine. + v = vNew } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - return + sv := v.(*countSamplesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.n++ + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - // Use the entry created by a concurrent goroutine. - v = vNew - } - sv := v.(*countSamplesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.n++ - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/count_series.go b/lib/streamaggr/count_series.go index b4a409e71..04b7383c8 100644 --- a/lib/streamaggr/count_series.go +++ b/lib/streamaggr/count_series.go @@ -1,9 +1,12 @@ package streamaggr import ( + "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/cespare/xxhash/v2" ) // countSeriesAggrState calculates output=count_series, e.g. the number of unique series. @@ -12,50 +15,57 @@ type countSeriesAggrState struct { } type countSeriesStateValue struct { - mu sync.Mutex - countedSeries map[string]struct{} - n uint64 - deleted bool + mu sync.Mutex + m map[uint64]struct{} + deleted bool } func newCountSeriesAggrState() *countSeriesAggrState { return &countSeriesAggrState{} } -func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, _ float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &countSeriesStateValue{ - countedSeries: map[string]struct{}{ - inputKey: {}, - }, - n: 1, +func (as *countSeriesAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + inputKey, outputKey := getInputOutputKey(s.key) + + // Count unique hashes over the inputKeys instead of unique inputKey values. + // This reduces memory usage at the cost of possible hash collisions for distinct inputKey values. + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(inputKey)) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &countSeriesStateValue{ + m: map[uint64]struct{}{ + h: {}, + }, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The entry has been added to the map. + continue + } + // Update the entry created by a concurrent goroutine. + v = vNew } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The entry has been added to the map. - return + sv := v.(*countSeriesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + if _, ok := sv.m[h]; !ok { + sv.m[h] = struct{}{} + } } - // Update the entry created by a concurrent goroutine. - v = vNew - } - sv := v.(*countSeriesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - if _, ok := sv.countedSeries[inputKey]; !ok { - sv.countedSeries[inputKey] = struct{}{} - sv.n++ + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again - } } func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) { @@ -67,7 +77,7 @@ func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) { sv := v.(*countSeriesStateValue) sv.mu.Lock() - n := sv.n + n := len(sv.m) // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. sv.deleted = true sv.mu.Unlock() diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go new file mode 100644 index 000000000..7de6ca908 --- /dev/null +++ b/lib/streamaggr/dedup.go @@ -0,0 +1,185 @@ +package streamaggr + +import ( + "strings" + "sync" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/cespare/xxhash/v2" +) + +const dedupAggrShardsCount = 128 + +type dedupAggr struct { + shards []dedupAggrShard +} + +type dedupAggrShard struct { + dedupAggrShardNopad + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte +} + +type dedupAggrShardNopad struct { + mu sync.Mutex + m map[string]*dedupAggrSample +} + +type dedupAggrSample struct { + value float64 +} + +func newDedupAggr() *dedupAggr { + shards := make([]dedupAggrShard, dedupAggrShardsCount) + return &dedupAggr{ + shards: shards, + } +} + +func (da *dedupAggr) sizeBytes() uint64 { + n := uint64(unsafe.Sizeof(*da)) + for i := range da.shards { + n += da.shards[i].sizeBytes() + } + return n +} + +func (da *dedupAggr) itemsCount() uint64 { + n := uint64(0) + for i := range da.shards { + n += da.shards[i].itemsCount() + } + return n +} + +func (das *dedupAggrShard) sizeBytes() uint64 { + das.mu.Lock() + n := uint64(unsafe.Sizeof(*das)) + for k, s := range das.m { + n += uint64(len(k)) + uint64(unsafe.Sizeof(k)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)) + } + das.mu.Unlock() + return n +} + +func (das *dedupAggrShard) itemsCount() uint64 { + das.mu.Lock() + n := uint64(len(das.m)) + das.mu.Unlock() + return n +} + +func (da *dedupAggr) pushSamples(samples []pushSample) { + pss := getPerShardSamples() + shards := pss.shards + for _, sample := range samples { + h := xxhash.Sum64(bytesutil.ToUnsafeBytes(sample.key)) + idx := h % uint64(len(shards)) + shards[idx] = append(shards[idx], sample) + } + for i, shardSamples := range shards { + if len(shardSamples) == 0 { + continue + } + da.shards[i].pushSamples(shardSamples) + } + putPerShardSamples(pss) +} + +type dedupFlushCtx struct { + samples []pushSample +} + +func (ctx *dedupFlushCtx) reset() { + clear(ctx.samples) + ctx.samples = ctx.samples[:0] +} + +func (da *dedupAggr) flush(f func(samples []pushSample)) { + ctx := &dedupFlushCtx{} + shards := da.shards + for i := range shards { + ctx.reset() + shards[i].flush(ctx, f) + } +} + +type perShardSamples struct { + shards [][]pushSample +} + +func (pss *perShardSamples) reset() { + shards := pss.shards + for i, shardSamples := range shards { + if len(shardSamples) > 0 { + clear(shardSamples) + shards[i] = shardSamples[:0] + } + } +} + +func getPerShardSamples() *perShardSamples { + v := perShardSamplesPool.Get() + if v == nil { + return &perShardSamples{ + shards: make([][]pushSample, dedupAggrShardsCount), + } + } + return v.(*perShardSamples) +} + +func putPerShardSamples(pss *perShardSamples) { + pss.reset() + perShardSamplesPool.Put(pss) +} + +var perShardSamplesPool sync.Pool + +func (das *dedupAggrShard) pushSamples(samples []pushSample) { + das.mu.Lock() + defer das.mu.Unlock() + + m := das.m + if m == nil { + m = make(map[string]*dedupAggrSample, len(samples)) + das.m = m + } + for _, sample := range samples { + s, ok := m[sample.key] + if ok { + s.value = sample.value + } else { + key := strings.Clone(sample.key) + m[key] = &dedupAggrSample{ + value: sample.value, + } + } + } +} + +func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) { + das.mu.Lock() + + m := das.m + das.m = nil + + das.mu.Unlock() + + if len(m) == 0 { + return + } + + dstSamples := ctx.samples + for key, s := range m { + dstSamples = append(dstSamples, pushSample{ + key: key, + value: s.value, + }) + } + ctx.samples = dstSamples + + f(dstSamples) +} diff --git a/lib/streamaggr/dedup_test.go b/lib/streamaggr/dedup_test.go new file mode 100644 index 000000000..247e3ccc4 --- /dev/null +++ b/lib/streamaggr/dedup_test.go @@ -0,0 +1,88 @@ +package streamaggr + +import ( + "fmt" + "reflect" + "strings" + "sync" + "sync/atomic" + "testing" +) + +func TestDedupAggrSerial(t *testing.T) { + da := newDedupAggr() + + const seriesCount = 100_000 + expectedSamplesMap := make(map[string]pushSample) + for i := 0; i < 2; i++ { + samples := make([]pushSample, seriesCount) + for j := range samples { + sample := &samples[j] + sample.key = fmt.Sprintf("key_%d", j) + sample.value = float64(i + j) + expectedSamplesMap[sample.key] = *sample + } + da.pushSamples(samples) + } + + if n := da.sizeBytes(); n > 4_200_000 { + t.Fatalf("too big dedupAggr state before flush: %d bytes; it shouldn't exceed 4_200_000 bytes", n) + } + if n := da.itemsCount(); n != seriesCount { + t.Fatalf("unexpected itemsCount; got %d; want %d", n, seriesCount) + } + + flushedSamplesMap := make(map[string]pushSample) + flushSamples := func(samples []pushSample) { + for _, sample := range samples { + sample.key = strings.Clone(sample.key) + flushedSamplesMap[sample.key] = sample + } + } + da.flush(flushSamples) + + if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) { + t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap) + } + + if n := da.sizeBytes(); n > 17_000 { + t.Fatalf("too big dedupAggr state after flush; %d bytes; it shouldn't exceed 17_000 bytes", n) + } + if n := da.itemsCount(); n != 0 { + t.Fatalf("unexpected non-zero itemsCount after flush; got %d", n) + } +} + +func TestDedupAggrConcurrent(t *testing.T) { + const concurrency = 5 + const seriesCount = 10_000 + da := newDedupAggr() + + var samplesFlushed atomic.Int64 + flushSamples := func(samples []pushSample) { + samplesFlushed.Add(int64(len(samples))) + } + + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + samples := make([]pushSample, seriesCount) + for j := range samples { + sample := &samples[j] + sample.key = fmt.Sprintf("key_%d", j) + sample.value = float64(i + j) + } + da.pushSamples(samples) + } + da.flush(flushSamples) + }() + } + wg.Wait() + + if n := samplesFlushed.Load(); n < seriesCount { + t.Fatalf("too small number of series flushed; got %d; want at least %d", n, seriesCount) + } +} diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go new file mode 100644 index 000000000..85e9db78e --- /dev/null +++ b/lib/streamaggr/dedup_timing_test.go @@ -0,0 +1,48 @@ +package streamaggr + +import ( + "fmt" + "sync/atomic" + "testing" +) + +func BenchmarkDedupAggr(b *testing.B) { + for _, samplesPerPush := range []int{1, 10, 100, 1_000, 10_000, 100_000} { + b.Run(fmt.Sprintf("samplesPerPush_%d", samplesPerPush), func(b *testing.B) { + benchmarkDedupAggr(b, samplesPerPush) + }) + } +} + +func benchmarkDedupAggr(b *testing.B, samplesPerPush int) { + flushSamples := func(samples []pushSample) { + Sink.Add(uint64(len(samples))) + } + + const loops = 2 + benchSamples := newBenchSamples(samplesPerPush) + da := newDedupAggr() + + b.ReportAllocs() + b.SetBytes(int64(samplesPerPush * loops)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := 0; i < loops; i++ { + da.pushSamples(benchSamples) + } + da.flush(flushSamples) + } + }) +} + +func newBenchSamples(count int) []pushSample { + samples := make([]pushSample, count) + for i := range samples { + sample := &samples[i] + sample.key = fmt.Sprintf("key_%d", i) + sample.value = float64(i) + } + return samples +} + +var Sink atomic.Uint64 diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 32568179c..bc91d2f23 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "time" @@ -9,7 +10,7 @@ import ( "github.com/VictoriaMetrics/metrics" ) -// histogramBucketAggrState calculates output=histogramBucket, e.g. VictoriaMetrics histogram over input samples. +// histogramBucketAggrState calculates output=histogram_bucket, e.g. VictoriaMetrics histogram over input samples. type histogramBucketAggrState struct { m sync.Map @@ -30,33 +31,38 @@ func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBuck } } -func (as *histogramBucketAggrState) pushSample(_, outputKey string, value float64) { +func (as *histogramBucketAggrState) pushSamples(samples []pushSample) { currentTime := fasttime.UnixTimestamp() deleteDeadline := currentTime + as.stalenessSecs + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &histogramBucketStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &histogramBucketStateValue{} + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*histogramBucketStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.h.Update(s.value) + sv.deleteDeadline = deleteDeadline + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - } - sv := v.(*histogramBucketStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.h.Update(value) - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/increase.go b/lib/streamaggr/increase.go deleted file mode 100644 index 7e76c0a4c..000000000 --- a/lib/streamaggr/increase.go +++ /dev/null @@ -1,129 +0,0 @@ -package streamaggr - -import ( - "sync" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" -) - -// increaseAggrState calculates output=increase, e.g. the increase over input counters. -type increaseAggrState struct { - m sync.Map - - ignoreInputDeadline uint64 - stalenessSecs uint64 -} - -type increaseStateValue struct { - mu sync.Mutex - lastValues map[string]*lastValueState - total float64 - deleteDeadline uint64 - deleted bool -} - -func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *increaseAggrState { - currentTime := fasttime.UnixTimestamp() - intervalSecs := roundDurationToSecs(interval) - stalenessSecs := roundDurationToSecs(stalenessInterval) - return &increaseAggrState{ - ignoreInputDeadline: currentTime + intervalSecs, - stalenessSecs: stalenessSecs, - } -} - -func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) { - currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs - -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &increaseStateValue{ - lastValues: make(map[string]*lastValueState), - } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*increaseStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - lv, ok := sv.lastValues[inputKey] - if !ok { - lv = &lastValueState{} - sv.lastValues[inputKey] = lv - } - d := value - if ok && lv.value <= value { - d = value - lv.value - } - if ok || currentTime > as.ignoreInputDeadline { - sv.total += d - } - lv.value = value - lv.deleteDeadline = deleteDeadline - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again - } -} - -func (as *increaseAggrState) removeOldEntries(currentTime uint64) { - m := &as.m - m.Range(func(k, v interface{}) bool { - sv := v.(*increaseStateValue) - - sv.mu.Lock() - deleted := currentTime > sv.deleteDeadline - if deleted { - // Mark the current entry as deleted - sv.deleted = deleted - } else { - // Delete outdated entries in sv.lastValues - m := sv.lastValues - for k1, v1 := range m { - if currentTime > v1.deleteDeadline { - delete(m, k1) - } - } - } - sv.mu.Unlock() - - if deleted { - m.Delete(k) - } - return true - }) -} - -func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) { - currentTime := fasttime.UnixTimestamp() - currentTimeMsec := int64(currentTime) * 1000 - - as.removeOldEntries(currentTime) - - m := &as.m - m.Range(func(k, v interface{}) bool { - sv := v.(*increaseStateValue) - sv.mu.Lock() - increase := sv.total - sv.total = 0 - deleted := sv.deleted - sv.mu.Unlock() - if !deleted { - key := k.(string) - ctx.appendSeries(key, "increase", currentTimeMsec, increase) - } - return true - }) -} diff --git a/lib/streamaggr/last.go b/lib/streamaggr/last.go index b51a8baf3..b4710b21f 100644 --- a/lib/streamaggr/last.go +++ b/lib/streamaggr/last.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -21,33 +22,39 @@ func newLastAggrState() *lastAggrState { return &lastAggrState{} } -func (as *lastAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &lastStateValue{ - last: value, +func (as *lastAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &lastStateValue{ + last: s.value, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + continue + } + // Use the entry created by a concurrent goroutine. + v = vNew } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - return + sv := v.(*lastStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.last = s.value + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - // Use the entry created by a concurrent goroutine. - v = vNew - } - sv := v.(*lastStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.last = value - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index fdb91aa27..45d248cbb 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -21,36 +22,42 @@ func newMaxAggrState() *maxAggrState { return &maxAggrState{} } -func (as *maxAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &maxStateValue{ - max: value, +func (as *maxAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &maxStateValue{ + max: s.value, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + continue + } + // Use the entry created by a concurrent goroutine. + v = vNew } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - return + sv := v.(*maxStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + if s.value > sv.max { + sv.max = s.value + } } - // Use the entry created by a concurrent goroutine. - v = vNew - } - sv := v.(*maxStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - if value > sv.max { - sv.max = value + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again - } } func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) { diff --git a/lib/streamaggr/min.go b/lib/streamaggr/min.go index 707ffb7ec..866b95ff9 100644 --- a/lib/streamaggr/min.go +++ b/lib/streamaggr/min.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -21,36 +22,42 @@ func newMinAggrState() *minAggrState { return &minAggrState{} } -func (as *minAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &minStateValue{ - min: value, +func (as *minAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &minStateValue{ + min: s.value, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + continue + } + // Use the entry created by a concurrent goroutine. + v = vNew } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - return + sv := v.(*minStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + if s.value < sv.min { + sv.min = s.value + } } - // Use the entry created by a concurrent goroutine. - v = vNew - } - sv := v.(*minStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - if value < sv.min { - sv.min = value + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again - } } func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) { diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index 31b7affe7..5142e6558 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -2,6 +2,7 @@ package streamaggr import ( "strconv" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -28,33 +29,39 @@ func newQuantilesAggrState(phis []float64) *quantilesAggrState { } } -func (as *quantilesAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - h := histogram.GetFast() - v = &quantilesStateValue{ - h: h, +func (as *quantilesAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + h := histogram.GetFast() + v = &quantilesStateValue{ + h: h, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + histogram.PutFast(h) + v = vNew + } } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - histogram.PutFast(h) - v = vNew + sv := v.(*quantilesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.h.Update(s.value) + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - } - sv := v.(*quantilesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.h.Update(value) - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/stddev.go b/lib/streamaggr/stddev.go index 9d270bcb5..592787efe 100644 --- a/lib/streamaggr/stddev.go +++ b/lib/streamaggr/stddev.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -24,33 +25,39 @@ func newStddevAggrState() *stddevAggrState { return &stddevAggrState{} } -func (as *stddevAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &stddevStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew +func (as *stddevAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &stddevStateValue{} + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*stddevStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation + sv.count++ + avg := sv.avg + (s.value-sv.avg)/sv.count + sv.q += (s.value - sv.avg) * (s.value - avg) + sv.avg = avg + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - } - sv := v.(*stddevStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation - sv.count++ - avg := sv.avg + (value-sv.avg)/sv.count - sv.q += (value - sv.avg) * (value - avg) - sv.avg = avg - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/stdvar.go b/lib/streamaggr/stdvar.go index f2e03313b..585ec22e4 100644 --- a/lib/streamaggr/stdvar.go +++ b/lib/streamaggr/stdvar.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -23,33 +24,39 @@ func newStdvarAggrState() *stdvarAggrState { return &stdvarAggrState{} } -func (as *stdvarAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &stdvarStateValue{} - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew +func (as *stdvarAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &stdvarStateValue{} + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } + } + sv := v.(*stdvarStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation + sv.count++ + avg := sv.avg + (s.value-sv.avg)/sv.count + sv.q += (s.value - sv.avg) * (s.value - avg) + sv.avg = avg + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - } - sv := v.(*stdvarStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation - sv.count++ - avg := sv.avg + (value-sv.avg)/sv.count - sv.q += (value - sv.avg) * (value - avg) - sv.avg = avg - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 4c27b6d69..bc3dcf71b 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "math" + "slices" "sort" "strconv" "strings" @@ -19,14 +20,18 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/metrics" "gopkg.in/yaml.v2" ) var supportedOutputs = []string{ "total", + "total_prometheus", "increase", + "increase_prometheus", "count_series", "count_samples", + "unique_samples", "sum_samples", "last", "min", @@ -80,8 +85,11 @@ type Config struct { // Interval is the interval between aggregations. Interval string `yaml:"interval"` + // DedupInterval is an optional interval for deduplication. + DedupInterval string `yaml:"dedup_interval,omitempty"` + // Staleness interval is interval after which the series state will be reset if no samples have been sent during it. - // The parameter is only relevant for outputs: total, increase and histogram_bucket. + // The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket. StalenessInterval string `yaml:"staleness_interval,omitempty"` // Outputs is a list of output aggregate functions to produce. @@ -89,10 +97,13 @@ type Config struct { // The following names are allowed: // // - total - aggregates input counters - // - increase - counts the increase over input counters - // - count_series - counts the input series + // - total_prometheus - aggregates input counters, ignoring the first sample in new time series + // - increase - calculates the increase over input series + // - increase_prometheus - calculates the increase over input series, ignoring the first sample in new time series + // - count_series - counts the number of unique input series // - count_samples - counts the input samples - // - sum_samples - sums the input samples + // - unique_samples - counts the number of unique sample values + // - sum_samples - sums the input sample values // - last - the last biggest sample value // - min - the minimum sample value // - max - the maximum sample value @@ -102,12 +113,18 @@ type Config struct { // - histogram_bucket - creates VictoriaMetrics histogram for input samples // - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1] // - // The output time series will have the following names: + // The output time series will have the following names by default: // - // input_name:aggr__ + // input_name:[_by_][_without_]_ + // + // See also KeepMetricNames // Outputs []string `yaml:"outputs"` + // KeepMetricNames instructs to leave metric names as is for the output time series + // without adding any suffix. + KeepMetricNames bool `yaml:"keep_metric_names,omitempty"` + // By is an optional list of labels for grouping input series. // // See also Without. @@ -144,6 +161,8 @@ type Aggregators struct { // configData contains marshaled configs passed to NewAggregators(). // It is used in Equal() for comparing Aggregators. configData []byte + + ms *metrics.Set } // NewAggregators creates Aggregators from the given cfgs. @@ -155,9 +174,10 @@ type Aggregators struct { // // MustStop must be called on the returned Aggregators when they are no longer needed. func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { + ms := metrics.NewSet() as := make([]*aggregator, len(cfgs)) for i, cfg := range cfgs { - a, err := newAggregator(cfg, pushFunc, dedupInterval) + a, err := newAggregator(cfg, pushFunc, ms, dedupInterval) if err != nil { // Stop already initialized aggregators before returning the error. for _, a := range as[:i] { @@ -171,9 +191,46 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati if err != nil { logger.Panicf("BUG: cannot marshal the provided configs: %s", err) } + + _ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 { + n := uint64(0) + for _, aggr := range as { + if aggr.da != nil { + n += aggr.da.sizeBytes() + } + } + return float64(n) + }) + _ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 { + n := uint64(0) + for _, aggr := range as { + if aggr.da != nil { + n += aggr.da.itemsCount() + } + } + return float64(n) + }) + + _ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 { + n := uint64(0) + for _, aggr := range as { + n += aggr.lc.SizeBytes() + } + return float64(n) + }) + _ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { + n := uint64(0) + for _, aggr := range as { + n += aggr.lc.ItemsCount() + } + return float64(n) + }) + + metrics.RegisterSet(ms) return &Aggregators{ as: as, configData: configData, + ms: ms, }, nil } @@ -182,9 +239,14 @@ func (a *Aggregators) MustStop() { if a == nil { return } + + metrics.UnregisterSet(a.ms) + a.ms = nil + for _, aggr := range a.as { aggr.MustStop() } + a.as = nil } // Equal returns true if a and b are initialized from identical configs. @@ -208,12 +270,14 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []b for i := 0; i < len(matchIdxs); i++ { matchIdxs[i] = 0 } - - if a != nil { - for _, aggr := range a.as { - aggr.Push(tss, matchIdxs) - } + if a == nil { + return matchIdxs } + + for _, aggr := range a.as { + aggr.Push(tss, matchIdxs) + } + return matchIdxs } @@ -224,17 +288,22 @@ type aggregator struct { inputRelabeling *promrelabel.ParsedConfigs outputRelabeling *promrelabel.ParsedConfigs + keepMetricNames bool + by []string without []string aggregateOnlyByTime bool - // dedupAggr is set to non-nil if input samples must be de-duplicated according + // da is set to non-nil if input samples must be de-duplicated according // to the dedupInterval passed to newAggregator(). - dedupAggr *lastAggrState + da *dedupAggr // aggrStates contains aggregate states for the given outputs aggrStates []aggrState + // lc is used for compressing series keys before passing them to dedupAggr and aggrState. + lc promutils.LabelsCompressor + pushFunc PushFunc // suffix contains a suffix, which should be added to aggregate metric names @@ -250,10 +319,16 @@ type aggregator struct { wg sync.WaitGroup stopCh chan struct{} + + flushDuration *metrics.Histogram + dedupFlushDuration *metrics.Histogram + + flushTimeouts *metrics.Counter + dedupFlushTimeouts *metrics.Counter } type aggrState interface { - pushSample(inputKey, outputKey string, value float64) + pushSamples(samples []pushSample) appendSeriesForFlush(ctx *flushCtx) } @@ -266,7 +341,7 @@ type PushFunc func(tss []prompbmarshal.TimeSeries) // e.g. only the last sample per each time series per each dedupInterval is aggregated. // // The returned aggregator must be stopped when no longer needed by calling MustStop(). -func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) { +func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterval time.Duration) (*aggregator, error) { // check cfg.Interval interval, err := time.ParseDuration(cfg.Interval) if err != nil { @@ -276,6 +351,21 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) return nil, fmt.Errorf("aggregation interval cannot be smaller than 1s; got %s", interval) } + // check cfg.DedupInterval + if cfg.DedupInterval != "" { + di, err := time.ParseDuration(cfg.DedupInterval) + if err != nil { + return nil, fmt.Errorf("cannot parse `dedup_interval: %q`: %w", cfg.DedupInterval, err) + } + dedupInterval = di + } + if dedupInterval > interval { + return nil, fmt.Errorf("dedup_interval=%s cannot exceed interval=%s", dedupInterval, interval) + } + if dedupInterval > 0 && interval%dedupInterval != 0 { + return nil, fmt.Errorf("interval=%s must be a multiple of dedup_interval=%s", interval, dedupInterval) + } + // check cfg.StalenessInterval stalenessInterval := interval * 2 if cfg.StalenessInterval != "" { @@ -284,7 +374,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) return nil, fmt.Errorf("cannot parse `staleness_interval: %q`: %w", cfg.StalenessInterval, err) } if stalenessInterval < interval { - return nil, fmt.Errorf("staleness_interval cannot be less than interval (%s); got %s", cfg.Interval, cfg.StalenessInterval) + return nil, fmt.Errorf("interval=%s cannot exceed staleness_interval=%s", cfg.Interval, cfg.StalenessInterval) } } @@ -309,6 +399,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) by = addMissingUnderscoreName(by) } + // check cfg.KeepMetricNames + if cfg.KeepMetricNames { + if len(cfg.Outputs) != 1 { + return nil, fmt.Errorf("`ouputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs) + } + if cfg.Outputs[0] == "histogram_bucket" || strings.HasPrefix(cfg.Outputs[0], "quantiles(") && strings.Contains(cfg.Outputs[0], ",") { + return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series", cfg.Outputs) + } + } + // initialize outputs list if len(cfg.Outputs) == 0 { return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ @@ -342,13 +442,19 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) } switch output { case "total": - aggrStates[i] = newTotalAggrState(interval, stalenessInterval) + aggrStates[i] = newTotalAggrState(stalenessInterval, false, true) + case "total_prometheus": + aggrStates[i] = newTotalAggrState(stalenessInterval, false, false) case "increase": - aggrStates[i] = newIncreaseAggrState(interval, stalenessInterval) + aggrStates[i] = newTotalAggrState(stalenessInterval, true, true) + case "increase_prometheus": + aggrStates[i] = newTotalAggrState(stalenessInterval, true, false) case "count_series": aggrStates[i] = newCountSeriesAggrState() case "count_samples": aggrStates[i] = newCountSamplesAggrState() + case "unique_samples": + aggrStates[i] = newUniqueSamplesAggrState() case "sum_samples": aggrStates[i] = newSumSamplesAggrState() case "last": @@ -381,11 +487,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) } suffix += "_" - var dedupAggr *lastAggrState - if dedupInterval > 0 { - dedupAggr = newLastAggrState() - } - // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -393,11 +494,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) inputRelabeling: inputRelabeling, outputRelabeling: outputRelabeling, + keepMetricNames: cfg.KeepMetricNames, + by: by, without: without, aggregateOnlyByTime: aggregateOnlyByTime, - dedupAggr: dedupAggr, aggrStates: aggrStates, pushFunc: pushFunc, @@ -405,75 +507,84 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) flushOnShutdown: cfg.FlushOnShutdown, stopCh: make(chan struct{}), + + flushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_flush_duration_seconds`), + dedupFlushDuration: ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`), + + flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`), + dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`), + } + if dedupInterval > 0 { + a.da = newDedupAggr() } - if dedupAggr != nil { - a.wg.Add(1) - go func() { - a.runDedupFlusher(dedupInterval) - a.wg.Done() - }() - } a.wg.Add(1) go func() { - a.runFlusher(interval) + a.runFlusher(interval, dedupInterval) a.wg.Done() }() return a, nil } -func (a *aggregator) runDedupFlusher(interval time.Duration) { - t := time.NewTicker(interval) - defer t.Stop() - for { - select { - case <-a.stopCh: - return - case <-t.C: - } +func (a *aggregator) runFlusher(interval, dedupInterval time.Duration) { + tickerFlush := time.NewTicker(interval) + defer tickerFlush.Stop() - // Globally limit the concurrency for metrics' flush - // in order to limit memory usage when big number of aggregators - // are flushed at the same time. - flushConcurrencyCh <- struct{}{} - a.dedupFlush() - <-flushConcurrencyCh + var dedupTickerCh <-chan time.Time + if dedupInterval > 0 { + t := time.NewTicker(dedupInterval) + defer t.Stop() + dedupTickerCh = t.C } -} -func (a *aggregator) runFlusher(interval time.Duration) { - t := time.NewTicker(interval) - defer t.Stop() for { select { case <-a.stopCh: return - case <-t.C: - } + case <-tickerFlush.C: + startTime := time.Now() - // Globally limit the concurrency for metrics' flush - // in order to limit memory usage when big number of aggregators - // are flushed at the same time. - flushConcurrencyCh <- struct{}{} - a.flush() - <-flushConcurrencyCh + flushConcurrencyCh <- struct{}{} + a.flush() + <-flushConcurrencyCh + + d := time.Since(startTime) + a.flushDuration.Update(d.Seconds()) + if d > interval { + a.flushTimeouts.Inc() + logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %s; "+ + "possible solutions: increase interval; use match filter matching smaller number of series; "+ + "reduce samples' ingestion rate to stream aggregation", interval, d) + } + case <-dedupTickerCh: + startTime := time.Now() + + flushConcurrencyCh <- struct{}{} + a.dedupFlush() + <-flushConcurrencyCh + + d := time.Since(startTime) + a.dedupFlushDuration.Update(d.Seconds()) + if d > dedupInterval { + a.dedupFlushTimeouts.Inc() + logger.Warnf("stream aggregation deduplication couldn't be finished in the configured dedup_interval=%s; it took %s; "+ + "possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+ + "reduce samples' ingestion rate to stream aggregation", dedupInterval, d) + } + } } } var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) func (a *aggregator) dedupFlush() { - ctx := &flushCtx{ - skipAggrSuffix: true, - } - a.dedupAggr.appendSeriesForFlush(ctx) - a.push(ctx.tss, nil) + a.da.flush(a.pushSamples) } func (a *aggregator) flush() { ctx := &flushCtx{ - suffix: a.suffix, + a: a, } for _, as := range a.aggrStates { ctx.reset() @@ -521,7 +632,7 @@ func (a *aggregator) MustStop() { // Flush the remaining data from the last interval if needed. flushConcurrencyCh <- struct{}{} - if a.dedupAggr != nil { + if a.da != nil { a.dedupFlush() } a.flush() @@ -530,18 +641,15 @@ func (a *aggregator) MustStop() { // Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { - if a.dedupAggr == nil { - // Deduplication is disabled. - a.push(tss, matchIdxs) - return - } + ctx := getPushCtx() + defer putPushCtx(ctx) + + samples := ctx.samples + labels := &ctx.labels + inputLabels := &ctx.inputLabels + outputLabels := &ctx.outputLabels + buf := ctx.buf - // Deduplication is enabled. - // push samples to dedupAggr, so later they will be pushed to the configured aggregators. - pushSample := a.dedupAggr.pushSample - inputKey := "" - bb := bbPool.Get() - labels := promutils.GetLabels() for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue @@ -556,173 +664,140 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { } labels.Sort() - bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) - outputKey := bytesutil.InternBytes(bb.B) - for _, sample := range ts.Samples { - pushSample(inputKey, outputKey, sample.Value) - } - } - promutils.PutLabels(labels) - bbPool.Put(bb) -} - -func (a *aggregator) push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { - labels := promutils.GetLabels() - tmpLabels := promutils.GetLabels() - bb := bbPool.Get() - applyFilters := matchIdxs != nil - for idx, ts := range tss { - if applyFilters { - if !a.match.Match(ts.Labels) { - continue - } - matchIdxs[idx] = 1 - } - - labels.Labels = append(labels.Labels[:0], ts.Labels...) - if applyFilters { - labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) - if len(labels.Labels) == 0 { - // The metric has been deleted by the relabeling - continue - } - labels.Sort() - } - - if a.aggregateOnlyByTime { - bb.B = marshalLabelsFast(bb.B[:0], labels.Labels) - } else { - tmpLabels.Labels = removeUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without) - bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels) - } - outputKey := bytesutil.InternBytes(bb.B) - inputKey := "" + inputLabels.Reset() + outputLabels.Reset() if !a.aggregateOnlyByTime { - tmpLabels.Labels = extractUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without) - bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels) - inputKey = bytesutil.InternBytes(bb.B) + inputLabels.Labels, outputLabels.Labels = getInputOutputLabels(inputLabels.Labels, outputLabels.Labels, labels.Labels, a.by, a.without) + } else { + outputLabels.Labels = append(outputLabels.Labels, labels.Labels...) } + bufLen := len(buf) + buf = a.compressLabels(buf, inputLabels.Labels, outputLabels.Labels) for _, sample := range ts.Samples { - a.pushSample(inputKey, outputKey, sample.Value) + if math.IsNaN(sample.Value) { + // Skip NaN values + continue + } + samples = append(samples, pushSample{ + key: bytesutil.ToUnsafeString(buf[bufLen:]), + value: sample.Value, + }) } } + ctx.samples = samples + ctx.buf = buf + + if a.da != nil { + a.da.pushSamples(samples) + } else { + a.pushSamples(samples) + } +} + +func (a *aggregator) compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { + bb := bbPool.Get() + bb.B = a.lc.Compress(bb.B, inputLabels) + dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) + dst = append(dst, bb.B...) bbPool.Put(bb) - promutils.PutLabels(tmpLabels) - promutils.PutLabels(labels) + dst = a.lc.Compress(dst, outputLabels) + return dst } -var bbPool bytesutil.ByteBufferPool +func (a *aggregator) decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Label { + dst = a.lc.Decompress(dst, bytesutil.ToUnsafeBytes(key)) + return dst +} -func (a *aggregator) pushSample(inputKey, outputKey string, value float64) { - if math.IsNaN(value) { - // Skip nan samples - return +func getOutputKey(key string) string { + src := bytesutil.ToUnsafeBytes(key) + tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src) + if err != nil { + logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err) } + outputKey := tail[inputKeyLen:] + return bytesutil.ToUnsafeString(outputKey) +} + +func getInputOutputKey(key string) (string, string) { + src := bytesutil.ToUnsafeBytes(key) + tail, inputKeyLen, err := encoding.UnmarshalVarUint64(src) + if err != nil { + logger.Panicf("BUG: cannot unmarshal inputKeyLen: %s", err) + } + inputKey := tail[:inputKeyLen] + outputKey := tail[inputKeyLen:] + return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) +} + +func (a *aggregator) pushSamples(samples []pushSample) { for _, as := range a.aggrStates { - as.pushSample(inputKey, outputKey, value) + as.pushSamples(samples) } } -func extractUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label { +type pushCtx struct { + samples []pushSample + labels promutils.Labels + inputLabels promutils.Labels + outputLabels promutils.Labels + buf []byte +} + +func (ctx *pushCtx) reset() { + clear(ctx.samples) + ctx.samples = ctx.samples[:0] + + ctx.labels.Reset() + ctx.inputLabels.Reset() + ctx.outputLabels.Reset() + ctx.buf = ctx.buf[:0] +} + +type pushSample struct { + key string + value float64 +} + +func getPushCtx() *pushCtx { + v := pushCtxPool.Get() + if v == nil { + return &pushCtx{} + } + return v.(*pushCtx) +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + pushCtxPool.Put(ctx) +} + +var pushCtxPool sync.Pool + +func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, without []string) ([]prompbmarshal.Label, []prompbmarshal.Label) { if len(without) > 0 { for _, label := range labels { - if hasInArray(label.Name, without) { - dst = append(dst, label) + if slices.Contains(without, label.Name) { + dstInput = append(dstInput, label) + } else { + dstOutput = append(dstOutput, label) } } } else { for _, label := range labels { - if !hasInArray(label.Name, by) { - dst = append(dst, label) + if !slices.Contains(by, label.Name) { + dstInput = append(dstInput, label) + } else { + dstOutput = append(dstOutput, label) } } } - return dst -} - -func removeUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label { - if len(without) > 0 { - for _, label := range labels { - if !hasInArray(label.Name, without) { - dst = append(dst, label) - } - } - } else { - for _, label := range labels { - if hasInArray(label.Name, by) { - dst = append(dst, label) - } - } - } - return dst -} - -func hasInArray(name string, a []string) bool { - for _, s := range a { - if name == s { - return true - } - } - return false -} - -func marshalLabelsFast(dst []byte, labels []prompbmarshal.Label) []byte { - dst = encoding.MarshalUint32(dst, uint32(len(labels))) - for _, label := range labels { - dst = encoding.MarshalUint32(dst, uint32(len(label.Name))) - dst = append(dst, label.Name...) - dst = encoding.MarshalUint32(dst, uint32(len(label.Value))) - dst = append(dst, label.Value...) - } - return dst -} - -func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal.Label, error) { - if len(src) < 4 { - return dst, fmt.Errorf("cannot unmarshal labels count from %d bytes; needs at least 4 bytes", len(src)) - } - n := encoding.UnmarshalUint32(src) - src = src[4:] - for i := uint32(0); i < n; i++ { - // Unmarshal label name - if len(src) < 4 { - return dst, fmt.Errorf("cannot unmarshal label name length from %d bytes; needs at least 4 bytes", len(src)) - } - labelNameLen := encoding.UnmarshalUint32(src) - src = src[4:] - if uint32(len(src)) < labelNameLen { - return dst, fmt.Errorf("cannot unmarshal label name from %d bytes; needs at least %d bytes", len(src), labelNameLen) - } - labelName := bytesutil.InternBytes(src[:labelNameLen]) - src = src[labelNameLen:] - - // Unmarshal label value - if len(src) < 4 { - return dst, fmt.Errorf("cannot unmarshal label value length from %d bytes; needs at least 4 bytes", len(src)) - } - labelValueLen := encoding.UnmarshalUint32(src) - src = src[4:] - if uint32(len(src)) < labelValueLen { - return dst, fmt.Errorf("cannot unmarshal label value from %d bytes; needs at least %d bytes", len(src), labelValueLen) - } - labelValue := bytesutil.InternBytes(src[:labelValueLen]) - src = src[labelValueLen:] - - dst = append(dst, prompbmarshal.Label{ - Name: labelName, - Value: labelValue, - }) - } - if len(src) > 0 { - return dst, fmt.Errorf("unexpected non-empty tail after unmarshaling labels; tail length is %d bytes", len(src)) - } - return dst, nil + return dstInput, dstOutput } type flushCtx struct { - skipAggrSuffix bool - suffix string + a *aggregator tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label @@ -736,16 +811,12 @@ func (ctx *flushCtx) reset() { ctx.samples = ctx.samples[:0] } -func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int64, value float64) { - var err error +func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value float64) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled)) - if err != nil { - logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err) - } - if !ctx.skipAggrSuffix { - ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) + ctx.labels = ctx.a.decompressLabels(ctx.labels, key) + if !ctx.a.keepMetricNames { + ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } ctx.samples = append(ctx.samples, prompbmarshal.Sample{ Timestamp: timestamp, @@ -757,15 +828,13 @@ func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int6 }) } -func (ctx *flushCtx) appendSeriesWithExtraLabel(labelsMarshaled, suffix string, timestamp int64, value float64, extraName, extraValue string) { - var err error +func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp int64, value float64, extraName, extraValue string) { labelsLen := len(ctx.labels) samplesLen := len(ctx.samples) - ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled)) - if err != nil { - logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err) + ctx.labels = ctx.a.decompressLabels(ctx.labels, key) + if !ctx.a.keepMetricNames { + ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.a.suffix, suffix) } - ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) ctx.labels = append(ctx.labels, prompbmarshal.Label{ Name: extraName, Value: extraValue, @@ -843,3 +912,5 @@ func sortAndRemoveDuplicates(a []string) []string { } return dst } + +var bbPool bytesutil.ByteBufferPool diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 4df257d8b..0bac5bee0 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -56,9 +56,43 @@ func TestAggregatorsFailure(t *testing.T) { `) // Negative interval - f(`- interval: -5m`) + f(` +- outputs: [total] + interval: -5m +`) // Too small interval - f(`- interval: 10ms`) + f(` +- outputs: [total] + interval: 10ms +`) + + // interval isn't multiple of dedup_interval + f(` +- interval: 1m + dedup_interval: 35s + outputs: ["quantiles"] +`) + + // dedup_interval is bigger than dedup_interval + f(` +- interval: 1m + dedup_interval: 1h + outputs: ["quantiles"] +`) + + // keep_metric_names is set for multiple inputs + f(` +- interval: 1m + keep_metric_names: true + outputs: ["total", "increase"] +`) + + // keep_metric_names is set for unsupported input + f(` +- interval: 1m + keep_metric_names: true + outputs: ["histogram_bucket"] +`) // Invalid input_relabel_configs f(` @@ -477,6 +511,17 @@ foo:1m_by_abc_sum_samples{abc="456"} 8 `, ` foo 123 bar{baz="qwe"} 4.34 +`, `bar:1m_total{baz="qwe"} 4.34 +foo:1m_total 123 +`, "11") + + // total_prometheus output for non-repeated series + f(` +- interval: 1m + outputs: [total_prometheus] +`, ` +foo 123 +bar{baz="qwe"} 4.34 `, `bar:1m_total{baz="qwe"} 0 foo:1m_total 0 `, "11") @@ -494,6 +539,25 @@ foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 +`, `bar:1m_total{baz="qwe"} 6.34 +bar:1m_total{baz="qwer"} 344 +foo:1m_total 123 +foo:1m_total{baz="qwe"} 10 +`, "11111111") + + // total_prometheus output for repeated series + f(` +- interval: 1m + outputs: [total_prometheus] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 `, `bar:1m_total{baz="qwe"} 5.02 bar:1m_total{baz="qwer"} 1 foo:1m_total 0 @@ -514,6 +578,24 @@ foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 +`, `bar:1m_total 350.34 +foo:1m_total 133 +`, "11111111") + + // total_prometheus output for repeated series with group by __name__ + f(` +- interval: 1m + by: [__name__] + outputs: [total_prometheus] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 `, `bar:1m_total 6.02 foo:1m_total 15 `, "11111111") @@ -525,6 +607,17 @@ foo:1m_total 15 `, ` foo 123 bar{baz="qwe"} 4.34 +`, `bar:1m_increase{baz="qwe"} 4.34 +foo:1m_increase 123 +`, "11") + + // increase_prometheus output for non-repeated series + f(` +- interval: 1m + outputs: [increase_prometheus] +`, ` +foo 123 +bar{baz="qwe"} 4.34 `, `bar:1m_increase{baz="qwe"} 0 foo:1m_increase 0 `, "11") @@ -542,12 +635,30 @@ foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 +`, `bar:1m_increase{baz="qwe"} 6.34 +bar:1m_increase{baz="qwer"} 344 +foo:1m_increase 123 +foo:1m_increase{baz="qwe"} 10 +`, "11111111") + + // increase_prometheus output for repeated series + f(` +- interval: 1m + outputs: [increase_prometheus] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 `, `bar:1m_increase{baz="qwe"} 5.02 bar:1m_increase{baz="qwer"} 1 foo:1m_increase 0 foo:1m_increase{baz="qwe"} 15 `, "11111111") - // multiple aggregate configs f(` - interval: 1m @@ -750,7 +861,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { } tssOutputLock.Unlock() } - const dedupInterval = time.Hour + const dedupInterval = 30 * time.Second a, err := newAggregatorsFromData([]byte(config), pushFunc, dedupInterval) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 25712ce1c..b56957ec2 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -11,9 +11,12 @@ import ( func BenchmarkAggregatorsPushByJobAvg(b *testing.B) { for _, output := range []string{ "total", + "total_prometheus", "increase", + "increase_prometheus", "count_series", "count_samples", + "unique_samples", "sum_samples", "last", "min", @@ -44,7 +47,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { } defer a.MustStop() - const loops = 5 + const loops = 10 b.ReportAllocs() b.SetBytes(int64(len(benchSeries) * loops)) @@ -52,7 +55,17 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { var matchIdxs []byte for pb.Next() { for i := 0; i < loops; i++ { - matchIdxs = a.Push(benchSeries, matchIdxs) + series := benchSeries + for len(series) > 0 { + chunk := series + if len(chunk) > 1_000 { + chunk = series[:1_000] + series = series[len(chunk):] + } else { + series = nil + } + matchIdxs = a.Push(chunk, matchIdxs) + } } } }) diff --git a/lib/streamaggr/sum_samples.go b/lib/streamaggr/sum_samples.go index 918ecf449..2ea43d46d 100644 --- a/lib/streamaggr/sum_samples.go +++ b/lib/streamaggr/sum_samples.go @@ -1,6 +1,7 @@ package streamaggr import ( + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" @@ -21,33 +22,39 @@ func newSumSamplesAggrState() *sumSamplesAggrState { return &sumSamplesAggrState{} } -func (as *sumSamplesAggrState) pushSample(_, outputKey string, value float64) { -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &sumSamplesStateValue{ - sum: value, +func (as *sumSamplesAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &sumSamplesStateValue{ + sum: s.value, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + continue + } + // Use the entry created by a concurrent goroutine. + v = vNew } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if !loaded { - // The new entry has been successfully created. - return + sv := v.(*sumSamplesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + sv.sum += s.value + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - // Use the entry created by a concurrent goroutine. - v = vNew - } - sv := v.(*sumSamplesStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - sv.sum += value - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index b611966f4..67adbae7f 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -2,6 +2,7 @@ package streamaggr import ( "math" + "strings" "sync" "time" @@ -12,8 +13,10 @@ import ( type totalAggrState struct { m sync.Map - ignoreInputDeadline uint64 - stalenessSecs uint64 + suffix string + resetTotalOnFlush bool + keepFirstSample bool + stalenessSecs uint64 } type totalStateValue struct { @@ -29,58 +32,68 @@ type lastValueState struct { deleteDeadline uint64 } -func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *totalAggrState { - currentTime := fasttime.UnixTimestamp() - intervalSecs := roundDurationToSecs(interval) +func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { stalenessSecs := roundDurationToSecs(stalenessInterval) + suffix := "total" + if resetTotalOnFlush { + suffix = "increase" + } return &totalAggrState{ - ignoreInputDeadline: currentTime + intervalSecs, - stalenessSecs: stalenessSecs, + suffix: suffix, + resetTotalOnFlush: resetTotalOnFlush, + keepFirstSample: keepFirstSample, + stalenessSecs: stalenessSecs, } } -func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) { - currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessSecs +func (as *totalAggrState) pushSamples(samples []pushSample) { + deleteDeadline := fasttime.UnixTimestamp() + as.stalenessSecs + for i := range samples { + s := &samples[i] + inputKey, outputKey := getInputOutputKey(s.key) -again: - v, ok := as.m.Load(outputKey) - if !ok { - // The entry is missing in the map. Try creating it. - v = &totalStateValue{ - lastValues: make(map[string]*lastValueState), - } - vNew, loaded := as.m.LoadOrStore(outputKey, v) - if loaded { - // Use the entry created by a concurrent goroutine. - v = vNew - } - } - sv := v.(*totalStateValue) - sv.mu.Lock() - deleted := sv.deleted - if !deleted { - lv, ok := sv.lastValues[inputKey] + again: + v, ok := as.m.Load(outputKey) if !ok { - lv = &lastValueState{} - sv.lastValues[inputKey] = lv + // The entry is missing in the map. Try creating it. + v = &totalStateValue{ + lastValues: make(map[string]*lastValueState), + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if loaded { + // Use the entry created by a concurrent goroutine. + v = vNew + } } - d := value - if ok && lv.value <= value { - d = value - lv.value + sv := v.(*totalStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + lv, ok := sv.lastValues[inputKey] + if !ok { + lv = &lastValueState{} + inputKey = strings.Clone(inputKey) + sv.lastValues[inputKey] = lv + } + if ok || as.keepFirstSample { + if s.value >= lv.value { + sv.total += s.value - lv.value + } else { + // counter reset + sv.total += s.value + } + } + lv.value = s.value + lv.deleteDeadline = deleteDeadline + sv.deleteDeadline = deleteDeadline } - if ok || currentTime > as.ignoreInputDeadline { - sv.total += d + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again } - lv.value = value - lv.deleteDeadline = deleteDeadline - sv.deleteDeadline = deleteDeadline - } - sv.mu.Unlock() - if deleted { - // The entry has been deleted by the concurrent call to appendSeriesForFlush - // Try obtaining and updating the entry again. - goto again } } @@ -123,7 +136,9 @@ func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) { sv := v.(*totalStateValue) sv.mu.Lock() total := sv.total - if math.Abs(sv.total) >= (1 << 53) { + if as.resetTotalOnFlush { + sv.total = 0 + } else if math.Abs(sv.total) >= (1 << 53) { // It is time to reset the entry, since it starts losing float64 precision sv.total = 0 } @@ -131,7 +146,7 @@ func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) { sv.mu.Unlock() if !deleted { key := k.(string) - ctx.appendSeries(key, "total", currentTimeMsec, total) + ctx.appendSeries(key, as.suffix, currentTimeMsec, total) } return true }) diff --git a/lib/streamaggr/unique_samples.go b/lib/streamaggr/unique_samples.go new file mode 100644 index 000000000..4db18f750 --- /dev/null +++ b/lib/streamaggr/unique_samples.go @@ -0,0 +1,82 @@ +package streamaggr + +import ( + "strings" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +// uniqueSamplesAggrState calculates output=unique_samples, e.g. the number of unique sample values. +type uniqueSamplesAggrState struct { + m sync.Map +} + +type uniqueSamplesStateValue struct { + mu sync.Mutex + m map[float64]struct{} + deleted bool +} + +func newUniqueSamplesAggrState() *uniqueSamplesAggrState { + return &uniqueSamplesAggrState{} +} + +func (as *uniqueSamplesAggrState) pushSamples(samples []pushSample) { + for i := range samples { + s := &samples[i] + outputKey := getOutputKey(s.key) + + again: + v, ok := as.m.Load(outputKey) + if !ok { + // The entry is missing in the map. Try creating it. + v = &uniqueSamplesStateValue{ + m: map[float64]struct{}{ + s.value: {}, + }, + } + outputKey = strings.Clone(outputKey) + vNew, loaded := as.m.LoadOrStore(outputKey, v) + if !loaded { + // The new entry has been successfully created. + continue + } + // Use the entry created by a concurrent goroutine. + v = vNew + } + sv := v.(*uniqueSamplesStateValue) + sv.mu.Lock() + deleted := sv.deleted + if !deleted { + if _, ok := sv.m[s.value]; !ok { + sv.m[s.value] = struct{}{} + } + } + sv.mu.Unlock() + if deleted { + // The entry has been deleted by the concurrent call to appendSeriesForFlush + // Try obtaining and updating the entry again. + goto again + } + } +} + +func (as *uniqueSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) { + currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000 + m := &as.m + m.Range(func(k, v interface{}) bool { + // Atomically delete the entry from the map, so new entry is created for the next flush. + m.Delete(k) + + sv := v.(*uniqueSamplesStateValue) + sv.mu.Lock() + n := len(sv.m) + // Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls. + sv.deleted = true + sv.mu.Unlock() + key := k.(string) + ctx.appendSeries(key, "unique_samples", currentTimeMsec, float64(n)) + return true + }) +}