lib/streamaggr: make aggregate.runFlusher() more roubst and clear

This commit is contained in:
Aliaksandr Valialkin 2024-03-04 16:12:41 +02:00
parent aa5e7e268c
commit 925f60841f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -93,9 +93,9 @@ type Options struct {
// The alignment of flushes can be disabled individually per each aggregation via no_align_flush_to_interval option.
NoAlignFlushToInterval bool
// FlushOnShutdown enables flush of incomplete state on start and shutdown.
// FlushOnShutdown enables flush of incomplete aggregation state.
//
// By default incomplete state is dropped on shutdown.
// By default incomplete state is dropped.
//
// The flush of incomplete state can be enabled individually per each aggregation via flush_on_shutdown option.
FlushOnShutdown bool
@ -126,8 +126,8 @@ type Config struct {
// See also FlushOnShutdown.
NoAlignFlushToInterval *bool `yaml:"no_align_flush_to_interval,omitempty"`
// FlushOnShutdown defines whether to flush the aggregation state on process termination
// or config reload. By default the state is dropped on these events.
// FlushOnShutdown defines whether to flush incomplete aggregation state.
// By default incomplete aggregation state is dropped, since it may confuse users.
FlushOnShutdown *bool `yaml:"flush_on_shutdown,omitempty"`
// DedupInterval is an optional interval for deduplication.
@ -576,74 +576,90 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
}
func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) {
flushTickerCh := make(chan *time.Ticker, 1)
dedupFlushTickerCh := make(chan *time.Ticker, 1)
go func() {
alignedSleep := func(d time.Duration) {
if !alignFlushToInterval {
flushTickerCh <- time.NewTicker(interval)
if dedupInterval > 0 {
dedupFlushTickerCh <- time.NewTicker(dedupInterval)
}
return
}
sleep := func(d time.Duration) {
timer := timerpool.Get(d)
defer timerpool.Put(timer)
select {
case <-a.stopCh:
case <-timer.C:
}
}
currentTime := time.Duration(time.Now().UnixNano())
if dedupInterval > 0 {
d := dedupInterval - (currentTime % dedupInterval)
if d < dedupInterval {
sleep(d)
}
dedupFlushTickerCh <- time.NewTicker(dedupInterval)
currentTime += d
}
d := interval - (currentTime % interval)
if d < interval {
sleep(d)
}
t := time.NewTicker(interval)
if skipIncompleteFlush {
a.dedupFlush(dedupInterval)
a.flush(nil, interval)
}
flushTickerCh <- t
}()
var flushTickerC <-chan time.Time
var dedupFlushTickerC <-chan time.Time
for {
ct := time.Duration(time.Now().UnixNano())
dSleep := d - (ct % d)
timer := timerpool.Get(dSleep)
defer timer.Stop()
select {
case <-a.stopCh:
if !skipIncompleteFlush {
a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval)
}
return
case flushTicker := <-flushTickerCh:
flushTickerC = flushTicker.C
defer flushTicker.Stop()
case dedupFlushTicker := <-dedupFlushTickerCh:
dedupFlushTickerC = dedupFlushTicker.C
defer dedupFlushTicker.Stop()
case <-flushTickerC:
select {
case <-dedupFlushTickerC:
// flush deduplicated samples if needed before flushing the aggregated samples
a.dedupFlush(dedupInterval)
default:
}
a.flush(pushFunc, interval)
case <-dedupFlushTickerC:
a.dedupFlush(dedupInterval)
case <-timer.C:
}
}
tickerWait := func(t *time.Ticker) bool {
select {
case <-a.stopCh:
return false
case <-t.C:
return true
}
}
if dedupInterval <= 0 {
alignedSleep(interval)
t := time.NewTicker(interval)
defer t.Stop()
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil, interval)
}
for tickerWait(t) {
a.flush(pushFunc, interval)
if alignFlushToInterval {
select {
case <-t.C:
if skipIncompleteFlush && tickerWait(t) {
logger.Warnf("drop incomplete aggregation state because the previous flush took longer than interval=%s", interval)
a.flush(nil, interval)
}
default:
}
}
}
} else {
alignedSleep(dedupInterval)
t := time.NewTicker(dedupInterval)
defer t.Stop()
flushDeadline := time.Now().Add(interval)
isSkippedFirstFlush := false
for tickerWait(t) {
a.dedupFlush(dedupInterval)
ct := time.Now()
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval)
isSkippedFirstFlush = true
} else {
a.flush(pushFunc, interval)
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval)
}
}
if alignFlushToInterval {
select {
case <-t.C:
default:
}
}
}
}
if !skipIncompleteFlush {
a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval)
}
}
func (a *aggregator) dedupFlush(dedupInterval time.Duration) {