app/{vminsert,vmagent}: allow using -streamAggr.dedupInterval without -streamAggr.config

This allows performing online de-duplication of incoming samples
This commit is contained in:
Aliaksandr Valialkin 2024-03-05 00:45:22 +02:00
parent 4352544d61
commit c38c45d71f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
13 changed files with 400 additions and 67 deletions

View file

@ -89,9 +89,8 @@ var (
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated "+
"by stream aggregation. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication")
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+
"See also -remoteWrite.dropSamplesOnOverload")
@ -666,7 +665,9 @@ type remoteWriteCtx struct {
fq *persistentqueue.FastQueue
c *client
sas atomic.Pointer[streamaggr.Aggregators]
sas atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
streamAggrKeepInput bool
streamAggrDropInput bool
@ -739,9 +740,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
// Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
if sasFile != "" {
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(argIdx),
DedupInterval: dedupInterval,
}
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil {
@ -752,17 +754,24 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
} else if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval)
}
return rwctx
}
func (rwctx *remoteWriteCtx) MustStop() {
// sas must be stopped before rwctx is closed
// sas and deduplicator must be stopped before rwctx is closed
// because sas can write pending series to rwctx.pss if there are any
sas := rwctx.sas.Swap(nil)
sas.MustStop()
if rwctx.deduplicator != nil {
rwctx.deduplicator.MustStop()
rwctx.deduplicator = nil
}
for _, ps := range rwctx.pss {
ps.MustStop()
}
@ -801,7 +810,7 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
rowsCount := getRowsCount(tss)
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
// Apply stream aggregation if any
// Apply stream aggregation or deduplication if they are configured
sas := rwctx.sas.Load()
if sas != nil {
matchIdxs := matchIdxsPool.Get()
@ -816,6 +825,10 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries) bool {
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
}
matchIdxsPool.Put(matchIdxs)
} else if rwctx.deduplicator != nil {
rwctx.deduplicator.Push(tss)
clear(tss)
tss = tss[:0]
}
// Try pushing the data to remote storage
@ -844,7 +857,7 @@ func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, drop
}
}
tail := src[len(dst):]
_ = prompbmarshal.ResetTimeSeries(tail)
clear(tail)
return dst
}

View file

