From 4ca66344ee55454ab35cb1875ae20f2a2aebd3f3 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Thu, 11 Jul 2019 17:04:56 +0300
Subject: [PATCH] lib/storage: do not pollute inverted index with data for
 samples outside the retention period

---
 lib/storage/storage.go |  5 +++++
 lib/storage/table.go   | 28 +++++++++++++++++-----------
 2 files changed, 22 insertions(+), 11 deletions(-)

diff --git a/lib/storage/storage.go b/lib/storage/storage.go
index 1b294c28c2..4048835029 100644
--- a/lib/storage/storage.go
+++ b/lib/storage/storage.go
@@ -756,6 +756,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
 	}
 	rows = rows[:rowsLen+len(mrs)]
 	j := 0
+	minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
 	for i := range mrs {
 		mr := &mrs[i]
 		if math.IsNaN(mr.Value) {
@@ -763,6 +764,10 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
 			// doesn't know how to work with them.
 			continue
 		}
+		if mr.Timestamp < minTimestamp || mr.Timestamp > maxTimestamp {
+			// Skip rows with timestamps outside the retention.
+			continue
+		}
 		r := &rows[rowsLen+j]
 		j++
 		r.Timestamp = mr.Timestamp
diff --git a/lib/storage/table.go b/lib/storage/table.go
index 1cde1a9043..4acc27b0c9 100644
--- a/lib/storage/table.go
+++ b/lib/storage/table.go
@@ -310,22 +310,14 @@ func (tb *table) AddRows(rows []rawRow) error {
 	// The slowest path - there are rows that don't fit any existing partition.
 	// Create new partitions for these rows.
 	// Do this under tb.ptwsLock.
-	now := timestampFromTime(time.Now())
-	minTimestamp := now - tb.retentionMilliseconds
-	maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
+	minTimestamp, maxTimestamp := tb.getMinMaxTimestamps()
 	tb.ptwsLock.Lock()
 	var errors []error
 	for i := range missingRows {
 		r := &missingRows[i]
 
-		if r.Timestamp < minTimestamp {
-			// Silently skip row with too small timestamp, since it should be deleted anyway.
-			continue
-		}
-		if r.Timestamp > maxTimestamp {
-			err := fmt.Errorf("cannot add row %+v with too big timestamp to table %q; the timestamp cannot be bigger than %d (+2 days from now)",
-				r, tb.path, maxTimestamp)
-			errors = append(errors, err)
+		if r.Timestamp < minTimestamp || r.Timestamp > maxTimestamp {
+			// Silently skip row outside retention, since it should be deleted anyway.
 			continue
 		}
 
@@ -359,6 +351,20 @@ func (tb *table) AddRows(rows []rawRow) error {
 	return nil
 }
 
+func (tb *table) getMinMaxTimestamps() (int64, int64) {
+	now := timestampFromTime(time.Now())
+	minTimestamp := now - tb.retentionMilliseconds
+	maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
+	if minTimestamp < 0 {
+		// Negative timestamps aren't supported by the storage.
+		minTimestamp = 0
+	}
+	if maxTimestamp < 0 {
+		maxTimestamp = (1 << 63) - 1
+	}
+	return minTimestamp, maxTimestamp
+}
+
 func (tb *table) startRetentionWatcher() {
 	tb.retentionWatcherWG.Add(1)
 	go func() {