VictoriaMetrics/lib/streamaggr/dedup.go
Andrii Chubatiuk a041488786
lib/streamaggr: added aggregation windows (#6314)
### Describe Your Changes

By default, stream aggregation and deduplication stores a single state
per each aggregation output result.
The data for each aggregator is flushed independently once per
aggregation interval. But there's no guarantee that
incoming samples with timestamps close to the aggregation interval's end
will get into it. For example, when aggregating
with `interval: 1m` a data sample with timestamp 1739473078 (18:57:59)
can fall into aggregation round `18:58:00` or `18:59:00`.
It depends on network lag, load, clock synchronization, etc. In most
scenarios it doesn't impact aggregation or
deduplication results, which are consistent within margin of error. But
for metrics represented as a collection of series,
like
[histograms](https://docs.victoriametrics.com/keyconcepts/#histogram),
such inaccuracy leads to invalid aggregation results.

For this case, streaming aggregation and deduplication support mode with
aggregation windows for current and previous state. With this mode,
flush doesn't happen immediately but is shifted by a calculated samples
lag that improves correctness for delayed data.

Enabling of this mode has increased resource usage: memory usage is
expected to double as aggregation will store two states
instead of one. However, this significantly improves accuracy of
calculations. Aggregation windows can be enabled via
the following settings:

- `-streamAggr.enableWindows` at [single-node
VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At
[vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.enableWindows` flag can be specified
individually per each `-remoteWrite.url`.
If one of these flags is set, then all aggregators will be using fixed
windows. In conjunction with `-remoteWrite.streamAggr.dedupInterval` or
`-streamAggr.dedupInterval` fixed aggregation windows are enabled on
deduplicator as well.
- `enable_windows` option in [aggregation
config](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
  It allows enabling aggregation windows for a specific aggregator.

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>

(cherry picked from commit c8fc903669)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2025-02-19 13:31:37 +01:00

269 lines
5.8 KiB
Go

package streamaggr
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
const dedupAggrShardsCount = 128
type dedupAggr struct {
shards []dedupAggrShard
flushDuration *metrics.Histogram
flushTimeouts *metrics.Counter
}
type dedupAggrShard struct {
dedupAggrShardNopad
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(dedupAggrShardNopad{})%128]byte
}
type dedupAggrState struct {
m map[string]*dedupAggrSample
mu sync.Mutex
samplesBuf []dedupAggrSample
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
}
type dedupAggrShardNopad struct {
blue dedupAggrState
green dedupAggrState
}
type dedupAggrSample struct {
value float64
timestamp int64
}
func newDedupAggr() *dedupAggr {
return &dedupAggr{
shards: make([]dedupAggrShard, dedupAggrShardsCount),
}
}
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
var shard *dedupAggrShard
for i := range da.shards {
shard = &da.shards[i]
n += shard.blue.sizeBytes.Load()
n += shard.green.sizeBytes.Load()
}
return n
}
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
var shard *dedupAggrShard
for i := range da.shards {
shard = &da.shards[i]
n += shard.blue.itemsCount.Load()
n += shard.green.itemsCount.Load()
}
return n
}
func (da *dedupAggr) pushSamples(samples []pushSample, _ int64, isGreen bool) {
pss := getPerShardSamples()
shards := pss.shards
for _, sample := range samples {
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(sample.key))
idx := h % uint64(len(shards))
shards[idx] = append(shards[idx], sample)
}
for i, shardSamples := range shards {
if len(shardSamples) == 0 {
continue
}
da.shards[i].pushSamples(shardSamples, isGreen)
}
putPerShardSamples(pss)
}
func getDedupFlushCtx(deleteDeadline int64, isGreen bool) *dedupFlushCtx {
v := dedupFlushCtxPool.Get()
if v == nil {
v = &dedupFlushCtx{}
}
ctx := v.(*dedupFlushCtx)
ctx.deleteDeadline = deleteDeadline
ctx.isGreen = isGreen
return ctx
}
func putDedupFlushCtx(ctx *dedupFlushCtx) {
ctx.reset()
dedupFlushCtxPool.Put(ctx)
}
var dedupFlushCtxPool sync.Pool
type dedupFlushCtx struct {
samples []pushSample
deleteDeadline int64
isGreen bool
}
func (ctx *dedupFlushCtx) reset() {
clear(ctx.samples)
ctx.samples = ctx.samples[:0]
ctx.deleteDeadline = 0
}
func (da *dedupAggr) flush(f aggrPushFunc, deleteDeadline int64, isGreen bool) {
var wg sync.WaitGroup
for i := range da.shards {
flushConcurrencyCh <- struct{}{}
wg.Add(1)
go func(shard *dedupAggrShard) {
defer func() {
<-flushConcurrencyCh
wg.Done()
}()
ctx := getDedupFlushCtx(deleteDeadline, isGreen)
shard.flush(ctx, f)
putDedupFlushCtx(ctx)
}(&da.shards[i])
}
wg.Wait()
}
type perShardSamples struct {
shards [][]pushSample
}
func (pss *perShardSamples) reset() {
shards := pss.shards
for i, shardSamples := range shards {
if len(shardSamples) > 0 {
clear(shardSamples)
shards[i] = shardSamples[:0]
}
}
}
func getPerShardSamples() *perShardSamples {
v := perShardSamplesPool.Get()
if v == nil {
return &perShardSamples{
shards: make([][]pushSample, dedupAggrShardsCount),
}
}
return v.(*perShardSamples)
}
func putPerShardSamples(pss *perShardSamples) {
pss.reset()
perShardSamplesPool.Put(pss)
}
var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
var state *dedupAggrState
if isGreen {
state = &das.green
} else {
state = &das.blue
}
state.mu.Lock()
defer state.mu.Unlock()
if state.m == nil {
state.m = make(map[string]*dedupAggrSample, len(samples))
}
samplesBuf := state.samplesBuf
for _, sample := range samples {
s, ok := state.m[sample.key]
if !ok {
samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
s = &samplesBuf[len(samplesBuf)-1]
s.value = sample.value
s.timestamp = sample.timestamp
key := bytesutil.InternString(sample.key)
state.m[key] = s
state.itemsCount.Add(1)
state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
continue
}
if !isDuplicate(s, sample) {
s.value = sample.value
s.timestamp = sample.timestamp
}
}
state.samplesBuf = samplesBuf
}
// isDuplicate returns true if b is duplicate of a
// See https://docs.victoriametrics.com/#deduplication
func isDuplicate(a *dedupAggrSample, b pushSample) bool {
if b.timestamp > a.timestamp {
return false
}
if b.timestamp == a.timestamp {
if decimal.IsStaleNaN(b.value) {
return false
}
if b.value > a.value {
return false
}
}
return true
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
var m map[string]*dedupAggrSample
var state *dedupAggrState
if ctx.isGreen {
state = &das.green
} else {
state = &das.blue
}
state.mu.Lock()
if len(state.m) > 0 {
m = state.m
state.m = make(map[string]*dedupAggrSample, len(state.m))
state.samplesBuf = make([]dedupAggrSample, 0, len(state.samplesBuf))
state.sizeBytes.Store(0)
state.itemsCount.Store(0)
}
state.mu.Unlock()
if len(m) == 0 {
return
}
dstSamples := ctx.samples
for key, s := range m {
dstSamples = append(dstSamples, pushSample{
key: key,
value: s.value,
timestamp: s.timestamp,
})
// Limit the number of samples per each flush in order to limit memory usage.
if len(dstSamples) >= 10_000 {
f(dstSamples, ctx.deleteDeadline, false)
clear(dstSamples)
dstSamples = dstSamples[:0]
}
}
f(dstSamples, ctx.deleteDeadline, false)
ctx.samples = dstSamples
}