From f8d10a7106e997be65a7b325e81bdd7878eb2d62 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 4 Apr 2024 02:24:56 +0300 Subject: [PATCH] lib/streamaggr: update the minimum allowed timestamp for incoming samples before flushing the samples to the storage This should prevent from dropping samples with old timestamps during long flushes. This is a follow-up for 1cedaf61cb0e4893200edd7e84696df1f6418fd3 --- lib/streamaggr/streamaggr.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 3aaee6ac82..b1abaadfb0 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -713,6 +713,11 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) { func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) { startTime := time.Now() + // Update minTimestamp before flushing samples to the storage, + // since the flush durtion can be quite long. + // This should prevent from dropping samples with old timestamps when the flush takes long time. + a.minTimestamp.Store(startTime.UnixMilli() - 5_000) + var wg sync.WaitGroup for _, as := range a.aggrStates { flushConcurrencyCh <- struct{}{} @@ -732,8 +737,6 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState } wg.Wait() - a.minTimestamp.Store(startTime.UnixMilli() - 5_000) - d := time.Since(startTime) a.flushDuration.Update(d.Seconds()) if d > interval {