mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00

### Describe Your Changes By default, stream aggregation and deduplication stores a single state per each aggregation output result. The data for each aggregator is flushed independently once per aggregation interval. But there's no guarantee that incoming samples with timestamps close to the aggregation interval's end will get into it. For example, when aggregating with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59) can fall into aggregation round `18:58:00` or `18:59:00`. It depends on network lag, load, clock synchronization, etc. In most scenarios it doesn't impact aggregation or deduplication results, which are consistent within margin of error. But for metrics represented as a collection of series, like [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram), such inaccuracy leads to invalid aggregation results. For this case, streaming aggregation and deduplication support mode with aggregation windows for current and previous state. With this mode, flush doesn't happen immediately but is shifted by a calculated samples lag that improves correctness for delayed data. Enabling of this mode has increased resource usage: memory usage is expected to double as aggregation will store two states instead of one. However, this significantly improves accuracy of calculations. Aggregation windows can be enabled via the following settings: - `-streamAggr.enableWindows` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.enableWindows` flag can be specified individually per each `-remoteWrite.url`. If one of these flags is set, then all aggregators will be using fixed windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or `-streamAggr.dedupInterval` fixed aggregation windows are enabled on deduplicator as well. - `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config). It allows enabling aggregation windows for a specific aggregator. ### Checklist The following checks are **mandatory**: - [ ] My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
308 lines
7.6 KiB
Go
308 lines
7.6 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/valyala/histogram"
|
|
)
|
|
|
|
// Deduplicator deduplicates samples per each time series.
|
|
type Deduplicator struct {
|
|
da *dedupAggr
|
|
|
|
cs atomic.Pointer[currentState]
|
|
enableWindows bool
|
|
dropLabels []string
|
|
interval time.Duration
|
|
minDeadline atomic.Int64
|
|
|
|
wg sync.WaitGroup
|
|
stopCh chan struct{}
|
|
|
|
ms *metrics.Set
|
|
// time to wait after interval end before flush
|
|
flushAfter *histogram.Fast
|
|
muFlushAfter sync.Mutex
|
|
}
|
|
|
|
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
|
|
//
|
|
// The de-duplicated samples are passed to pushFunc once per interval.
|
|
//
|
|
// An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples.
|
|
// Common case is to drop `replica`-like labels from samples received from HA datasources.
|
|
//
|
|
// alias is url label used in metrics exposed by the returned Deduplicator.
|
|
//
|
|
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
|
|
func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Duration, dropLabels []string, alias string) *Deduplicator {
|
|
d := &Deduplicator{
|
|
da: newDedupAggr(),
|
|
dropLabels: dropLabels,
|
|
interval: interval,
|
|
enableWindows: enableWindows,
|
|
stopCh: make(chan struct{}),
|
|
ms: metrics.NewSet(),
|
|
}
|
|
startTime := time.Now()
|
|
cs := ¤tState{
|
|
maxDeadline: startTime.Add(interval).UnixMilli(),
|
|
}
|
|
d.cs.Store(cs)
|
|
if enableWindows {
|
|
d.flushAfter = histogram.GetFast()
|
|
d.minDeadline.Store(startTime.UnixMilli())
|
|
}
|
|
d.cs.Store(cs)
|
|
|
|
ms := d.ms
|
|
|
|
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
|
|
|
|
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
|
|
return float64(d.da.sizeBytes())
|
|
})
|
|
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
|
|
return float64(d.da.itemsCount())
|
|
})
|
|
|
|
d.da.flushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
|
|
d.da.flushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
|
|
|
|
metrics.RegisterSet(ms)
|
|
|
|
d.wg.Add(1)
|
|
go func() {
|
|
defer d.wg.Done()
|
|
d.runFlusher(pushFunc)
|
|
}()
|
|
|
|
return d
|
|
}
|
|
|
|
// MustStop stops d.
|
|
func (d *Deduplicator) MustStop() {
|
|
metrics.UnregisterSet(d.ms, true)
|
|
d.ms = nil
|
|
|
|
close(d.stopCh)
|
|
d.wg.Wait()
|
|
}
|
|
|
|
// Push pushes tss to d.
|
|
func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
|
|
ctx := getDeduplicatorPushCtx()
|
|
labels := &ctx.labels
|
|
buf := ctx.buf
|
|
cs := d.cs.Load()
|
|
nowMsec := time.Now().UnixMilli()
|
|
minDeadline := d.minDeadline.Load()
|
|
var maxLagMsec int64
|
|
|
|
dropLabels := d.dropLabels
|
|
for _, ts := range tss {
|
|
if len(dropLabels) > 0 {
|
|
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)
|
|
} else {
|
|
labels.Labels = append(labels.Labels[:0], ts.Labels...)
|
|
}
|
|
if len(labels.Labels) == 0 {
|
|
continue
|
|
}
|
|
labels.Sort()
|
|
|
|
bufLen := len(buf)
|
|
buf = lc.Compress(buf, labels.Labels)
|
|
key := bytesutil.ToUnsafeString(buf[bufLen:])
|
|
for _, s := range ts.Samples {
|
|
if d.enableWindows && minDeadline > s.Timestamp {
|
|
continue
|
|
} else if d.enableWindows && s.Timestamp <= cs.maxDeadline == cs.isGreen {
|
|
ctx.green = append(ctx.green, pushSample{
|
|
key: key,
|
|
value: s.Value,
|
|
timestamp: s.Timestamp,
|
|
})
|
|
} else {
|
|
ctx.blue = append(ctx.blue, pushSample{
|
|
key: key,
|
|
value: s.Value,
|
|
timestamp: s.Timestamp,
|
|
})
|
|
}
|
|
lagMsec := nowMsec - s.Timestamp
|
|
if lagMsec > maxLagMsec {
|
|
maxLagMsec = lagMsec
|
|
}
|
|
}
|
|
}
|
|
|
|
if d.enableWindows && maxLagMsec > 0 {
|
|
d.muFlushAfter.Lock()
|
|
d.flushAfter.Update(float64(maxLagMsec))
|
|
d.muFlushAfter.Unlock()
|
|
}
|
|
|
|
if len(ctx.blue) > 0 {
|
|
d.da.pushSamples(ctx.blue, 0, false)
|
|
}
|
|
if len(ctx.green) > 0 {
|
|
d.da.pushSamples(ctx.green, 0, true)
|
|
}
|
|
|
|
ctx.buf = buf
|
|
putDeduplicatorPushCtx(ctx)
|
|
}
|
|
|
|
func dropSeriesLabels(dst, src []prompbmarshal.Label, labelNames []string) []prompbmarshal.Label {
|
|
for _, label := range src {
|
|
if !slices.Contains(labelNames, label.Name) {
|
|
dst = append(dst, label)
|
|
}
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func (d *Deduplicator) runFlusher(pushFunc PushFunc) {
|
|
t := time.NewTicker(d.interval)
|
|
var fa *histogram.Fast
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-d.stopCh:
|
|
return
|
|
case <-t.C:
|
|
if d.enableWindows {
|
|
// Calculate delay and wait
|
|
d.muFlushAfter.Lock()
|
|
fa, d.flushAfter = d.flushAfter, histogram.GetFast()
|
|
d.muFlushAfter.Unlock()
|
|
delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond
|
|
histogram.PutFast(fa)
|
|
time.Sleep(delay)
|
|
}
|
|
d.flush(pushFunc)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Deduplicator) flush(pushFunc PushFunc) {
|
|
cs := d.cs.Load().newState()
|
|
d.minDeadline.Store(cs.maxDeadline)
|
|
startTime := time.Now()
|
|
deadlineTime := time.UnixMilli(cs.maxDeadline)
|
|
d.da.flush(func(samples []pushSample, _ int64, _ bool) {
|
|
ctx := getDeduplicatorFlushCtx()
|
|
|
|
tss := ctx.tss
|
|
labels := ctx.labels
|
|
dstSamples := ctx.samples
|
|
for _, ps := range samples {
|
|
labelsLen := len(labels)
|
|
labels = decompressLabels(labels, ps.key)
|
|
|
|
dstSamplesLen := len(dstSamples)
|
|
dstSamples = append(dstSamples, prompbmarshal.Sample{
|
|
Value: ps.value,
|
|
Timestamp: ps.timestamp,
|
|
})
|
|
|
|
tss = append(tss, prompbmarshal.TimeSeries{
|
|
Labels: labels[labelsLen:],
|
|
Samples: dstSamples[dstSamplesLen:],
|
|
})
|
|
}
|
|
pushFunc(tss)
|
|
|
|
ctx.tss = tss
|
|
ctx.labels = labels
|
|
ctx.samples = dstSamples
|
|
putDeduplicatorFlushCtx(ctx)
|
|
}, cs.maxDeadline, cs.isGreen)
|
|
|
|
duration := time.Since(startTime)
|
|
d.da.flushDuration.Update(duration.Seconds())
|
|
if duration > d.interval {
|
|
d.da.flushTimeouts.Inc()
|
|
logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+
|
|
"possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds())
|
|
}
|
|
for time.Now().After(deadlineTime) {
|
|
deadlineTime = deadlineTime.Add(d.interval)
|
|
}
|
|
cs.maxDeadline = deadlineTime.UnixMilli()
|
|
if d.enableWindows {
|
|
cs.isGreen = !cs.isGreen
|
|
}
|
|
d.cs.Store(cs)
|
|
}
|
|
|
|
type deduplicatorPushCtx struct {
|
|
blue []pushSample
|
|
green []pushSample
|
|
labels promutils.Labels
|
|
buf []byte
|
|
}
|
|
|
|
func (ctx *deduplicatorPushCtx) reset() {
|
|
ctx.blue = ctx.blue[:0]
|
|
ctx.green = ctx.green[:0]
|
|
ctx.buf = ctx.buf[:0]
|
|
ctx.labels.Reset()
|
|
}
|
|
|
|
func getDeduplicatorPushCtx() *deduplicatorPushCtx {
|
|
v := deduplicatorPushCtxPool.Get()
|
|
if v == nil {
|
|
return &deduplicatorPushCtx{}
|
|
}
|
|
return v.(*deduplicatorPushCtx)
|
|
}
|
|
|
|
func putDeduplicatorPushCtx(ctx *deduplicatorPushCtx) {
|
|
ctx.reset()
|
|
deduplicatorPushCtxPool.Put(ctx)
|
|
}
|
|
|
|
var deduplicatorPushCtxPool sync.Pool
|
|
|
|
type deduplicatorFlushCtx struct {
|
|
tss []prompbmarshal.TimeSeries
|
|
labels []prompbmarshal.Label
|
|
samples []prompbmarshal.Sample
|
|
}
|
|
|
|
func (ctx *deduplicatorFlushCtx) reset() {
|
|
clear(ctx.tss)
|
|
ctx.tss = ctx.tss[:0]
|
|
|
|
clear(ctx.labels)
|
|
ctx.labels = ctx.labels[:0]
|
|
|
|
clear(ctx.samples)
|
|
ctx.samples = ctx.samples[:0]
|
|
}
|
|
|
|
func getDeduplicatorFlushCtx() *deduplicatorFlushCtx {
|
|
v := deduplicatorFlushCtxPool.Get()
|
|
if v == nil {
|
|
return &deduplicatorFlushCtx{}
|
|
}
|
|
return v.(*deduplicatorFlushCtx)
|
|
}
|
|
|
|
func putDeduplicatorFlushCtx(ctx *deduplicatorFlushCtx) {
|
|
ctx.reset()
|
|
deduplicatorFlushCtxPool.Put(ctx)
|
|
}
|
|
|
|
var deduplicatorFlushCtxPool sync.Pool
|