From 5defa99a2e218672abb98e3612a439b1315a8eed Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 25 Jan 2023 09:14:49 -0800 Subject: [PATCH] lib/streamaggr: add ability to de-duplicate input samples before aggregation --- app/vmagent/remotewrite/remotewrite.go | 7 +- docs/CHANGELOG.md | 1 + docs/stream-aggregation.md | 14 ++- lib/streamaggr/streamaggr.go | 103 ++++++++++++++++++++--- lib/streamaggr/streamaggr_test.go | 82 +++++++++++++++++- lib/streamaggr/streamaggr_timing_test.go | 2 +- 6 files changed, 191 insertions(+), 18 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 407fd6d13..b92fc904c 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -62,10 +62,12 @@ var ( streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput") + "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ + "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) var ( @@ -509,7 +511,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI // Initialize sas sasFile := streamAggrConfig.GetOptionalArg(argIdx) if sasFile != "" { - sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal) + dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0) + sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) if err != nil { logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9fb15ec4a..a55b72280 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): add the ability to [de-duplicate](https://docs.victoriametrics.com/#deduplication) input samples before aggregation via `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line options. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add dark mode - it can be seleted via `settings` menu in the top right corner. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3704). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696). diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 11a0327ab..863010e2b 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -12,7 +12,7 @@ and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics The stream aggregation is configured via the following command-line flags: - `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html). - This flag can be specified individually per each specified `-remoteWrite.url`. + This flag can be specified individually per each `-remoteWrite.url`. This allows writing different aggregates to different remote storage destinations. - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). @@ -22,13 +22,23 @@ By default only the aggregated data is written to the storage. If the original i then the following command-line flags must be specified: - `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html). - This flag can be specified individually per each specified `-remoteWrite.url`. + This flag can be specified individually per each `-remoteWrite.url`. This allows writing both raw and aggregate data to different remote storage destinations. - `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). It expects that the ingested samples have timestamps close to the current time. +By default all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation. +For example, if the samples are received from replicated sources. +The following command-line flag can be used for enabling the [de-duplication](https://docs.victoriametrics.com/#deduplication) +before aggregation in this case: + +- `-remoteWrite.streamAggr.dedupInterval` at [vmagent](https://docs.victoriametrics.com/vmagent.html). + This flag can be specified individually per each `-remoteWrite.url`. + This allows setting different de-duplication intervals per each configured remote storage. +- `-streamAggr.dedupInterval` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). + ## Use cases Stream aggregation can be used in the following cases: diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 5fcf4be7e..a31dd0e16 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -38,13 +38,16 @@ var supportedOutputs = []string{ // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) { +func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { data, err := fs.ReadFileOrHTTP(path) if err != nil { return nil, fmt.Errorf("cannot load aggregators: %w", err) } - as, err := NewAggregatorsFromData(data, pushFunc) + as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) if err != nil { return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } @@ -53,13 +56,16 @@ func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) { // NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // The returned Aggregators must be stopped with MustStop() when no longer needed. -func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) { +func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { return nil, err } - return NewAggregators(cfgs, pushFunc) + return NewAggregators(cfgs, pushFunc, dedupInterval) } // Config is a configuration for a single stream aggregation. @@ -130,14 +136,17 @@ type Aggregators struct { // // pushFunc is called when the aggregated data must be flushed. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // MustStop must be called on the returned Aggregators when they are no longer needed. -func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) { +func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { if len(cfgs) == 0 { return nil, nil } as := make([]*aggregator, len(cfgs)) for i, cfg := range cfgs { - a, err := newAggregator(cfg, pushFunc) + a, err := newAggregator(cfg, pushFunc, dedupInterval) if err != nil { return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) } @@ -179,6 +188,10 @@ type aggregator struct { without []string aggregateOnlyByTime bool + // dedupAggr is set to non-nil if input samples must be de-duplicated according + // to the dedupInterval passed to newAggregator(). + dedupAggr *lastAggrState + // aggrStates contains aggregate states for the given outputs aggrStates []aggrState @@ -205,8 +218,11 @@ type PushFunc func(tss []prompbmarshal.TimeSeries) // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // The returned aggregator must be stopped when no longer needed by calling MustStop(). -func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { +func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) { // check cfg.Interval interval, err := time.ParseDuration(cfg.Interval) if err != nil { @@ -309,6 +325,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { } suffix += "_" + var dedupAggr *lastAggrState + if dedupInterval > 0 { + dedupAggr = newLastAggrState() + } + // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -320,6 +341,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { without: without, aggregateOnlyByTime: aggregateOnlyByTime, + dedupAggr: dedupAggr, aggrStates: aggrStates, pushFunc: pushFunc, @@ -328,15 +350,41 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { stopCh: make(chan struct{}), } + if dedupAggr != nil { + a.wg.Add(1) + go func() { + a.runDedupFlusher(dedupInterval) + a.wg.Done() + }() + } a.wg.Add(1) go func() { a.runFlusher(interval) - defer a.wg.Done() + a.wg.Done() }() return a, nil } +func (a *aggregator) runDedupFlusher(interval time.Duration) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-a.stopCh: + return + case <-t.C: + } + + // Globally limit the concurrency for metrics' flush + // in order to limit memory usage when big number of aggregators + // are flushed at the same time. + flushConcurrencyCh <- struct{}{} + a.dedupFlush() + <-flushConcurrencyCh + } +} + func (a *aggregator) runFlusher(interval time.Duration) { t := time.NewTicker(interval) defer t.Stop() @@ -358,6 +406,15 @@ func (a *aggregator) runFlusher(interval time.Duration) { var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) +func (a *aggregator) dedupFlush() { + ctx := &flushCtx{ + skipAggrSuffix: true, + } + a.dedupAggr.appendSeriesForFlush(ctx) + logger.Errorf("series after dedup: %v", ctx.tss) + a.push(ctx.tss) +} + func (a *aggregator) flush() { ctx := &flushCtx{ suffix: a.suffix, @@ -395,8 +452,29 @@ func (a *aggregator) MustStop() { a.wg.Wait() } -// Push pushes series to a. +// Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { + if a.dedupAggr == nil { + a.push(tss) + return + } + + // deduplication is enabled. + // push samples to dedupAggr, so later they will be pushed to the configured aggregators. + pushSample := a.dedupAggr.pushSample + inputKey := "" + bb := bbPool.Get() + for _, ts := range tss { + bb.B = marshalLabelsFast(bb.B[:0], ts.Labels) + outputKey := bytesutil.InternBytes(bb.B) + for _, sample := range ts.Samples { + pushSample(inputKey, outputKey, sample.Value) + } + } + bbPool.Put(bb) +} + +func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { labels := promutils.GetLabels() tmpLabels := promutils.GetLabels() bb := bbPool.Get() @@ -545,7 +623,8 @@ func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal } type flushCtx struct { - suffix string + skipAggrSuffix bool + suffix string tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label @@ -567,7 +646,9 @@ func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int6 if err != nil { logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err) } - ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) + if !ctx.skipAggrSuffix { + ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) + } ctx.samples = append(ctx.samples, prompbmarshal.Sample{ Timestamp: timestamp, Value: value, diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 672032e45..a3c002c8d 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -6,6 +6,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" @@ -18,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(tss []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := NewAggregatorsFromData([]byte(config), pushFunc) + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err == nil { t.Fatalf("expecting non-nil error") } @@ -136,7 +137,7 @@ func TestAggregatorsSuccess(t *testing.T) { } tssOutputLock.Unlock() } - a, err := NewAggregatorsFromData([]byte(config), pushFunc) + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -641,6 +642,83 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 `) } +func TestAggregatorsWithDedupInterval(t *testing.T) { + f := func(config, inputMetrics, outputMetricsExpected string) { + t.Helper() + + // Initialize Aggregators + var tssOutput []prompbmarshal.TimeSeries + var tssOutputLock sync.Mutex + pushFunc := func(tss []prompbmarshal.TimeSeries) { + tssOutputLock.Lock() + for _, ts := range tss { + labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) + samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) + tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ + Labels: labelsCopy, + Samples: samplesCopy, + }) + } + tssOutputLock.Unlock() + } + const dedupInterval = time.Hour + a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + tssInput := mustParsePromMetrics(inputMetrics) + a.Push(tssInput) + if a != nil { + for _, aggr := range a.as { + aggr.dedupFlush() + aggr.flush() + } + } + a.MustStop() + + // Verify the tssOutput contains the expected metrics + tsStrings := make([]string, len(tssOutput)) + for i, ts := range tssOutput { + tsStrings[i] = timeSeriesToString(ts) + } + sort.Strings(tsStrings) + outputMetrics := strings.Join(tsStrings, "") + if outputMetrics != outputMetricsExpected { + t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) + } + } + + f(` +- interval: 1m + outputs: [sum_samples] +`, ` +foo 123 +bar 567 +`, `bar:1m_sum_samples 567 +foo:1m_sum_samples 123 +`) + + f(` +- interval: 1m + outputs: [sum_samples] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 +`, `bar:1m_sum_samples{baz="qwe"} 2 +bar:1m_sum_samples{baz="qwer"} 344 +foo:1m_sum_samples 123 +foo:1m_sum_samples{baz="qwe"} 10 +`) +} + func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 { diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 6ff151241..f45dd0b40 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -40,7 +40,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(tss []prompbmarshal.TimeSeries) { panic(fmt.Errorf("unexpected pushFunc call")) } - a, err := NewAggregatorsFromData([]byte(config), pushFunc) + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { b.Fatalf("unexpected error when initializing aggregators: %s", err) }