From c71a9478f02320740158e359443ef7b0873af952 Mon Sep 17 00:00:00 2001 From: AndrewChubatiuk Date: Thu, 18 Jul 2024 18:50:23 +0300 Subject: [PATCH] lib/streamaggr: allow setting keep_input for each aggregator separately --- app/vmagent/remotewrite/remotewrite.go | 44 ++----- app/vmagent/remotewrite/remotewrite_test.go | 2 - app/vmagent/remotewrite/streamaggr.go | 11 +- app/vminsert/common/insert_ctx.go | 20 +-- app/vminsert/common/streamaggr.go | 21 ++-- docs/stream-aggregation.md | 3 +- lib/streamaggr/streamaggr.go | 101 ++++++++++++--- lib/streamaggr/streamaggr_test.go | 128 ++++++++++---------- lib/streamaggr/streamaggr_timing_test.go | 5 +- 9 files changed, 184 insertions(+), 151 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index bc6159b43..2629f0338 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -488,12 +488,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF sortLabelsIfNeeded(tssBlock) tssBlock = limitSeriesCardinality(tssBlock) if sas.IsEnabled() { - matchIdxs := matchIdxsPool.Get() - matchIdxs.B = sas.Push(tssBlock, matchIdxs.B) - if !*streamAggrGlobalKeepInput { - tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput) - } - matchIdxsPool.Put(matchIdxs) + tssBlock = sas.Push(tssBlock) } else if deduplicatorGlobal != nil { deduplicatorGlobal.Push(tssBlock) tssBlock = tssBlock[:0] @@ -764,9 +759,6 @@ type remoteWriteCtx struct { sas atomic.Pointer[streamaggr.Aggregators] deduplicator *streamaggr.Deduplicator - streamAggrKeepInput bool - streamAggrDropInput bool - pss []*pendingSeries pssNextIdx atomic.Uint64 @@ -910,18 +902,13 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa // Apply stream aggregation or deduplication if they are configured sas := rwctx.sas.Load() if sas.IsEnabled() { - matchIdxs := matchIdxsPool.Get() - matchIdxs.B = sas.Push(tss, matchIdxs.B) - if !rwctx.streamAggrKeepInput { - if rctx == nil { - rctx = getRelabelCtx() - // Make a copy of tss before dropping aggregated series - v = tssPool.Get().(*[]prompbmarshal.TimeSeries) - tss = append(*v, tss...) - } - tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput) + if sas.ExpectModifications() && rctx == nil { + rctx = getRelabelCtx() + // Make a copy of tss before dropping aggregated series + v = tssPool.Get().(*[]prompbmarshal.TimeSeries) + tss = append(*v, tss...) } - matchIdxsPool.Put(matchIdxs) + tss = sas.Push(tss) } else if rwctx.deduplicator != nil { rwctx.deduplicator.Push(tss) return true @@ -942,23 +929,6 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa return false } -var matchIdxsPool bytesutil.ByteBufferPool - -func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, dropInput bool) []prompbmarshal.TimeSeries { - dst := src[:0] - if !dropInput { - for i, match := range matchIdxs { - if match == 1 { - continue - } - dst = append(dst, src[i]) - } - } - tail := src[len(dst):] - clear(tail) - return dst -} - func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSeries) { if rwctx.tryPushInternal(tss) { return diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 01a9fc1f2..8bece2c50 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -70,8 +70,6 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { pss[0] = newPendingSeries(nil, true, 0, 100) rwctx := &remoteWriteCtx{ idx: 0, - streamAggrKeepInput: keepInput, - streamAggrDropInput: dropInput, pss: pss, rowsPushedAfterRelabel: metrics.GetOrCreateCounter(`foo`), rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`), diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index 0ebc2e624..cf2ff40c2 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -50,8 +50,7 @@ var ( streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+ "aggregation interval for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") - streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start "+ - "for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. Increase this value if "+ + streamAggrIgnoreFirstIntervals = flagutil.NewArrayInt("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start "+"for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. Increase this value if "+ "you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving bufferred delayed data from clients pushing data into the vmagent. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") streamAggrDropInputLabels = flagutil.NewArrayString("remoteWrite.streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ @@ -148,8 +147,6 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() { if sas != nil { filePath := sas.FilePath() rwctx.sas.Store(sas) - rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(idx) - rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) } else { @@ -202,6 +199,8 @@ func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) { DropInputLabels: *streamAggrGlobalDropInputLabels, IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, + KeepInput: *streamAggrGlobalKeepInput, + DropInput: *streamAggrGlobalDropInput, } sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global") @@ -229,7 +228,9 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), - IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + IgnoreFirstIntervals: streamAggrIgnoreFirstIntervals.GetOptionalArg(idx), + KeepInput: streamAggrKeepInput.GetOptionalArg(idx), + DropInput: streamAggrDropInput.GetOptionalArg(idx), } sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias) diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index f7a06960b..07c46fac8 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -141,13 +141,7 @@ func (ctx *InsertCtx) ApplyRelabeling() { func (ctx *InsertCtx) FlushBufs() error { sas := sasGlobal.Load() if (sas.IsEnabled() || deduplicator != nil) && !ctx.skipStreamAggr { - matchIdxs := matchIdxsPool.Get() - matchIdxs.B = ctx.streamAggrCtx.push(ctx.mrs, matchIdxs.B) - if !*streamAggrKeepInput { - // Remove aggregated rows from ctx.mrs - ctx.dropAggregatedRows(matchIdxs.B) - } - matchIdxsPool.Put(matchIdxs) + ctx.streamAggrCtx.push(ctx) } // There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here, // since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter @@ -166,13 +160,11 @@ func (ctx *InsertCtx) FlushBufs() error { func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) { dst := ctx.mrs[:0] src := ctx.mrs - if !*streamAggrDropInput { - for idx, match := range matchIdxs { - if match == 1 { - continue - } - dst = append(dst, src[idx]) + for i, match := range matchIdxs { + if match == 1 { + continue } + dst = append(dst, src[i]) } tail := src[len(dst):] for i := range tail { @@ -180,5 +172,3 @@ func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) { } ctx.mrs = dst } - -var matchIdxsPool bytesutil.ByteBufferPool diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 3cc649c52..7475ec5f9 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -62,6 +62,8 @@ func CheckStreamAggrConfig() error { DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + KeepInput: *streamAggrKeepInput, + DropInput: *streamAggrDropInput, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts, "global") if err != nil { @@ -90,6 +92,8 @@ func InitStreamAggr() { DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + KeepInput: *streamAggrKeepInput, + DropInput: *streamAggrDropInput, } sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global") if err != nil { @@ -124,6 +128,8 @@ func reloadStreamAggrConfig() { DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + KeepInput: *streamAggrKeepInput, + DropInput: *streamAggrDropInput, } sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global") if err != nil { @@ -180,12 +186,13 @@ func (ctx *streamAggrCtx) Reset() { ctx.buf = ctx.buf[:0] } -func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte { +func (ctx *streamAggrCtx) push(insertCtx *InsertCtx) { mn := &ctx.mn tss := ctx.tss labels := ctx.labels samples := ctx.samples buf := ctx.buf + mrs := insertCtx.mrs tssLen := len(tss) for _, mr := range mrs { @@ -237,18 +244,16 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte sas := sasGlobal.Load() if sas.IsEnabled() { - matchIdxs = sas.Push(tss, matchIdxs) + _ = sas.PushWithCallback(tss, func(matchIdxs []byte) { + insertCtx.dropAggregatedRows(matchIdxs) + }) } else if deduplicator != nil { - matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) - for i := range matchIdxs { - matchIdxs[i] = 1 - } deduplicator.Push(tss) + mrs = mrs[:0] } ctx.Reset() - - return matchIdxs + insertCtx.mrs = mrs } func pushAggregateSeries(tss []prompbmarshal.TimeSeries) { diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index a3d3e1029..f363f0a40 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -44,7 +44,8 @@ This behaviour can be changed via the following command-line flags: - `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) - `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`. + `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url` and `keep_input` parameter can be defined + for each aggregator separately. If one of these flags is set, then all the input samples are written to the storage alongside the aggregated samples. - `-streamAggr.dropInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index c8571b9d7..01245c800 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -135,6 +135,14 @@ type Options struct { // // This option can be overridden individually per each aggregation via ignore_first_intervals option. IgnoreFirstIntervals int + + // KeepInput defines whether to keep all the input samples after the aggregation. + // By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write. + KeepInput bool + + // DropInput defines whether to drop all the input samples after the aggregation. + // By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write. + DropInput bool } // Config is a configuration for a single stream aggregation. @@ -237,6 +245,10 @@ type Config struct { // OutputRelabelConfigs is an optional relabeling rules, which are applied // on the aggregated output before being sent to remote storage. OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"` + + // KeepInput defines whether to keep all the input samples after the aggregation. + // By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write. + KeepInput *bool `yaml:"keep_input,omitempty"` } // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data. @@ -252,6 +264,8 @@ type Aggregators struct { // ms contains metrics associated with the Aggregators. ms *metrics.Set + + dropInput bool } // FilePath returns path to file with the configuration used for creating the given Aggregators. @@ -291,12 +305,16 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options } metrics.RegisterSet(ms) - return &Aggregators{ + a := &Aggregators{ as: as, configData: configData, filePath: filePath, ms: ms, - }, nil + } + if opts != nil { + a.dropInput = opts.DropInput + } + return a, nil } // IsEnabled returns true if Aggregators has at least one configured aggregator @@ -325,6 +343,19 @@ func (a *Aggregators) MustStop() { a.as = nil } +// ExpectModifications returns true if Push modifies original timeseries +func (a *Aggregators) ExpectModifications() bool { + if a == nil { + return false + } + for _, aggr := range a.as { + if aggr.keepInput { + return true + } + } + return false +} + // Equal returns true if a and b are initialized from identical configs. func (a *Aggregators) Equal(b *Aggregators) bool { if a == nil || b == nil { @@ -333,28 +364,39 @@ func (a *Aggregators) Equal(b *Aggregators) bool { return string(a.configData) == string(b.configData) } -// Push pushes tss to a. +// Push calls PushWithCallback with an empty default callback +func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries { + defaultCallback := func(_ []byte) {} + return a.PushWithCallback(tss, defaultCallback) +} + +// PushWithCallback pushes tss to a. // -// Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations. +// PushWithCallback calls callback with matchIdxs, where matchIdx[idx] is set to 1 if the corresponding tss[idx] was used in aggregations. // Otherwise matchIdxs[idx] is set to 0. // -// Push returns matchIdxs with len equal to len(tss). -// It re-uses the matchIdxs if it has enough capacity to hold len(tss) items. -// Otherwise it allocates new matchIdxs. -func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte { - matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) - for i := range matchIdxs { - matchIdxs[i] = 0 - } +// Push returns modified timeseries. +func (a *Aggregators) PushWithCallback(tss []prompbmarshal.TimeSeries, callback func([]byte)) []prompbmarshal.TimeSeries { if a == nil { - return matchIdxs + return tss } - + matchIdxs := matchIdxsPool.Get() + defer matchIdxsPool.Put(matchIdxs) for _, aggr := range a.as { - aggr.Push(tss, matchIdxs) + matchIdxs.B = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs.B, len(tss)) + for i := range matchIdxs.B { + matchIdxs.B[i] = 0 + } + aggr.Push(tss, matchIdxs.B) + if !aggr.keepInput { + callback(matchIdxs.B) + tss = dropAggregatedSeries(tss, matchIdxs.B) + } } - - return matchIdxs + if a.dropInput { + tss = tss[:0] + } + return tss } // aggregator aggregates input series according to the config passed to NewAggregator @@ -368,6 +410,7 @@ type aggregator struct { keepMetricNames bool ignoreOldSamples bool + keepInput bool by []string without []string @@ -546,6 +589,12 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, } metricLabels := fmt.Sprintf(`name=%q,path=%q,url=%q,position="%d"`, name, path, alias, aggrID) + // check cfg.KeepInput + keepInput := opts.KeepInput + if v := cfg.KeepInput; v != nil { + keepInput = *v + } + // initialize aggrOutputs if len(cfg.Outputs) == 0 { return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ @@ -585,6 +634,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, keepMetricNames: keepMetricNames, ignoreOldSamples: ignoreOldSamples, + keepInput: keepInput, by: by, without: without, @@ -884,7 +934,7 @@ func (a *aggregator) MustStop() { a.wg.Wait() } -// Push pushes tss to a. +// push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { ctx := getPushCtx() defer putPushCtx(ctx) @@ -1050,6 +1100,8 @@ func putPushCtx(ctx *pushCtx) { pushCtxPool.Put(ctx) } +var matchIdxsPool bytesutil.ByteBufferPool + var pushCtxPool sync.Pool func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, without []string) ([]prompbmarshal.Label, []prompbmarshal.Label) { @@ -1273,4 +1325,17 @@ func sortAndRemoveDuplicates(a []string) []string { return dst } +func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte) []prompbmarshal.TimeSeries { + dst := src[:0] + for i, match := range matchIdxs { + if match == 1 { + continue + } + dst = append(dst, src[i]) + } + tail := src[len(dst):] + clear(tail) + return dst +} + var bbPool bytesutil.ByteBufferPool diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index ae64a6ac7..9e5215fbb 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -3,7 +3,6 @@ package streamaggr import ( "fmt" "sort" - "strconv" "strings" "sync" "testing" @@ -252,7 +251,7 @@ func TestAggregatorsEqual(t *testing.T) { } func TestAggregatorsSuccess(t *testing.T) { - f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) { + f := func(config, inputMetrics, outputMetricsExpected string, matchedIdxsExpected int) { t.Helper() // Initialize Aggregators @@ -275,16 +274,18 @@ func TestAggregatorsSuccess(t *testing.T) { // Push the inputMetrics to Aggregators offsetMsecs := time.Now().UnixMilli() tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs) - matchIdxs := a.Push(tssInput, nil) + var matchedIdxs int + _ = a.PushWithCallback(tssInput, func(idxs []byte) { + for _, idx := range idxs { + if idx == 1 { + matchedIdxs++ + } + } + }) a.MustStop() - // Verify matchIdxs equals to matchIdxsExpected - matchIdxsStr := "" - for _, v := range matchIdxs { - matchIdxsStr += strconv.Itoa(int(v)) - } - if matchIdxsStr != matchIdxsStrExpected { - t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected) + if matchedIdxs != matchedIdxsExpected { + t.Fatalf("unexpected matchIdxs;\ngot\n%d\nwant\n%d", matchedIdxs, matchedIdxsExpected) } // Verify the tssOutput contains the expected metrics @@ -295,9 +296,9 @@ func TestAggregatorsSuccess(t *testing.T) { } // Empty config - f(``, ``, ``, "") - f(``, `foo{bar="baz"} 1`, ``, "0") - f(``, "foo 1\nbaz 2", ``, "00") + f(``, ``, ``, 0) + f(``, `foo{bar="baz"} 1`, ``, 0) + f(``, "foo 1\nbaz 2", ``, 0) // Empty by list - aggregate only by time f(` @@ -321,7 +322,7 @@ foo:1m_last{abc="123"} 8.5 foo:1m_last{abc="456",de="fg"} 8 foo:1m_sum_samples{abc="123"} 12.5 foo:1m_sum_samples{abc="456",de="fg"} 8 -`, "11111") +`, 5) // Special case: __name__ in `by` list - this is the same as empty `by` list f(` @@ -339,7 +340,7 @@ bar:1m_sum_samples 5 foo:1m_count_samples 3 foo:1m_count_series 2 foo:1m_sum_samples 20.5 -`, "1111") +`, 4) // Non-empty `by` list with non-existing labels f(` @@ -357,7 +358,7 @@ bar:1m_by_bar_foo_sum_samples 5 foo:1m_by_bar_foo_count_samples 3 foo:1m_by_bar_foo_count_series 2 foo:1m_by_bar_foo_sum_samples 20.5 -`, "1111") +`, 4) // Non-empty `by` list with existing label f(` @@ -378,7 +379,7 @@ foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 -`, "1111") +`, 4) // Non-empty `by` list with duplicate existing label f(` @@ -399,7 +400,7 @@ foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 -`, "1111") +`, 4) // Non-empty `without` list with non-existing labels f(` @@ -420,7 +421,7 @@ foo:1m_without_foo_count_series{abc="123"} 1 foo:1m_without_foo_count_series{abc="456",de="fg"} 1 foo:1m_without_foo_sum_samples{abc="123"} 12.5 foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8 -`, "1111") +`, 4) // Non-empty `without` list with existing labels f(` @@ -441,7 +442,7 @@ foo:1m_without_abc_count_series 1 foo:1m_without_abc_count_series{de="fg"} 1 foo:1m_without_abc_sum_samples 12.5 foo:1m_without_abc_sum_samples{de="fg"} 8 -`, "1111") +`, 4) // Special case: __name__ in `without` list f(` @@ -462,7 +463,7 @@ foo{abc="456",de="fg"} 8 :1m_sum_samples 5 :1m_sum_samples{abc="123"} 12.5 :1m_sum_samples{abc="456",de="fg"} 8 -`, "1111") +`, 4) // drop some input metrics f(` @@ -480,7 +481,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_without_abc_count_samples 1 bar:1m_without_abc_count_series 1 bar:1m_without_abc_sum_samples 5 -`, "1111") +`, 4) // rename output metrics f(` @@ -507,7 +508,7 @@ bar-1m-without-abc-sum-samples 5 foo-1m-without-abc-count-samples 2 foo-1m-without-abc-count-series 1 foo-1m-without-abc-sum-samples 12.5 -`, "1111") +`, 4) // match doesn't match anything f(` @@ -521,7 +522,7 @@ foo{abc="123"} 4 bar 5 foo{abc="123"} 8.5 foo{abc="456",de="fg"} 8 -`, ``, "0000") +`, ``, 0) // match matches foo series with non-empty abc label f(` @@ -543,7 +544,7 @@ foo:1m_by_abc_count_series{abc="123"} 1 foo:1m_by_abc_count_series{abc="456"} 1 foo:1m_by_abc_sum_samples{abc="123"} 12.5 foo:1m_by_abc_sum_samples{abc="456"} 8 -`, "1011") +`, 3) // total output for non-repeated series f(` @@ -554,7 +555,7 @@ foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_total{baz="qwe"} 0 foo:1m_total 0 -`, "11") +`, 2) // total_prometheus output for non-repeated series f(` @@ -565,7 +566,7 @@ foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_total_prometheus{baz="qwe"} 0 foo:1m_total_prometheus 0 -`, "11") +`, 2) // total output for repeated series f(` @@ -584,7 +585,7 @@ foo{baz="qwe"} 10 bar:1m_total{baz="qwer"} 1 foo:1m_total 0 foo:1m_total{baz="qwe"} 15 -`, "11111111") +`, 8) // total_prometheus output for repeated series f(` @@ -603,7 +604,7 @@ foo{baz="qwe"} 10 bar:1m_total_prometheus{baz="qwer"} 1 foo:1m_total_prometheus 0 foo:1m_total_prometheus{baz="qwe"} 15 -`, "11111111") +`, 8) // total output for repeated series with group by __name__ f(` @@ -621,7 +622,7 @@ bar{baz="qwer"} 344 foo{baz="qwe"} 10 `, `bar:1m_total 6.02 foo:1m_total 15 -`, "11111111") +`, 8) // total_prometheus output for repeated series with group by __name__ f(` @@ -639,7 +640,7 @@ bar{baz="qwer"} 344 foo{baz="qwe"} 10 `, `bar:1m_total_prometheus 6.02 foo:1m_total_prometheus 15 -`, "11111111") +`, 8) // increase output for non-repeated series f(` @@ -650,7 +651,7 @@ foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_increase{baz="qwe"} 0 foo:1m_increase 0 -`, "11") +`, 2) // increase_prometheus output for non-repeated series f(` @@ -661,7 +662,7 @@ foo 123 bar{baz="qwe"} 4.34 `, `bar:1m_increase_prometheus{baz="qwe"} 0 foo:1m_increase_prometheus 0 -`, "11") +`, 2) // increase output for repeated series f(` @@ -680,7 +681,7 @@ foo{baz="qwe"} 10 bar:1m_increase{baz="qwer"} 1 foo:1m_increase 0 foo:1m_increase{baz="qwe"} 15 -`, "11111111") +`, 8) // increase_prometheus output for repeated series f(` @@ -699,12 +700,13 @@ foo{baz="qwe"} 10 bar:1m_increase_prometheus{baz="qwer"} 1 foo:1m_increase_prometheus 0 foo:1m_increase_prometheus{baz="qwe"} 15 -`, "11111111") +`, 8) // multiple aggregate configs f(` - interval: 1m outputs: [count_series, sum_samples] + keep_input: true - interval: 5m by: [bar] outputs: [sum_samples] @@ -718,7 +720,7 @@ foo:1m_sum_samples 4.3 foo:1m_sum_samples{bar="baz"} 2 foo:5m_by_bar_sum_samples 4.3 foo:5m_by_bar_sum_samples{bar="baz"} 2 -`, "111") +`, 3) // min and max outputs f(` @@ -735,7 +737,7 @@ foo:1m_max{abc="123"} 8.5 foo:1m_max{abc="456",de="fg"} 8 foo:1m_min{abc="123"} 4 foo:1m_min{abc="456",de="fg"} 8 -`, "1111") +`, 4) // avg output f(` @@ -749,7 +751,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_avg 5 foo:1m_avg{abc="123"} 6.25 foo:1m_avg{abc="456",de="fg"} 8 -`, "1111") +`, 4) // stddev output f(` @@ -763,7 +765,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_stddev 0 foo:1m_stddev{abc="123"} 2.25 foo:1m_stddev{abc="456",de="fg"} 0 -`, "1111") +`, 4) // stdvar output f(` @@ -777,7 +779,7 @@ foo{abc="456",de="fg"} 8 `, `bar:1m_stdvar 0 foo:1m_stdvar{abc="123"} 5.0625 foo:1m_stdvar{abc="456",de="fg"} 0 -`, "1111") +`, 4) // histogram_bucket output f(` @@ -795,7 +797,7 @@ cpu_usage{cpu="2"} 90 cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3 cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1 cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1 -`, "1111111") +`, 7) // histogram_bucket output without cpu f(` @@ -814,7 +816,7 @@ cpu_usage{cpu="2"} 90 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1 cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1 -`, "1111111") +`, 7) // quantiles output f(` @@ -834,7 +836,7 @@ cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25 cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90 cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90 cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90 -`, "1111111") +`, 7) // quantiles output without cpu f(` @@ -852,7 +854,7 @@ cpu_usage{cpu="2"} 90 `, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12 cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3 cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 -`, "1111111") +`, 7) // append additional label f(` @@ -881,7 +883,7 @@ bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5 foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2 foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1 foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5 -`, "1111") +`, 4) // test rate_sum and rate_avg f(` @@ -896,7 +898,7 @@ foo{abc="456", cde="1"} 10 10 foo 12 34 `, `foo:1m_by_cde_rate_avg{cde="1"} 0.325 foo:1m_by_cde_rate_sum{cde="1"} 0.65 -`, "11111") +`, 5) // rate_sum and rate_avg with duplicated events f(` @@ -905,7 +907,7 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.65 `, ` foo{abc="123", cde="1"} 4 10 foo{abc="123", cde="1"} 4 10 -`, ``, "11") +`, ``, 2) // rate_sum and rate_avg for a single sample f(` @@ -914,7 +916,7 @@ foo{abc="123", cde="1"} 4 10 `, ` foo 4 10 bar 5 10 -`, ``, "11") +`, ``, 2) // unique_samples output f(` @@ -927,7 +929,7 @@ foo 1 10 foo 2 20 foo 3 20 `, `foo:1m_unique_samples 3 -`, "11111") +`, 5) // keep_metric_names f(` @@ -943,7 +945,7 @@ foo{abc="456",de="fg"} 8 `, `bar 2 foo{abc="123"} 2 foo{abc="456",de="fg"} 1 -`, "11111") +`, 5) // drop_input_labels f(` @@ -960,11 +962,11 @@ foo{abc="456",de="fg"} 8 `, `bar 2 foo 2 foo{de="fg"} 1 -`, "11111") +`, 5) } func TestAggregatorsWithDedupInterval(t *testing.T) { - f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) { + f := func(config, inputMetrics, outputMetricsExpected string, matchedIdxsExpected int) { t.Helper() // Initialize Aggregators @@ -994,16 +996,18 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { // Push the inputMetrics to Aggregators offsetMsecs := time.Now().UnixMilli() tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs) - matchIdxs := a.Push(tssInput, nil) + var matchedIdxs int + _ = a.PushWithCallback(tssInput, func(idxs []byte) { + for _, idx := range idxs { + if idx == 1 { + matchedIdxs++ + } + } + }) a.MustStop() - // Verify matchIdxs equals to matchIdxsExpected - matchIdxsStr := "" - for _, v := range matchIdxs { - matchIdxsStr += strconv.Itoa(int(v)) - } - if matchIdxsStr != matchIdxsStrExpected { - t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected) + if matchedIdxs != matchedIdxsExpected { + t.Fatalf("unexpected matchIdxs;\ngot\n%d\nwant\n%d", matchedIdxs, matchedIdxsExpected) } // Verify the tssOutput contains the expected metrics @@ -1026,7 +1030,7 @@ foo 123 bar 567 `, `bar:1m_sum_samples 567 foo:1m_sum_samples 123 -`, "11") +`, 2) f(` - interval: 1m @@ -1044,7 +1048,7 @@ foo{baz="qwe"} 10 bar:1m_sum_samples{baz="qwer"} 344 foo:1m_sum_samples 123 foo:1m_sum_samples{baz="qwe"} 10 -`, "11111111") +`, 8) } func timeSeriessToString(tss []prompbmarshal.TimeSeries) string { diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index bb0d98afa..893cc8614 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -43,7 +43,7 @@ func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators(benchOutputs, pushFunc) defer a.MustStop() - _ = a.Push(benchSeries, nil) + benchSeries = a.Push(benchSeries) b.ResetTimer() b.ReportAllocs() @@ -66,10 +66,9 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { b.ReportAllocs() b.SetBytes(int64(len(benchSeries) * loops)) b.RunParallel(func(pb *testing.PB) { - var matchIdxs []byte for pb.Next() { for i := 0; i < loops; i++ { - matchIdxs = a.Push(benchSeries, matchIdxs) + benchSeries = a.Push(benchSeries) } } })