app/{vmagent,vminsert}: add -streamAggr.dropInputSamples command-line flag for dropping the specified labels from input samples before deduplication and streaming aggregation

This commit is contained in:
Aliaksandr Valialkin 2024-03-05 02:13:21 +02:00
parent ed523b5bbc
commit da611ad628
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
13 changed files with 168 additions and 46 deletions

View file

@ -3107,9 +3107,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.keepInput
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls array

View file

@ -91,6 +91,9 @@ var (
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels")
disableOnDiskQueue = flag.Bool("remoteWrite.disableOnDiskQueue", false, "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent.html#disabling-on-disk-persistence ."+
"See also -remoteWrite.dropSamplesOnOverload")
@ -743,7 +746,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
if sasFile != "" {
opts := &streamaggr.Options{
DedupInterval: dedupInterval,
DedupInterval: dedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
}
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil {
@ -755,7 +759,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
} else if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval)
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels)
}
return rwctx

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -28,7 +29,9 @@ var (
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
"See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . "+
"See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication")
"See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels")
)
var (
@ -69,7 +72,7 @@ func InitStreamAggr() {
if *streamAggrConfig == "" {
if *streamAggrDedupInterval > 0 {
deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval)
deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels)
}
return
}
@ -77,7 +80,8 @@ func InitStreamAggr() {
sighupCh := procutil.NewSighupChan()
opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil {

View file

@ -32,6 +32,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): reduce memory usage by up to 5x when aggregating over big number of unique [time series](https://docs.victoriametrics.com/keyconcepts/#time-series). The memory usage reduction is most visible when [stream deduplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) is enabled. The downside is increased CPU usage by up to 30%.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): allow using `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line flags without the need to specify `-streamAggr.config` and `-remoteWrite.streamAggr.config`. See [these docs](https://docs.victoriametrics.com/stream-aggregation/#deduplication).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `-streamAggr.dropInputLabels` command-line flag, which can be used for dropping the listed labels from input samples before applying stream [de-duplication](https://docs.victoriametrics.com/stream-aggregation/#deduplication) and aggregation. This is faster and easier to use alternative to [input_relabel_configs](https://docs.victoriametrics.com/stream-aggregation/#relabeling). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `dedup_interval` option, which allows configuring individual [deduplication intervals](https://docs.victoriametrics.com/stream-aggregation/#deduplication) per each [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add `keep_metric_names` option, which can be set at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) in order to keep the original metric names in the output aggregated samples instead of using [the default output metric naming scheme](https://docs.victoriametrics.com/stream-aggregation/#output-metric-names).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): align the time of aggregated data flush to the specified aggregation `interval`. For example, if `interval` is set to `1m`, then the aggregated data will be flushed at the end of every minute. The alginment can be disabled by setting `no_align_flush_to_interval: true` option at [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). See [these docs](https://docs.victoriametrics.com/stream-aggregation/#flush-time-alignment) for details.

View file

@ -3110,9 +3110,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.keepInput
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls array

View file

@ -3118,9 +3118,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.keepInput
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls array

View file

@ -76,6 +76,8 @@ to the configured `-remoteWrite.url`. The de-duplication can be enabled via the
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-streamAggr.config`.
It is possible to drop the given labels before applying the de-duplication. See [these docs](#dropping-unneeded-labels).
The online de-duplication doesn't take into account timestamps associated with the de-duplicated samples - it just leaves the last seen sample
on the configured deduplication interval. If you need taking into account timestamps during the de-duplication,
then use [`-dedup.minScrapeInterval` command-line flag](https://docs.victoriametrics.com/#deduplication).
@ -447,6 +449,32 @@ Another option to remove the suffix, which is added by stream aggregation, is to
keep_metric_names: true
```
See also [dropping unneded labels](#dropping-unneeded-labels).
## Dropping unneeded labels
If you need dropping some labels from input samples before [input relabeling](#relabeling), [de-duplication](#deduplication)
and [stream aggregation](#aggregation-outputs), then the following options exist:
- To specify comma-separated list of label names to drop in `-streamAggr.dropInputLabels` command-line flag.
For example, `-streamAggr.dropInputLabels=replica,az` instructs to drop `replica` and `az` labels from input samples
before applying de-duplication and stream aggregation.
- To specify `drop_input_labels` list with the labels to drop in [stream aggregation config](#stream-aggregation-config).
For example, the following config drops `replica` label from input samples with the name `process_resident_memory_bytes`
before calculating the average over one minute:
```yaml
- match: process_resident_memory_bytes
interval: 1m
drop_input_labels: [replica]
outputs: [avg]
keep_metric_names: true
```
Typical use case is to drop `replica` label from samples, which are recevied from high availability replicas.
## Aggregation outputs
The aggregations are calculated during the `interval` specified in the [config](#stream-aggregation-config)
@ -889,6 +917,13 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
#
# keep_metric_names: false
# drop_input_labels instructs dropping the given labels from input samples.
# The labels' dropping is performed before input_relabel_configs are applied.
# This also means that the labels are dropped before de-duplication ( https://docs.victoriametrics.com/stream-aggregation.html#deduplication )
# and stream aggregation.
#
# drop_input_labels: [replica, availability_zone]
# input_relabel_configs is an optional relabeling rules,
# which are applied to the incoming samples after they pass the match filter
# and before being aggregated.

View file

@ -255,21 +255,15 @@ There is also support for multitenant writes. See [these docs](#multitenancy).
[Deduplication at stream aggregation](https://docs.victoriametrics.com/stream-aggregation/#deduplication) allows setting up arbitrary complex de-duplication schemes
for the collected samples. Examples:
- The following command instructs `vmagent` to leave only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 60 seconds:
- The following command instructs `vmagent` to send only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 60 seconds:
```
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=60s
```
- The following [stream aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config) instructs `vmagent` to merge
[time series](https://docs.victoriametrics.com/keyconcepts/#time-series) with different `replica` label values and then to leave only the last sample
per each merged series per ever 60 seconds:
```yml
- input_relabel_configs:
- action: labeldrop
regex: replica
interval: 60s
keep_metric_names: true
outputs: [last]
- The following command instructs `vmagent` to merge [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) with different `replica` label values
and then to send only the last sample per each merged series per ever 60 seconds:
```
./vmagent -remoteWrite=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -remoteWrite.streamAggr.dedupInterval=60s
```
## VictoriaMetrics remote write protocol
@ -2173,6 +2167,10 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
The compression level for VictoriaMetrics remote write protocol. Higher values reduce network traffic at the cost of higher CPU usage. Negative values reduce CPU usage at the cost of increased network traffic. See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol
-sortLabels
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
-streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-tls array
Whether to enable TLS for incoming HTTP requests at the given -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set. See also -mtls
Supports array of values separated by comma or specified via multiple flags.

View file

@ -117,7 +117,7 @@ func (x *Labels) String() string {
// Reset resets x.
func (x *Labels) Reset() {
cleanLabels(x.Labels)
clear(x.Labels)
x.Labels = x.Labels[:0]
}
@ -245,7 +245,7 @@ func (x *Labels) RemoveDuplicates() {
prevName = label.Name
}
}
cleanLabels(labels[len(tmp):])
clear(labels[len(tmp):])
x.Labels = tmp
}
@ -261,7 +261,7 @@ func (x *Labels) RemoveMetaLabels() {
}
dst = append(dst, label)
}
cleanLabels(src[len(dst):])
clear(src[len(dst):])
x.Labels = dst
}
@ -276,16 +276,10 @@ func (x *Labels) RemoveLabelsWithDoubleUnderscorePrefix() {
}
dst = append(dst, label)
}
cleanLabels(src[len(dst):])
clear(src[len(dst):])
x.Labels = dst
}
func cleanLabels(labels []prompbmarshal.Label) {
for i := range labels {
labels[i] = prompbmarshal.Label{}
}
}
// GetLabels returns and empty Labels instance from the pool.
//
// The returned Labels instance must be returned to pool via PutLabels() when no longer needed.

View file

@ -1,10 +1,12 @@
package streamaggr
import (
"slices"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
@ -15,20 +17,31 @@ type Deduplicator struct {
da *dedupAggr
lc promutils.LabelsCompressor
dropLabels []string
wg sync.WaitGroup
stopCh chan struct{}
ms *metrics.Set
dedupFlushDuration *metrics.Histogram
dedupFlushTimeouts *metrics.Counter
}
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
//
// The de-duplicated samples are passed to pushFunc once per dedupInterval.
//
// An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples.
// Common case is to drop `replica`-like labels from samples received from HA datasources.
//
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration) *Deduplicator {
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
da: newDedupAggr(),
dropLabels: dropLabels,
stopCh: make(chan struct{}),
ms: metrics.NewSet(),
}
@ -47,6 +60,10 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration) *Deduplicat
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
return float64(d.lc.ItemsCount())
})
d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`)
d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`)
metrics.RegisterSet(ms)
d.wg.Add(1)
@ -71,10 +88,22 @@ func (d *Deduplicator) MustStop() {
func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
ctx := getDeduplicatorPushCtx()
pss := ctx.pss
labels := &ctx.labels
buf := ctx.buf
dropLabels := d.dropLabels
for _, ts := range tss {
buf = d.lc.Compress(buf[:0], ts.Labels)
if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
} else {
labels.Labels = append(labels.Labels[:0], ts.Labels...)
}
if len(labels.Labels) == 0 {
continue
}
labels.Sort()
buf = d.lc.Compress(buf[:0], labels.Labels)
key := bytesutil.InternBytes(buf)
for _, s := range ts.Samples {
pss = append(pss, pushSample{
@ -91,6 +120,15 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
putDeduplicatorPushCtx(ctx)
}
func dropSeriesLabels(dst, src []prompbmarshal.Label, labelNames []string) []prompbmarshal.Label {
for _, label := range src {
if !slices.Contains(labelNames, label.Name) {
dst = append(dst, label)
}
}
return dst
}
func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration) {
t := time.NewTicker(dedupInterval)
defer t.Stop()
@ -99,13 +137,15 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc, dedupInterval time.Duration
case <-d.stopCh:
return
case <-t.C:
d.flush(pushFunc)
d.flush(pushFunc, dedupInterval)
}
}
}
func (d *Deduplicator) flush(pushFunc PushFunc) {
timestamp := time.Now().UnixMilli()
func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
startTime := time.Now()
timestamp := startTime.UnixMilli()
d.da.flush(func(pss []pushSample) {
ctx := getDeduplicatorFlushCtx()
@ -134,17 +174,29 @@ func (d *Deduplicator) flush(pushFunc PushFunc) {
ctx.samples = samples
putDeduplicatorFlushCtx(ctx)
}, true)
duration := time.Since(startTime)
d.dedupFlushDuration.Update(duration.Seconds())
if duration > dedupInterval {
d.dedupFlushTimeouts.Inc()
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", dedupInterval, duration.Seconds())
}
}
type deduplicatorPushCtx struct {
pss []pushSample
buf []byte
pss []pushSample
labels promutils.Labels
buf []byte
}
func (ctx *deduplicatorPushCtx) reset() {
clear(ctx.pss)
ctx.pss = ctx.pss[:0]
ctx.labels.Reset()
ctx.buf = ctx.buf[:0]
}

View file

@ -29,18 +29,18 @@ foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="as
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
`)
d := NewDeduplicator(pushFunc, time.Hour)
d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"})
for i := 0; i < 10; i++ {
d.Push(tss)
}
d.flush(pushFunc)
d.flush(pushFunc, time.Hour)
d.MustStop()
result := timeSeriessToString(tssResult)
resultExpected := `asfjkldsf{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 12322
bar{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 34.54
baz_aaa_aaa_fdd{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} -2.3
foo{container="ohohffd",instance="x",job="aaa",namespace="asdff",node="aosijjewrerfd",pod="sdfd-dfdfdfs"} 894
resultExpected := `asfjkldsf{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 12322
bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54
baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3
foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894
x 433
`
if result != resultExpected {

View file

@ -9,7 +9,7 @@ import (
func BenchmarkDeduplicatorPush(b *testing.B) {
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
d := NewDeduplicator(pushFunc, time.Hour)
d := NewDeduplicator(pushFunc, time.Hour, nil)
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries)))

View file

@ -78,6 +78,9 @@ type Options struct {
// The deduplication can be set up individually per each aggregation via dedup_interval option.
DedupInterval time.Duration
// DropInputLabels is an optional list of labels to drop from samples before de-duplication and stream aggregation.
DropInputLabels []string
// NoAlignFlushToInterval disables alignment of flushes to the aggregation interval.
//
// By default flushes are aligned to aggregation interval.
@ -177,6 +180,11 @@ type Config struct {
// individually per each input time series.
Without []string `yaml:"without,omitempty"`
// DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples.
//
// Labels are dropped before de-duplication and aggregation.
DropInputLabels *[]string `yaml:"drop_input_labels,omitempty"`
// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
// before aggregation.
InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"`
@ -314,6 +322,8 @@ func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []b
type aggregator struct {
match *promrelabel.IfExpression
dropInputLabels []string
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
@ -407,6 +417,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
}
}
// Check cfg.DropInputLabels
dropInputLabels := opts.DropInputLabels
if v := cfg.DropInputLabels; v != nil {
dropInputLabels = *v
}
// initialize input_relabel_configs and output_relabel_configs
inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs)
if err != nil {
@ -524,6 +540,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
a := &aggregator{
match: cfg.Match,
dropInputLabels: dropInputLabels,
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
@ -719,18 +736,23 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
defer putPushCtx(ctx)
samples := ctx.samples
buf := ctx.buf
labels := &ctx.labels
inputLabels := &ctx.inputLabels
outputLabels := &ctx.outputLabels
buf := ctx.buf
dropLabels := a.dropInputLabels
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
matchIdxs[idx] = 1
labels.Labels = append(labels.Labels[:0], ts.Labels...)
if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
} else {
labels.Labels = append(labels.Labels[:0], ts.Labels...)
}
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling