From da611ad628294711e0f7c62f3261d49e79ead214 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 5 Mar 2024 02:13:21 +0200 Subject: [PATCH] app/{vmagent,vminsert}: add `-streamAggr.dropInputSamples` command-line flag for dropping the specified labels from input samples before deduplication and streaming aggregation --- README.md | 6 +- app/vmagent/remotewrite/remotewrite.go | 8 ++- app/vminsert/common/streamaggr.go | 10 +++- docs/CHANGELOG.md | 1 + docs/README.md | 6 +- docs/Single-server-VictoriaMetrics.md | 6 +- docs/stream-aggregation.md | 35 +++++++++++ docs/vmagent.md | 20 +++---- lib/promutils/labels.go | 14 ++--- lib/streamaggr/deduplicator.go | 68 +++++++++++++++++++--- lib/streamaggr/deduplicator_test.go | 12 ++-- lib/streamaggr/deduplicator_timing_test.go | 2 +- lib/streamaggr/streamaggr.go | 26 ++++++++- 13 files changed, 168 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 402c580c1..11f61b691 100644 --- a/README.md +++ b/README.md @@ -3107,9 +3107,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -streamAggr.config string Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration - Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication + Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication -streamAggr.dropInput Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html + -streamAggr.dropInputLabels array + An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels + Supports an array of values separated by comma or specified via multiple flags. + Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -streamAggr.keepInput Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls array diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 8c6b12ca1..cf9b25e49 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -91,6 +91,9 @@ var ( "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+ "with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication") + streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ + "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels") + disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+ "See also -remoteWrite.dropSamplesOnOverload") @@ -743,7 +746,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) if sasFile != "" { opts := &streamaggr.Options{ - DedupInterval: dedupInterval, + DedupInterval: dedupInterval, + DropInputLabels: *streamAggrDropInputLabels, } sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) if err != nil { @@ -755,7 +759,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) } else if dedupInterval > 0 { - rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval) + rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels) } return rwctx diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index dc5402f12..0a4862340 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -28,7 +29,9 @@ var ( "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ "See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html") streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . "+ - "See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication") + "See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication") + streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ + "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels") ) var ( @@ -69,7 +72,7 @@ func InitStreamAggr() { if *streamAggrConfig == "" { if *streamAggrDedupInterval > 0 { - deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval) + deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels) } return } @@ -77,7 +80,8 @@ func InitStreamAggr() { sighupCh := procutil.NewSighupChan() opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, + DedupInterval: *streamAggrDedupInterval, + DropInputLabels: *streamAggrDropInputLabels, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) if err != nil { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0f369d780..9e6eb8e99 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -32,6 +32,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details. diff --git a/docs/README.md b/docs/README.md index e5f89e78e..73f8a7fa7 100644 --- a/docs/README.md +++ b/docs/README.md @@ -3110,9 +3110,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -streamAggr.config string Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration - Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication + Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication -streamAggr.dropInput Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html + -streamAggr.dropInputLabels array + An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels + Supports an array of values separated by comma or specified via multiple flags. + Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -streamAggr.keepInput Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls array diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 075e805bb..ee8d39f05 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -3118,9 +3118,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -streamAggr.config string Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval -streamAggr.dedupInterval duration - Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication + Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication -streamAggr.dropInput Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html + -streamAggr.dropInputLabels array + An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels + Supports an array of values separated by comma or specified via multiple flags. + Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -streamAggr.keepInput Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html -tls array diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 2962ec1e6..aa70aa00c 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -76,6 +76,8 @@ to the configured `-remoteWrite.url`. The de-duplication can be enabled via the - By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-streamAggr.config`. +It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels). + The online de-duplication doesn't take into account timestamps associated with the de-duplicated samples - it just leaves the last seen sample on the configured deduplication interval. If you need taking into account timestamps during the de-duplication, then use [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication). @@ -447,6 +449,32 @@ Another option to remove the suffix, which is added by stream aggregation, is to keep_metric_names: true ``` +See also [dropping unneded labels](#dropping-unneeded-labels). + + +## Dropping unneeded labels + +If you need dropping some labels from input samples before [input relabeling](#relabeling), [de-duplication](#deduplication) +and [stream aggregation](#aggregation-outputs), then the following options exist: + +- To specify comma-separated list of label names to drop in `-streamAggr.dropInputLabels` command-line flag. + For example, `-streamAggr.dropInputLabels=replica,az` instructs to drop `replica` and `az` labels from input samples + before applying de-duplication and stream aggregation. + +- To specify `drop_input_labels` list with the labels to drop in [stream aggregation config](#stream-aggregation-config). + For example, the following config drops `replica` label from input samples with the name `process_resident_memory_bytes` + before calculating the average over one minute: + + ```yaml + - match: process_resident_memory_bytes + interval: 1m + drop_input_labels: [replica] + outputs: [avg] + keep_metric_names: true + ``` + +Typical use case is to drop `replica` label from samples, which are recevied from high availability replicas. + ## Aggregation outputs The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config) @@ -889,6 +917,13 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server- # # keep_metric_names: false + # drop_input_labels instructs dropping the given labels from input samples. + # The labels' dropping is performed before input_relabel_configs are applied. + # This also means that the labels are dropped before de-duplication ( https://docs.victoriametrics.com/stream-aggregation.html#deduplication ) + # and stream aggregation. + # + # drop_input_labels: [replica, availability_zone] + # input_relabel_configs is an optional relabeling rules, # which are applied to the incoming samples after they pass the match filter # and before being aggregated. diff --git a/docs/vmagent.md b/docs/vmagent.md index 08eb403e7..556accca6 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -255,21 +255,15 @@ There is also support for multitenant writes. See [these docs](#multitenancy). [Deduplication at stream aggregation](https://docs.victoriametrics.com/stream-aggregation/#deduplication) allows setting up arbitrary complex de-duplication schemes for the collected samples. Examples: -- The following command instructs `vmagent` to leave only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 60 seconds: +- The following command instructs `vmagent` to send only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 60 seconds: ``` ./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=60s ``` -- The following [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) instructs `vmagent` to merge - [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) with different `replica` label values and then to leave only the last sample - per each merged series per ever 60 seconds: - ```yml - - input_relabel_configs: - - action: labeldrop - regex: replica - interval: 60s - keep_metric_names: true - outputs: [last] +- The following command instructs `vmagent` to merge [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) with different `replica` label values + and then to send only the last sample per each merged series per ever 60 seconds: + ``` + ./vmagent -remoteWrite=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -remoteWrite.streamAggr.dedupInterval=60s ``` ## VictoriaMetrics remote write protocol @@ -2173,6 +2167,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . The compression level for VictoriaMetrics remote write protocol. Higher values reduce network traffic at the cost of higher CPU usage. Negative values reduce CPU usage at the cost of increased network traffic. See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol -sortLabels Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit + -streamAggr.dropInputLabels array + An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels + Supports an array of values separated by comma or specified via multiple flags. + Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. -tls array Whether to enable TLS for incoming HTTP requests at the given -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set. See also -mtls Supports array of values separated by comma or specified via multiple flags. diff --git a/lib/promutils/labels.go b/lib/promutils/labels.go index 75c223539..567157097 100644 --- a/lib/promutils/labels.go +++ b/lib/promutils/labels.go @@ -117,7 +117,7 @@ func (x *Labels) String() string { // Reset resets x. func (x *Labels) Reset() { - cleanLabels(x.Labels) + clear(x.Labels) x.Labels = x.Labels[:0] } @@ -245,7 +245,7 @@ func (x *Labels) RemoveDuplicates() { prevName = label.Name } } - cleanLabels(labels[len(tmp):]) + clear(labels[len(tmp):]) x.Labels = tmp } @@ -261,7 +261,7 @@ func (x *Labels) RemoveMetaLabels() { } dst = append(dst, label) } - cleanLabels(src[len(dst):]) + clear(src[len(dst):]) x.Labels = dst } @@ -276,16 +276,10 @@ func (x *Labels) RemoveLabelsWithDoubleUnderscorePrefix() { } dst = append(dst, label) } - cleanLabels(src[len(dst):]) + clear(src[len(dst):]) x.Labels = dst } -func cleanLabels(labels []prompbmarshal.Label) { - for i := range labels { - labels[i] = prompbmarshal.Label{} - } -} - // GetLabels returns and empty Labels instance from the pool. // // The returned Labels instance must be returned to pool via PutLabels() when no longer needed. diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index 1fd11e7ff..9a97ac60c 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -1,10 +1,12 @@ package streamaggr import ( + "slices" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/metrics" @@ -15,20 +17,31 @@ type Deduplicator struct { da *dedupAggr lc promutils.LabelsCompressor + dropLabels []string + wg sync.WaitGroup stopCh chan struct{} ms *metrics.Set + + dedupFlushDuration *metrics.Histogram + dedupFlushTimeouts *metrics.Counter } // NewDeduplicator returns new deduplicator, which deduplicates samples per each time series. // // The de-duplicated samples are passed to pushFunc once per dedupInterval. // +// An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples. +// Common case is to drop `replica`-like labels from samples received from HA datasources. +// // MustStop must be called on the returned deduplicator in order to free up occupied resources. -func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration) *Deduplicator { +func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator { d := &Deduplicator{ - da: newDedupAggr(), + da: newDedupAggr(), + + dropLabels: dropLabels, + stopCh: make(chan struct{}), ms: metrics.NewSet(), } @@ -47,6 +60,10 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration) *Deduplicat _ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 { return float64(d.lc.ItemsCount()) }) + + d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`) + d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`) + metrics.RegisterSet(ms) d.wg.Add(1) @@ -71,10 +88,22 @@ func (d *Deduplicator) MustStop() { func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { ctx := getDeduplicatorPushCtx() pss := ctx.pss + labels := &ctx.labels buf := ctx.buf + dropLabels := d.dropLabels for _, ts := range tss { - buf = d.lc.Compress(buf[:0], ts.Labels) + if len(dropLabels) > 0 { + labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels) + } else { + labels.Labels = append(labels.Labels[:0], ts.Labels...) + } + if len(labels.Labels) == 0 { + continue + } + labels.Sort() + + buf = d.lc.Compress(buf[:0], labels.Labels) key := bytesutil.InternBytes(buf) for _, s := range ts.Samples { pss = append(pss, pushSample{ @@ -91,6 +120,15 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) { putDeduplicatorPushCtx(ctx) } +func dropSeriesLabels(dst, src []prompbmarshal.Label, labelNames []string) []prompbmarshal.Label { + for _, label := range src { + if !slices.Contains(labelNames, label.Name) { + dst = append(dst, label) + } + } + return dst +} + func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration) { t := time.NewTicker(dedupInterval) defer t.Stop() @@ -99,13 +137,15 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration case <-d.stopCh: return case <-t.C: - d.flush(pushFunc) + d.flush(pushFunc, dedupInterval) } } } -func (d *Deduplicator) flush(pushFunc PushFunc) { - timestamp := time.Now().UnixMilli() +func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) { + startTime := time.Now() + + timestamp := startTime.UnixMilli() d.da.flush(func(pss []pushSample) { ctx := getDeduplicatorFlushCtx() @@ -134,17 +174,29 @@ func (d *Deduplicator) flush(pushFunc PushFunc) { ctx.samples = samples putDeduplicatorFlushCtx(ctx) }, true) + + duration := time.Since(startTime) + d.dedupFlushDuration.Update(duration.Seconds()) + if duration > dedupInterval { + d.dedupFlushTimeouts.Inc() + logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+ + "possible solutions: increase dedupInterval; reduce samples' ingestion rate", dedupInterval, duration.Seconds()) + } + } type deduplicatorPushCtx struct { - pss []pushSample - buf []byte + pss []pushSample + labels promutils.Labels + buf []byte } func (ctx *deduplicatorPushCtx) reset() { clear(ctx.pss) ctx.pss = ctx.pss[:0] + ctx.labels.Reset() + ctx.buf = ctx.buf[:0] } diff --git a/lib/streamaggr/deduplicator_test.go b/lib/streamaggr/deduplicator_test.go index 7f8abd8e6..ad860bd26 100644 --- a/lib/streamaggr/deduplicator_test.go +++ b/lib/streamaggr/deduplicator_test.go @@ -29,18 +29,18 @@ foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="as baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3 `) - d := NewDeduplicator(pushFunc, time.Hour) + d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}) for i := 0; i < 10; i++ { d.Push(tss) } - d.flush(pushFunc) + d.flush(pushFunc, time.Hour) d.MustStop() result := timeSeriessToString(tssResult) - resultExpected := `asfjkldsf{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 12322 -bar{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 34.54 -baz_aaa_aaa_fdd{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} -2.3 -foo{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 894 + resultExpected := `asfjkldsf{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 12322 +bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54 +baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3 +foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894 x 433 ` if result != resultExpected { diff --git a/lib/streamaggr/deduplicator_timing_test.go b/lib/streamaggr/deduplicator_timing_test.go index c9bf0f3dc..e4e859590 100644 --- a/lib/streamaggr/deduplicator_timing_test.go +++ b/lib/streamaggr/deduplicator_timing_test.go @@ -9,7 +9,7 @@ import ( func BenchmarkDeduplicatorPush(b *testing.B) { pushFunc := func(tss []prompbmarshal.TimeSeries) {} - d := NewDeduplicator(pushFunc, time.Hour) + d := NewDeduplicator(pushFunc, time.Hour, nil) b.ReportAllocs() b.SetBytes(int64(len(benchSeries))) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index f2de26a99..f0ea7c06a 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -78,6 +78,9 @@ type Options struct { // The deduplication can be set up individually per each aggregation via dedup_interval option. DedupInterval time.Duration + // DropInputLabels is an optional list of labels to drop from samples before de-duplication and stream aggregation. + DropInputLabels []string + // NoAlignFlushToInterval disables alignment of flushes to the aggregation interval. // // By default flushes are aligned to aggregation interval. @@ -177,6 +180,11 @@ type Config struct { // individually per each input time series. Without []string `yaml:"without,omitempty"` + // DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples. + // + // Labels are dropped before de-duplication and aggregation. + DropInputLabels *[]string `yaml:"drop_input_labels,omitempty"` + // InputRelabelConfigs is an optional relabeling rules, which are applied on the input // before aggregation. InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"` @@ -314,6 +322,8 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []b type aggregator struct { match *promrelabel.IfExpression + dropInputLabels []string + inputRelabeling *promrelabel.ParsedConfigs outputRelabeling *promrelabel.ParsedConfigs @@ -407,6 +417,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option } } + // Check cfg.DropInputLabels + dropInputLabels := opts.DropInputLabels + if v := cfg.DropInputLabels; v != nil { + dropInputLabels = *v + } + // initialize input_relabel_configs and output_relabel_configs inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs) if err != nil { @@ -524,6 +540,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option a := &aggregator{ match: cfg.Match, + dropInputLabels: dropInputLabels, inputRelabeling: inputRelabeling, outputRelabeling: outputRelabeling, @@ -719,18 +736,23 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { defer putPushCtx(ctx) samples := ctx.samples + buf := ctx.buf labels := &ctx.labels inputLabels := &ctx.inputLabels outputLabels := &ctx.outputLabels - buf := ctx.buf + dropLabels := a.dropInputLabels for idx, ts := range tss { if !a.match.Match(ts.Labels) { continue } matchIdxs[idx] = 1 - labels.Labels = append(labels.Labels[:0], ts.Labels...) + if len(dropLabels) > 0 { + labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels) + } else { + labels.Labels = append(labels.Labels[:0], ts.Labels...) + } labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0) if len(labels.Labels) == 0 { // The metric has been deleted by the relabeling