From 37cf509c3a692848571901d079a760e49964c72f Mon Sep 17 00:00:00 2001 From: Artem Navoiev Date: Mon, 2 May 2022 11:00:15 +0300 Subject: [PATCH] lib/{storage,flagutil} - Add option for snapshot autoremoval (#2487) * lib/{storage,flagutil} - Add option for snapshot autoremoval - add prometheus-like duration as command flag - add option to delete stale snapshots - update duration.go flag to re-use own code * wip * lib/flagutil: re-use Duration.Set() call in NewDuration * wip Co-authored-by: Aliaksandr Valialkin --- app/vmstorage/main.go | 37 ++++++++++++++++++++++++++ docs/CHANGELOG.md | 1 + lib/storage/storage.go | 33 +++++++++++++++++++++++ lib/storage/storage_test.go | 52 +++++++++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 4d1ce91b42..3c13dd0b06 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -26,6 +26,7 @@ var ( snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages") forceFlushAuthKey = flag.String("forceFlushAuthKey", "", "authKey, which must be passed in query string to /internal/force_flush pages") + snapshotsMaxAge = flag.Duration("snapshotsMaxAge", 0, "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted") precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss") @@ -102,6 +103,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) { logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err) } Storage = strg + initStaleSnapshotsRemover() var m storage.Metrics Storage.UpdateMetrics(&m) @@ -255,6 +257,7 @@ func Stop() { logger.Infof("gracefully closing the storage at %s", *DataPath) startTime := time.Now() WG.WaitAndBlock() + stopStaleSnapshotsRemover() Storage.MustClose() logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds()) @@ -374,6 +377,40 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } } +func initStaleSnapshotsRemover() { + staleSnapshotsRemoverCh = make(chan struct{}) + if *snapshotsMaxAge <= 0 { + return + } + staleSnapshotsRemoverWG.Add(1) + go func() { + defer staleSnapshotsRemoverWG.Done() + t := time.NewTicker(11 * time.Second) + defer t.Stop() + for { + select { + case <-staleSnapshotsRemoverCh: + return + case <-t.C: + } + if err := Storage.DeleteStaleSnapshots(*snapshotsMaxAge); err != nil { + // Use logger.Errorf instead of logger.Fatalf in the hope the error is temporary. + logger.Errorf("cannot delete stale snapshots: %s", err) + } + } + }() +} + +func stopStaleSnapshotsRemover() { + close(staleSnapshotsRemoverCh) + staleSnapshotsRemoverWG.Wait() +} + +var ( + staleSnapshotsRemoverCh chan struct{} + staleSnapshotsRemoverWG sync.WaitGroup +) + var activeForceMerges = metrics.NewCounter("vm_active_force_merges") func registerStorageMetrics() { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8ac8f2beb0..0dbd2b318c 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add support for DNS-based discovery for notifiers in the same way as Prometheus does. See [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2460). * FEATURE: allow specifying TLS cipher suites for incoming https requests via `-tlsCipherSuites` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2404). * FEATURE: allow specifying TLS cipher suites for mTLS connections between cluster components via `-cluster.tlsCipherSuites` command-line flag. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection). +* FEATURE: vmstorage: add `-snapshotsMaxAge` command-line flag for automatic removal of snapshots older than the given age. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): shown an empty graph on the selected time range when there is no data on it. Previously `No data to show` placeholder was shown instead of the graph in this case. This prevented from zooming and scrolling of such a graph. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show the selected `last N minutes/hours/days` in the top right corner. Previously the `start - end` duration was shown instead, which could be hard to interpret. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2402). * FEATURE: expose `vm_indexdb_items_added_total` and `vm_indexdb_items_added_size_bytes_total` counters at `/metrics` page, which can be used for monitoring the rate for addition of new entries in `indexdb` (aka `inverted index`) alongside the total size in bytes for the added entries. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2471). diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 95d4f88a82..1f48d02788 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -418,6 +418,39 @@ func (s *Storage) DeleteSnapshot(snapshotName string) error { return nil } +// DeleteStaleSnapshots deletes snapshot older than given maxAge +func (s *Storage) DeleteStaleSnapshots(maxAge time.Duration) error { + list, err := s.ListSnapshots() + if err != nil { + return err + } + expireDeadline := time.Now().UTC().Add(-maxAge) + for _, snapshotName := range list { + t, err := snapshotTime(snapshotName) + if err != nil { + return fmt.Errorf("cannot parse snapshot date from %q: %w", snapshotName, err) + } + if t.Before(expireDeadline) { + if err := s.DeleteSnapshot(snapshotName); err != nil { + return fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err) + } + } + } + return nil +} + +func snapshotTime(snapshotName string) (time.Time, error) { + if !snapshotNameRegexp.MatchString(snapshotName) { + return time.Time{}, fmt.Errorf("unexpected snapshotName must be in the format `YYYYMMDDhhmmss-idx`; got %q", snapshotName) + } + n := strings.IndexByte(snapshotName, '-') + if n < 0 { + return time.Time{}, fmt.Errorf("cannot find `-` in snapshotName=%q", snapshotName) + } + s := snapshotName[:n] + return time.Parse("20060102150405", s) +} + var snapshotIdx = uint64(time.Now().UnixNano()) func nextSnapshotIdx() uint64 { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 6ed004ae26..fa6ebb5c9f 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -1106,6 +1106,58 @@ func testStorageAddMetrics(s *Storage, workerNum int) error { return nil } +func TestStorageDeleteStaleSnapshots(t *testing.T) { + path := "TestStorageDeleteStaleSnapshots" + s, err := OpenStorage(path, 0, 1e5, 1e5) + if err != nil { + t.Fatalf("cannot open storage: %s", err) + } + const rowsPerAdd = 1e3 + const addsCount = 10 + for i := 0; i < addsCount; i++ { + mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10) + if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { + t.Fatalf("unexpected error when adding mrs: %s", err) + } + } + // Try creating a snapshot from the storage. + snapshotName, err := s.CreateSnapshot() + if err != nil { + t.Fatalf("cannot create snapshot from the storage: %s", err) + } + // Delete snapshots older than 1 month + if err := s.DeleteStaleSnapshots(30 * 24 * time.Hour); err != nil { + t.Fatalf("error in DeleteStaleSnapshots(1 month): %s", err) + } + snapshots, err := s.ListSnapshots() + if err != nil { + t.Fatalf("cannot list snapshots: %s", err) + } + if len(snapshots) != 1 { + t.Fatalf("expecting one snapshot; got %q", snapshots) + } + if snapshots[0] != snapshotName { + t.Fatalf("snapshot %q is missing in %q", snapshotName, snapshots) + } + + // Delete the snapshot which is older than 1 nanoseconds + time.Sleep(2 * time.Nanosecond) + if err := s.DeleteStaleSnapshots(time.Nanosecond); err != nil { + t.Fatalf("cannot delete snapshot %q: %s", snapshotName, err) + } + snapshots, err = s.ListSnapshots() + if err != nil { + t.Fatalf("cannot list snapshots: %s", err) + } + if len(snapshots) != 0 { + t.Fatalf("expecting zero snapshots; got %q", snapshots) + } + s.MustClose() + if err := os.RemoveAll(path); err != nil { + t.Fatalf("cannot remove %q: %s", path, err) + } +} + func containsString(a []string, s string) bool { for i := range a { if a[i] == s {