lib/streamaggr: ignore the first sample in new time series during staleness_interval seconds after the stream aggregation start for total and increase outputs

This commit is contained in:
Aliaksandr Valialkin 2024-03-04 01:49:26 +02:00
parent 5e9cbfd4db
commit d80deaeaf4
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 43 additions and 24 deletions

View file

@ -375,7 +375,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva
return nil, fmt.Errorf("cannot parse `staleness_interval: %q`: %w", cfg.StalenessInterval, err) return nil, fmt.Errorf("cannot parse `staleness_interval: %q`: %w", cfg.StalenessInterval, err)
} }
if stalenessInterval < interval { if stalenessInterval < interval {
return nil, fmt.Errorf("interval=%s cannot exceed staleness_interval=%s", cfg.Interval, cfg.StalenessInterval) return nil, fmt.Errorf("staleness_interval=%s cannot be smaller than interval=%s", cfg.StalenessInterval, cfg.Interval)
} }
} }

View file

@ -511,8 +511,8 @@ foo:1m_by_abc_sum_samples{abc="456"} 8
`, ` `, `
foo 123 foo 123
bar{baz="qwe"} 4.34 bar{baz="qwe"} 4.34
`, `bar:1m_total{baz="qwe"} 4.34 `, `bar:1m_total{baz="qwe"} 0
foo:1m_total 123 foo:1m_total 0
`, "11") `, "11")
// total_prometheus output for non-repeated series // total_prometheus output for non-repeated series
@ -539,10 +539,10 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343 bar{baz="qwer"} 343
bar{baz="qwer"} 344 bar{baz="qwer"} 344
foo{baz="qwe"} 10 foo{baz="qwe"} 10
`, `bar:1m_total{baz="qwe"} 6.34 `, `bar:1m_total{baz="qwe"} 5.02
bar:1m_total{baz="qwer"} 344 bar:1m_total{baz="qwer"} 1
foo:1m_total 123 foo:1m_total 0
foo:1m_total{baz="qwe"} 10 foo:1m_total{baz="qwe"} 15
`, "11111111") `, "11111111")
// total_prometheus output for repeated series // total_prometheus output for repeated series
@ -578,8 +578,8 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343 bar{baz="qwer"} 343
bar{baz="qwer"} 344 bar{baz="qwer"} 344
foo{baz="qwe"} 10 foo{baz="qwe"} 10
`, `bar:1m_total 350.34 `, `bar:1m_total 6.02
foo:1m_total 133 foo:1m_total 15
`, "11111111") `, "11111111")
// total_prometheus output for repeated series with group by __name__ // total_prometheus output for repeated series with group by __name__
@ -607,8 +607,8 @@ foo:1m_total 15
`, ` `, `
foo 123 foo 123
bar{baz="qwe"} 4.34 bar{baz="qwe"} 4.34
`, `bar:1m_increase{baz="qwe"} 4.34 `, `bar:1m_increase{baz="qwe"} 0
foo:1m_increase 123 foo:1m_increase 0
`, "11") `, "11")
// increase_prometheus output for non-repeated series // increase_prometheus output for non-repeated series
@ -635,10 +635,10 @@ foo{baz="qwe"} -5
bar{baz="qwer"} 343 bar{baz="qwer"} 343
bar{baz="qwer"} 344 bar{baz="qwer"} 344
foo{baz="qwe"} 10 foo{baz="qwe"} 10
`, `bar:1m_increase{baz="qwe"} 6.34 `, `bar:1m_increase{baz="qwe"} 5.02
bar:1m_increase{baz="qwer"} 344 bar:1m_increase{baz="qwer"} 1
foo:1m_increase 123 foo:1m_increase 0
foo:1m_increase{baz="qwe"} 10 foo:1m_increase{baz="qwe"} 15
`, "11111111") `, "11111111")
// increase_prometheus output for repeated series // increase_prometheus output for repeated series
@ -659,6 +659,7 @@ bar:1m_increase{baz="qwer"} 1
foo:1m_increase 0 foo:1m_increase 0
foo:1m_increase{baz="qwe"} 15 foo:1m_increase{baz="qwe"} 15
`, "11111111") `, "11111111")
// multiple aggregate configs // multiple aggregate configs
f(` f(`
- interval: 1m - interval: 1m

View file

@ -13,10 +13,24 @@ import (
type totalAggrState struct { type totalAggrState struct {
m sync.Map m sync.Map
suffix string suffix string
// Whether to reset the output value on every flushState call.
resetTotalOnFlush bool resetTotalOnFlush bool
keepFirstSample bool
stalenessSecs uint64 // Whether to take into account the first sample in new time series when calculating the output value.
keepFirstSample bool
// Time series state is dropped if no new samples are received during stalenessSecs.
//
// Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set.
// see ignoreFirstSampleDeadline for more details.
stalenessSecs uint64
// The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set.
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
} }
type totalStateValue struct { type totalStateValue struct {
@ -34,20 +48,24 @@ type lastValueState struct {
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval) stalenessSecs := roundDurationToSecs(stalenessInterval)
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs
suffix := "total" suffix := "total"
if resetTotalOnFlush { if resetTotalOnFlush {
suffix = "increase" suffix = "increase"
} }
return &totalAggrState{ return &totalAggrState{
suffix: suffix, suffix: suffix,
resetTotalOnFlush: resetTotalOnFlush, resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample, keepFirstSample: keepFirstSample,
stalenessSecs: stalenessSecs, stalenessSecs: stalenessSecs,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
} }
} }
func (as *totalAggrState) pushSamples(samples []pushSample) { func (as *totalAggrState) pushSamples(samples []pushSample) {
deleteDeadline := fasttime.UnixTimestamp() + as.stalenessSecs currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
inputKey, outputKey := getInputOutputKey(s.key) inputKey, outputKey := getInputOutputKey(s.key)
@ -76,7 +94,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) {
inputKey = strings.Clone(inputKey) inputKey = strings.Clone(inputKey)
sv.lastValues[inputKey] = lv sv.lastValues[inputKey] = lv
} }
if ok || as.keepFirstSample { if ok || keepFirstSample {
if s.value >= lv.value { if s.value >= lv.value {
sv.total += s.value - lv.value sv.total += s.value - lv.value
} else { } else {