diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 730c9897b..c941073fb 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -93,9 +93,9 @@ type Options struct { // The alignment of flushes can be disabled individually per each aggregation via no_align_flush_to_interval option. NoAlignFlushToInterval bool - // FlushOnShutdown enables flush of incomplete state on start and shutdown. + // FlushOnShutdown enables flush of incomplete aggregation state. // - // By default incomplete state is dropped on shutdown. + // By default incomplete state is dropped. // // The flush of incomplete state can be enabled individually per each aggregation via flush_on_shutdown option. FlushOnShutdown bool @@ -126,8 +126,8 @@ type Config struct { // See also FlushOnShutdown. NoAlignFlushToInterval *bool `yaml:"no_align_flush_to_interval,omitempty"` - // FlushOnShutdown defines whether to flush the aggregation state on process termination - // or config reload. By default the state is dropped on these events. + // FlushOnShutdown defines whether to flush incomplete aggregation state. + // By default incomplete aggregation state is dropped, since it may confuse users. FlushOnShutdown *bool `yaml:"flush_on_shutdown,omitempty"` // DedupInterval is an optional interval for deduplication. @@ -576,74 +576,90 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option } func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) { - flushTickerCh := make(chan *time.Ticker, 1) - dedupFlushTickerCh := make(chan *time.Ticker, 1) - go func() { + alignedSleep := func(d time.Duration) { if !alignFlushToInterval { - flushTickerCh <- time.NewTicker(interval) - if dedupInterval > 0 { - dedupFlushTickerCh <- time.NewTicker(dedupInterval) - } return } - sleep := func(d time.Duration) { - timer := timerpool.Get(d) - defer timerpool.Put(timer) - select { - case <-a.stopCh: - case <-timer.C: - } - } - currentTime := time.Duration(time.Now().UnixNano()) - if dedupInterval > 0 { - d := dedupInterval - (currentTime % dedupInterval) - if d < dedupInterval { - sleep(d) - } - dedupFlushTickerCh <- time.NewTicker(dedupInterval) - currentTime += d - } - d := interval - (currentTime % interval) - if d < interval { - sleep(d) - } - t := time.NewTicker(interval) - if skipIncompleteFlush { - a.dedupFlush(dedupInterval) - a.flush(nil, interval) - } - flushTickerCh <- t - }() - - var flushTickerC <-chan time.Time - var dedupFlushTickerC <-chan time.Time - for { + ct := time.Duration(time.Now().UnixNano()) + dSleep := d - (ct % d) + timer := timerpool.Get(dSleep) + defer timer.Stop() select { case <-a.stopCh: - if !skipIncompleteFlush { - a.dedupFlush(dedupInterval) - a.flush(pushFunc, interval) - } - return - case flushTicker := <-flushTickerCh: - flushTickerC = flushTicker.C - defer flushTicker.Stop() - case dedupFlushTicker := <-dedupFlushTickerCh: - dedupFlushTickerC = dedupFlushTicker.C - defer dedupFlushTicker.Stop() - case <-flushTickerC: - select { - case <-dedupFlushTickerC: - // flush deduplicated samples if needed before flushing the aggregated samples - a.dedupFlush(dedupInterval) - default: - } - a.flush(pushFunc, interval) - case <-dedupFlushTickerC: - a.dedupFlush(dedupInterval) + case <-timer.C: } } + + tickerWait := func(t *time.Ticker) bool { + select { + case <-a.stopCh: + return false + case <-t.C: + return true + } + } + + if dedupInterval <= 0 { + alignedSleep(interval) + t := time.NewTicker(interval) + defer t.Stop() + + if alignFlushToInterval && skipIncompleteFlush { + a.flush(nil, interval) + } + + for tickerWait(t) { + a.flush(pushFunc, interval) + + if alignFlushToInterval { + select { + case <-t.C: + if skipIncompleteFlush && tickerWait(t) { + logger.Warnf("drop incomplete aggregation state because the previous flush took longer than interval=%s", interval) + a.flush(nil, interval) + } + default: + } + } + } + } else { + alignedSleep(dedupInterval) + t := time.NewTicker(dedupInterval) + defer t.Stop() + + flushDeadline := time.Now().Add(interval) + isSkippedFirstFlush := false + for tickerWait(t) { + a.dedupFlush(dedupInterval) + + ct := time.Now() + if ct.After(flushDeadline) { + // It is time to flush the aggregated state + if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush { + a.flush(nil, interval) + isSkippedFirstFlush = true + } else { + a.flush(pushFunc, interval) + } + for ct.After(flushDeadline) { + flushDeadline = flushDeadline.Add(interval) + } + } + + if alignFlushToInterval { + select { + case <-t.C: + default: + } + } + } + } + + if !skipIncompleteFlush { + a.dedupFlush(dedupInterval) + a.flush(pushFunc, interval) + } } func (a *aggregator) dedupFlush(dedupInterval time.Duration) {