lib/{storage,flagutil} - Add option for snapshot autoremoval (#2487)

* lib/{storage,flagutil} - Add option for snapshot autoremoval

- add prometheus-like duration as command flag
- add option to delete stale snapshots
- update duration.go flag to re-use own code

* wip

* lib/flagutil: re-use Duration.Set() call in NewDuration

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Artem Navoiev 2022-05-02 11:00:15 +03:00 committed by GitHub
parent 20bc2a2c44
commit 37cf509c3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 0 deletions

View file

@ -26,6 +26,7 @@ var (
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages")
forceFlushAuthKey = flag.String("forceFlushAuthKey", "", "authKey, which must be passed in query string to /internal/force_flush pages")
snapshotsMaxAge = flag.Duration("snapshotsMaxAge", 0, "Automatically delete snapshots older than -snapshotsMaxAge if it is set to non-zero duration. Make sure that backup process has enough time to finish the backup before the corresponding snapshot is automatically deleted")
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
@ -102,6 +103,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err)
}
Storage = strg
initStaleSnapshotsRemover()
var m storage.Metrics
Storage.UpdateMetrics(&m)
@ -255,6 +257,7 @@ func Stop() {
logger.Infof("gracefully closing the storage at %s", *DataPath)
startTime := time.Now()
WG.WaitAndBlock()
stopStaleSnapshotsRemover()
Storage.MustClose()
logger.Infof("successfully closed the storage in %.3f seconds", time.Since(startTime).Seconds())
@ -374,6 +377,40 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
}
}
func initStaleSnapshotsRemover() {
staleSnapshotsRemoverCh = make(chan struct{})
if *snapshotsMaxAge <= 0 {
return
}
staleSnapshotsRemoverWG.Add(1)
go func() {
defer staleSnapshotsRemoverWG.Done()
t := time.NewTicker(11 * time.Second)
defer t.Stop()
for {
select {
case <-staleSnapshotsRemoverCh:
return
case <-t.C:
}
if err := Storage.DeleteStaleSnapshots(*snapshotsMaxAge); err != nil {
// Use logger.Errorf instead of logger.Fatalf in the hope the error is temporary.
logger.Errorf("cannot delete stale snapshots: %s", err)
}
}
}()
}
func stopStaleSnapshotsRemover() {
close(staleSnapshotsRemoverCh)
staleSnapshotsRemoverWG.Wait()
}
var (
staleSnapshotsRemoverCh chan struct{}
staleSnapshotsRemoverWG sync.WaitGroup
)
var activeForceMerges = metrics.NewCounter("vm_active_force_merges")
func registerStorageMetrics() {

View file

@ -24,6 +24,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add support for DNS-based discovery for notifiers in the same way as Prometheus does. See [these docs](https://docs.victoriametrics.com/vmalert.html#notifier-configuration-file) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2460).
* FEATURE: allow specifying TLS cipher suites for incoming https requests via `-tlsCipherSuites` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2404).
* FEATURE: allow specifying TLS cipher suites for mTLS connections between cluster components via `-cluster.tlsCipherSuites` command-line flag. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#mtls-protection).
* FEATURE: vmstorage: add `-snapshotsMaxAge` command-line flag for automatic removal of snapshots older than the given age.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): shown an empty graph on the selected time range when there is no data on it. Previously `No data to show` placeholder was shown instead of the graph in this case. This prevented from zooming and scrolling of such a graph.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show the selected `last N minutes/hours/days` in the top right corner. Previously the `start - end` duration was shown instead, which could be hard to interpret. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2402).
* FEATURE: expose `vm_indexdb_items_added_total` and `vm_indexdb_items_added_size_bytes_total` counters at `/metrics` page, which can be used for monitoring the rate for addition of new entries in `indexdb` (aka `inverted index`) alongside the total size in bytes for the added entries. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2471).

View file

@ -418,6 +418,39 @@ func (s *Storage) DeleteSnapshot(snapshotName string) error {
return nil
}
// DeleteStaleSnapshots deletes snapshot older than given maxAge
func (s *Storage) DeleteStaleSnapshots(maxAge time.Duration) error {
list, err := s.ListSnapshots()
if err != nil {
return err
}
expireDeadline := time.Now().UTC().Add(-maxAge)
for _, snapshotName := range list {
t, err := snapshotTime(snapshotName)
if err != nil {
return fmt.Errorf("cannot parse snapshot date from %q: %w", snapshotName, err)
}
if t.Before(expireDeadline) {
if err := s.DeleteSnapshot(snapshotName); err != nil {
return fmt.Errorf("cannot delete snapshot %q: %w", snapshotName, err)
}
}
}
return nil
}
func snapshotTime(snapshotName string) (time.Time, error) {
if !snapshotNameRegexp.MatchString(snapshotName) {
return time.Time{}, fmt.Errorf("unexpected snapshotName must be in the format `YYYYMMDDhhmmss-idx`; got %q", snapshotName)
}
n := strings.IndexByte(snapshotName, '-')
if n < 0 {
return time.Time{}, fmt.Errorf("cannot find `-` in snapshotName=%q", snapshotName)
}
s := snapshotName[:n]
return time.Parse("20060102150405", s)
}
var snapshotIdx = uint64(time.Now().UnixNano())
func nextSnapshotIdx() uint64 {

View file

@ -1106,6 +1106,58 @@ func testStorageAddMetrics(s *Storage, workerNum int) error {
return nil
}
func TestStorageDeleteStaleSnapshots(t *testing.T) {
path := "TestStorageDeleteStaleSnapshots"
s, err := OpenStorage(path, 0, 1e5, 1e5)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
const rowsPerAdd = 1e3
const addsCount = 10
for i := 0; i < addsCount; i++ {
mrs := testGenerateMetricRows(rowsPerAdd, 0, 1e10)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatalf("unexpected error when adding mrs: %s", err)
}
}
// Try creating a snapshot from the storage.
snapshotName, err := s.CreateSnapshot()
if err != nil {
t.Fatalf("cannot create snapshot from the storage: %s", err)
}
// Delete snapshots older than 1 month
if err := s.DeleteStaleSnapshots(30 * 24 * time.Hour); err != nil {
t.Fatalf("error in DeleteStaleSnapshots(1 month): %s", err)
}
snapshots, err := s.ListSnapshots()
if err != nil {
t.Fatalf("cannot list snapshots: %s", err)
}
if len(snapshots) != 1 {
t.Fatalf("expecting one snapshot; got %q", snapshots)
}
if snapshots[0] != snapshotName {
t.Fatalf("snapshot %q is missing in %q", snapshotName, snapshots)
}
// Delete the snapshot which is older than 1 nanoseconds
time.Sleep(2 * time.Nanosecond)
if err := s.DeleteStaleSnapshots(time.Nanosecond); err != nil {
t.Fatalf("cannot delete snapshot %q: %s", snapshotName, err)
}
snapshots, err = s.ListSnapshots()
if err != nil {
t.Fatalf("cannot list snapshots: %s", err)
}
if len(snapshots) != 0 {
t.Fatalf("expecting zero snapshots; got %q", snapshots)
}
s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func containsString(a []string, s string) bool {
for i := range a {
if a[i] == s {