diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 0c2d080a5..e3860c183 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -375,7 +375,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, dedupInterva return nil, fmt.Errorf("cannot parse `staleness_interval: %q`: %w", cfg.StalenessInterval, err) } if stalenessInterval < interval { - return nil, fmt.Errorf("interval=%s cannot exceed staleness_interval=%s", cfg.Interval, cfg.StalenessInterval) + return nil, fmt.Errorf("staleness_interval=%s cannot be smaller than interval=%s", cfg.StalenessInterval, cfg.Interval) } } diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 0bac5bee0..5e10ade64 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -511,8 +511,8 @@ foo:1m_by_abc_sum_samples{abc="456"} 8 `, ` foo 123 bar{baz="qwe"} 4.34 -`, `bar:1m_total{baz="qwe"} 4.34 -foo:1m_total 123 +`, `bar:1m_total{baz="qwe"} 0 +foo:1m_total 0 `, "11") // total_prometheus output for non-repeated series @@ -539,10 +539,10 @@ foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 -`, `bar:1m_total{baz="qwe"} 6.34 -bar:1m_total{baz="qwer"} 344 -foo:1m_total 123 -foo:1m_total{baz="qwe"} 10 +`, `bar:1m_total{baz="qwe"} 5.02 +bar:1m_total{baz="qwer"} 1 +foo:1m_total 0 +foo:1m_total{baz="qwe"} 15 `, "11111111") // total_prometheus output for repeated series @@ -578,8 +578,8 @@ foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 -`, `bar:1m_total 350.34 -foo:1m_total 133 +`, `bar:1m_total 6.02 +foo:1m_total 15 `, "11111111") // total_prometheus output for repeated series with group by __name__ @@ -607,8 +607,8 @@ foo:1m_total 15 `, ` foo 123 bar{baz="qwe"} 4.34 -`, `bar:1m_increase{baz="qwe"} 4.34 -foo:1m_increase 123 +`, `bar:1m_increase{baz="qwe"} 0 +foo:1m_increase 0 `, "11") // increase_prometheus output for non-repeated series @@ -635,10 +635,10 @@ foo{baz="qwe"} -5 bar{baz="qwer"} 343 bar{baz="qwer"} 344 foo{baz="qwe"} 10 -`, `bar:1m_increase{baz="qwe"} 6.34 -bar:1m_increase{baz="qwer"} 344 -foo:1m_increase 123 -foo:1m_increase{baz="qwe"} 10 +`, `bar:1m_increase{baz="qwe"} 5.02 +bar:1m_increase{baz="qwer"} 1 +foo:1m_increase 0 +foo:1m_increase{baz="qwe"} 15 `, "11111111") // increase_prometheus output for repeated series @@ -659,6 +659,7 @@ bar:1m_increase{baz="qwer"} 1 foo:1m_increase 0 foo:1m_increase{baz="qwe"} 15 `, "11111111") + // multiple aggregate configs f(` - interval: 1m diff --git a/lib/streamaggr/total.go b/lib/streamaggr/total.go index bfce7e66f..516694ea6 100644 --- a/lib/streamaggr/total.go +++ b/lib/streamaggr/total.go @@ -13,10 +13,24 @@ import ( type totalAggrState struct { m sync.Map - suffix string + suffix string + + // Whether to reset the output value on every flushState call. resetTotalOnFlush bool - keepFirstSample bool - stalenessSecs uint64 + + // Whether to take into account the first sample in new time series when calculating the output value. + keepFirstSample bool + + // Time series state is dropped if no new samples are received during stalenessSecs. + // + // Aslo, the first sample per each new series is ignored during stalenessSecs even if keepFirstSample is set. + // see ignoreFirstSampleDeadline for more details. + stalenessSecs uint64 + + // The first sample per each new series is ignored until this unix timestamp deadline in seconds even if keepFirstSample is set. + // This allows avoiding an initial spike of the output values at startup when new time series + // cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline. + ignoreFirstSampleDeadline uint64 } type totalStateValue struct { @@ -34,20 +48,24 @@ type lastValueState struct { func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState { stalenessSecs := roundDurationToSecs(stalenessInterval) + ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs suffix := "total" if resetTotalOnFlush { suffix = "increase" } return &totalAggrState{ - suffix: suffix, - resetTotalOnFlush: resetTotalOnFlush, - keepFirstSample: keepFirstSample, - stalenessSecs: stalenessSecs, + suffix: suffix, + resetTotalOnFlush: resetTotalOnFlush, + keepFirstSample: keepFirstSample, + stalenessSecs: stalenessSecs, + ignoreFirstSampleDeadline: ignoreFirstSampleDeadline, } } func (as *totalAggrState) pushSamples(samples []pushSample) { - deleteDeadline := fasttime.UnixTimestamp() + as.stalenessSecs + currentTime := fasttime.UnixTimestamp() + deleteDeadline := currentTime + as.stalenessSecs + keepFirstSample := as.keepFirstSample && currentTime > as.ignoreFirstSampleDeadline for i := range samples { s := &samples[i] inputKey, outputKey := getInputOutputKey(s.key) @@ -76,7 +94,7 @@ func (as *totalAggrState) pushSamples(samples []pushSample) { inputKey = strings.Clone(inputKey) sv.lastValues[inputKey] = lv } - if ok || as.keepFirstSample { + if ok || keepFirstSample { if s.value >= lv.value { sv.total += s.value - lv.value } else {