mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/{vmagent,vminsert}: add an ability to ignore input samples outside the current aggregation interval for stream aggregation
See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples
This commit is contained in:
parent
6a465f6e29
commit
1cedaf61cb
9 changed files with 101 additions and 26 deletions
|
@ -3116,6 +3116,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-streamAggr.ignoreOldSamples
|
||||
Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples
|
||||
-streamAggr.keepInput
|
||||
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
|
||||
-tls array
|
||||
|
|
|
@ -92,6 +92,8 @@ var (
|
|||
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
|
||||
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
|
||||
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication")
|
||||
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+
|
||||
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples")
|
||||
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
|
||||
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels")
|
||||
|
||||
|
@ -745,10 +747,12 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
|
|||
// Initialize sas
|
||||
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
|
||||
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
|
||||
ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx)
|
||||
if sasFile != "" {
|
||||
opts := &streamaggr.Options{
|
||||
DedupInterval: dedupInterval,
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: ignoreOldSamples,
|
||||
}
|
||||
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
|
||||
if err != nil {
|
||||
|
@ -917,6 +921,8 @@ func (rwctx *remoteWriteCtx) reinitStreamAggr() {
|
|||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
|
||||
opts := &streamaggr.Options{
|
||||
DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx),
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(rwctx.idx),
|
||||
}
|
||||
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
|
||||
if err != nil {
|
||||
|
@ -962,6 +968,8 @@ func CheckStreamAggrConfigs() error {
|
|||
}
|
||||
opts := &streamaggr.Options{
|
||||
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
|
||||
}
|
||||
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts)
|
||||
if err != nil {
|
||||
|
|
|
@ -32,6 +32,8 @@ var (
|
|||
"See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication")
|
||||
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
|
||||
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels")
|
||||
streamAggrIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the current aggregation interval. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples")
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -55,6 +57,8 @@ func CheckStreamAggrConfig() error {
|
|||
pushNoop := func(tss []prompbmarshal.TimeSeries) {}
|
||||
opts := &streamaggr.Options{
|
||||
DedupInterval: *streamAggrDedupInterval,
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
|
||||
}
|
||||
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts)
|
||||
if err != nil {
|
||||
|
@ -82,6 +86,7 @@ func InitStreamAggr() {
|
|||
opts := &streamaggr.Options{
|
||||
DedupInterval: *streamAggrDedupInterval,
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
|
||||
}
|
||||
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
|
||||
if err != nil {
|
||||
|
@ -113,6 +118,8 @@ func reloadStreamAggrConfig() {
|
|||
|
||||
opts := &streamaggr.Options{
|
||||
DedupInterval: *streamAggrDedupInterval,
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
|
||||
}
|
||||
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
|
||||
if err != nil {
|
||||
|
|
|
@ -43,6 +43,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
|||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): use the same logic in [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) as in [the deduplication at VictoriaMetrics](https://docs.victoriametrics.com/#deduplication). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5643).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): ignore out of order samples samples when calculating [`increase`](https://docs.victoriametrics.com/stream-aggregation/#increase), [`increase_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus), [`total`](https://docs.victoriametrics.com/stream-aggregation/#total) and [`total_prometheus`](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs. Thanks to @edma2 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add an ability to ignore input samples with old timestamps outside the current aggregation interval. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples) for details.
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`.
|
||||
|
|
|
@ -1679,8 +1679,14 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical
|
|||
This allows saving CPU and RAM when executing unexpected heavy queries.
|
||||
- `-search.maxConcurrentRequests` limits the number of concurrent requests VictoriaMetrics can process. Bigger number of concurrent requests usually means
|
||||
bigger memory usage. For example, if a single query needs 100 MiB of additional memory during its execution, then 100 concurrent queries may need `100 * 100 MiB = 10 GiB`
|
||||
of additional memory. So it is better to limit the number of concurrent queries, while suspending additional incoming queries if the concurrency limit is reached.
|
||||
VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for suspended queries. See also `-search.maxMemoryPerQuery` command-line flag.
|
||||
of additional memory. So it is better to limit the number of concurrent queries, while pausing additional incoming queries if the concurrency limit is reached.
|
||||
VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for paused queries. See also `-search.maxMemoryPerQuery` command-line flag.
|
||||
- `-search.maxQueueDuration` limits the maximum duration queries may wait for execution when `-search.maxConcurrentRequests` concurrent queries are executed.
|
||||
- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements)
|
||||
query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and
|
||||
[/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues).
|
||||
This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series.
|
||||
The downside is that the endpoints can return labels and series, which do not match the provided extra filters.
|
||||
- `-search.maxSamplesPerSeries` limits the number of raw samples the query can process per each time series. VictoriaMetrics sequentially processes
|
||||
raw samples per each found time series during the query. It unpacks raw samples on the selected time range per each time series into memory
|
||||
and then applies the given [rollup function](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions). The `-search.maxSamplesPerSeries` command-line flag
|
||||
|
@ -1723,11 +1729,6 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical
|
|||
when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate).
|
||||
In this case it might be useful to set the `-search.maxLabelsAPIDuration` to quite low value in order to limit CPU and memory usage.
|
||||
See also `-search.maxLabelsAPISeries` and `-search.ignoreExtraFiltersAtLabelsAPI`.
|
||||
- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements)
|
||||
query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and
|
||||
[/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues).
|
||||
This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series.
|
||||
The downside is that the endpoints can return labels and series, which do not match the provided extra filters.
|
||||
- `-search.maxTagValueSuffixesPerSearch` limits the number of entries, which may be returned from `/metrics/find` endpoint. See [Graphite Metrics API usage docs](#graphite-metrics-api-usage).
|
||||
|
||||
See also [resource usage limits at VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#resource-usage-limits),
|
||||
|
@ -2790,7 +2791,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-loggerWarnsPerSecondLimit int
|
||||
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
|
||||
-maxConcurrentInserts int
|
||||
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 32)
|
||||
The maximum number of concurrent insert requests. Default value depends on the number of CPU cores and should work for most cases since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration
|
||||
-maxInsertRequestSize size
|
||||
The maximum size in bytes of a single Prometheus remote_write API request
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432)
|
||||
|
@ -3118,6 +3119,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-streamAggr.ignoreOldSamples
|
||||
Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples
|
||||
-streamAggr.keepInput
|
||||
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
|
||||
-tls array
|
||||
|
|
|
@ -1687,8 +1687,14 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical
|
|||
This allows saving CPU and RAM when executing unexpected heavy queries.
|
||||
- `-search.maxConcurrentRequests` limits the number of concurrent requests VictoriaMetrics can process. Bigger number of concurrent requests usually means
|
||||
bigger memory usage. For example, if a single query needs 100 MiB of additional memory during its execution, then 100 concurrent queries may need `100 * 100 MiB = 10 GiB`
|
||||
of additional memory. So it is better to limit the number of concurrent queries, while suspending additional incoming queries if the concurrency limit is reached.
|
||||
VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for suspended queries. See also `-search.maxMemoryPerQuery` command-line flag.
|
||||
of additional memory. So it is better to limit the number of concurrent queries, while pausing additional incoming queries if the concurrency limit is reached.
|
||||
VictoriaMetrics provides `-search.maxQueueDuration` command-line flag for limiting the max wait time for paused queries. See also `-search.maxMemoryPerQuery` command-line flag.
|
||||
- `-search.maxQueueDuration` limits the maximum duration queries may wait for execution when `-search.maxConcurrentRequests` concurrent queries are executed.
|
||||
- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements)
|
||||
query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and
|
||||
[/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues).
|
||||
This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series.
|
||||
The downside is that the endpoints can return labels and series, which do not match the provided extra filters.
|
||||
- `-search.maxSamplesPerSeries` limits the number of raw samples the query can process per each time series. VictoriaMetrics sequentially processes
|
||||
raw samples per each found time series during the query. It unpacks raw samples on the selected time range per each time series into memory
|
||||
and then applies the given [rollup function](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions). The `-search.maxSamplesPerSeries` command-line flag
|
||||
|
@ -1731,11 +1737,6 @@ By default, VictoriaMetrics is tuned for an optimal resource usage under typical
|
|||
when the database contains big number of unique time series because of [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate).
|
||||
In this case it might be useful to set the `-search.maxLabelsAPIDuration` to quite low value in order to limit CPU and memory usage.
|
||||
See also `-search.maxLabelsAPISeries` and `-search.ignoreExtraFiltersAtLabelsAPI`.
|
||||
- `-search.ignoreExtraFiltersAtLabelsAPI` enables ignoring of `match[]`, [`extra_filters[]` and `extra_label`](https://docs.victoriametrics.com/#prometheus-querying-api-enhancements)
|
||||
query args at [/api/v1/labels](https://docs.victoriametrics.com/url-examples/#apiv1labels) and
|
||||
[/api/v1/label/.../values](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues).
|
||||
This may be useful for reducing the load on VictoriaMetrics if the provided extra filters match too many time series.
|
||||
The downside is that the endpoints can return labels and series, which do not match the provided extra filters.
|
||||
- `-search.maxTagValueSuffixesPerSearch` limits the number of entries, which may be returned from `/metrics/find` endpoint. See [Graphite Metrics API usage docs](#graphite-metrics-api-usage).
|
||||
|
||||
See also [resource usage limits at VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#resource-usage-limits),
|
||||
|
@ -3126,6 +3127,8 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-streamAggr.ignoreOldSamples
|
||||
Whether to ignore input samples with old timestamps outside the current aggregation interval. See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples
|
||||
-streamAggr.keepInput
|
||||
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
|
||||
-tls array
|
||||
|
|
|
@ -80,6 +80,18 @@ It is possible to drop the given labels before applying the de-duplication. See
|
|||
|
||||
The online de-duplication uses the same logic as [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication) at VictoriaMetrics.
|
||||
|
||||
## Ignoring old samples
|
||||
|
||||
By default all the input samples are taken into account during stream aggregation. If samples with old timestamps outside the current [aggregation interval](#stream-aggregation-config)
|
||||
must be ignored, then the following options can be used:
|
||||
|
||||
- To pass `-remoteWrite.streamAggr.ignoreOldSamples` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
|
||||
or `-streamAggr.ignoreOldSamples` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/).
|
||||
This enables ignoring old samples for all the [aggregation configs](#stream-aggregation-config).
|
||||
|
||||
- To set `ignore_old_samples: true` option at the particular [aggregation config](#stream-aggregation-config).
|
||||
This enables ignoring old samples for that particular aggregation config.
|
||||
|
||||
## Flush time alignment
|
||||
|
||||
By default the time for aggregated data flush is aligned by the `interval` option specified in [aggregate config](#stream-aggregation-config).
|
||||
|
@ -915,6 +927,11 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
|
|||
#
|
||||
# keep_metric_names: false
|
||||
|
||||
# ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval.
|
||||
# See also -streamAggr.ignoreOldSamples command-line flag.
|
||||
#
|
||||
# ignore_old_samples: false
|
||||
|
||||
# drop_input_labels instructs dropping the given labels from input samples.
|
||||
# The labels' dropping is performed before input_relabel_configs are applied.
|
||||
# This also means that the labels are dropped before de-duplication ( https://docs.victoriametrics.com/stream-aggregation.html#deduplication )
|
||||
|
|
|
@ -2125,6 +2125,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
Empty values are set to false.
|
||||
-remoteWrite.streamAggr.ignoreOldSamples array
|
||||
Whether to ignore input samples with old timestamps outside the current aggregation interval for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
Empty values are set to false.
|
||||
-remoteWrite.streamAggr.keepInput array
|
||||
Whether to keep all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -101,8 +102,15 @@ type Options struct {
|
|||
//
|
||||
// input_name:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
|
||||
//
|
||||
// This option can be overriden individually per each aggregation via keep_metric_names option.
|
||||
// This option can be overridden individually per each aggregation via keep_metric_names option.
|
||||
KeepMetricNames bool
|
||||
|
||||
// IgnoreOldSamples instructs to ignore samples with timestamps older than the current aggregation interval.
|
||||
//
|
||||
// By default all the samples are taken into account.
|
||||
//
|
||||
// This option can be overridden individually per each aggregation via ignore_old_samples option.
|
||||
IgnoreOldSamples bool
|
||||
}
|
||||
|
||||
// Config is a configuration for a single stream aggregation.
|
||||
|
@ -164,6 +172,9 @@ type Config struct {
|
|||
// KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
|
||||
KeepMetricNames *bool `yaml:"keep_metric_names,omitempty"`
|
||||
|
||||
// IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval.
|
||||
IgnoreOldSamples *bool `yaml:"ignore_old_samples,omitempty"`
|
||||
|
||||
// By is an optional list of labels for grouping input series.
|
||||
//
|
||||
// See also Without.
|
||||
|
@ -328,6 +339,7 @@ type aggregator struct {
|
|||
outputRelabeling *promrelabel.ParsedConfigs
|
||||
|
||||
keepMetricNames bool
|
||||
ignoreOldSamples bool
|
||||
|
||||
by []string
|
||||
without []string
|
||||
|
@ -342,6 +354,9 @@ type aggregator struct {
|
|||
// lc is used for compressing series keys before passing them to dedupAggr and aggrState
|
||||
lc promutils.LabelsCompressor
|
||||
|
||||
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set
|
||||
minTimestamp atomic.Int64
|
||||
|
||||
// suffix contains a suffix, which should be added to aggregate metric names
|
||||
//
|
||||
// It contains the interval, labels in (by, without), plus output name.
|
||||
|
@ -458,6 +473,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
|
|||
}
|
||||
}
|
||||
|
||||
// check cfg.IgnoreOldSamples
|
||||
ignoreOldSamples := opts.IgnoreOldSamples
|
||||
if v := cfg.IgnoreOldSamples; v != nil {
|
||||
ignoreOldSamples = *v
|
||||
}
|
||||
|
||||
// initialize outputs list
|
||||
if len(cfg.Outputs) == 0 {
|
||||
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
|
||||
|
@ -545,6 +566,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
|
|||
outputRelabeling: outputRelabeling,
|
||||
|
||||
keepMetricNames: keepMetricNames,
|
||||
ignoreOldSamples: ignoreOldSamples,
|
||||
|
||||
by: by,
|
||||
without: without,
|
||||
|
@ -710,6 +732,8 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
|
|||
}
|
||||
wg.Wait()
|
||||
|
||||
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
|
||||
|
||||
d := time.Since(startTime)
|
||||
a.flushDuration.Update(d.Seconds())
|
||||
if d > interval {
|
||||
|
@ -742,6 +766,8 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
outputLabels := &ctx.outputLabels
|
||||
|
||||
dropLabels := a.dropInputLabels
|
||||
ignoreOldSamples := a.ignoreOldSamples
|
||||
minTimestamp := a.minTimestamp.Load()
|
||||
for idx, ts := range tss {
|
||||
if !a.match.Match(ts.Labels) {
|
||||
continue
|
||||
|
@ -775,6 +801,10 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
|
|||
// Skip NaN values
|
||||
continue
|
||||
}
|
||||
if ignoreOldSamples && sample.Timestamp < minTimestamp {
|
||||
// Skip old samples outside the current aggregation interval
|
||||
continue
|
||||
}
|
||||
samples = append(samples, pushSample{
|
||||
key: key,
|
||||
value: sample.Value,
|
||||
|
|
Loading…
Reference in a new issue