From 4290b46e8c10e7041c328d4770a2e4e77090cd91 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 8 Oct 2021 12:52:56 +0300 Subject: [PATCH] Adds read-only mode for vmstorage node (#1680) * adds read-only mode for vmstorage https://github.com/VictoriaMetrics/VictoriaMetrics/issues/269 * changes order a bit * moves isFreeDiskLimitReached var to storage struct renames functions to be consistent change protoparser api - with optional storage limit check for given openned storage * renames freeSpaceLimit to ReadOnly --- app/vmstorage/main.go | 15 ++++++++++++ lib/storage/storage.go | 52 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index b80a2d2ea..176370d65 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -45,6 +45,9 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series cardinality. See also -storage.maxDailySeries") maxDailySeries = flag.Int("storage.maxDailySeries", 0, "The maximum number of unique series can be added to the storage during the last 24 hours. "+ "Excess series are logged and dropped. This can be useful for limiting series churn rate. See also -storage.maxHourlySeries") + + minFreeDiskSpaceSizeBytes = flagutil.NewBytes("storage.minFreeDiskSpaceSize", 0, "Defines minimum free disk space size for storageDataPath. "+ + "If limit is reached, storage becomes read-only and tells vminsert to reroute data for other storage nodes.") ) // CheckTimeRange returns true if the given tr is denied for querying. @@ -81,6 +84,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) { storage.SetFinalMergeDelay(*finalMergeDelay) storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) + storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceSizeBytes.N) logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod) startTime := time.Now() @@ -389,6 +393,17 @@ func registerStorageMetrics() { return float64(fs.MustGetFreeSpace(*DataPath)) }) + metrics.NewGauge(fmt.Sprintf(`vm_free_disk_space_limit_bytes{path=%q}`, *storageDataPath), func() float64 { + return float64(minFreeDiskSpaceSizeBytes.N) + }) + + metrics.NewGauge(`vm_storage_read_only`, func() float64 { + if strg.IsReadOnly() { + return 1 + } + return 0 + }) + metrics.NewGauge(`vm_active_merges{type="storage/big"}`, func() float64 { return float64(tm().ActiveBigMerges) }) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index dd4d70ed2..f8654b419 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -57,6 +57,8 @@ type Storage struct { hourlySeriesLimitRowsDropped uint64 dailySeriesLimitRowsDropped uint64 + isReadOnly uint32 + path string cachePath string retentionMsecs int64 @@ -118,6 +120,7 @@ type Storage struct { currHourMetricIDsUpdaterWG sync.WaitGroup nextDayMetricIDsUpdaterWG sync.WaitGroup retentionWatcherWG sync.WaitGroup + freeSpaceWatcherWG sync.WaitGroup // The snapshotLock prevents from concurrent creation of snapshots, // since this may result in snapshots without recently added data, @@ -258,6 +261,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer s.startCurrHourMetricIDsUpdater() s.startNextDayMetricIDsUpdater() s.startRetentionWatcher() + s.startFreeDiskSpaceWatcher() return s, nil } @@ -558,6 +562,53 @@ func (s *Storage) UpdateMetrics(m *Metrics) { s.tb.UpdateMetrics(&m.TableMetrics) } +var ( + storageFreeSpaceLimitBytes uint64 +) + +// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path +// +// The function must be called before opening or creating any storage. +func SetFreeDiskSpaceLimit(bytes int) { + storageFreeSpaceLimitBytes = uint64(bytes) +} + +// IsReadOnly returns information is storage in read only mode +func (s *Storage) IsReadOnly() bool { + return atomic.LoadUint32(&s.isReadOnly) == 1 +} + +func (s *Storage) startFreeDiskSpaceWatcher() { + f := func() { + freeSpaceBytes := fs.MustGetFreeSpace(s.path) + // not enough free space + if freeSpaceBytes < storageFreeSpaceLimitBytes { + atomic.StoreUint32(&s.isReadOnly, 1) + return + } + atomic.StoreUint32(&s.isReadOnly, 0) + } + f() + s.freeSpaceWatcherWG.Add(1) + go func() { + defer s.freeSpaceWatcherWG.Done() + // zero value disables limit. + if storageFreeSpaceLimitBytes == 0 { + return + } + t := time.NewTicker(time.Minute) + defer t.Stop() + for { + select { + case <-s.stop: + return + case <-t.C: + f() + } + } + }() +} + func (s *Storage) startRetentionWatcher() { s.retentionWatcherWG.Add(1) go func() { @@ -672,6 +723,7 @@ func (s *Storage) resetAndSaveTSIDCache() { func (s *Storage) MustClose() { close(s.stop) + s.freeSpaceWatcherWG.Wait() s.retentionWatcherWG.Wait() s.currHourMetricIDsUpdaterWG.Wait() s.nextDayMetricIDsUpdaterWG.Wait()