2023-01-04 06:19:18 +00:00
|
|
|
package streamaggr
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sort"
|
2023-07-24 23:44:09 +00:00
|
|
|
"strconv"
|
2023-01-04 06:19:18 +00:00
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"testing"
|
2023-01-25 17:14:49 +00:00
|
|
|
"time"
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestAggregatorsFailure(t *testing.T) {
|
|
|
|
f := func(config string) {
|
|
|
|
t.Helper()
|
2024-04-02 20:16:24 +00:00
|
|
|
pushFunc := func(_ []prompbmarshal.TimeSeries) {
|
2023-01-04 06:19:18 +00:00
|
|
|
panic(fmt.Errorf("pushFunc shouldn't be called"))
|
|
|
|
}
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
|
2023-01-04 06:19:18 +00:00
|
|
|
if err == nil {
|
|
|
|
t.Fatalf("expecting non-nil error")
|
|
|
|
}
|
|
|
|
if a != nil {
|
|
|
|
t.Fatalf("expecting nil a")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Invalid config
|
|
|
|
f(`foobar`)
|
|
|
|
|
|
|
|
// Unknown option
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
foobar: baz
|
|
|
|
`)
|
|
|
|
|
|
|
|
// missing interval
|
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
`)
|
|
|
|
|
|
|
|
// missing outputs
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
`)
|
|
|
|
|
2024-07-12 09:06:45 +00:00
|
|
|
// Bad interval
|
|
|
|
f(`
|
|
|
|
- interval: 1foo
|
|
|
|
outputs: [total]
|
|
|
|
`)
|
|
|
|
|
2023-01-04 06:19:18 +00:00
|
|
|
// Invalid output
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [foobar]
|
|
|
|
`)
|
|
|
|
|
|
|
|
// Negative interval
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
interval: -5m
|
|
|
|
`)
|
2023-01-04 06:19:18 +00:00
|
|
|
// Too small interval
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 10ms
|
|
|
|
`)
|
|
|
|
|
2024-07-12 09:06:45 +00:00
|
|
|
// bad dedup_interval
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
dedup_interval: 1foo
|
|
|
|
outputs: ["quantiles"]
|
|
|
|
`)
|
|
|
|
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
// interval isn't multiple of dedup_interval
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
dedup_interval: 35s
|
|
|
|
outputs: ["quantiles"]
|
|
|
|
`)
|
|
|
|
|
|
|
|
// dedup_interval is bigger than dedup_interval
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
dedup_interval: 1h
|
|
|
|
outputs: ["quantiles"]
|
|
|
|
`)
|
|
|
|
|
2024-07-12 09:06:45 +00:00
|
|
|
// bad staleness_interval
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
staleness_interval: 1foo
|
|
|
|
outputs: ["quantiles"]
|
|
|
|
`)
|
|
|
|
|
|
|
|
// staleness_interval should be > interval
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
staleness_interval: 30s
|
|
|
|
outputs: ["quantiles"]
|
|
|
|
`)
|
|
|
|
|
|
|
|
// staleness_interval should be multiple of interval
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
staleness_interval: 100s
|
|
|
|
outputs: ["quantiles"]
|
|
|
|
`)
|
|
|
|
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
// keep_metric_names is set for multiple inputs
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
keep_metric_names: true
|
|
|
|
outputs: ["total", "increase"]
|
|
|
|
`)
|
|
|
|
|
|
|
|
// keep_metric_names is set for unsupported input
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
keep_metric_names: true
|
|
|
|
outputs: ["histogram_bucket"]
|
|
|
|
`)
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// Invalid input_relabel_configs
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
input_relabel_configs:
|
|
|
|
- foo: bar
|
|
|
|
`)
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
input_relabel_configs:
|
|
|
|
- action: replace
|
|
|
|
`)
|
|
|
|
|
|
|
|
// Invalid output_relabel_configs
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
output_relabel_configs:
|
|
|
|
- foo: bar
|
|
|
|
`)
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
output_relabel_configs:
|
|
|
|
- action: replace
|
|
|
|
`)
|
|
|
|
|
|
|
|
// Both by and without are non-empty
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
by: [foo]
|
|
|
|
without: [bar]
|
|
|
|
`)
|
|
|
|
|
|
|
|
// Invalid quantiles()
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: ["quantiles("]
|
|
|
|
`)
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: ["quantiles()"]
|
|
|
|
`)
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: ["quantiles(foo)"]
|
|
|
|
`)
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: ["quantiles(-0.5)"]
|
|
|
|
`)
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: ["quantiles(1.5)"]
|
2024-07-12 08:56:07 +00:00
|
|
|
`)
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total, total]
|
|
|
|
`)
|
|
|
|
// "quantiles(0.5)", "quantiles(0.9)" should be set as "quantiles(0.5, 0.9)"
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: ["quantiles(0.5)", "quantiles(0.9)"]
|
2023-01-04 06:19:18 +00:00
|
|
|
`)
|
|
|
|
}
|
|
|
|
|
2023-04-01 04:27:45 +00:00
|
|
|
func TestAggregatorsEqual(t *testing.T) {
|
|
|
|
f := func(a, b string, expectedResult bool) {
|
|
|
|
t.Helper()
|
|
|
|
|
2024-04-02 20:16:24 +00:00
|
|
|
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
aa, err := LoadFromData([]byte(a), pushFunc, nil, "some_alias")
|
2023-04-01 04:27:45 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
|
|
}
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
ab, err := LoadFromData([]byte(b), pushFunc, nil, "some_alias")
|
2023-04-01 04:27:45 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
|
|
}
|
|
|
|
result := aa.Equal(ab)
|
|
|
|
if result != expectedResult {
|
|
|
|
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
f("", "", true)
|
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
|
|
|
`, ``, false)
|
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
|
|
|
`, `
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
|
|
|
`, true)
|
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 3m
|
|
|
|
`, `
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
2024-01-26 21:45:23 +00:00
|
|
|
`, false)
|
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
|
|
|
flush_on_shutdown: true
|
|
|
|
`, `
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
|
|
|
flush_on_shutdown: false
|
2023-04-01 04:27:45 +00:00
|
|
|
`, false)
|
2024-04-22 12:22:59 +00:00
|
|
|
f(`
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
|
|
|
ignore_first_intervals: 2
|
|
|
|
`, `
|
|
|
|
- outputs: [total]
|
|
|
|
interval: 5m
|
|
|
|
ignore_first_intervals: 4`, false)
|
2023-04-01 04:27:45 +00:00
|
|
|
}
|
|
|
|
|
2023-01-04 06:19:18 +00:00
|
|
|
func TestAggregatorsSuccess(t *testing.T) {
|
2023-07-24 23:44:09 +00:00
|
|
|
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
|
2023-01-04 06:19:18 +00:00
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
// Initialize Aggregators
|
|
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
|
|
var tssOutputLock sync.Mutex
|
|
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
|
|
tssOutputLock.Lock()
|
2024-03-04 22:45:22 +00:00
|
|
|
tssOutput = appendClonedTimeseries(tssOutput, tss)
|
2023-01-04 06:19:18 +00:00
|
|
|
tssOutputLock.Unlock()
|
|
|
|
}
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
opts := &Options{
|
2024-03-04 03:42:55 +00:00
|
|
|
FlushOnShutdown: true,
|
|
|
|
NoAlignFlushToInterval: true,
|
|
|
|
}
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias")
|
2023-01-04 06:19:18 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Push the inputMetrics to Aggregators
|
2024-07-03 13:10:09 +00:00
|
|
|
offsetMsecs := time.Now().UnixMilli()
|
|
|
|
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
|
2023-07-24 23:44:09 +00:00
|
|
|
matchIdxs := a.Push(tssInput, nil)
|
2023-01-04 06:19:18 +00:00
|
|
|
a.MustStop()
|
|
|
|
|
2023-07-24 23:44:09 +00:00
|
|
|
// Verify matchIdxs equals to matchIdxsExpected
|
|
|
|
matchIdxsStr := ""
|
|
|
|
for _, v := range matchIdxs {
|
|
|
|
matchIdxsStr += strconv.Itoa(int(v))
|
|
|
|
}
|
|
|
|
if matchIdxsStr != matchIdxsStrExpected {
|
|
|
|
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
|
|
|
|
}
|
|
|
|
|
2023-01-04 06:19:18 +00:00
|
|
|
// Verify the tssOutput contains the expected metrics
|
2024-03-04 22:45:22 +00:00
|
|
|
outputMetrics := timeSeriessToString(tssOutput)
|
2023-01-04 06:19:18 +00:00
|
|
|
if outputMetrics != outputMetricsExpected {
|
|
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Empty config
|
2023-07-24 23:44:09 +00:00
|
|
|
f(``, ``, ``, "")
|
|
|
|
f(``, `foo{bar="baz"} 1`, ``, "0")
|
|
|
|
f(``, "foo 1\nbaz 2", ``, "00")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// Empty by list - aggregate only by time
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [count_samples, sum_samples, count_series, last]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
2024-03-17 23:02:28 +00:00
|
|
|
bar 5 100
|
|
|
|
bar 34 10
|
2023-01-04 06:19:18 +00:00
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
2024-03-17 23:02:28 +00:00
|
|
|
`, `bar:1m_count_samples 2
|
2023-01-04 06:19:18 +00:00
|
|
|
bar:1m_count_series 1
|
|
|
|
bar:1m_last 5
|
2024-03-17 23:02:28 +00:00
|
|
|
bar:1m_sum_samples 39
|
2023-01-04 06:19:18 +00:00
|
|
|
foo:1m_count_samples{abc="123"} 2
|
|
|
|
foo:1m_count_samples{abc="456",de="fg"} 1
|
|
|
|
foo:1m_count_series{abc="123"} 1
|
|
|
|
foo:1m_count_series{abc="456",de="fg"} 1
|
|
|
|
foo:1m_last{abc="123"} 8.5
|
|
|
|
foo:1m_last{abc="456",de="fg"} 8
|
|
|
|
foo:1m_sum_samples{abc="123"} 12.5
|
|
|
|
foo:1m_sum_samples{abc="456",de="fg"} 8
|
2024-03-17 23:02:28 +00:00
|
|
|
`, "11111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
2023-01-05 10:08:24 +00:00
|
|
|
// Special case: __name__ in `by` list - this is the same as empty `by` list
|
2023-01-04 06:19:18 +00:00
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
by: [__name__]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_count_samples 1
|
|
|
|
bar:1m_count_series 1
|
|
|
|
bar:1m_sum_samples 5
|
|
|
|
foo:1m_count_samples 3
|
|
|
|
foo:1m_count_series 2
|
|
|
|
foo:1m_sum_samples 20.5
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
2023-01-05 10:08:24 +00:00
|
|
|
// Non-empty `by` list with non-existing labels
|
2023-01-04 06:19:18 +00:00
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
by: [foo, bar]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
2023-01-05 10:08:24 +00:00
|
|
|
`, `bar:1m_by_bar_foo_count_samples 1
|
|
|
|
bar:1m_by_bar_foo_count_series 1
|
|
|
|
bar:1m_by_bar_foo_sum_samples 5
|
|
|
|
foo:1m_by_bar_foo_count_samples 3
|
|
|
|
foo:1m_by_bar_foo_count_series 2
|
|
|
|
foo:1m_by_bar_foo_sum_samples 20.5
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
2023-01-05 10:08:24 +00:00
|
|
|
// Non-empty `by` list with existing label
|
2023-01-04 06:19:18 +00:00
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
by: [abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
|
|
bar:1m_by_abc_count_series 1
|
|
|
|
bar:1m_by_abc_sum_samples 5
|
|
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
2023-01-05 10:08:24 +00:00
|
|
|
// Non-empty `by` list with duplicate existing label
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
by: [abc, abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_by_abc_count_samples 1
|
|
|
|
bar:1m_by_abc_count_series 1
|
|
|
|
bar:1m_by_abc_sum_samples 5
|
|
|
|
foo:1m_by_abc_count_samples{abc="123"} 2
|
|
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-05 10:08:24 +00:00
|
|
|
|
|
|
|
// Non-empty `without` list with non-existing labels
|
2023-01-04 06:19:18 +00:00
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [foo]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_without_foo_count_samples 1
|
|
|
|
bar:1m_without_foo_count_series 1
|
|
|
|
bar:1m_without_foo_sum_samples 5
|
|
|
|
foo:1m_without_foo_count_samples{abc="123"} 2
|
|
|
|
foo:1m_without_foo_count_samples{abc="456",de="fg"} 1
|
|
|
|
foo:1m_without_foo_count_series{abc="123"} 1
|
|
|
|
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
|
|
|
|
foo:1m_without_foo_sum_samples{abc="123"} 12.5
|
|
|
|
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
2023-01-05 10:08:24 +00:00
|
|
|
// Non-empty `without` list with existing labels
|
2023-01-04 06:19:18 +00:00
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
|
|
bar:1m_without_abc_count_series 1
|
|
|
|
bar:1m_without_abc_sum_samples 5
|
|
|
|
foo:1m_without_abc_count_samples 2
|
|
|
|
foo:1m_without_abc_count_samples{de="fg"} 1
|
|
|
|
foo:1m_without_abc_count_series 1
|
|
|
|
foo:1m_without_abc_count_series{de="fg"} 1
|
|
|
|
foo:1m_without_abc_sum_samples 12.5
|
|
|
|
foo:1m_without_abc_sum_samples{de="fg"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
2023-01-05 10:08:24 +00:00
|
|
|
// Special case: __name__ in `without` list
|
2023-01-04 06:19:18 +00:00
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [__name__]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `:1m_count_samples 1
|
|
|
|
:1m_count_samples{abc="123"} 2
|
|
|
|
:1m_count_samples{abc="456",de="fg"} 1
|
|
|
|
:1m_count_series 1
|
|
|
|
:1m_count_series{abc="123"} 1
|
|
|
|
:1m_count_series{abc="456",de="fg"} 1
|
|
|
|
:1m_sum_samples 5
|
|
|
|
:1m_sum_samples{abc="123"} 12.5
|
|
|
|
:1m_sum_samples{abc="456",de="fg"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// drop some input metrics
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
input_relabel_configs:
|
|
|
|
- if: 'foo'
|
|
|
|
action: drop
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_without_abc_count_samples 1
|
|
|
|
bar:1m_without_abc_count_series 1
|
|
|
|
bar:1m_without_abc_sum_samples 5
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// rename output metrics
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
output_relabel_configs:
|
|
|
|
- action: replace_all
|
|
|
|
source_labels: [__name__]
|
|
|
|
regex: ":|_"
|
|
|
|
replacement: "-"
|
|
|
|
target_label: __name__
|
|
|
|
- action: drop
|
|
|
|
source_labels: [de]
|
|
|
|
regex: fg
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar-1m-without-abc-count-samples 1
|
|
|
|
bar-1m-without-abc-count-series 1
|
|
|
|
bar-1m-without-abc-sum-samples 5
|
|
|
|
foo-1m-without-abc-count-samples 2
|
|
|
|
foo-1m-without-abc-count-series 1
|
|
|
|
foo-1m-without-abc-sum-samples 12.5
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// match doesn't match anything
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
match: '{non_existing_label!=""}'
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
name: foobar
|
2023-01-04 06:19:18 +00:00
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, ``, "0000")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// match matches foo series with non-empty abc label
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
by: [abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
name: abcdef
|
2023-07-24 23:10:47 +00:00
|
|
|
match:
|
|
|
|
- foo{abc=~".+"}
|
|
|
|
- '{non_existing_label!=""}'
|
2023-01-04 06:19:18 +00:00
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `foo:1m_by_abc_count_samples{abc="123"} 2
|
|
|
|
foo:1m_by_abc_count_samples{abc="456"} 1
|
|
|
|
foo:1m_by_abc_count_series{abc="123"} 1
|
|
|
|
foo:1m_by_abc_count_series{abc="456"} 1
|
|
|
|
foo:1m_by_abc_sum_samples{abc="123"} 12.5
|
|
|
|
foo:1m_by_abc_sum_samples{abc="456"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1011")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// total output for non-repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 4.34
|
2024-03-03 23:49:26 +00:00
|
|
|
`, `bar:1m_total{baz="qwe"} 0
|
|
|
|
foo:1m_total 0
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
`, "11")
|
|
|
|
|
|
|
|
// total_prometheus output for non-repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total_prometheus]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 4.34
|
2024-05-08 11:11:30 +00:00
|
|
|
`, `bar:1m_total_prometheus{baz="qwe"} 0
|
|
|
|
foo:1m_total_prometheus 0
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "11")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// total output for repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total]
|
|
|
|
`, `
|
|
|
|
foo 123
|
2024-03-17 20:03:01 +00:00
|
|
|
bar{baz="qwe"} 1.31
|
|
|
|
bar{baz="qwe"} 4.34 1000
|
2023-01-04 06:19:18 +00:00
|
|
|
bar{baz="qwe"} 2
|
|
|
|
foo{baz="qwe"} -5
|
|
|
|
bar{baz="qwer"} 343
|
|
|
|
bar{baz="qwer"} 344
|
|
|
|
foo{baz="qwe"} 10
|
2024-03-17 20:03:01 +00:00
|
|
|
`, `bar:1m_total{baz="qwe"} 3.03
|
2024-03-03 23:49:26 +00:00
|
|
|
bar:1m_total{baz="qwer"} 1
|
|
|
|
foo:1m_total 0
|
|
|
|
foo:1m_total{baz="qwe"} 15
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
`, "11111111")
|
|
|
|
|
|
|
|
// total_prometheus output for repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [total_prometheus]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 1.32
|
|
|
|
bar{baz="qwe"} 4.34
|
|
|
|
bar{baz="qwe"} 2
|
|
|
|
foo{baz="qwe"} -5
|
|
|
|
bar{baz="qwer"} 343
|
|
|
|
bar{baz="qwer"} 344
|
|
|
|
foo{baz="qwe"} 10
|
2024-05-08 11:11:30 +00:00
|
|
|
`, `bar:1m_total_prometheus{baz="qwe"} 5.02
|
|
|
|
bar:1m_total_prometheus{baz="qwer"} 1
|
|
|
|
foo:1m_total_prometheus 0
|
|
|
|
foo:1m_total_prometheus{baz="qwe"} 15
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "11111111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// total output for repeated series with group by __name__
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
by: [__name__]
|
|
|
|
outputs: [total]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 1.32
|
|
|
|
bar{baz="qwe"} 4.34
|
|
|
|
bar{baz="qwe"} 2
|
|
|
|
foo{baz="qwe"} -5
|
|
|
|
bar{baz="qwer"} 343
|
|
|
|
bar{baz="qwer"} 344
|
|
|
|
foo{baz="qwe"} 10
|
2024-03-03 23:49:26 +00:00
|
|
|
`, `bar:1m_total 6.02
|
|
|
|
foo:1m_total 15
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
`, "11111111")
|
|
|
|
|
|
|
|
// total_prometheus output for repeated series with group by __name__
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
by: [__name__]
|
|
|
|
outputs: [total_prometheus]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 1.32
|
|
|
|
bar{baz="qwe"} 4.34
|
|
|
|
bar{baz="qwe"} 2
|
|
|
|
foo{baz="qwe"} -5
|
|
|
|
bar{baz="qwer"} 343
|
|
|
|
bar{baz="qwer"} 344
|
|
|
|
foo{baz="qwe"} 10
|
2024-05-08 11:11:30 +00:00
|
|
|
`, `bar:1m_total_prometheus 6.02
|
|
|
|
foo:1m_total_prometheus 15
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "11111111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// increase output for non-repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [increase]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 4.34
|
2024-03-03 23:49:26 +00:00
|
|
|
`, `bar:1m_increase{baz="qwe"} 0
|
|
|
|
foo:1m_increase 0
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
`, "11")
|
|
|
|
|
|
|
|
// increase_prometheus output for non-repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [increase_prometheus]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 4.34
|
2024-05-08 11:11:30 +00:00
|
|
|
`, `bar:1m_increase_prometheus{baz="qwe"} 0
|
|
|
|
foo:1m_increase_prometheus 0
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "11")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// increase output for repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [increase]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 1.32
|
|
|
|
bar{baz="qwe"} 4.34
|
|
|
|
bar{baz="qwe"} 2
|
|
|
|
foo{baz="qwe"} -5
|
|
|
|
bar{baz="qwer"} 343
|
|
|
|
bar{baz="qwer"} 344
|
|
|
|
foo{baz="qwe"} 10
|
2024-03-03 23:49:26 +00:00
|
|
|
`, `bar:1m_increase{baz="qwe"} 5.02
|
|
|
|
bar:1m_increase{baz="qwer"} 1
|
|
|
|
foo:1m_increase 0
|
|
|
|
foo:1m_increase{baz="qwe"} 15
|
lib/streamaggr: huge pile of changes
- Reduce memory usage by up to 5x when de-duplicating samples across big number of time series.
- Reduce memory usage by up to 5x when aggregating across big number of output time series.
- Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components
for reducing memory usage for marshaled []prompbmarshal.Label.
- Add `dedup_interval` option at aggregation config, which allows setting individual
deduplication intervals per each aggregation.
- Add `keep_metric_names` option at aggregation config, which allows keeping the original
metric names in the output samples.
- Add `unique_samples` output, which counts the number of unique sample values.
- Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample
per each newly encountered time series.
- Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output.
This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579
- Expose various metrics, which may help debugging stream aggregation:
- vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication
- vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures
- vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures
- vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor
- vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes
- vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes
- vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes,
which took longer than the configured interval
- vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes,
which took longer than the configured dedup_interval
- Actualize docs/stream-aggregation.md
The memory usage reduction increases CPU usage during stream aggregation by up to 30%.
This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
|
|
|
`, "11111111")
|
|
|
|
|
|
|
|
// increase_prometheus output for repeated series
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [increase_prometheus]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 1.32
|
|
|
|
bar{baz="qwe"} 4.34
|
|
|
|
bar{baz="qwe"} 2
|
|
|
|
foo{baz="qwe"} -5
|
|
|
|
bar{baz="qwer"} 343
|
|
|
|
bar{baz="qwer"} 344
|
|
|
|
foo{baz="qwe"} 10
|
2024-05-08 11:11:30 +00:00
|
|
|
`, `bar:1m_increase_prometheus{baz="qwe"} 5.02
|
|
|
|
bar:1m_increase_prometheus{baz="qwer"} 1
|
|
|
|
foo:1m_increase_prometheus 0
|
|
|
|
foo:1m_increase_prometheus{baz="qwe"} 15
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "11111111")
|
2024-03-03 23:49:26 +00:00
|
|
|
|
2023-01-04 06:19:18 +00:00
|
|
|
// multiple aggregate configs
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [count_series, sum_samples]
|
|
|
|
- interval: 5m
|
|
|
|
by: [bar]
|
|
|
|
outputs: [sum_samples]
|
|
|
|
`, `
|
|
|
|
foo 1
|
|
|
|
foo{bar="baz"} 2
|
|
|
|
foo 3.3
|
|
|
|
`, `foo:1m_count_series 1
|
|
|
|
foo:1m_count_series{bar="baz"} 1
|
|
|
|
foo:1m_sum_samples 4.3
|
|
|
|
foo:1m_sum_samples{bar="baz"} 2
|
|
|
|
foo:5m_by_bar_sum_samples 4.3
|
|
|
|
foo:5m_by_bar_sum_samples{bar="baz"} 2
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// min and max outputs
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [min, max]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_max 5
|
|
|
|
bar:1m_min 5
|
|
|
|
foo:1m_max{abc="123"} 8.5
|
|
|
|
foo:1m_max{abc="456",de="fg"} 8
|
|
|
|
foo:1m_min{abc="123"} 4
|
|
|
|
foo:1m_min{abc="456",de="fg"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// avg output
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [avg]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_avg 5
|
|
|
|
foo:1m_avg{abc="123"} 6.25
|
|
|
|
foo:1m_avg{abc="456",de="fg"} 8
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// stddev output
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [stddev]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_stddev 0
|
|
|
|
foo:1m_stddev{abc="123"} 2.25
|
|
|
|
foo:1m_stddev{abc="456",de="fg"} 0
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// stdvar output
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [stdvar]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar:1m_stdvar 0
|
|
|
|
foo:1m_stdvar{abc="123"} 5.0625
|
|
|
|
foo:1m_stdvar{abc="456",de="fg"} 0
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// histogram_bucket output
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [histogram_bucket]
|
|
|
|
`, `
|
|
|
|
cpu_usage{cpu="1"} 12.5
|
|
|
|
cpu_usage{cpu="1"} 13.3
|
|
|
|
cpu_usage{cpu="1"} 13
|
|
|
|
cpu_usage{cpu="1"} 12
|
|
|
|
cpu_usage{cpu="1"} 14
|
|
|
|
cpu_usage{cpu="1"} 25
|
|
|
|
cpu_usage{cpu="2"} 90
|
|
|
|
`, `cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.136e+01...1.292e+01"} 2
|
|
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
|
|
|
|
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
|
|
|
|
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// histogram_bucket output without cpu
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [cpu]
|
|
|
|
outputs: [histogram_bucket]
|
|
|
|
`, `
|
|
|
|
cpu_usage{cpu="1"} 12.5
|
|
|
|
cpu_usage{cpu="1"} 13.3
|
|
|
|
cpu_usage{cpu="1"} 13
|
|
|
|
cpu_usage{cpu="1"} 12
|
|
|
|
cpu_usage{cpu="1"} 14
|
|
|
|
cpu_usage{cpu="1"} 25
|
|
|
|
cpu_usage{cpu="2"} 90
|
|
|
|
`, `cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.136e+01...1.292e+01"} 2
|
|
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
|
|
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
|
|
|
|
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// quantiles output
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
|
|
`, `
|
|
|
|
cpu_usage{cpu="1"} 12.5
|
|
|
|
cpu_usage{cpu="1"} 13.3
|
|
|
|
cpu_usage{cpu="1"} 13
|
|
|
|
cpu_usage{cpu="1"} 12
|
|
|
|
cpu_usage{cpu="1"} 14
|
|
|
|
cpu_usage{cpu="1"} 25
|
|
|
|
cpu_usage{cpu="2"} 90
|
|
|
|
`, `cpu_usage:1m_quantiles{cpu="1",quantile="0"} 12
|
|
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="0.5"} 13.3
|
|
|
|
cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
|
|
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
|
|
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
|
|
|
|
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111111")
|
2023-01-04 06:19:18 +00:00
|
|
|
|
|
|
|
// quantiles output without cpu
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [cpu]
|
|
|
|
outputs: ["quantiles(0, 0.5, 1)"]
|
|
|
|
`, `
|
|
|
|
cpu_usage{cpu="1"} 12.5
|
|
|
|
cpu_usage{cpu="1"} 13.3
|
|
|
|
cpu_usage{cpu="1"} 13
|
|
|
|
cpu_usage{cpu="1"} 12
|
|
|
|
cpu_usage{cpu="1"} 14
|
|
|
|
cpu_usage{cpu="1"} 25
|
|
|
|
cpu_usage{cpu="2"} 90
|
|
|
|
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
|
|
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
|
|
|
|
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "1111111")
|
2023-11-29 08:03:04 +00:00
|
|
|
|
|
|
|
// append additional label
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
without: [abc]
|
|
|
|
outputs: [count_samples, sum_samples, count_series]
|
|
|
|
output_relabel_configs:
|
|
|
|
- action: replace_all
|
|
|
|
source_labels: [__name__]
|
|
|
|
regex: ":|_"
|
|
|
|
replacement: "-"
|
|
|
|
target_label: __name__
|
|
|
|
- action: drop
|
|
|
|
source_labels: [de]
|
|
|
|
regex: fg
|
|
|
|
- target_label: new_label
|
|
|
|
replacement: must_keep_metric_name
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
2024-05-13 13:39:49 +00:00
|
|
|
foo{abc="123"} 8.5 10
|
2023-11-29 08:03:04 +00:00
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 1
|
|
|
|
bar-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
|
|
|
bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5
|
|
|
|
foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2
|
|
|
|
foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
|
|
|
|
foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
|
2024-05-13 13:39:49 +00:00
|
|
|
`, "1111")
|
|
|
|
|
|
|
|
// test rate_sum and rate_avg
|
2024-07-03 13:10:09 +00:00
|
|
|
f(`
|
2024-05-13 13:39:49 +00:00
|
|
|
- interval: 1m
|
|
|
|
by: [cde]
|
|
|
|
outputs: [rate_sum, rate_avg]
|
|
|
|
`, `
|
|
|
|
foo{abc="123", cde="1"} 4
|
|
|
|
foo{abc="123", cde="1"} 8.5 10
|
|
|
|
foo{abc="456", cde="1"} 8
|
|
|
|
foo{abc="456", cde="1"} 10 10
|
2024-07-14 15:23:59 +00:00
|
|
|
foo 12 34
|
2024-05-13 13:39:49 +00:00
|
|
|
`, `foo:1m_by_cde_rate_avg{cde="1"} 0.325
|
|
|
|
foo:1m_by_cde_rate_sum{cde="1"} 0.65
|
2024-07-14 15:23:59 +00:00
|
|
|
`, "11111")
|
2024-03-06 16:34:04 +00:00
|
|
|
|
2024-07-14 15:23:59 +00:00
|
|
|
// rate_sum and rate_avg with duplicated events
|
2024-07-03 13:10:09 +00:00
|
|
|
f(`
|
2024-06-14 08:06:22 +00:00
|
|
|
- interval: 1m
|
|
|
|
outputs: [rate_sum, rate_avg]
|
|
|
|
`, `
|
|
|
|
foo{abc="123", cde="1"} 4 10
|
|
|
|
foo{abc="123", cde="1"} 4 10
|
2024-07-14 15:23:59 +00:00
|
|
|
`, ``, "11")
|
|
|
|
|
|
|
|
// rate_sum and rate_avg for a single sample
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [rate_sum, rate_avg]
|
|
|
|
`, `
|
|
|
|
foo 4 10
|
|
|
|
bar 5 10
|
|
|
|
`, ``, "11")
|
2024-06-14 08:06:22 +00:00
|
|
|
|
2024-07-12 09:06:45 +00:00
|
|
|
// unique_samples output
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [unique_samples]
|
|
|
|
`, `
|
|
|
|
foo 1 10
|
|
|
|
foo 2 20
|
|
|
|
foo 1 10
|
|
|
|
foo 2 20
|
|
|
|
foo 3 20
|
|
|
|
`, `foo:1m_unique_samples 3
|
|
|
|
`, "11111")
|
|
|
|
|
2024-03-06 16:34:04 +00:00
|
|
|
// keep_metric_names
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
keep_metric_names: true
|
|
|
|
outputs: [count_samples]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
bar -34.3
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar 2
|
|
|
|
foo{abc="123"} 2
|
|
|
|
foo{abc="456",de="fg"} 1
|
|
|
|
`, "11111")
|
|
|
|
|
|
|
|
// drop_input_labels
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
drop_input_labels: [abc]
|
|
|
|
keep_metric_names: true
|
|
|
|
outputs: [count_samples]
|
|
|
|
`, `
|
|
|
|
foo{abc="123"} 4
|
|
|
|
bar 5
|
|
|
|
foo{abc="123"} 8.5
|
|
|
|
bar -34.3
|
|
|
|
foo{abc="456",de="fg"} 8
|
|
|
|
`, `bar 2
|
|
|
|
foo 2
|
|
|
|
foo{de="fg"} 1
|
|
|
|
`, "11111")
|
2023-01-04 06:19:18 +00:00
|
|
|
}
|
|
|
|
|
2023-01-25 17:14:49 +00:00
|
|
|
func TestAggregatorsWithDedupInterval(t *testing.T) {
|
2023-07-24 23:44:09 +00:00
|
|
|
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
|
2023-01-25 17:14:49 +00:00
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
// Initialize Aggregators
|
|
|
|
var tssOutput []prompbmarshal.TimeSeries
|
|
|
|
var tssOutputLock sync.Mutex
|
|
|
|
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
|
|
|
tssOutputLock.Lock()
|
|
|
|
for _, ts := range tss {
|
|
|
|
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
|
|
|
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
|
|
|
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
|
|
|
Labels: labelsCopy,
|
|
|
|
Samples: samplesCopy,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
tssOutputLock.Unlock()
|
|
|
|
}
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
opts := &Options{
|
2024-03-04 03:42:55 +00:00
|
|
|
DedupInterval: 30 * time.Second,
|
|
|
|
FlushOnShutdown: true,
|
|
|
|
}
|
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691
- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
This improves code maintainability a bit.
- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
This prevents from potential resource leaks.
- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.
- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.
- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
This label should simplify investigation of the exposed metrics.
- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
it is hard to use these labels in query filters and aggregation functions.
- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
This metric shows the number of samples generated by the corresponding streaming aggregation rule.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462
- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
which could modify the behaviour of the constructed streaming aggregator.
Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.
- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
This also allows passing nil instead of Options when default options are enough.
- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
so they have consistent set of labels comparing to the rest of streaming aggregation metrics.
- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
`aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
`output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604
- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 .
- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
|
|
|
a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias")
|
2023-01-25 17:14:49 +00:00
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("cannot initialize aggregators: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Push the inputMetrics to Aggregators
|
2024-07-03 13:10:09 +00:00
|
|
|
offsetMsecs := time.Now().UnixMilli()
|
|
|
|
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
|
2023-07-24 23:44:09 +00:00
|
|
|
matchIdxs := a.Push(tssInput, nil)
|
2023-01-25 17:14:49 +00:00
|
|
|
a.MustStop()
|
|
|
|
|
2023-07-24 23:44:09 +00:00
|
|
|
// Verify matchIdxs equals to matchIdxsExpected
|
|
|
|
matchIdxsStr := ""
|
|
|
|
for _, v := range matchIdxs {
|
|
|
|
matchIdxsStr += strconv.Itoa(int(v))
|
|
|
|
}
|
|
|
|
if matchIdxsStr != matchIdxsStrExpected {
|
|
|
|
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
|
|
|
|
}
|
|
|
|
|
2023-01-25 17:14:49 +00:00
|
|
|
// Verify the tssOutput contains the expected metrics
|
|
|
|
tsStrings := make([]string, len(tssOutput))
|
|
|
|
for i, ts := range tssOutput {
|
|
|
|
tsStrings[i] = timeSeriesToString(ts)
|
|
|
|
}
|
|
|
|
sort.Strings(tsStrings)
|
|
|
|
outputMetrics := strings.Join(tsStrings, "")
|
|
|
|
if outputMetrics != outputMetricsExpected {
|
|
|
|
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [sum_samples]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar 567
|
|
|
|
`, `bar:1m_sum_samples 567
|
|
|
|
foo:1m_sum_samples 123
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "11")
|
2023-01-25 17:14:49 +00:00
|
|
|
|
|
|
|
f(`
|
|
|
|
- interval: 1m
|
|
|
|
outputs: [sum_samples]
|
|
|
|
`, `
|
|
|
|
foo 123
|
|
|
|
bar{baz="qwe"} 1.32
|
|
|
|
bar{baz="qwe"} 4.34
|
|
|
|
bar{baz="qwe"} 2
|
|
|
|
foo{baz="qwe"} -5
|
|
|
|
bar{baz="qwer"} 343
|
|
|
|
bar{baz="qwer"} 344
|
|
|
|
foo{baz="qwe"} 10
|
2024-03-12 21:47:29 +00:00
|
|
|
`, `bar:1m_sum_samples{baz="qwe"} 4.34
|
2023-01-25 17:14:49 +00:00
|
|
|
bar:1m_sum_samples{baz="qwer"} 344
|
|
|
|
foo:1m_sum_samples 123
|
|
|
|
foo:1m_sum_samples{baz="qwe"} 10
|
2023-07-24 23:44:09 +00:00
|
|
|
`, "11111111")
|
2023-01-25 17:14:49 +00:00
|
|
|
}
|
|
|
|
|
2024-03-04 22:45:22 +00:00
|
|
|
func timeSeriessToString(tss []prompbmarshal.TimeSeries) string {
|
|
|
|
a := make([]string, len(tss))
|
|
|
|
for i, ts := range tss {
|
|
|
|
a[i] = timeSeriesToString(ts)
|
|
|
|
}
|
|
|
|
sort.Strings(a)
|
|
|
|
return strings.Join(a, "")
|
|
|
|
}
|
|
|
|
|
2023-01-04 06:19:18 +00:00
|
|
|
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
|
|
|
|
labelsString := promrelabel.LabelsToString(ts.Labels)
|
|
|
|
if len(ts.Samples) != 1 {
|
|
|
|
panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples)))
|
|
|
|
}
|
|
|
|
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
|
|
|
|
}
|
|
|
|
|
2024-03-04 22:45:22 +00:00
|
|
|
func appendClonedTimeseries(dst, src []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
|
|
|
|
for _, ts := range src {
|
|
|
|
dst = append(dst, prompbmarshal.TimeSeries{
|
|
|
|
Labels: append(ts.Labels[:0:0], ts.Labels...),
|
|
|
|
Samples: append(ts.Samples[:0:0], ts.Samples...),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return dst
|
|
|
|
}
|