From 402dc14ec0285bf93c0c6329d637578f2d058fc3 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Mon, 4 Mar 2024 16:12:41 +0200
Subject: [PATCH] lib/streamaggr: make aggregate.runFlusher() more roubst and
 clear

---
 lib/streamaggr/streamaggr.go | 144 +++++++++++++++++++----------------
 1 file changed, 80 insertions(+), 64 deletions(-)

diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go
index 730c9897bb..c941073fb1 100644
--- a/lib/streamaggr/streamaggr.go
+++ b/lib/streamaggr/streamaggr.go
@@ -93,9 +93,9 @@ type Options struct {
 	// The alignment of flushes can be disabled individually per each aggregation via no_align_flush_to_interval option.
 	NoAlignFlushToInterval bool
 
-	// FlushOnShutdown enables flush of incomplete state on start and shutdown.
+	// FlushOnShutdown enables flush of incomplete aggregation state.
 	//
-	// By default incomplete state is dropped on shutdown.
+	// By default incomplete state is dropped.
 	//
 	// The flush of incomplete state can be enabled individually per each aggregation via flush_on_shutdown option.
 	FlushOnShutdown bool
@@ -126,8 +126,8 @@ type Config struct {
 	// See also FlushOnShutdown.
 	NoAlignFlushToInterval *bool `yaml:"no_align_flush_to_interval,omitempty"`
 
-	// FlushOnShutdown defines whether to flush the aggregation state on process termination
-	// or config reload. By default the state is dropped on these events.
+	// FlushOnShutdown defines whether to flush incomplete aggregation state.
+	// By default incomplete aggregation state is dropped, since it may confuse users.
 	FlushOnShutdown *bool `yaml:"flush_on_shutdown,omitempty"`
 
 	// DedupInterval is an optional interval for deduplication.
@@ -576,74 +576,90 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
 }
 
 func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration) {
-	flushTickerCh := make(chan *time.Ticker, 1)
-	dedupFlushTickerCh := make(chan *time.Ticker, 1)
-	go func() {
+	alignedSleep := func(d time.Duration) {
 		if !alignFlushToInterval {
-			flushTickerCh <- time.NewTicker(interval)
-			if dedupInterval > 0 {
-				dedupFlushTickerCh <- time.NewTicker(dedupInterval)
-			}
 			return
 		}
 
-		sleep := func(d time.Duration) {
-			timer := timerpool.Get(d)
-			defer timerpool.Put(timer)
-			select {
-			case <-a.stopCh:
-			case <-timer.C:
-			}
-		}
-		currentTime := time.Duration(time.Now().UnixNano())
-		if dedupInterval > 0 {
-			d := dedupInterval - (currentTime % dedupInterval)
-			if d < dedupInterval {
-				sleep(d)
-			}
-			dedupFlushTickerCh <- time.NewTicker(dedupInterval)
-			currentTime += d
-		}
-		d := interval - (currentTime % interval)
-		if d < interval {
-			sleep(d)
-		}
-		t := time.NewTicker(interval)
-		if skipIncompleteFlush {
-			a.dedupFlush(dedupInterval)
-			a.flush(nil, interval)
-		}
-		flushTickerCh <- t
-	}()
-
-	var flushTickerC <-chan time.Time
-	var dedupFlushTickerC <-chan time.Time
-	for {
+		ct := time.Duration(time.Now().UnixNano())
+		dSleep := d - (ct % d)
+		timer := timerpool.Get(dSleep)
+		defer timer.Stop()
 		select {
 		case <-a.stopCh:
-			if !skipIncompleteFlush {
-				a.dedupFlush(dedupInterval)
-				a.flush(pushFunc, interval)
-			}
-			return
-		case flushTicker := <-flushTickerCh:
-			flushTickerC = flushTicker.C
-			defer flushTicker.Stop()
-		case dedupFlushTicker := <-dedupFlushTickerCh:
-			dedupFlushTickerC = dedupFlushTicker.C
-			defer dedupFlushTicker.Stop()
-		case <-flushTickerC:
-			select {
-			case <-dedupFlushTickerC:
-				// flush deduplicated samples if needed before flushing the aggregated samples
-				a.dedupFlush(dedupInterval)
-			default:
-			}
-			a.flush(pushFunc, interval)
-		case <-dedupFlushTickerC:
-			a.dedupFlush(dedupInterval)
+		case <-timer.C:
 		}
 	}
+
+	tickerWait := func(t *time.Ticker) bool {
+		select {
+		case <-a.stopCh:
+			return false
+		case <-t.C:
+			return true
+		}
+	}
+
+	if dedupInterval <= 0 {
+		alignedSleep(interval)
+		t := time.NewTicker(interval)
+		defer t.Stop()
+
+		if alignFlushToInterval && skipIncompleteFlush {
+			a.flush(nil, interval)
+		}
+
+		for tickerWait(t) {
+			a.flush(pushFunc, interval)
+
+			if alignFlushToInterval {
+				select {
+				case <-t.C:
+					if skipIncompleteFlush && tickerWait(t) {
+						logger.Warnf("drop incomplete aggregation state because the previous flush took longer than interval=%s", interval)
+						a.flush(nil, interval)
+					}
+				default:
+				}
+			}
+		}
+	} else {
+		alignedSleep(dedupInterval)
+		t := time.NewTicker(dedupInterval)
+		defer t.Stop()
+
+		flushDeadline := time.Now().Add(interval)
+		isSkippedFirstFlush := false
+		for tickerWait(t) {
+			a.dedupFlush(dedupInterval)
+
+			ct := time.Now()
+			if ct.After(flushDeadline) {
+				// It is time to flush the aggregated state
+				if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
+					a.flush(nil, interval)
+					isSkippedFirstFlush = true
+				} else {
+					a.flush(pushFunc, interval)
+				}
+				for ct.After(flushDeadline) {
+					flushDeadline = flushDeadline.Add(interval)
+				}
+			}
+
+			if alignFlushToInterval {
+				select {
+				case <-t.C:
+				default:
+				}
+			}
+		}
+	}
+
+	if !skipIncompleteFlush {
+		a.dedupFlush(dedupInterval)
+		a.flush(pushFunc, interval)
+	}
 }
 
 func (a *aggregator) dedupFlush(dedupInterval time.Duration) {