mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
stream aggregation: perform deduplication for all received data when … (#6711)
…specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flag [The documentation](https://docs.victoriametrics.com/stream-aggregation/) contains conflicting descriptions regarding deduplication for non-matched series when `-remoteWrite.streamAggr.config` and / or `-streamAggr.config` are set: 1. Statement below says **all the received data** is deduplicated: >[vmagent](https://docs.victoriametrics.com/vmagent/) supports relabeling, deduplication and stream aggregation for all the received data, scraped or pushed. Then, the collected data will be forwarded to specified -remoteWrite.url destinations. The data processing order is the following: >1. all the received data is relabeled according to the specified [-remoteWrite.relabelConfig](https://docs.victoriametrics.com/vmagent/#relabeling) (if it is set) >2. all the received data is deduplicated according to specified [-streamAggr.dedupInterval](https://docs.victoriametrics.com/stream-aggregation/#deduplication) (if it is set to duration bigger than 0) 2. Another statement says the deduplication is performed individually for the **matching samples** >The de-deduplication is performed after applying [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling) and before performing the aggregation. If the -remoteWrite.streamAggr.config and / or -streamAggr.config is set, then the de-duplication is performed individually per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) for the matching samples after applying [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). Considering the following deduplication use cases: 1. To apply deduplication(globally or for specific remoteWrite destination) for all the received data, scraped or pushed --- using `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval`. 2. To deduplicate and aggregate metrics that match the rule `match` filters --- using `-remoteWrite.streamAggr.config` and specifiying `dedup_interval` option in [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). 3. To deduplicate all the received data while having `streamAggr.config` for some metrics --- no way for a single vmagent now, need to set up two level vmagents This PR implements case3. --------- Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
This commit is contained in:
parent
4df243d530
commit
d523015f27
4 changed files with 17 additions and 18 deletions
|
@ -494,7 +494,8 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
|
||||||
tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput)
|
tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput)
|
||||||
}
|
}
|
||||||
matchIdxsPool.Put(matchIdxs)
|
matchIdxsPool.Put(matchIdxs)
|
||||||
} else if deduplicatorGlobal != nil {
|
}
|
||||||
|
if deduplicatorGlobal != nil {
|
||||||
deduplicatorGlobal.Push(tssBlock)
|
deduplicatorGlobal.Push(tssBlock)
|
||||||
tssBlock = tssBlock[:0]
|
tssBlock = tssBlock[:0]
|
||||||
}
|
}
|
||||||
|
@ -922,7 +923,8 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa
|
||||||
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
|
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
|
||||||
}
|
}
|
||||||
matchIdxsPool.Put(matchIdxs)
|
matchIdxsPool.Put(matchIdxs)
|
||||||
} else if rwctx.deduplicator != nil {
|
}
|
||||||
|
if rwctx.deduplicator != nil {
|
||||||
rwctx.deduplicator.Push(tss)
|
rwctx.deduplicator.Push(tss)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,11 +130,10 @@ func initStreamAggrConfigGlobal() {
|
||||||
sasGlobal.Store(sas)
|
sasGlobal.Store(sas)
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
|
||||||
} else {
|
}
|
||||||
dedupInterval := streamAggrGlobalDedupInterval.Duration()
|
dedupInterval := streamAggrGlobalDedupInterval.Duration()
|
||||||
if dedupInterval > 0 {
|
if dedupInterval > 0 {
|
||||||
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global")
|
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global")
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,12 +151,11 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
|
||||||
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx)
|
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx)
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
|
||||||
} else {
|
}
|
||||||
dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx)
|
dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx)
|
||||||
if dedupInterval > 0 {
|
if dedupInterval > 0 {
|
||||||
alias := fmt.Sprintf("dedup-%d", idx+1)
|
alias := fmt.Sprintf("dedup-%d", idx+1)
|
||||||
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels, alias)
|
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels, alias)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,10 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
**Update note 1: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): perform deduplication for all received data when specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flag. Previously, if the `-remoteWrite.streamAggr.config` or `-streamAggr.config` is set, only series that matched aggregation config were deduplicated. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6711#issuecomment-2288361213) for details.**
|
||||||
|
|
||||||
|
* FEATURE [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): perform deduplication for all received data when specifying `-streamAggr.dedupInterval` or `-remoteWrite.streamAggr.dedupInterval` command-line flags are set. Previously, if the `-remoteWrite.streamAggr.config` or `-streamAggr.config` is set, only series that matched aggregation config were deduplicated. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6711#issuecomment-2288361213) for details.
|
||||||
|
|
||||||
## [v1.103.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.103.0)
|
## [v1.103.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.103.0)
|
||||||
|
|
||||||
Released at 2024-08-28
|
Released at 2024-08-28
|
||||||
|
|
|
@ -86,8 +86,6 @@ before sending them to the configured `-remoteWrite.url`. The de-duplication can
|
||||||
only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
|
only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
|
||||||
The de-deduplication is performed after applying [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling) and
|
The de-deduplication is performed after applying [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling) and
|
||||||
before performing the aggregation.
|
before performing the aggregation.
|
||||||
If the `-remoteWrite.streamAggr.config` and / or `-streamAggr.config` is set, then the de-duplication is performed individually per each
|
|
||||||
[stream aggregation config](#stream-aggregation-config) for the matching samples after applying [input_relabel_configs](#relabeling).
|
|
||||||
|
|
||||||
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config)
|
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config)
|
||||||
in `-remoteWrite.streamAggr.config` or `-streamAggr.config` configs.
|
in `-remoteWrite.streamAggr.config` or `-streamAggr.config` configs.
|
||||||
|
@ -100,9 +98,6 @@ before sending them to the configured `-remoteWrite.url`. The de-duplication can
|
||||||
seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
|
seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
|
||||||
The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling).
|
The de-duplication is performed after applying `-relabelConfig` [relabeling](https://docs.victoriametrics.com/#relabeling).
|
||||||
|
|
||||||
If the `-streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config)
|
|
||||||
for the matching samples after applying [input_relabel_configs](#relabeling).
|
|
||||||
|
|
||||||
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-streamAggr.config`.
|
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-streamAggr.config`.
|
||||||
|
|
||||||
It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels).
|
It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels).
|
||||||
|
|
Loading…
Reference in a new issue