lib/streamaggr: update the minimum allowed timestamp for incoming samples before flushing the samples to the storage

This should prevent from dropping samples with old timestamps during long flushes.

This is a follow-up for 1cedaf61cb
This commit is contained in:
Aliaksandr Valialkin 2024-04-04 02:24:56 +03:00
parent 931dd3f320
commit f8d10a7106
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -713,6 +713,11 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) { func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) {
startTime := time.Now() startTime := time.Now()
// Update minTimestamp before flushing samples to the storage,
// since the flush durtion can be quite long.
// This should prevent from dropping samples with old timestamps when the flush takes long time.
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
var wg sync.WaitGroup var wg sync.WaitGroup
for _, as := range a.aggrStates { for _, as := range a.aggrStates {
flushConcurrencyCh <- struct{}{} flushConcurrencyCh <- struct{}{}
@ -732,8 +737,6 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
} }
wg.Wait() wg.Wait()
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
d := time.Since(startTime) d := time.Since(startTime)
a.flushDuration.Update(d.Seconds()) a.flushDuration.Update(d.Seconds())
if d > interval { if d > interval {