diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 703828cd0..f6b45d764 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -358,6 +358,13 @@ func registerStorageMetrics() { return float64(idbm().SizeBytes) }) + metrics.NewGauge(`vm_too_big_timestamp_rows_total`, func() float64 { + return float64(m().TooBigTimestampRows) + }) + metrics.NewGauge(`vm_too_small_timestamp_rows_total`, func() float64 { + return float64(m().TooSmallTimestampRows) + }) + metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 { return float64(tm().BigRowsCount) }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index f0063d3a0..444503d1f 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -65,6 +65,9 @@ type Storage struct { currHourMetricIDsUpdaterWG sync.WaitGroup retentionWatcherWG sync.WaitGroup + + tooSmallTimestampRows uint64 + tooBigTimestampRows uint64 } // OpenStorage opens storage on the given path with the given number of retention months. @@ -271,6 +274,9 @@ func (s *Storage) idb() *indexDB { // Metrics contains essential metrics for the Storage. type Metrics struct { + TooSmallTimestampRows uint64 + TooBigTimestampRows uint64 + TSIDCacheSize uint64 TSIDCacheSizeBytes uint64 TSIDCacheRequests uint64 @@ -308,6 +314,9 @@ func (m *Metrics) Reset() { // UpdateMetrics updates m with metrics from s. func (s *Storage) UpdateMetrics(m *Metrics) { + m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows) + m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows) + var cs fastcache.Stats s.tsidCache.UpdateStats(&cs) m.TSIDCacheSize += cs.EntriesCount @@ -760,8 +769,14 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // doesn't know how to work with them. continue } - if mr.Timestamp < minTimestamp || mr.Timestamp > maxTimestamp { - // Skip rows with timestamps outside the retention. + if mr.Timestamp < minTimestamp { + // Skip rows with too small timestamps outside the retention. + atomic.AddUint64(&s.tooSmallTimestampRows, 1) + continue + } + if mr.Timestamp > maxTimestamp { + // Skip rows with too big timestamps significantly exceeding the current time. + atomic.AddUint64(&s.tooBigTimestampRows, 1) continue } r := &rows[rowsLen+j]