VictoriaMetrics/lib/streamaggr/total.go

187 lines
4.8 KiB
Go
Raw Normal View History

package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
// totalAggrState calculates output=total, total_prometheus, increase and increase_prometheus.
type totalAggrState struct {
m sync.Map
// Whether to reset the output value on every flushState call.
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
resetTotalOnFlush bool
// Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
// Time series state is dropped if no new samples are received during stalenessSecs.
//
// Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set.
// see ignoreFirstSampleDeadline for more details.
stalenessSecs uint64
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
}
type totalStateValue struct {
mu sync.Mutex
lastValues map[string]totalLastValueState
total float64
deleteDeadline uint64
deleted bool
}
type totalLastValueState struct {
value float64
timestamp int64
deleteDeadline uint64
}
func newTotalAggrState(stalenessInterval, ignoreFirstSampleInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + roundDurationToSecs(ignoreFirstSampleInterval)
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691 - Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
return &totalAggrState{
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
stalenessSecs: stalenessSecs,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
}
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
func (as *totalAggrState) pushSamples(samples []pushSample) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
keepFirstSample := as.keepFirstSample && currentTime >= as.ignoreFirstSampleDeadline
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
for i := range samples {
s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key)
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
again:
v, ok := as.m.Load(outputKey)
if !ok {
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
// The entry is missing in the map. Try creating it.
v = &totalStateValue{
lastValues: make(map[string]totalLastValueState),
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
}
outputKey = bytesutil.InternString(outputKey)
vNew, loaded := as.m.LoadOrStore(outputKey, v)
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
sv := v.(*totalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if ok || keepFirstSample {
if s.timestamp < lv.timestamp {
// Skip out of order sample
sv.mu.Unlock()
continue
}
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
if s.value >= lv.value {
sv.total += s.value - lv.value
} else {
// counter reset
sv.total += s.value
}
}
lv.value = s.value
lv.timestamp = s.timestamp
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
lv.deleteDeadline = deleteDeadline
inputKey = bytesutil.InternString(inputKey)
sv.lastValues[inputKey] = lv
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
sv.deleteDeadline = deleteDeadline
}
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to flushState
lib/streamaggr: huge pile of changes - Reduce memory usage by up to 5x when de-duplicating samples across big number of time series. - Reduce memory usage by up to 5x when aggregating across big number of output time series. - Add lib/promutils.LabelsCompressor, which is going to be used by other VictoriaMetrics components for reducing memory usage for marshaled []prompbmarshal.Label. - Add `dedup_interval` option at aggregation config, which allows setting individual deduplication intervals per each aggregation. - Add `keep_metric_names` option at aggregation config, which allows keeping the original metric names in the output samples. - Add `unique_samples` output, which counts the number of unique sample values. - Add `increase_prometheus` and `total_prometheus` outputs, which ignore the first sample per each newly encountered time series. - Use 64-bit hashes instead of marshaled labels as map keys when calculating `count_series` output. This makes obsolete https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5579 - Expose various metrics, which may help debugging stream aggregation: - vm_streamaggr_dedup_state_size_bytes - the size of data structures responsible for deduplication - vm_streamaggr_dedup_state_items_count - the number of items in the deduplication data structures - vm_streamaggr_labels_compressor_size_bytes - the size of labels compressor data structures - vm_streamaggr_labels_compressor_items_count - the number of entries in the labels compressor - vm_streamaggr_flush_duration_seconds - a histogram, which shows the duration of stream aggregation flushes - vm_streamaggr_dedup_flush_duration_seconds - a histogram, which shows the duration of deduplication flushes - vm_streamaggr_flush_timeouts_total - counter for timed out stream aggregation flushes, which took longer than the configured interval - vm_streamaggr_dedup_flush_timeouts_total - counter for timed out deduplication flushes, which took longer than the configured dedup_interval - Actualize docs/stream-aggregation.md The memory usage reduction increases CPU usage during stream aggregation by up to 30%. This commit is based on https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5850 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5898
2024-03-02 00:42:26 +00:00
// Try obtaining and updating the entry again.
goto again
}
}
}
func (as *totalAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691 - Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
suffix := as.getSuffix()
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*totalStateValue)
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691 - Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
sv.mu.Lock()
total := sv.total
if as.resetTotalOnFlush {
sv.total = 0
} else if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
deleted := sv.deleted
sv.mu.Unlock()
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691 - Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
if !deleted {
key := k.(string)
ctx.appendSeries(key, suffix, total)
}
return true
})
}
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
app/vmagent/remotewrite: follow-up for f153f54d11250da050aa93bc4fa9b7ba9e144691 - Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go . This improves code maintainability a bit. - Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs(). This prevents from potential resource leaks. - Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation. This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions. - Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics. - Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation. This label should simplify investigation of the exposed metrics. - Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability: it is hard to use these labels in query filters and aggregation functions. - Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` . This metric shows the number of samples generated by the corresponding streaming aggregation rule. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice. This metric has been added in the commit 861852f2624895e01f93ce196607c72616ce2a94 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462 - Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params, which could modify the behaviour of the constructed streaming aggregator. Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory. - Pass Options arg to LoadFromFile() function by reference, since this structure is quite big. This also allows passing nil instead of Options when default options are enough. - Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics, so they have consistent set of labels comparing to the rest of streaming aggregation metrics. - Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output metrics. This is a follow-up for the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604 - Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason in the commit 2eb1bc4f814037ae87ac6556011ae0d3caee6bc8 . - Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function. While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
2024-07-15 16:01:37 +00:00
func (as *totalAggrState) getSuffix() string {
// Note: this function is at hot path, so it shouldn't allocate.
if as.resetTotalOnFlush {
if as.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if as.keepFirstSample {
return "total"
}
return "total_prometheus"
}
func (as *totalAggrState) removeOldEntries(currentTime uint64) {
lib/streamaggr: follow-up for 9c3d44c8c961d072f545e5e09f98f99bb7dfb2c0 - Consistently enumerate stream aggregation outputs in alphabetical order across the source code and docs. This should simplify future maintenance of the corresponding code and docs. - Fix the link to `rate_sum()` at `see also` section of `rate_avg()` docs. - Make more clear the docs for `rate_sum()` and `rate_avg()` outputs. - Encapsulate output metric suffix inside rateAggrState. This eliminates possible bugs related to incorrect suffix passing to newRateAggrState(). - Rename rateAggrState.total field to less misleading rateAggrState.increase name, since it calculates counter increase in the current aggregation window. - Set rateLastValueState.prevTimestamp on the first sample in time series instead of the second sample. This makes more clear the code logic. - Move the code for removing outdated entries at rateAggrState into removeOldEntries() function. This make the code logic inside rateAggrState.flushState() more clear. - Do not write output sample with zero value if there are no input series, which could be used for calculating the rate, e.g. if only a single sample is registered for every input series. - Do not take into account input series with a single registered sample when calculating rate_avg(), since this leads to incorrect results. - Move {rate,total}AggrState.flushState() function to the end of rate.go and total.go files, so they look more similar. This shuld simplify future mantenance. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6243
2024-07-14 15:23:59 +00:00
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
if currentTime > sv.deleteDeadline {
// Mark the current entry as deleted
sv.deleted = true
sv.mu.Unlock()
m.Delete(k)
return true
}
// Delete outdated entries in sv.lastValues
lvs := sv.lastValues
for k1, lv := range lvs {
if currentTime > lv.deleteDeadline {
delete(lvs, k1)
}
}
sv.mu.Unlock()
return true
})
}