Adds read-only mode for vmstorage node (#1680)

* adds read-only mode for vmstorage
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/269

* changes order a bit

* moves isFreeDiskLimitReached var to storage struct
renames functions to be consistent
change protoparser api - with optional storage limit check for given openned storage

* renames freeSpaceLimit to ReadOnly
This commit is contained in:
Nikolay 2021-10-08 12:52:56 +03:00 committed by Aliaksandr Valialkin
parent 2748255c8b
commit 4290b46e8c
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 67 additions and 0 deletions

View file

@ -45,6 +45,9 @@ var (
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries") "Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries")
maxDailySeries = flag.Int("storage.maxDailySeries", 0, "The maximum number of unique series can be added to the storage during the last 24 hours. "+ maxDailySeries = flag.Int("storage.maxDailySeries", 0, "The maximum number of unique series can be added to the storage during the last 24 hours. "+
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries") "Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries")
minFreeDiskSpaceSizeBytes = flagutil.NewBytes("storage.minFreeDiskSpaceSize", 0, "Defines minimum free disk space size for storageDataPath. "+
"If limit is reached, storage becomes read-only and tells vminsert to reroute data for other storage nodes.")
) )
// CheckTimeRange returns true if the given tr is denied for querying. // CheckTimeRange returns true if the given tr is denied for querying.
@ -81,6 +84,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetFinalMergeDelay(*finalMergeDelay) storage.SetFinalMergeDelay(*finalMergeDelay)
storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceSizeBytes.N)
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod) logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
startTime := time.Now() startTime := time.Now()
@ -389,6 +393,17 @@ func registerStorageMetrics() {
return float64(fs.MustGetFreeSpace(*DataPath)) return float64(fs.MustGetFreeSpace(*DataPath))
}) })
metrics.NewGauge(fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *storageDataPath), func() float64 {
return float64(minFreeDiskSpaceSizeBytes.N)
})
metrics.NewGauge(`vm_storage_read_only`, func() float64 {
if strg.IsReadOnly() {
return 1
}
return 0
})
metrics.NewGauge(`vm_active_merges{type="storage/big"}`, func() float64 { metrics.NewGauge(`vm_active_merges{type="storage/big"}`, func() float64 {
return float64(tm().ActiveBigMerges) return float64(tm().ActiveBigMerges)
}) })

View file

@ -57,6 +57,8 @@ type Storage struct {
hourlySeriesLimitRowsDropped uint64 hourlySeriesLimitRowsDropped uint64
dailySeriesLimitRowsDropped uint64 dailySeriesLimitRowsDropped uint64
isReadOnly uint32
path string path string
cachePath string cachePath string
retentionMsecs int64 retentionMsecs int64
@ -118,6 +120,7 @@ type Storage struct {
currHourMetricIDsUpdaterWG sync.WaitGroup currHourMetricIDsUpdaterWG sync.WaitGroup
nextDayMetricIDsUpdaterWG sync.WaitGroup nextDayMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup retentionWatcherWG sync.WaitGroup
freeSpaceWatcherWG sync.WaitGroup
// The snapshotLock prevents from concurrent creation of snapshots, // The snapshotLock prevents from concurrent creation of snapshots,
// since this may result in snapshots without recently added data, // since this may result in snapshots without recently added data,
@ -258,6 +261,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
s.startCurrHourMetricIDsUpdater() s.startCurrHourMetricIDsUpdater()
s.startNextDayMetricIDsUpdater() s.startNextDayMetricIDsUpdater()
s.startRetentionWatcher() s.startRetentionWatcher()
s.startFreeDiskSpaceWatcher()
return s, nil return s, nil
} }
@ -558,6 +562,53 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
s.tb.UpdateMetrics(&m.TableMetrics) s.tb.UpdateMetrics(&m.TableMetrics)
} }
var (
storageFreeSpaceLimitBytes uint64
)
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
//
// The function must be called before opening or creating any storage.
func SetFreeDiskSpaceLimit(bytes int) {
storageFreeSpaceLimitBytes = uint64(bytes)
}
// IsReadOnly returns information is storage in read only mode
func (s *Storage) IsReadOnly() bool {
return atomic.LoadUint32(&s.isReadOnly) == 1
}
func (s *Storage) startFreeDiskSpaceWatcher() {
f := func() {
freeSpaceBytes := fs.MustGetFreeSpace(s.path)
// not enough free space
if freeSpaceBytes < storageFreeSpaceLimitBytes {
atomic.StoreUint32(&s.isReadOnly, 1)
return
}
atomic.StoreUint32(&s.isReadOnly, 0)
}
f()
s.freeSpaceWatcherWG.Add(1)
go func() {
defer s.freeSpaceWatcherWG.Done()
// zero value disables limit.
if storageFreeSpaceLimitBytes == 0 {
return
}
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-s.stop:
return
case <-t.C:
f()
}
}
}()
}
func (s *Storage) startRetentionWatcher() { func (s *Storage) startRetentionWatcher() {
s.retentionWatcherWG.Add(1) s.retentionWatcherWG.Add(1)
go func() { go func() {
@ -672,6 +723,7 @@ func (s *Storage) resetAndSaveTSIDCache() {
func (s *Storage) MustClose() { func (s *Storage) MustClose() {
close(s.stop) close(s.stop)
s.freeSpaceWatcherWG.Wait()
s.retentionWatcherWG.Wait() s.retentionWatcherWG.Wait()
s.currHourMetricIDsUpdaterWG.Wait() s.currHourMetricIDsUpdaterWG.Wait()
s.nextDayMetricIDsUpdaterWG.Wait() s.nextDayMetricIDsUpdaterWG.Wait()