lib/streamaggr: add ability to de-duplicate input samples before aggregation

This commit is contained in:
Aliaksandr Valialkin 2023-01-25 09:14:49 -08:00
parent 29fd95d426
commit 5defa99a2e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
6 changed files with 191 additions and 18 deletions

View file

@ -62,10 +62,12 @@ var (
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
"See also -remoteWrite.streamAggr.keepInput")
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
)
var (
@ -509,7 +511,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
// Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
if sasFile != "" {
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal)
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0)
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err)
}

View file

@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): add the ability to [de-duplicate](https://docs.victoriametrics.com/#deduplication) input samples before aggregation via `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line options.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add dark mode - it can be seleted via `settings` menu in the top right corner. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3704).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696).

View file

@ -12,7 +12,7 @@ and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics
The stream aggregation is configured via the following command-line flags:
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each specified `-remoteWrite.url`.
This flag can be specified individually per each `-remoteWrite.url`.
This allows writing different aggregates to different remote storage destinations.
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
@ -22,13 +22,23 @@ By default only the aggregated data is written to the storage. If the original i
then the following command-line flags must be specified:
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each specified `-remoteWrite.url`.
This flag can be specified individually per each `-remoteWrite.url`.
This allows writing both raw and aggregate data to different remote storage destinations.
- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
It expects that the ingested samples have timestamps close to the current time.
By default all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation.
For example, if the samples are received from replicated sources.
The following command-line flag can be used for enabling the [de-duplication](https://docs.victoriametrics.com/#deduplication)
before aggregation in this case:
- `-remoteWrite.streamAggr.dedupInterval` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each `-remoteWrite.url`.
This allows setting different de-duplication intervals per each configured remote storage.
- `-streamAggr.dedupInterval` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
## Use cases
Stream aggregation can be used in the following cases:

View file

@ -38,13 +38,16 @@ var supportedOutputs = []string{
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) {
func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
}
as, err := NewAggregatorsFromData(data, pushFunc)
as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
}
@ -53,13 +56,16 @@ func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) {
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) {
func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, err
}
return NewAggregators(cfgs, pushFunc)
return NewAggregators(cfgs, pushFunc, dedupInterval)
}
// Config is a configuration for a single stream aggregation.
@ -130,14 +136,17 @@ type Aggregators struct {
//
// pushFunc is called when the aggregated data must be flushed.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) {
func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
if len(cfgs) == 0 {
return nil, nil
}
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc)
a, err := newAggregator(cfg, pushFunc, dedupInterval)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
@ -179,6 +188,10 @@ type aggregator struct {
without []string
aggregateOnlyByTime bool
// dedupAggr is set to non-nil if input samples must be de-duplicated according
// to the dedupInterval passed to newAggregator().
dedupAggr *lastAggrState
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
@ -205,8 +218,11 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
//
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) {
// check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
@ -309,6 +325,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
}
suffix += "_"
var dedupAggr *lastAggrState
if dedupInterval > 0 {
dedupAggr = newLastAggrState()
}
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
@ -320,6 +341,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
dedupAggr: dedupAggr,
aggrStates: aggrStates,
pushFunc: pushFunc,
@ -328,15 +350,41 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
stopCh: make(chan struct{}),
}
if dedupAggr != nil {
a.wg.Add(1)
go func() {
a.runDedupFlusher(dedupInterval)
a.wg.Done()
}()
}
a.wg.Add(1)
go func() {
a.runFlusher(interval)
defer a.wg.Done()
a.wg.Done()
}()
return a, nil
}
func (a *aggregator) runDedupFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.dedupFlush()
<-flushConcurrencyCh
}
}
func (a *aggregator) runFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
@ -358,6 +406,15 @@ func (a *aggregator) runFlusher(interval time.Duration) {
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func (a *aggregator) dedupFlush() {
ctx := &flushCtx{
skipAggrSuffix: true,
}
a.dedupAggr.appendSeriesForFlush(ctx)
logger.Errorf("series after dedup: %v", ctx.tss)
a.push(ctx.tss)
}
func (a *aggregator) flush() {
ctx := &flushCtx{
suffix: a.suffix,
@ -395,8 +452,29 @@ func (a *aggregator) MustStop() {
a.wg.Wait()
}
// Push pushes series to a.
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
if a.dedupAggr == nil {
a.push(tss)
return
}
// deduplication is enabled.
// push samples to dedupAggr, so later they will be pushed to the configured aggregators.
pushSample := a.dedupAggr.pushSample
inputKey := ""
bb := bbPool.Get()
for _, ts := range tss {
bb.B = marshalLabelsFast(bb.B[:0], ts.Labels)
outputKey := bytesutil.InternBytes(bb.B)
for _, sample := range ts.Samples {
pushSample(inputKey, outputKey, sample.Value)
}
}
bbPool.Put(bb)
}
func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
@ -545,7 +623,8 @@ func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal
}
type flushCtx struct {
suffix string
skipAggrSuffix bool
suffix string
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
@ -567,7 +646,9 @@ func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int6
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
if !ctx.skipAggrSuffix {
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
}
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,

View file

@ -6,6 +6,7 @@ import (
"strings"
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
@ -18,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) {
pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called"))
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -136,7 +137,7 @@ func TestAggregatorsSuccess(t *testing.T) {
}
tssOutputLock.Unlock()
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
@ -641,6 +642,83 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`)
}
func TestAggregatorsWithDedupInterval(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper()
// Initialize Aggregators
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutputLock.Unlock()
}
const dedupInterval = time.Hour
a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput)
if a != nil {
for _, aggr := range a.as {
aggr.dedupFlush()
aggr.flush()
}
}
a.MustStop()
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
}
f(`
- interval: 1m
outputs: [sum_samples]
`, `
foo 123
bar 567
`, `bar:1m_sum_samples 567
foo:1m_sum_samples 123
`)
f(`
- interval: 1m
outputs: [sum_samples]
`, `
foo 123
bar{baz="qwe"} 1.32
bar{baz="qwe"} 4.34
bar{baz="qwe"} 2
foo{baz="qwe"} -5
bar{baz="qwer"} 343
bar{baz="qwer"} 344
foo{baz="qwe"} 10
`, `bar:1m_sum_samples{baz="qwe"} 2
bar:1m_sum_samples{baz="qwer"} 344
foo:1m_sum_samples 123
foo:1m_sum_samples{baz="qwe"} 10
`)
}
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
if len(ts.Samples) != 1 {

View file

@ -40,7 +40,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(tss []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("unexpected pushFunc call"))
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
b.Fatalf("unexpected error when initializing aggregators: %s", err)
}