diff --git a/lib/streamaggr/histogram_bucket.go b/lib/streamaggr/histogram_bucket.go index 7361b62382..4e27e3cd25 100644 --- a/lib/streamaggr/histogram_bucket.go +++ b/lib/streamaggr/histogram_bucket.go @@ -1,6 +1,7 @@ package streamaggr import ( + "math" "sync" "time" @@ -12,7 +13,7 @@ import ( type histogramBucketAggrState struct { m sync.Map - stalenessInterval uint64 + stalenessSecs uint64 } type histogramBucketStateValue struct { @@ -23,14 +24,15 @@ type histogramBucketStateValue struct { } func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState { + stalenessSecs := roundDurationToSecs(stalenessInterval) return &histogramBucketAggrState{ - stalenessInterval: uint64(stalenessInterval.Seconds()), + stalenessSecs: stalenessSecs, } } func (as *histogramBucketAggrState) pushSample(inputKey, outputKey string, value float64) { currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessInterval + deleteDeadline := currentTime + as.stalenessSecs again: v, ok := as.m.Load(outputKey) @@ -98,3 +100,11 @@ func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) { return true }) } + +func roundDurationToSecs(d time.Duration) uint64 { + if d < 0 { + return 0 + } + secs := d.Seconds() + return uint64(math.Ceil(secs)) +} diff --git a/lib/streamaggr/increase.go b/lib/streamaggr/increase.go index 8a43e64430..7e76c0a4cb 100644 --- a/lib/streamaggr/increase.go +++ b/lib/streamaggr/increase.go @@ -12,7 +12,7 @@ type increaseAggrState struct { m sync.Map ignoreInputDeadline uint64 - stalenessInterval uint64 + stalenessSecs uint64 } type increaseStateValue struct { @@ -25,16 +25,17 @@ type increaseStateValue struct { func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *increaseAggrState { currentTime := fasttime.UnixTimestamp() - intervalSecs := uint64(interval.Seconds() + 1) + intervalSecs := roundDurationToSecs(interval) + stalenessSecs := roundDurationToSecs(stalenessInterval) return &increaseAggrState{ ignoreInputDeadline: currentTime + intervalSecs, - stalenessInterval: uint64(stalenessInterval.Seconds()), + stalenessSecs: stalenessSecs, } } func (as *increaseAggrState) pushSample(inputKey, outputKey string, value float64) { currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessInterval + deleteDeadline := currentTime + as.stalenessSecs again: v, ok := as.m.Load(outputKey) diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index 92b1cd2bd4..b611966f42 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -13,7 +13,7 @@ type totalAggrState struct { m sync.Map ignoreInputDeadline uint64 - stalenessInterval uint64 + stalenessSecs uint64 } type totalStateValue struct { @@ -31,16 +31,17 @@ type lastValueState struct { func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *totalAggrState { currentTime := fasttime.UnixTimestamp() - intervalSecs := uint64(interval.Seconds() + 1) + intervalSecs := roundDurationToSecs(interval) + stalenessSecs := roundDurationToSecs(stalenessInterval) return &totalAggrState{ ignoreInputDeadline: currentTime + intervalSecs, - stalenessInterval: uint64(stalenessInterval.Seconds()), + stalenessSecs: stalenessSecs, } } func (as *totalAggrState) pushSample(inputKey, outputKey string, value float64) { currentTime := fasttime.UnixTimestamp() - deleteDeadline := currentTime + as.stalenessInterval + deleteDeadline := currentTime + as.stalenessSecs again: v, ok := as.m.Load(outputKey)