diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go
index 03a0e02108..0a693f1ae2 100644
--- a/lib/streamaggr/streamaggr.go
+++ b/lib/streamaggr/streamaggr.go
@@ -13,11 +13,13 @@ import (
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
+	"github.com/VictoriaMetrics/metrics"
 	"gopkg.in/yaml.v2"
 )
 
@@ -210,6 +212,9 @@ type aggregator struct {
 	without             []string
 	aggregateOnlyByTime bool
 
+	// interval is the interval for aggregating input samples
+	interval time.Duration
+
 	// dedupAggr is set to non-nil if input samples must be de-duplicated according
 	// to the dedupInterval passed to newAggregator().
 	dedupAggr *lastAggrState
@@ -228,6 +233,10 @@ type aggregator struct {
 
 	wg     sync.WaitGroup
 	stopCh chan struct{}
+
+	// tooOldSamplesDroppedTotal is the total number of dropped samples due to being too old.
+	// stored in the aggregator in order to avoid creating a metric if aggregation is not used.
+	tooOldSamplesDroppedTotal *metrics.Counter
 }
 
 type aggrState interface {
@@ -362,6 +371,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
 		by:                  by,
 		without:             without,
 		aggregateOnlyByTime: aggregateOnlyByTime,
+		interval:            interval,
 
 		dedupAggr:  dedupAggr,
 		aggrStates: aggrStates,
@@ -370,6 +380,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
 		suffix: suffix,
 
 		stopCh: make(chan struct{}),
+
+		tooOldSamplesDroppedTotal: metrics.GetOrCreateCounter(`vmagent_streamaggr_samples_dropped_total{reason="too_old"}`),
 	}
 
 	if dedupAggr != nil {
@@ -507,6 +519,7 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
 	labels := promutils.GetLabels()
 	tmpLabels := promutils.GetLabels()
 	bb := bbPool.Get()
+	minAllowedTimestamp := int64(float64(fasttime.UnixTimestamp())-a.interval.Seconds()) * 1000
 	for _, ts := range tss {
 		if !a.match.Match(ts.Labels) {
 			continue
@@ -535,6 +548,11 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
 		}
 
 		for _, sample := range ts.Samples {
+			if sample.Timestamp < minAllowedTimestamp {
+				// Skip too old samples.
+				trackDroppedSample(&ts, sample.Timestamp, minAllowedTimestamp, a.tooOldSamplesDroppedTotal)
+				continue
+			}
 			a.pushSample(inputKey, outputKey, sample.Value)
 		}
 	}
@@ -543,6 +561,21 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
 	promutils.PutLabels(labels)
 }
 
+func trackDroppedSample(ts *prompbmarshal.TimeSeries, actualTs, minTs int64, m *metrics.Counter) {
+	select {
+	case <-droppedSamplesLogTicker.C:
+		// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
+		// because LabelsToString() will be called with each trackDroppedSample call.
+		lbs := promrelabel.LabelsToString(ts.Labels)
+		logger.Warnf("skipping a sample for metric %s at streaming aggregation: timestamp too old: %d; minimal accepted timestamp: %d", lbs, actualTs, minTs)
+	default:
+	}
+
+	m.Inc()
+}
+
+var droppedSamplesLogTicker = time.NewTicker(5 * time.Second)
+
 var bbPool bytesutil.ByteBufferPool
 
 func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {
diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go
index 57c74aa8e4..95dce6ff45 100644
--- a/lib/streamaggr/streamaggr_test.go
+++ b/lib/streamaggr/streamaggr_test.go
@@ -8,6 +8,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
@@ -676,6 +677,72 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
 `)
 }
 
+func TestDiscardsSamplesWithOldTimestamps(t *testing.T) {
+	f := func(interval string, inputTs int64, mustMatch bool) {
+		t.Helper()
+
+		config := fmt.Sprintf(`
+- interval: %s
+  outputs: ["avg"]
+`, interval)
+		input := `
+cpu_usage{cpu="1"} 1
+`
+		expected := fmt.Sprintf(`cpu_usage:%s_avg{cpu="1"} 1
+`, interval)
+		if !mustMatch {
+			expected = ""
+		}
+
+		// Initialize Aggregators
+		var tssOutput []prompbmarshal.TimeSeries
+		var tssOutputLock sync.Mutex
+		pushFunc := func(tss []prompbmarshal.TimeSeries) {
+			tssOutputLock.Lock()
+			for _, ts := range tss {
+				labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
+				samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
+				tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
+					Labels:  labelsCopy,
+					Samples: samplesCopy,
+				})
+			}
+			tssOutputLock.Unlock()
+		}
+		a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
+		if err != nil {
+			t.Fatalf("cannot initialize aggregators: %s", err)
+		}
+
+		// Push the inputMetrics to Aggregators
+		tssInput := mustParsePromMetricsSetTS(input, inputTs)
+		a.Push(tssInput)
+		a.MustStop()
+
+		// Verify the tssOutput contains the expected metrics
+		tsStrings := make([]string, len(tssOutput))
+		for i, ts := range tssOutput {
+			tsStrings[i] = timeSeriesToString(ts)
+		}
+		sort.Strings(tsStrings)
+		outputMetrics := strings.Join(tsStrings, "")
+		if outputMetrics != expected {
+			t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, expected)
+		}
+	}
+	currentTs := func() int64 {
+		return int64(fasttime.UnixTimestamp() * 1000)
+	}
+
+	f("1m", currentTs(), true)
+	f("1h", currentTs()-120*1000, true)
+	f("24h", currentTs()-60*60*1000, true)
+
+	f("1m", currentTs()-120*1000, false)
+	f("1h", currentTs()-2*60*60*1000*1000, false)
+	f("24h", currentTs()-25*60*60*1000, false)
+}
+
 func TestAggregatorsWithDedupInterval(t *testing.T) {
 	f := func(config, inputMetrics, outputMetricsExpected string) {
 		t.Helper()
@@ -762,6 +829,10 @@ func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
 }
 
 func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
+	return mustParsePromMetricsSetTS(s, int64(fasttime.UnixTimestamp()*1000))
+}
+
+func mustParsePromMetricsSetTS(s string, timestamp int64) []prompbmarshal.TimeSeries {
 	var rows prometheus.Rows
 	errLogger := func(s string) {
 		panic(fmt.Errorf("unexpected error when parsing Prometheus metrics: %s", s))
@@ -770,6 +841,10 @@ func mustParsePromMetrics(s string) []prompbmarshal.TimeSeries {
 	var tss []prompbmarshal.TimeSeries
 	samples := make([]prompbmarshal.Sample, 0, len(rows.Rows))
 	for _, row := range rows.Rows {
+		if row.Timestamp == 0 && timestamp != 0 {
+			row.Timestamp = timestamp
+		}
+
 		labels := make([]prompbmarshal.Label, 0, len(row.Tags)+1)
 		labels = append(labels, prompbmarshal.Label{
 			Name:  "__name__",