lib/storage: add metrics for calculating skipped rows outside the retention

The metrics are:

    - vm_too_big_timestamp_rows_total
    - vm_too_small_timestamp_rows_total
This commit is contained in:
Aliaksandr Valialkin 2019-07-26 14:10:25 +03:00
parent d1132bb188
commit f586e1f83c
2 changed files with 24 additions and 2 deletions

View file

@ -358,6 +358,13 @@ func registerStorageMetrics() {
return float64(idbm().SizeBytes) return float64(idbm().SizeBytes)
}) })
metrics.NewGauge(`vm_too_big_timestamp_rows_total`, func() float64 {
return float64(m().TooBigTimestampRows)
})
metrics.NewGauge(`vm_too_small_timestamp_rows_total`, func() float64 {
return float64(m().TooSmallTimestampRows)
})
metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 { metrics.NewGauge(`vm_rows{type="storage/big"}`, func() float64 {
return float64(tm().BigRowsCount) return float64(tm().BigRowsCount)
}) })

View file

@ -65,6 +65,9 @@ type Storage struct {
currHourMetricIDsUpdaterWG sync.WaitGroup currHourMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup retentionWatcherWG sync.WaitGroup
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
} }
// OpenStorage opens storage on the given path with the given number of retention months. // OpenStorage opens storage on the given path with the given number of retention months.
@ -271,6 +274,9 @@ func (s *Storage) idb() *indexDB {
// Metrics contains essential metrics for the Storage. // Metrics contains essential metrics for the Storage.
type Metrics struct { type Metrics struct {
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
TSIDCacheSize uint64 TSIDCacheSize uint64
TSIDCacheSizeBytes uint64 TSIDCacheSizeBytes uint64
TSIDCacheRequests uint64 TSIDCacheRequests uint64
@ -308,6 +314,9 @@ func (m *Metrics) Reset() {
// UpdateMetrics updates m with metrics from s. // UpdateMetrics updates m with metrics from s.
func (s *Storage) UpdateMetrics(m *Metrics) { func (s *Storage) UpdateMetrics(m *Metrics) {
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
var cs fastcache.Stats var cs fastcache.Stats
s.tsidCache.UpdateStats(&cs) s.tsidCache.UpdateStats(&cs)
m.TSIDCacheSize += cs.EntriesCount m.TSIDCacheSize += cs.EntriesCount
@ -760,8 +769,14 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
// doesn't know how to work with them. // doesn't know how to work with them.
continue continue
} }
if mr.Timestamp < minTimestamp || mr.Timestamp > maxTimestamp { if mr.Timestamp < minTimestamp {
// Skip rows with timestamps outside the retention. // Skip rows with too small timestamps outside the retention.
atomic.AddUint64(&s.tooSmallTimestampRows, 1)
continue
}
if mr.Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time.
atomic.AddUint64(&s.tooBigTimestampRows, 1)
continue continue
} }
r := &rows[rowsLen+j] r := &rows[rowsLen+j]