lib/streamaggr: ignore out of order samples for last output

This is a follow-up for 6a465f6e29

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5931
This commit is contained in:
Aliaksandr Valialkin 2024-03-18 01:02:28 +02:00
parent f95e9f13ae
commit 4553521f9a
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 18 additions and 12 deletions

View file

@ -19,10 +19,10 @@ The aggregation is applied to all the metrics received via any [supported data i
and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter)
after applying all the configured [relabeling stages](https://docs.victoriametrics.com/vmagent.html#relabeling).
Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
It expects that the ingested samples have timestamps close to the current time.
By default stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples).
Stream aggregation is configured via the following command-line flags:
Stream aggregation can be configured via the following command-line flags:
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
This flag can be specified individually per each `-remoteWrite.url`.

View file

@ -12,9 +12,10 @@ type lastAggrState struct {
}
type lastStateValue struct {
mu sync.Mutex
last float64
deleted bool
mu sync.Mutex
last float64
timestamp int64
deleted bool
}
func newLastAggrState() *lastAggrState {
@ -31,7 +32,8 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
if !ok {
// The entry is missing in the map. Try creating it.
v = &lastStateValue{
last: s.value,
last: s.value,
timestamp: s.timestamp,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
@ -45,7 +47,10 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.last = s.value
if s.timestamp >= sv.timestamp {
sv.last = s.value
sv.timestamp = s.timestamp
}
}
sv.mu.Unlock()
if deleted {

View file

@ -254,13 +254,14 @@ func TestAggregatorsSuccess(t *testing.T) {
outputs: [count_samples, sum_samples, count_series, last]
`, `
foo{abc="123"} 4
bar 5
bar 5 100
bar 34 10
foo{abc="123"} 8.5
foo{abc="456",de="fg"} 8
`, `bar:1m_count_samples 1
`, `bar:1m_count_samples 2
bar:1m_count_series 1
bar:1m_last 5
bar:1m_sum_samples 5
bar:1m_sum_samples 39
foo:1m_count_samples{abc="123"} 2
foo:1m_count_samples{abc="456",de="fg"} 1
foo:1m_count_series{abc="123"} 1
@ -269,7 +270,7 @@ foo:1m_last{abc="123"} 8.5
foo:1m_last{abc="456",de="fg"} 8
foo:1m_sum_samples{abc="123"} 12.5
foo:1m_sum_samples{abc="456",de="fg"} 8
`, "1111")
`, "11111")
// Special case: __name__ in `by` list - this is the same as empty `by` list
f(`