lib/streamaggr: properly drop samples on the first incomplete interval

Previously samples were dropped on the first incomplete interval and the next complete interval.
Also make sure that the de-duplication is performed just before flushing the aggregate state.
This should help the case then dedup_interval = interval.
This commit is contained in:
Aliaksandr Valialkin 2024-03-04 14:50:46 +02:00
parent 51745ec5ff
commit 2ffef39bb3
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 54 additions and 36 deletions

View file

@ -82,16 +82,22 @@ type Options struct {
// The last sample per each series is left per each DedupInterval if DedupInterval > 0.
//
// By default deduplication is disabled.
//
// The deduplication can be set up individually per each aggregation via dedup_interval option.
DedupInterval time.Duration
// NoAlignFlushToInterval disables alignment of flushes to the aggregation interval.
//
// By default flushes are aligned to aggregation interval.
//
// The alignment of flushes can be disabled individually per each aggregation via no_align_flush_to_interval option.
NoAlignFlushToInterval bool
// FlushOnShutdown enables flush of incomplete state on shutdown.
// FlushOnShutdown enables flush of incomplete state on start and shutdown.
//
// By default incomplete state is dropped on shutdown.
//
// The flush of incomplete state can be enabled individually per each aggregation via flush_on_shutdown option.
FlushOnShutdown bool
// KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
@ -100,6 +106,7 @@ type Options struct {
//
// input_name:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
//
// This option can be overriden individually per each aggregation via keep_metric_names option.
KeepMetricNames bool
}
@ -601,20 +608,22 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
if d < interval {
sleep(d)
}
flushTickerCh <- time.NewTicker(interval)
t := time.NewTicker(interval)
if skipIncompleteFlush {
a.dedupFlush(dedupInterval)
a.flush(nil, interval)
}
flushTickerCh <- t
}()
var flushTickerC <-chan time.Time
var dedupFlushTickerC <-chan time.Time
isFirstFlush := true
for {
select {
case <-a.stopCh:
if !skipIncompleteFlush {
if dedupInterval > 0 {
a.dedupFlush()
}
a.flush(pushFunc)
a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval)
}
return
case flushTicker := <-flushTickerCh:
@ -624,43 +633,42 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
dedupFlushTickerC = dedupFlushTicker.C
defer dedupFlushTicker.Stop()
case <-flushTickerC:
if isFirstFlush {
isFirstFlush = false
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil)
continue
}
}
startTime := time.Now()
a.flush(pushFunc)
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
if d > interval {
a.flushTimeouts.Inc()
logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %s; "+
"possible solutions: increase interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", interval, d)
select {
case <-dedupFlushTickerC:
// flush deduplicated samples if needed before flushing the aggregated samples
a.dedupFlush(dedupInterval)
default:
}
a.flush(pushFunc, interval)
case <-dedupFlushTickerC:
startTime := time.Now()
a.dedupFlush()
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
if d > dedupInterval {
a.dedupFlushTimeouts.Inc()
logger.Warnf("stream aggregation deduplication couldn't be finished in the configured dedup_interval=%s; it took %s; "+
"possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", dedupInterval, d)
}
a.dedupFlush(dedupInterval)
}
}
}
func (a *aggregator) dedupFlush() {
func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
if dedupInterval <= 0 {
// The de-duplication is disabled.
return
}
startTime := time.Now()
a.da.flush(a.pushSamples)
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
if d > dedupInterval {
a.dedupFlushTimeouts.Inc()
logger.Warnf("deduplication couldn't be finished in the configured dedup_interval=%s; it took %.03f; "+
"possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", dedupInterval, d.Seconds())
}
}
func (a *aggregator) flush(pushFunc PushFunc) {
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration) {
startTime := time.Now()
var wg sync.WaitGroup
for _, as := range a.aggrStates {
flushConcurrencyCh <- struct{}{}
@ -679,6 +687,15 @@ func (a *aggregator) flush(pushFunc PushFunc) {
}(as)
}
wg.Wait()
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
if d > interval {
a.flushTimeouts.Inc()
logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03f; "+
"possible solutions: increase interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", interval, d.Seconds())
}
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())

View file

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
@ -55,7 +56,7 @@ func benchmarkAggregatorsFlushSerial(b *testing.B, output string) {
for i := 0; i < b.N; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
for _, aggr := range a.as {
aggr.flush(pushFunc)
aggr.flush(pushFunc, time.Hour)
}
}
}