lib/streamaggr: allow setting keep_input for each aggregator separately

This commit is contained in:
AndrewChubatiuk 2024-07-18 18:50:23 +03:00
parent efd70b2c52
commit c71a9478f0
No known key found for this signature in database
GPG key ID: 96D776CC99880667
9 changed files with 184 additions and 151 deletions

View file

@ -488,12 +488,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock)
if sas.IsEnabled() {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tssBlock, matchIdxs.B)
if !*streamAggrGlobalKeepInput {
tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput)
}
matchIdxsPool.Put(matchIdxs)
tssBlock = sas.Push(tssBlock)
} else if deduplicatorGlobal != nil {
deduplicatorGlobal.Push(tssBlock)
tssBlock = tssBlock[:0]
@ -764,9 +759,6 @@ type remoteWriteCtx struct {
sas atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
streamAggrKeepInput bool
streamAggrDropInput bool
pss []*pendingSeries
pssNextIdx atomic.Uint64
@ -910,18 +902,13 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa
// Apply stream aggregation or deduplication if they are configured
sas := rwctx.sas.Load()
if sas.IsEnabled() {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tss, matchIdxs.B)
if !rwctx.streamAggrKeepInput {
if rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
tss = append(*v, tss...)
}
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
if sas.ExpectModifications() && rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
tss = append(*v, tss...)
}
matchIdxsPool.Put(matchIdxs)
tss = sas.Push(tss)
} else if rwctx.deduplicator != nil {
rwctx.deduplicator.Push(tss)
return true
@ -942,23 +929,6 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa
return false
}
var matchIdxsPool bytesutil.ByteBufferPool
func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, dropInput bool) []prompbmarshal.TimeSeries {
dst := src[:0]
if !dropInput {
for i, match := range matchIdxs {
if match == 1 {
continue
}
dst = append(dst, src[i])
}
}
tail := src[len(dst):]
clear(tail)
return dst
}
func (rwctx *remoteWriteCtx) pushInternalTrackDropped(tss []prompbmarshal.TimeSeries) {
if rwctx.tryPushInternal(tss) {
return

View file

@ -70,8 +70,6 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
pss[0] = newPendingSeries(nil, true, 0, 100)
rwctx := &remoteWriteCtx{
idx: 0,
streamAggrKeepInput: keepInput,
streamAggrDropInput: dropInput,
pss: pss,
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(`foo`),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`),

View file

@ -50,8 +50,7 @@ var (
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+
"aggregation interval for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start "+
"for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. Increase this value if "+
streamAggrIgnoreFirstIntervals = flagutil.NewArrayInt("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start "+"for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. Increase this value if "+
"you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving bufferred delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("remoteWrite.streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
@ -148,8 +147,6 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
if sas != nil {
filePath := sas.FilePath()
rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(idx)
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
} else {
@ -202,6 +199,8 @@ func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) {
DropInputLabels: *streamAggrGlobalDropInputLabels,
IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals,
KeepInput: *streamAggrGlobalKeepInput,
DropInput: *streamAggrGlobalDropInput,
}
sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global")
@ -229,7 +228,9 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
IgnoreFirstIntervals: streamAggrIgnoreFirstIntervals.GetOptionalArg(idx),
KeepInput: streamAggrKeepInput.GetOptionalArg(idx),
DropInput: streamAggrDropInput.GetOptionalArg(idx),
}
sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias)

View file

@ -141,13 +141,7 @@ func (ctx *InsertCtx) ApplyRelabeling() {
func (ctx *InsertCtx) FlushBufs() error {
sas := sasGlobal.Load()
if (sas.IsEnabled() || deduplicator != nil) && !ctx.skipStreamAggr {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = ctx.streamAggrCtx.push(ctx.mrs, matchIdxs.B)
if !*streamAggrKeepInput {
// Remove aggregated rows from ctx.mrs
ctx.dropAggregatedRows(matchIdxs.B)
}
matchIdxsPool.Put(matchIdxs)
ctx.streamAggrCtx.push(ctx)
}
// There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here,
// since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter
@ -166,13 +160,11 @@ func (ctx *InsertCtx) FlushBufs() error {
func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) {
dst := ctx.mrs[:0]
src := ctx.mrs
if !*streamAggrDropInput {
for idx, match := range matchIdxs {
if match == 1 {
continue
}
dst = append(dst, src[idx])
for i, match := range matchIdxs {
if match == 1 {
continue
}
dst = append(dst, src[i])
}
tail := src[len(dst):]
for i := range tail {
@ -180,5 +172,3 @@ func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) {
}
ctx.mrs = dst
}
var matchIdxsPool bytesutil.ByteBufferPool

View file

@ -62,6 +62,8 @@ func CheckStreamAggrConfig() error {
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
KeepInput: *streamAggrKeepInput,
DropInput: *streamAggrDropInput,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts, "global")
if err != nil {
@ -90,6 +92,8 @@ func InitStreamAggr() {
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
KeepInput: *streamAggrKeepInput,
DropInput: *streamAggrDropInput,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global")
if err != nil {
@ -124,6 +128,8 @@ func reloadStreamAggrConfig() {
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
KeepInput: *streamAggrKeepInput,
DropInput: *streamAggrDropInput,
}
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global")
if err != nil {
@ -180,12 +186,13 @@ func (ctx *streamAggrCtx) Reset() {
ctx.buf = ctx.buf[:0]
}
func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte {
func (ctx *streamAggrCtx) push(insertCtx *InsertCtx) {
mn := &ctx.mn
tss := ctx.tss
labels := ctx.labels
samples := ctx.samples
buf := ctx.buf
mrs := insertCtx.mrs
tssLen := len(tss)
for _, mr := range mrs {
@ -237,18 +244,16 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte
sas := sasGlobal.Load()
if sas.IsEnabled() {
matchIdxs = sas.Push(tss, matchIdxs)
_ = sas.PushWithCallback(tss, func(matchIdxs []byte) {
insertCtx.dropAggregatedRows(matchIdxs)
})
} else if deduplicator != nil {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
for i := range matchIdxs {
matchIdxs[i] = 1
}
deduplicator.Push(tss)
mrs = mrs[:0]
}
ctx.Reset()
return matchIdxs
insertCtx.mrs = mrs
}
func pushAggregateSeries(tss []prompbmarshal.TimeSeries) {

View file

@ -44,7 +44,8 @@ This behaviour can be changed via the following command-line flags:
- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`.
`-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url` and `keep_input` parameter can be defined
for each aggregator separately.
If one of these flags is set, then all the input samples are written to the storage alongside the aggregated samples.
- `-streamAggr.dropInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)

View file

@ -135,6 +135,14 @@ type Options struct {
//
// This option can be overridden individually per each aggregation via ignore_first_intervals option.
IgnoreFirstIntervals int
// KeepInput defines whether to keep all the input samples after the aggregation.
// By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write.
KeepInput bool
// DropInput defines whether to drop all the input samples after the aggregation.
// By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write.
DropInput bool
}
// Config is a configuration for a single stream aggregation.
@ -237,6 +245,10 @@ type Config struct {
// OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
// KeepInput defines whether to keep all the input samples after the aggregation.
// By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write.
KeepInput *bool `yaml:"keep_input,omitempty"`
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
@ -252,6 +264,8 @@ type Aggregators struct {
// ms contains metrics associated with the Aggregators.
ms *metrics.Set
dropInput bool
}
// FilePath returns path to file with the configuration used for creating the given Aggregators.
@ -291,12 +305,16 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
}
metrics.RegisterSet(ms)
return &Aggregators{
a := &Aggregators{
as: as,
configData: configData,
filePath: filePath,
ms: ms,
}, nil
}
if opts != nil {
a.dropInput = opts.DropInput
}
return a, nil
}
// IsEnabled returns true if Aggregators has at least one configured aggregator
@ -325,6 +343,19 @@ func (a *Aggregators) MustStop() {
a.as = nil
}
// ExpectModifications returns true if Push modifies original timeseries
func (a *Aggregators) ExpectModifications() bool {
if a == nil {
return false
}
for _, aggr := range a.as {
if aggr.keepInput {
return true
}
}
return false
}
// Equal returns true if a and b are initialized from identical configs.
func (a *Aggregators) Equal(b *Aggregators) bool {
if a == nil || b == nil {
@ -333,28 +364,39 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
return string(a.configData) == string(b.configData)
}
// Push pushes tss to a.
// Push calls PushWithCallback with an empty default callback
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) []prompbmarshal.TimeSeries {
defaultCallback := func(_ []byte) {}
return a.PushWithCallback(tss, defaultCallback)
}
// PushWithCallback pushes tss to a.
//
// Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations.
// PushWithCallback calls callback with matchIdxs, where matchIdx[idx] is set to 1 if the corresponding tss[idx] was used in aggregations.
// Otherwise matchIdxs[idx] is set to 0.
//
// Push returns matchIdxs with len equal to len(tss).
// It re-uses the matchIdxs if it has enough capacity to hold len(tss) items.
// Otherwise it allocates new matchIdxs.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
for i := range matchIdxs {
matchIdxs[i] = 0
}
// Push returns modified timeseries.
func (a *Aggregators) PushWithCallback(tss []prompbmarshal.TimeSeries, callback func([]byte)) []prompbmarshal.TimeSeries {
if a == nil {
return matchIdxs
return tss
}
matchIdxs := matchIdxsPool.Get()
defer matchIdxsPool.Put(matchIdxs)
for _, aggr := range a.as {
aggr.Push(tss, matchIdxs)
matchIdxs.B = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs.B, len(tss))
for i := range matchIdxs.B {
matchIdxs.B[i] = 0
}
aggr.Push(tss, matchIdxs.B)
if !aggr.keepInput {
callback(matchIdxs.B)
tss = dropAggregatedSeries(tss, matchIdxs.B)
}
}
return matchIdxs
if a.dropInput {
tss = tss[:0]
}
return tss
}
// aggregator aggregates input series according to the config passed to NewAggregator
@ -368,6 +410,7 @@ type aggregator struct {
keepMetricNames bool
ignoreOldSamples bool
keepInput bool
by []string
without []string
@ -546,6 +589,12 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
metricLabels := fmt.Sprintf(`name=%q,path=%q,url=%q,position="%d"`, name, path, alias, aggrID)
// check cfg.KeepInput
keepInput := opts.KeepInput
if v := cfg.KeepInput; v != nil {
keepInput = *v
}
// initialize aggrOutputs
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
@ -585,6 +634,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
keepMetricNames: keepMetricNames,
ignoreOldSamples: ignoreOldSamples,
keepInput: keepInput,
by: by,
without: without,
@ -884,7 +934,7 @@ func (a *aggregator) MustStop() {
a.wg.Wait()
}
// Push pushes tss to a.
// push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
ctx := getPushCtx()
defer putPushCtx(ctx)
@ -1050,6 +1100,8 @@ func putPushCtx(ctx *pushCtx) {
pushCtxPool.Put(ctx)
}
var matchIdxsPool bytesutil.ByteBufferPool
var pushCtxPool sync.Pool
func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by, without []string) ([]prompbmarshal.Label, []prompbmarshal.Label) {
@ -1273,4 +1325,17 @@ func sortAndRemoveDuplicates(a []string) []string {
return dst
}
func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte) []prompbmarshal.TimeSeries {
dst := src[:0]
for i, match := range matchIdxs {
if match == 1 {
continue
}
dst = append(dst, src[i])
}
tail := src[len(dst):]
clear(tail)
return dst
}
var bbPool bytesutil.ByteBufferPool

View file

@ -3,7 +3,6 @@ package streamaggr
import (
"fmt"
"sort"
"strconv"
"strings"
"sync"
"testing"
@ -252,7 +251,7 @@ func TestAggregatorsEqual(t *testing.T) {
}
func TestAggregatorsSuccess(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
f := func(config, inputMetrics, outputMetricsExpected string, matchedIdxsExpected int) {
t.Helper()
// Initialize Aggregators
@ -275,16 +274,18 @@ func TestAggregatorsSuccess(t *testing.T) {
// Push the inputMetrics to Aggregators
offsetMsecs := time.Now().UnixMilli()
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
matchIdxs := a.Push(tssInput, nil)
var matchedIdxs int
_ = a.PushWithCallback(tssInput, func(idxs []byte) {
for _, idx := range idxs {
if idx == 1 {
matchedIdxs++
}
}
})
a.MustStop()
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
for _, v := range matchIdxs {
matchIdxsStr += strconv.Itoa(int(v))
}
if matchIdxsStr != matchIdxsStrExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
if matchedIdxs != matchedIdxsExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%d\nwant\n%d", matchedIdxs, matchedIdxsExpected)
}
// Verify the tssOutput contains the expected metrics
@ -295,9 +296,9 @@ func TestAggregatorsSuccess(t *testing.T) {
}
// Empty config
f(``, ``, ``, "")
f(``, `foo{bar="baz"} 1`, ``, "0")
f(``, "foo 1\nbaz 2", ``, "00")
f(``, ``, ``, 0)
f(``, `foo{bar="baz"} 1`, ``, 0)
f(``, "foo 1\nbaz 2", ``, 0)
// Empty by list - aggregate only by time
f(`
@ -321,7 +322,7 @@ foo:1m_last{abc="123"} 8.5
foo:1m_last{abc="456",de="fg"} 8
foo:1m_sum_samples{abc="123"} 12.5
foo:1m_sum_samples{abc="456",de="fg"} 8
`, "11111")
`, 5)
// Special case: __name__ in `by` list - this is the same as empty `by` list
f(`
@ -339,7 +340,7 @@ bar:1m_sum_samples 5
foo:1m_count_samples 3
foo:1m_count_series 2
foo:1m_sum_samples 20.5
`, "1111")
`, 4)
// Non-empty `by` list with non-existing labels
f(`
@ -357,7 +358,7 @@ bar:1m_by_bar_foo_sum_samples 5
foo:1m_by_bar_foo_count_samples 3
foo:1m_by_bar_foo_count_series 2
foo:1m_by_bar_foo_sum_samples 20.5
`, "1111")
`, 4)
// Non-empty `by` list with existing label
f(`
@ -378,7 +379,7 @@ foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`, "1111")
`, 4)
// Non-empty `by` list with duplicate existing label
f(`
@ -399,7 +400,7 @@ foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`, "1111")
`, 4)
// Non-empty `without` list with non-existing labels
f(`
@ -420,7 +421,7 @@ foo:1m_without_foo_count_series{abc="123"} 1
foo:1m_without_foo_count_series{abc="456",de="fg"} 1
foo:1m_without_foo_sum_samples{abc="123"} 12.5
foo:1m_without_foo_sum_samples{abc="456",de="fg"} 8
`, "1111")
`, 4)
// Non-empty `without` list with existing labels
f(`
@ -441,7 +442,7 @@ foo:1m_without_abc_count_series 1
foo:1m_without_abc_count_series{de="fg"} 1
foo:1m_without_abc_sum_samples 12.5
foo:1m_without_abc_sum_samples{de="fg"} 8
`, "1111")
`, 4)
// Special case: __name__ in `without` list
f(`
@ -462,7 +463,7 @@ foo{abc="456",de="fg"} 8
:1m_sum_samples 5
:1m_sum_samples{abc="123"} 12.5
:1m_sum_samples{abc="456",de="fg"} 8
`, "1111")
`, 4)
// drop some input metrics
f(`
@ -480,7 +481,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_without_abc_count_samples 1
bar:1m_without_abc_count_series 1
bar:1m_without_abc_sum_samples 5
`, "1111")
`, 4)
// rename output metrics
f(`
@ -507,7 +508,7 @@ bar-1m-without-abc-sum-samples 5
foo-1m-without-abc-count-samples 2
foo-1m-without-abc-count-series 1
foo-1m-without-abc-sum-samples 12.5
`, "1111")
`, 4)
// match doesn't match anything
f(`
@ -521,7 +522,7 @@ foo{abc="123"} 4
bar 5
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, ``, "0000")
`, ``, 0)
// match matches foo series with non-empty abc label
f(`
@ -543,7 +544,7 @@ foo:1m_by_abc_count_series{abc="123"} 1
foo:1m_by_abc_count_series{abc="456"} 1
foo:1m_by_abc_sum_samples{abc="123"} 12.5
foo:1m_by_abc_sum_samples{abc="456"} 8
`, "1011")
`, 3)
// total output for non-repeated series
f(`
@ -554,7 +555,7 @@ foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 0
foo:1m_total 0
`, "11")
`, 2)
// total_prometheus output for non-repeated series
f(`
@ -565,7 +566,7 @@ foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_total_prometheus{baz="qwe"} 0
foo:1m_total_prometheus 0
`, "11")
`, 2)
// total output for repeated series
f(`
@ -584,7 +585,7 @@ foo{baz="qwe"} 10
bar:1m_total{baz="qwer"} 1
foo:1m_total 0
foo:1m_total{baz="qwe"} 15
`, "11111111")
`, 8)
// total_prometheus output for repeated series
f(`
@ -603,7 +604,7 @@ foo{baz="qwe"} 10
bar:1m_total_prometheus{baz="qwer"} 1
foo:1m_total_prometheus 0
foo:1m_total_prometheus{baz="qwe"} 15
`, "11111111")
`, 8)
// total output for repeated series with group by __name__
f(`
@ -621,7 +622,7 @@ bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total 6.02
foo:1m_total 15
`, "11111111")
`, 8)
// total_prometheus output for repeated series with group by __name__
f(`
@ -639,7 +640,7 @@ bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_total_prometheus 6.02
foo:1m_total_prometheus 15
`, "11111111")
`, 8)
// increase output for non-repeated series
f(`
@ -650,7 +651,7 @@ foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 0
`, "11")
`, 2)
// increase_prometheus output for non-repeated series
f(`
@ -661,7 +662,7 @@ foo 123
bar{baz="qwe"} 4.34
`, `bar:1m_increase_prometheus{baz="qwe"} 0
foo:1m_increase_prometheus 0
`, "11")
`, 2)
// increase output for repeated series
f(`
@ -680,7 +681,7 @@ foo{baz="qwe"} 10
bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15
`, "11111111")
`, 8)
// increase_prometheus output for repeated series
f(`
@ -699,12 +700,13 @@ foo{baz="qwe"} 10
bar:1m_increase_prometheus{baz="qwer"} 1
foo:1m_increase_prometheus 0
foo:1m_increase_prometheus{baz="qwe"} 15
`, "11111111")
`, 8)
// multiple aggregate configs
f(`
- interval: 1m
outputs: [count_series, sum_samples]
keep_input: true
- interval: 5m
by: [bar]
outputs: [sum_samples]
@ -718,7 +720,7 @@ foo:1m_sum_samples 4.3
foo:1m_sum_samples{bar="baz"} 2
foo:5m_by_bar_sum_samples 4.3
foo:5m_by_bar_sum_samples{bar="baz"} 2
`, "111")
`, 3)
// min and max outputs
f(`
@ -735,7 +737,7 @@ foo:1m_max{abc="123"} 8.5
foo:1m_max{abc="456",de="fg"} 8
foo:1m_min{abc="123"} 4
foo:1m_min{abc="456",de="fg"} 8
`, "1111")
`, 4)
// avg output
f(`
@ -749,7 +751,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_avg 5
foo:1m_avg{abc="123"} 6.25
foo:1m_avg{abc="456",de="fg"} 8
`, "1111")
`, 4)
// stddev output
f(`
@ -763,7 +765,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_stddev 0
foo:1m_stddev{abc="123"} 2.25
foo:1m_stddev{abc="456",de="fg"} 0
`, "1111")
`, 4)
// stdvar output
f(`
@ -777,7 +779,7 @@ foo{abc="456",de="fg"} 8
`, `bar:1m_stdvar 0
foo:1m_stdvar{abc="123"} 5.0625
foo:1m_stdvar{abc="456",de="fg"} 0
`, "1111")
`, 4)
// histogram_bucket output
f(`
@ -795,7 +797,7 @@ cpu_usage{cpu="2"} 90
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_histogram_bucket{cpu="1",vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_histogram_bucket{cpu="2",vmrange="8.799e+01...1.000e+02"} 1
`, "1111111")
`, 7)
// histogram_bucket output without cpu
f(`
@ -814,7 +816,7 @@ cpu_usage{cpu="2"} 90
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="1.292e+01...1.468e+01"} 3
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="2.448e+01...2.783e+01"} 1
cpu_usage:1m_without_cpu_histogram_bucket{vmrange="8.799e+01...1.000e+02"} 1
`, "1111111")
`, 7)
// quantiles output
f(`
@ -834,7 +836,7 @@ cpu_usage:1m_quantiles{cpu="1",quantile="1"} 25
cpu_usage:1m_quantiles{cpu="2",quantile="0"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="0.5"} 90
cpu_usage:1m_quantiles{cpu="2",quantile="1"} 90
`, "1111111")
`, 7)
// quantiles output without cpu
f(`
@ -852,7 +854,7 @@ cpu_usage{cpu="2"} 90
`, `cpu_usage:1m_without_cpu_quantiles{quantile="0"} 12
cpu_usage:1m_without_cpu_quantiles{quantile="0.5"} 13.3
cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`, "1111111")
`, 7)
// append additional label
f(`
@ -881,7 +883,7 @@ bar-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 5
foo-1m-without-abc-count-samples{new_label="must_keep_metric_name"} 2
foo-1m-without-abc-count-series{new_label="must_keep_metric_name"} 1
foo-1m-without-abc-sum-samples{new_label="must_keep_metric_name"} 12.5
`, "1111")
`, 4)
// test rate_sum and rate_avg
f(`
@ -896,7 +898,7 @@ foo{abc="456", cde="1"} 10 10
foo 12 34
`, `foo:1m_by_cde_rate_avg{cde="1"} 0.325
foo:1m_by_cde_rate_sum{cde="1"} 0.65
`, "11111")
`, 5)
// rate_sum and rate_avg with duplicated events
f(`
@ -905,7 +907,7 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.65
`, `
foo{abc="123", cde="1"} 4 10
foo{abc="123", cde="1"} 4 10
`, ``, "11")
`, ``, 2)
// rate_sum and rate_avg for a single sample
f(`
@ -914,7 +916,7 @@ foo{abc="123", cde="1"} 4 10
`, `
foo 4 10
bar 5 10
`, ``, "11")
`, ``, 2)
// unique_samples output
f(`
@ -927,7 +929,7 @@ foo 1 10
foo 2 20
foo 3 20
`, `foo:1m_unique_samples 3
`, "11111")
`, 5)
// keep_metric_names
f(`
@ -943,7 +945,7 @@ foo{abc="456",de="fg"} 8
`, `bar 2
foo{abc="123"} 2
foo{abc="456",de="fg"} 1
`, "11111")
`, 5)
// drop_input_labels
f(`
@ -960,11 +962,11 @@ foo{abc="456",de="fg"} 8
`, `bar 2
foo 2
foo{de="fg"} 1
`, "11111")
`, 5)
}
func TestAggregatorsWithDedupInterval(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected, matchIdxsStrExpected string) {
f := func(config, inputMetrics, outputMetricsExpected string, matchedIdxsExpected int) {
t.Helper()
// Initialize Aggregators
@ -994,16 +996,18 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
// Push the inputMetrics to Aggregators
offsetMsecs := time.Now().UnixMilli()
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
matchIdxs := a.Push(tssInput, nil)
var matchedIdxs int
_ = a.PushWithCallback(tssInput, func(idxs []byte) {
for _, idx := range idxs {
if idx == 1 {
matchedIdxs++
}
}
})
a.MustStop()
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
for _, v := range matchIdxs {
matchIdxsStr += strconv.Itoa(int(v))
}
if matchIdxsStr != matchIdxsStrExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%s\nwant\n%s", matchIdxsStr, matchIdxsStrExpected)
if matchedIdxs != matchedIdxsExpected {
t.Fatalf("unexpected matchIdxs;\ngot\n%d\nwant\n%d", matchedIdxs, matchedIdxsExpected)
}
// Verify the tssOutput contains the expected metrics
@ -1026,7 +1030,7 @@ foo 123
bar 567
`, `bar:1m_sum_samples 567
foo:1m_sum_samples 123
`, "11")
`, 2)
f(`
- interval: 1m
@ -1044,7 +1048,7 @@ foo{baz="qwe"} 10
bar:1m_sum_samples{baz="qwer"} 344
foo:1m_sum_samples 123
foo:1m_sum_samples{baz="qwe"} 10
`, "11111111")
`, 8)
}
func timeSeriessToString(tss []prompbmarshal.TimeSeries) string {

View file

@ -43,7 +43,7 @@ func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(benchOutputs, pushFunc)
defer a.MustStop()
_ = a.Push(benchSeries, nil)
benchSeries = a.Push(benchSeries)
b.ResetTimer()
b.ReportAllocs()
@ -66,10 +66,9 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []byte
for pb.Next() {
for i := 0; i < loops; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
benchSeries = a.Push(benchSeries)
}
}
})