mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/storage: do not pollute inverted index with data for samples outside the retention period
This commit is contained in:
parent
cd4833d3d0
commit
2bd1a01d1a
2 changed files with 22 additions and 11 deletions
|
@ -752,6 +752,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||||
}
|
}
|
||||||
rows = rows[:rowsLen+len(mrs)]
|
rows = rows[:rowsLen+len(mrs)]
|
||||||
j := 0
|
j := 0
|
||||||
|
minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()
|
||||||
for i := range mrs {
|
for i := range mrs {
|
||||||
mr := &mrs[i]
|
mr := &mrs[i]
|
||||||
if math.IsNaN(mr.Value) {
|
if math.IsNaN(mr.Value) {
|
||||||
|
@ -759,6 +760,10 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
|
||||||
// doesn't know how to work with them.
|
// doesn't know how to work with them.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if mr.Timestamp < minTimestamp || mr.Timestamp > maxTimestamp {
|
||||||
|
// Skip rows with timestamps outside the retention.
|
||||||
|
continue
|
||||||
|
}
|
||||||
r := &rows[rowsLen+j]
|
r := &rows[rowsLen+j]
|
||||||
j++
|
j++
|
||||||
r.Timestamp = mr.Timestamp
|
r.Timestamp = mr.Timestamp
|
||||||
|
|
|
@ -310,22 +310,14 @@ func (tb *table) AddRows(rows []rawRow) error {
|
||||||
// The slowest path - there are rows that don't fit any existing partition.
|
// The slowest path - there are rows that don't fit any existing partition.
|
||||||
// Create new partitions for these rows.
|
// Create new partitions for these rows.
|
||||||
// Do this under tb.ptwsLock.
|
// Do this under tb.ptwsLock.
|
||||||
now := timestampFromTime(time.Now())
|
minTimestamp, maxTimestamp := tb.getMinMaxTimestamps()
|
||||||
minTimestamp := now - tb.retentionMilliseconds
|
|
||||||
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
|
|
||||||
tb.ptwsLock.Lock()
|
tb.ptwsLock.Lock()
|
||||||
var errors []error
|
var errors []error
|
||||||
for i := range missingRows {
|
for i := range missingRows {
|
||||||
r := &missingRows[i]
|
r := &missingRows[i]
|
||||||
|
|
||||||
if r.Timestamp < minTimestamp {
|
if r.Timestamp < minTimestamp || r.Timestamp > maxTimestamp {
|
||||||
// Silently skip row with too small timestamp, since it should be deleted anyway.
|
// Silently skip row outside retention, since it should be deleted anyway.
|
||||||
continue
|
|
||||||
}
|
|
||||||
if r.Timestamp > maxTimestamp {
|
|
||||||
err := fmt.Errorf("cannot add row %+v with too big timestamp to table %q; the timestamp cannot be bigger than %d (+2 days from now)",
|
|
||||||
r, tb.path, maxTimestamp)
|
|
||||||
errors = append(errors, err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -359,6 +351,20 @@ func (tb *table) AddRows(rows []rawRow) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tb *table) getMinMaxTimestamps() (int64, int64) {
|
||||||
|
now := timestampFromTime(time.Now())
|
||||||
|
minTimestamp := now - tb.retentionMilliseconds
|
||||||
|
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
|
||||||
|
if minTimestamp < 0 {
|
||||||
|
// Negative timestamps aren't supported by the storage.
|
||||||
|
minTimestamp = 0
|
||||||
|
}
|
||||||
|
if maxTimestamp < 0 {
|
||||||
|
maxTimestamp = (1 << 63) - 1
|
||||||
|
}
|
||||||
|
return minTimestamp, maxTimestamp
|
||||||
|
}
|
||||||
|
|
||||||
func (tb *table) startRetentionWatcher() {
|
func (tb *table) startRetentionWatcher() {
|
||||||
tb.retentionWatcherWG.Add(1)
|
tb.retentionWatcherWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
Loading…
Reference in a new issue