Revert "lib/streamaggr: discard samples with timestamps outside of aggregation interval (#4199)"

This reverts commit 9e99f2f5b3.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4068

Reason for revert: this breaks valid use cases:

- If timestamps aren't specified in the incoming samples on purpose. For example, if stream aggregation is used
  as StatsD replacement. StatsD protocol has no timestamp concept for incoming samples.
  See https://github.com/b/statsd_spec

- If all the samples must be aggregated, even if they contain stale timestamps.
  for example, if the stream aggregation produces some counter of some events,
  it may be better to count all the events even if they were delayed before
  being ingested into VictoriaMetrics.

Is is also unclear how to determine whether the sample becomes stale.
For example, if the aggregation interval equals to 1h, and the previous
aggregation cycle just finished 10 minutes ago, what to do with the newly
incoming sample with the timestamp 30 minutes older than the current time?
The answer highly depends on the context, so it is unsafe to uncoditionally
use a single logic for dropping the old samples here.
This commit is contained in:
Aliaksandr Valialkin 2023-05-08 16:52:27 -07:00
parent 8f1372bd43
commit 7acc54025e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 0 additions and 108 deletions

View file

@ -13,13 +13,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/metrics"
"gopkg.in/yaml.v2"
)
@ -212,9 +210,6 @@ type aggregator struct {
without []string
aggregateOnlyByTime bool
// interval is the interval for aggregating input samples
interval time.Duration
// dedupAggr is set to non-nil if input samples must be de-duplicated according
// to the dedupInterval passed to newAggregator().
dedupAggr *lastAggrState
@ -233,10 +228,6 @@ type aggregator struct {
wg sync.WaitGroup
stopCh chan struct{}
// tooOldSamplesDroppedTotal is the total number of dropped samples due to being too old.
// stored in the aggregator in order to avoid creating a metric if aggregation is not used.
tooOldSamplesDroppedTotal *metrics.Counter
}
type aggrState interface {
@ -371,7 +362,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
interval: interval,
dedupAggr: dedupAggr,
aggrStates: aggrStates,
@ -380,8 +370,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
suffix: suffix,
stopCh: make(chan struct{}),
tooOldSamplesDroppedTotal: metrics.GetOrCreateCounter(`vmagent_streamaggr_samples_dropped_total{reason="too_old"}`),
}
if dedupAggr != nil {
@ -519,7 +507,6 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
minAllowedTimestamp := int64(float64(fasttime.UnixTimestamp())-a.interval.Seconds()) * 1000
for _, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
@ -548,11 +535,6 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
}
for _, sample := range ts.Samples {
if sample.Timestamp < minAllowedTimestamp {
// Skip too old samples.
trackDroppedSample(&ts, sample.Timestamp, minAllowedTimestamp, a.tooOldSamplesDroppedTotal)
continue
}
a.pushSample(inputKey, outputKey, sample.Value)
}
}
@ -561,21 +543,6 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
promutils.PutLabels(labels)
}
func trackDroppedSample(ts *prompbmarshal.TimeSeries, actualTs, minTs int64, m *metrics.Counter) {
select {
case <-droppedSamplesLogTicker.C:
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
// because LabelsToString() will be called with each trackDroppedSample call.
lbs := promrelabel.LabelsToString(ts.Labels)
logger.Warnf("skipping a sample for metric %s at streaming aggregation: timestamp too old: %d; minimal accepted timestamp: %d", lbs, actualTs, minTs)
default:
}
m.Inc()
}
var droppedSamplesLogTicker = time.NewTicker(5 * time.Second)
var bbPool bytesutil.ByteBufferPool
func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {

View file

@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
@ -677,72 +676,6 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
`)
}
func TestDiscardsSamplesWithOldTimestamps(t *testing.T) {
f := func(interval string, inputTs int64, mustMatch bool) {
t.Helper()
config := fmt.Sprintf(`
- interval: %s
outputs: ["avg"]
`, interval)
input := `
cpu_usage{cpu="1"} 1
`
expected := fmt.Sprintf(`cpu_usage:%s_avg{cpu="1"} 1
`, interval)
if !mustMatch {
expected = ""
}
// Initialize Aggregators
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
for _, ts := range tss {
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
Labels: labelsCopy,
Samples: samplesCopy,
})
}
tssOutputLock.Unlock()
}
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetricsSetTS(input, inputTs)
a.Push(tssInput)
a.MustStop()
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
if outputMetrics != expected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, expected)
}
}
currentTs := func() int64 {
return int64(fasttime.UnixTimestamp() * 1000)
}
f("1m", currentTs(), true)
f("1h", currentTs()-120*1000, true)
f("24h", currentTs()-60*60*1000, true)
f("1m", currentTs()-120*1000, false)
f("1h", currentTs()-2*60*60*1000*1000, false)
f("24h", currentTs()-25*60*60*1000, false)
}
func TestAggregatorsWithDedupInterval(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper()
@ -829,10 +762,6 @@ func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
}
func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
return mustParsePromMetricsSetTS(s, int64(fasttime.UnixTimestamp()*1000))
}
func mustParsePromMetricsSetTS(s string, timestamp int64) []prompbmarshal.TimeSeries {
var rows prometheus.Rows
errLogger := func(s string) {
panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
@ -841,10 +770,6 @@ func mustParsePromMetricsSetTS(s string, timestamp int64) []prompbmarshal.TimeSe
var tss []prompbmarshal.TimeSeries
samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
for _, row := range rows.Rows {
if row.Timestamp == 0 && timestamp != 0 {
row.Timestamp = timestamp
}
labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",