VictoriaMetrics/lib/streamaggr/deduplicator.go
Andrii Chubatiuk c8fc903669
lib/streamaggr: added aggregation windows (#6314)
### Describe Your Changes

By default, stream aggregation and deduplication stores a single state
per each aggregation output result.
The data for each aggregator is flushed independently once per
aggregation interval. But there's no guarantee that
incoming samples with timestamps close to the aggregation interval's end
will get into it. For example, when aggregating
with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59)
can fall into aggregation round `18:58:00` or `18:59:00`.
It depends on network lag, load, clock synchronization, etc. In most
scenarios it doesn't impact aggregation or
deduplication results, which are consistent within margin of error. But
for metrics represented as a collection of series,
like
[histograms](https://docs.victoriametrics.com/keyconcepts/#histogram),
such inaccuracy leads to invalid aggregation results.

For this case, streaming aggregation and deduplication support mode with
aggregation windows for current and previous state. With this mode,
flush doesn't happen immediately but is shifted by a calculated samples
lag that improves correctness for delayed data.

Enabling of this mode has increased resource usage: memory usage is
expected to double as aggregation will store two states
instead of one. However, this significantly improves accuracy of
calculations. Aggregation windows can be enabled via
the following settings:

- `-streamAggr.enableWindows` at [single-node
VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At
[vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.enableWindows` flag can be specified
individually per each `-remoteWrite.url`.
If one of these flags is set, then all aggregators will be using fixed
windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or
`-streamAggr.dedupInterval` fixed aggregation windows are enabled on
deduplicator as well.
- `enable_windows` option in [aggregation
config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
  It allows enabling aggregation windows for a specific aggregator.



### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 13:19:33 +01:00

308 lines
7.6 KiB
Go

package streamaggr
import (
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/histogram"
)
// Deduplicator deduplicates samples per each time series.
type Deduplicator struct {
da *dedupAggr
cs atomic.Pointer[currentState]
enableWindows bool
dropLabels []string
interval time.Duration
minDeadline atomic.Int64
wg sync.WaitGroup
stopCh chan struct{}
ms *metrics.Set
// time to wait after interval end before flush
flushAfter *histogram.Fast
muFlushAfter sync.Mutex
}
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
//
// The de-duplicated samples are passed to pushFunc once per interval.
//
// An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples.
// Common case is to drop `replica`-like labels from samples received from HA datasources.
//
// alias is url label used in metrics exposed by the returned Deduplicator.
//
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),
dropLabels: dropLabels,
interval: interval,
enableWindows: enableWindows,
stopCh: make(chan struct{}),
ms: metrics.NewSet(),
}
startTime := time.Now()
cs := &currentState{
maxDeadline: startTime.Add(interval).UnixMilli(),
}
d.cs.Store(cs)
if enableWindows {
d.flushAfter = histogram.GetFast()
d.minDeadline.Store(startTime.UnixMilli())
}
d.cs.Store(cs)
ms := d.ms
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.da.sizeBytes())
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
return float64(d.da.itemsCount())
})
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
metrics.RegisterSet(ms)
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.runFlusher(pushFunc)
}()
return d
}
// MustStop stops d.
func (d *Deduplicator) MustStop() {
metrics.UnregisterSet(d.ms, true)
d.ms = nil
close(d.stopCh)
d.wg.Wait()
}
// Push pushes tss to d.
func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
ctx := getDeduplicatorPushCtx()
labels := &ctx.labels
buf := ctx.buf
cs := d.cs.Load()
nowMsec := time.Now().UnixMilli()
minDeadline := d.minDeadline.Load()
var maxLagMsec int64
dropLabels := d.dropLabels
for _, ts := range tss {
if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
} else {
labels.Labels = append(labels.Labels[:0], ts.Labels...)
}
if len(labels.Labels) == 0 {
continue
}
labels.Sort()
bufLen := len(buf)
buf = lc.Compress(buf, labels.Labels)
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, s := range ts.Samples {
if d.enableWindows && minDeadline > s.Timestamp {
continue
} else if d.enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
ctx.green = append(ctx.green, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
})
} else {
ctx.blue = append(ctx.blue, pushSample{
key: key,
value: s.Value,
timestamp: s.Timestamp,
})
}
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
}
}
}
if d.enableWindows && maxLagMsec > 0 {
d.muFlushAfter.Lock()
d.flushAfter.Update(float64(maxLagMsec))
d.muFlushAfter.Unlock()
}
if len(ctx.blue) > 0 {
d.da.pushSamples(ctx.blue, 0, false)
}
if len(ctx.green) > 0 {
d.da.pushSamples(ctx.green, 0, true)
}
ctx.buf = buf
putDeduplicatorPushCtx(ctx)
}
func dropSeriesLabels(dst, src []prompbmarshal.Label, labelNames []string) []prompbmarshal.Label {
for _, label := range src {
if !slices.Contains(labelNames, label.Name) {
dst = append(dst, label)
}
}
return dst
}
func (d *Deduplicator) runFlusher(pushFunc PushFunc) {
t := time.NewTicker(d.interval)
var fa *histogram.Fast
defer t.Stop()
for {
select {
case <-d.stopCh:
return
case <-t.C:
if d.enableWindows {
// Calculate delay and wait
d.muFlushAfter.Lock()
fa, d.flushAfter = d.flushAfter, histogram.GetFast()
d.muFlushAfter.Unlock()
delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond
histogram.PutFast(fa)
time.Sleep(delay)
}
d.flush(pushFunc)
}
}
}
func (d *Deduplicator) flush(pushFunc PushFunc) {
cs := d.cs.Load().newState()
d.minDeadline.Store(cs.maxDeadline)
startTime := time.Now()
deadlineTime := time.UnixMilli(cs.maxDeadline)
d.da.flush(func(samples []pushSample, _ int64, _ bool) {
ctx := getDeduplicatorFlushCtx()
tss := ctx.tss
labels := ctx.labels
dstSamples := ctx.samples
for _, ps := range samples {
labelsLen := len(labels)
labels = decompressLabels(labels, ps.key)
dstSamplesLen := len(dstSamples)
dstSamples = append(dstSamples, prompbmarshal.Sample{
Value: ps.value,
Timestamp: ps.timestamp,
})
tss = append(tss, prompbmarshal.TimeSeries{
Labels: labels[labelsLen:],
Samples: dstSamples[dstSamplesLen:],
})
}
pushFunc(tss)
ctx.tss = tss
ctx.labels = labels
ctx.samples = dstSamples
putDeduplicatorFlushCtx(ctx)
}, cs.maxDeadline, cs.isGreen)
duration := time.Since(startTime)
d.da.flushDuration.Update(duration.Seconds())
if duration > d.interval {
d.da.flushTimeouts.Inc()
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds())
}
for time.Now().After(deadlineTime) {
deadlineTime = deadlineTime.Add(d.interval)
}
cs.maxDeadline = deadlineTime.UnixMilli()
if d.enableWindows {
cs.isGreen = !cs.isGreen
}
d.cs.Store(cs)
}
type deduplicatorPushCtx struct {
blue []pushSample
green []pushSample
labels promutils.Labels
buf []byte
}
func (ctx *deduplicatorPushCtx) reset() {
ctx.blue = ctx.blue[:0]
ctx.green = ctx.green[:0]
ctx.buf = ctx.buf[:0]
ctx.labels.Reset()
}
func getDeduplicatorPushCtx() *deduplicatorPushCtx {
v := deduplicatorPushCtxPool.Get()
if v == nil {
return &deduplicatorPushCtx{}
}
return v.(*deduplicatorPushCtx)
}
func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) {
ctx.reset()
deduplicatorPushCtxPool.Put(ctx)
}
var deduplicatorPushCtxPool sync.Pool
type deduplicatorFlushCtx struct {
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
}
func (ctx *deduplicatorFlushCtx) reset() {
clear(ctx.tss)
ctx.tss = ctx.tss[:0]
clear(ctx.labels)
ctx.labels = ctx.labels[:0]
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
}
func getDeduplicatorFlushCtx() *deduplicatorFlushCtx {
v := deduplicatorFlushCtxPool.Get()
if v == nil {
return &deduplicatorFlushCtx{}
}
return v.(*deduplicatorFlushCtx)
}
func putDeduplicatorFlushCtx(ctx *deduplicatorFlushCtx) {
ctx.reset()
deduplicatorFlushCtxPool.Put(ctx)
}
var deduplicatorFlushCtxPool sync.Pool