diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 2463a6a13..f0063d3a0 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -752,6 +752,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra } rows = rows[:rowsLen+len(mrs)] j := 0 + minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps() for i := range mrs { mr := &mrs[i] if math.IsNaN(mr.Value) { @@ -759,6 +760,10 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // doesn't know how to work with them. continue } + if mr.Timestamp < minTimestamp || mr.Timestamp > maxTimestamp { + // Skip rows with timestamps outside the retention. + continue + } r := &rows[rowsLen+j] j++ r.Timestamp = mr.Timestamp diff --git a/lib/storage/table.go b/lib/storage/table.go index 1cde1a904..4acc27b0c 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -310,22 +310,14 @@ func (tb *table) AddRows(rows []rawRow) error { // The slowest path - there are rows that don't fit any existing partition. // Create new partitions for these rows. // Do this under tb.ptwsLock. - now := timestampFromTime(time.Now()) - minTimestamp := now - tb.retentionMilliseconds - maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) + minTimestamp, maxTimestamp := tb.getMinMaxTimestamps() tb.ptwsLock.Lock() var errors []error for i := range missingRows { r := &missingRows[i] - if r.Timestamp < minTimestamp { - // Silently skip row with too small timestamp, since it should be deleted anyway. - continue - } - if r.Timestamp > maxTimestamp { - err := fmt.Errorf("cannot add row %+v with too big timestamp to table %q; the timestamp cannot be bigger than %d (+2 days from now)", - r, tb.path, maxTimestamp) - errors = append(errors, err) + if r.Timestamp < minTimestamp || r.Timestamp > maxTimestamp { + // Silently skip row outside retention, since it should be deleted anyway. continue } @@ -359,6 +351,20 @@ func (tb *table) AddRows(rows []rawRow) error { return nil } +func (tb *table) getMinMaxTimestamps() (int64, int64) { + now := timestampFromTime(time.Now()) + minTimestamp := now - tb.retentionMilliseconds + maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) + if minTimestamp < 0 { + // Negative timestamps aren't supported by the storage. + minTimestamp = 0 + } + if maxTimestamp < 0 { + maxTimestamp = (1 << 63) - 1 + } + return minTimestamp, maxTimestamp +} + func (tb *table) startRetentionWatcher() { tb.retentionWatcherWG.Add(1) go func() {