@ -31,9 +31,10 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `no_align_flush_to_interval` option for disabling time alignment for aggregated data flush. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [unique_samples](https://docs.victoriametrics.com/stream-aggregation/#unique_samples) output, which can be used for calculating the number of unique sample values over the given `interval`.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states.

View file

@ -26,7 +26,8 @@ Documentation for the cluster version of VictoriaMetrics is available [here](htt
Learn more about [key concepts](https://docs.victoriametrics.com/keyConcepts.html) of VictoriaMetrics and follow the
[quick start guide](https://docs.victoriametrics.com/Quick-Start.html) for a better experience.
If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://slack.victoriametrics.com/).
If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://victoriametrics.slack.com/),
you can join it via [Slack Inviter](https://slack.victoriametrics.com/).
[Contact us](mailto:info@victoriametrics.com) if you need enterprise support for VictoriaMetrics.
See [features available in enterprise package](https://docs.victoriametrics.com/enterprise.html).
@ -1812,6 +1813,12 @@ so the de-duplication consistently leaves samples for one `vmagent` instance and
from other `vmagent` instances.
See [these docs](https://docs.victoriametrics.com/vmagent.html#high-availability) for details.
VictoriaMetrics stores all the ingested samples to disk even if `-dedup.minScrapeInterval` command-line flag is set.
The ingested samples are de-duplicated during [background merges](#storage) and during query execution.
VictoriaMetrics also supports de-duplication during data ingestion before the data is stored to disk, via `-streamAggr.dedupInterval` command-line flag -
see [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication).
## Storage
VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`,
@ -2551,7 +2558,7 @@ Contact us with any questions regarding VictoriaMetrics at [info@victoriametrics
Feel free asking any questions regarding VictoriaMetrics:
* [Slack](https://slack.victoriametrics.com/)
* [Slack Inviter](https://slack.victoriametrics.com/) and [Slack channel](https://victoriametrics.slack.com/)
* [Twitter](https://twitter.com/VictoriaMetrics/)
* [Linkedin](https://www.linkedin.com/company/victoriametrics/)
* [Reddit](https://www.reddit.com/r/VictoriaMetrics/)
@ -2647,7 +2654,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)
-dedup.minScrapeInterval duration
Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling
Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See also -streamAggr.dedupInterval and https://docs.victoriametrics.com/#deduplication
-deleteAuthKey value
authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries
Flag value can be read from the given file when using -deleteAuthKey=file:///abs/path/to/file or -deleteAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -deleteAuthKey=http://host/path or -deleteAuthKey=https://host/path
@ -3103,7 +3110,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.keepInput

View file

@ -34,7 +34,8 @@ Documentation for the cluster version of VictoriaMetrics is available [here](htt
Learn more about [key concepts](https://docs.victoriametrics.com/keyConcepts.html) of VictoriaMetrics and follow the
[quick start guide](https://docs.victoriametrics.com/Quick-Start.html) for a better experience.
If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://slack.victoriametrics.com/).
If you have questions about VictoriaMetrics, then feel free asking them in the [VictoriaMetrics community Slack chat](https://victoriametrics.slack.com/),
you can join it via [Slack Inviter](https://slack.victoriametrics.com/).
[Contact us](mailto:info@victoriametrics.com) if you need enterprise support for VictoriaMetrics.
See [features available in enterprise package](https://docs.victoriametrics.com/enterprise.html).
@ -1820,6 +1821,12 @@ so the de-duplication consistently leaves samples for one `vmagent` instance and
from other `vmagent` instances.
See [these docs](https://docs.victoriametrics.com/vmagent.html#high-availability) for details.
VictoriaMetrics stores all the ingested samples to disk even if `-dedup.minScrapeInterval` command-line flag is set.
The ingested samples are de-duplicated during [background merges](#storage) and during query execution.
VictoriaMetrics also supports de-duplication during data ingestion before the data is stored to disk, via `-streamAggr.dedupInterval` command-line flag -
see [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication).
## Storage
VictoriaMetrics buffers the ingested data in memory for up to a second. Then the buffered data is written to in-memory `parts`,
@ -2559,7 +2566,7 @@ Contact us with any questions regarding VictoriaMetrics at [info@victoriametrics
Feel free asking any questions regarding VictoriaMetrics:
* [Slack](https://slack.victoriametrics.com/)
* [Slack Inviter](https://slack.victoriametrics.com/) and [Slack channel](https://victoriametrics.slack.com/)
* [Twitter](https://twitter.com/VictoriaMetrics/)
* [Linkedin](https://www.linkedin.com/company/victoriametrics/)
* [Reddit](https://www.reddit.com/r/VictoriaMetrics/)
@ -2655,7 +2662,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-datadog.sanitizeMetricName
Sanitize metric names for the ingested DataDog data to comply with DataDog behaviour described at https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics (default true)
-dedup.minScrapeInterval duration
Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling
Leave only the last sample in every time series per each discrete interval equal to -dedup.minScrapeInterval > 0. See also -streamAggr.dedupInterval and https://docs.victoriametrics.com/#deduplication
-deleteAuthKey value
authKey for metrics' deletion via /api/v1/admin/tsdb/delete_series and /tags/delSeries
Flag value can be read from the given file when using -deleteAuthKey=file:///abs/path/to/file or -deleteAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -deleteAuthKey=http://host/path or -deleteAuthKey=https://host/path
@ -3111,7 +3118,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.keepInput

View file

@ -50,17 +50,35 @@ This behaviour can be changed via the following command-line flags:
## Deduplication
By default, all the input samples are aggregated. Sometimes it is needed to de-duplicate samples for the same [time series](https://docs.victoriametrics.com/keyconcepts/#time-series)
before the aggregation. For example, if the samples are received from replicated sources.
In this case the [de-duplication](https://docs.victoriametrics.com/#deduplication) can be enabled via the following options:
[vmagent](https://docs.victoriametrics.com/vmagent.html) supports de-duplication of samples before sending them
to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options:
- `-remoteWrite.streamAggr.dedupInterval` command-line flag at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each `-remoteWrite.url`.
This allows setting different de-duplication intervals per each configured remote storage.
- `-streamAggr.dedupInterval` command-line flag at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
- `dedup_interval` option per each [aggregate config](#stream-aggregation-config).
- By specifying the desired de-duplication interval via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url`.
For example, `./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=30s` instructs `vmagent` to leave
only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
The de-duplication is performed after applying `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling).
De-duplicatation is performed after performing the input relabeling with `input_relabel_configs` - see [these docs](#relabeling).
If the `-remoteWrite.streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config)
for the matching samples after applying [input_relabel_configs](#relabeling).
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-remoteWrite.streamAggr.config`.
[Single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) supports two types of de-duplication:
- After storing the duplicate samples to local storage. See [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication) command-line option.
- Before storing the duplicate samples to local storage. This type of de-duplication can be enabled via the following options:
- By specifying the desired de-duplication interval via `-streamAggr.dedupInterval` command-line flag.
For example, `./victoria-metrics -streamAggr.dedupInterval=30s` instructs VicotriaMetrics to leave only the last sample per each
seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling).
If the `-streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config)
for the matching samples after applying [input_relabel_configs](#relabeling).
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-streamAggr.config`.
The online de-duplication doesn't take into account timestamps associated with the de-duplicated samples - it just leaves the last seen sample
on the configured deduplication interval. If you need taking into account timestamps during the de-duplication,
then use [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication).
## Flush time alignment
@ -407,7 +425,7 @@ The `keep_metric_names` option can be used if only a single output is set in [`o
It is possible to apply [arbitrary relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) to input and output metrics
during stream aggregation via `input_relabel_configs` and `output_relabel_configs` options in [stream aggregation config](#stream-aggregation-config).
Relabeling rules inside `input_relabel_configs` are applied to samples matching the `match` filters.
Relabeling rules inside `input_relabel_configs` are applied to samples matching the `match` filters before optional [deduplication](#deduplication).
Relabeling rules inside `output_relabel_configs` are applied to aggregated samples before sending them to the remote storage.
For example, the following config removes the `:1m_sum_samples` suffix added [to the output metric name](#output-metric-names):

View file

@ -235,7 +235,6 @@ And to `http://<prod-url>` it will forward only metrics that have `env=prod` lab
Please note, order of flags is important: 1st mentioned `-remoteWrite.urlRelabelConfig` will be applied to the
1st mentioned `-remoteWrite.url`, and so on.
### Prometheus remote_write proxy
`vmagent` can be used as a proxy for Prometheus data sent via Prometheus `remote_write` protocol. It can accept data via the `remote_write` API
@ -251,6 +250,28 @@ the `-remoteWrite.url` command-line flag should be configured as `<schema>://<vm
according to [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format).
There is also support for multitenant writes. See [these docs](#multitenancy).
### Flexible deduplication
[Deduplication at stream aggregation](https://docs.victoriametrics.com/stream-aggregation/#deduplication) allows setting up arbitrary complex de-duplication schemes
for the collected samples. Examples:
- The following command instructs `vmagent` to leave only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 60 seconds:
```
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=60s
```
- The following [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) instructs `vmagent` to merge
[time series](https://docs.victoriametrics.com/keyconcepts/#time-series) with different `replica` label values and then to leave only the last sample
per each merged series per ever 60 seconds:
```yml
- input_relabel_configs:
- action: labeldrop
regex: replica
interval: 60s
keep_metric_names: true
outputs: [last]
```
## VictoriaMetrics remote write protocol
`vmagent` supports sending data to the configured `-remoteWrite.url` either via Prometheus remote write protocol
@ -909,7 +930,6 @@ to all the metrics scraped by the given `vmagent` instance:
See also [how to shard data among multiple remote storage systems](#sharding-among-remote-storages).
## High availability
It is possible to run multiple **identically configured** `vmagent` instances or `vmagent`
@ -2104,7 +2124,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.streamAggr.dedupInterval array
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero (default 0s)
Input samples are de-duplicated with this interval before optional aggregation with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication (default 0s)
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to default value.
-remoteWrite.streamAggr.dropInput array

View file

@ -717,7 +717,7 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
}
dstSeries = append(dstSeries, ts)
}
prompbmarshal.ResetTimeSeries(wc.writeRequest.Timeseries[len(dstSeries):])
clear(wc.writeRequest.Timeseries[len(dstSeries):])
wc.writeRequest.Timeseries = dstSeries
if samplesDropped > 0 && !sw.seriesLimitExceeded {
sw.seriesLimitExceeded = true

View file

@ -53,7 +53,7 @@ func TestDedupAggrSerial(t *testing.T) {
}
}
func TestDedupAggrConcurrent(t *testing.T) {
func TestDedupAggrConcurrent(_ *testing.T) {
const concurrency = 5
const seriesCount = 10_000
da := newDedupAggr()

View file

@ -0,0 +1,196 @@
package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
)
// Deduplicator deduplicates samples per each time series.
type Deduplicator struct {
da *dedupAggr
lc promutils.LabelsCompressor
wg sync.WaitGroup
stopCh chan struct{}
ms *metrics.Set
}
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
//
// The de-duplicated samples are passed to pushFunc once per dedupInterval.
//
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
stopCh: make(chan struct{}),
ms: metrics.NewSet(),
}
ms := d.ms
_ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 {
return float64(d.da.sizeBytes())
})
_ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 {
return float64(d.da.itemsCount())
})
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(d.lc.SizeBytes())
})
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
return float64(d.lc.ItemsCount())
})
metrics.RegisterSet(ms)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.runFlusher(pushFunc, dedupInterval)
}()
return d
}
// MustStop stops d.
func (d *Deduplicator) MustStop() {
metrics.UnregisterSet(d.ms)
d.ms = nil
close(d.stopCh)
d.wg.Wait()
}
// Push pushes tss to d.
func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
ctx := getDeduplicatorPushCtx()
pss := ctx.pss
buf := ctx.buf
for _, ts := range tss {
buf = d.lc.Compress(buf[:0], ts.Labels)
key := bytesutil.InternBytes(buf)
for _, s := range ts.Samples {
pss = append(pss, pushSample{
key: key,
value: s.Value,
})
}
}
d.da.pushSamples(pss)
ctx.pss = pss
ctx.buf = buf
putDeduplicatorPushCtx(ctx)
}
func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration) {
t := time.NewTicker(dedupInterval)
defer t.Stop()
for {
select {
case <-d.stopCh:
return
case <-t.C:
d.flush(pushFunc)
}
}
}
func (d *Deduplicator) flush(pushFunc PushFunc) {
timestamp := time.Now().UnixMilli()
d.da.flush(func(pss []pushSample) {
ctx := getDeduplicatorFlushCtx()
tss := ctx.tss
labels := ctx.labels
samples := ctx.samples
for _, ps := range pss {
labelsLen := len(labels)
labels = decompressLabels(labels, &d.lc, ps.key)
samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{
Value: ps.value,
Timestamp: timestamp,
})
tss = append(tss, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: samples[samplesLen:],
})
}
pushFunc(tss)
ctx.tss = tss
ctx.labels = labels
ctx.samples = samples
putDeduplicatorFlushCtx(ctx)
}, true)
}
type deduplicatorPushCtx struct {
pss []pushSample
buf []byte
}
func (ctx *deduplicatorPushCtx) reset() {
clear(ctx.pss)
ctx.pss = ctx.pss[:0]
ctx.buf = ctx.buf[:0]
}
func getDeduplicatorPushCtx() *deduplicatorPushCtx {
v := deduplicatorPushCtxPool.Get()
if v == nil {
return &deduplicatorPushCtx{}
}
return v.(*deduplicatorPushCtx)
}
func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) {
ctx.reset()
deduplicatorPushCtxPool.Put(ctx)
}
var deduplicatorPushCtxPool sync.Pool
type deduplicatorFlushCtx struct {
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
}
func (ctx *deduplicatorFlushCtx) reset() {
clear(ctx.tss)
ctx.tss = ctx.tss[:0]
clear(ctx.labels)
ctx.labels = ctx.labels[:0]
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
}
func getDeduplicatorFlushCtx() *deduplicatorFlushCtx {
v := deduplicatorFlushCtxPool.Get()
if v == nil {
return &deduplicatorFlushCtx{}
}
return v.(*deduplicatorFlushCtx)
}
func putDeduplicatorFlushCtx(ctx *deduplicatorFlushCtx) {
ctx.reset()
deduplicatorFlushCtxPool.Put(ctx)
}
var deduplicatorFlushCtxPool sync.Pool

View file

@ -0,0 +1,49 @@
package streamaggr
import (
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestDeduplicator(t *testing.T) {
var tssResult []prompbmarshal.TimeSeries
var tssResultLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssResultLock.Lock()
tssResult = appendClonedTimeseries(tssResult, tss)
tssResultLock.Unlock()
}
tss := mustParsePromMetrics(`
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123
bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54
x 8943
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34
x 90984
x 433
asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
`)
d := NewDeduplicator(pushFunc, time.Hour)
for i := 0; i < 10; i++ {
d.Push(tss)
}
d.flush(pushFunc)
d.MustStop()
result := timeSeriessToString(tssResult)
resultExpected := `asfjkldsf{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 12322
bar{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 34.54
baz_aaa_aaa_fdd{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} -2.3
foo{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 894
x 433
`
if result != resultExpected {
t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected)
}
}

View file

@ -0,0 +1,21 @@
package streamaggr
import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func BenchmarkDeduplicatorPush(b *testing.B) {
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
d := NewDeduplicator(pushFunc, time.Hour)
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries)))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
d.Push(benchSeries)
}
})
}

View file

@ -61,20 +61,12 @@ func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators,
as, err := newAggregatorsFromData(data, pushFunc, opts)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err)
}
return as, nil
}
func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
}
return NewAggregators(cfgs, pushFunc, opts)
}
// Options contains optional settings for the Aggregators.
type Options struct {
// DedupInterval is deduplication interval for samples received for the same time series.
@ -194,25 +186,23 @@ type Config struct {
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
type Aggregators struct {
as []*aggregator
// configData contains marshaled configs passed to NewAggregators().
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
configData []byte
ms *metrics.Set
}
// NewAggregators creates Aggregators from the given cfgs.
//
// pushFunc is called when the aggregated data must be flushed.
//
// opts can contain additional options. If opts is nil, then default options are used.
//
// MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
}
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
@ -306,7 +296,7 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
// Otherwise it allocates new matchIdxs.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
for i := 0; i < len(matchIdxs); i++ {
for i := range matchIdxs {
matchIdxs[i] = 0
}
if a == nil {
@ -378,6 +368,9 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
}
// check cfg.Interval
if cfg.Interval == "" {
return nil, fmt.Errorf("missing `interval` option")
}
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err)
@ -910,7 +903,8 @@ func (ctx *flushCtx) reset() {
}
func (ctx *flushCtx) resetSeries() {
ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss)
clear(ctx.tss)
ctx.tss = ctx.tss[:0]
clear(ctx.labels)
ctx.labels = ctx.labels[:0]

View file

@ -210,14 +210,7 @@ func TestAggregatorsSuccess(t *testing.T) {
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutput = appendClonedTimeseries(tssOutput, tss)
tssOutputLock.Unlock()
}
opts := &Options{
@ -244,12 +237,7 @@ func TestAggregatorsSuccess(t *testing.T) {
}
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
outputMetrics := timeSeriessToString(tssOutput)
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
@ -925,6 +913,15 @@ foo:1m_sum_samples{baz="qwe"} 10
`, "11111111")
}
func timeSeriessToString(tss []prompbmarshal.TimeSeries) string {
a := make([]string, len(tss))
for i, ts := range tss {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
if len(ts.Samples) != 1 {
@ -965,3 +962,13 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
}
return tss
}
func appendClonedTimeseries(dst, src []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
for _, ts := range src {
dst = append(dst, prompbmarshal.TimeSeries{
Labels: append(ts.Labels[:0:0], ts.Labels...),
Samples: append(ts.Samples[:0:0], ts.Samples...),
})
}
return dst
}