From f358fb72d189112f4ab75f40b6231369752b853d Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Thu, 9 Jan 2020 15:24:26 +0200
Subject: [PATCH] app/{vmbackup,vmrestore}: add `backup complete` file to
 backup when it is complete and check for this file before restoring from
 backup

This should prevent from restoring from incomplete backups.

Add `-skipBackupCompleteCheck` command-line flag to `vmrestore` in order to be able restoring from old backups without `backup complete` file.
---
 app/vmrestore/main.go           | 12 +++---
 lib/backup/actions/backup.go    | 21 +++++++++-
 lib/backup/actions/restore.go   | 18 +++++++++
 lib/backup/common/fs.go         |  9 +++++
 lib/backup/fscommon/fscommon.go |  8 ++++
 lib/backup/fsremote/fsremote.go | 43 ++++++++++++++++++++
 lib/backup/gcsremote/gcs.go     | 57 +++++++++++++++++++++++++++
 lib/backup/s3remote/s3.go       | 69 +++++++++++++++++++++++++++++++++
 8 files changed, 230 insertions(+), 7 deletions(-)

diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go
index 457b23c568..865a4f0a45 100644
--- a/app/vmrestore/main.go
+++ b/app/vmrestore/main.go
@@ -16,8 +16,9 @@ var (
 		"Example: gcs://bucket/path/to/backup/dir, s3://bucket/path/to/backup/dir or fs:///path/to/local/backup/dir")
 	storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Destination path where backup must be restored. "+
 		"VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case only missing data is downloaded from backup")
-	concurrency       = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
-	maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0")
+	concurrency             = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
+	maxBytesPerSecond       = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0")
+	skipBackupCompleteCheck = flag.Bool("skipBackupCompleteCheck", false, "Whether to skip checking for `backup complete` file in `-src`. This may be useful for restoring from old backups, which were created without `backup complete` file")
 )
 
 func main() {
@@ -34,9 +35,10 @@ func main() {
 		logger.Fatalf("%s", err)
 	}
 	a := &actions.Restore{
-		Concurrency: *concurrency,
-		Src:         srcFS,
-		Dst:         dstFS,
+		Concurrency:             *concurrency,
+		Src:                     srcFS,
+		Dst:                     dstFS,
+		SkipBackupCompleteCheck: *skipBackupCompleteCheck,
 	}
 	if err := a.Run(); err != nil {
 		logger.Fatalf("cannot restore from backup: %s", err)
diff --git a/lib/backup/actions/backup.go b/lib/backup/actions/backup.go
index fa13861f63..bbc8df4f69 100644
--- a/lib/backup/actions/backup.go
+++ b/lib/backup/actions/backup.go
@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -41,17 +42,33 @@ type Backup struct {
 
 // Run runs b with the provided settings.
 func (b *Backup) Run() error {
-	startTime := time.Now()
-
 	concurrency := b.Concurrency
 	src := b.Src
 	dst := b.Dst
 	origin := b.Origin
 
+	if origin != nil && origin.String() == dst.String() {
+		origin = nil
+	}
 	if origin == nil {
 		origin = &fsnil.FS{}
 	}
 
+	if err := dst.DeleteFile(fscommon.BackupCompleteFilename); err != nil {
+		return fmt.Errorf("cannot delete `backup complete` file at %s: %s", dst, err)
+	}
+	if err := runBackup(src, dst, origin, concurrency); err != nil {
+		return err
+	}
+	if err := dst.CreateFile(fscommon.BackupCompleteFilename, []byte("ok")); err != nil {
+		return fmt.Errorf("cannot create `backup complete` file at %s: %s", dst, err)
+	}
+	return nil
+}
+
+func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, concurrency int) error {
+	startTime := time.Now()
+
 	logger.Infof("starting backup from %s to %s using origin %s", src, dst, origin)
 
 	logger.Infof("obtaining list of parts at %s", src)
diff --git a/lib/backup/actions/restore.go b/lib/backup/actions/restore.go
index e310993b64..5994dea813 100644
--- a/lib/backup/actions/restore.go
+++ b/lib/backup/actions/restore.go
@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -29,6 +30,11 @@ type Restore struct {
 	// If dst points to existing directory, then incremental restore is performed,
 	// i.e. only new data is downloaded from src.
 	Dst *fslocal.FS
+
+	// SkipBackupCompleteCheck may be set in order to skip for `backup complete` file in Src.
+	//
+	// This may be needed for restoring from old backups with missing `backup complete` file.
+	SkipBackupCompleteCheck bool
 }
 
 // Run runs r with the provided settings.
@@ -48,6 +54,18 @@ func (r *Restore) Run() error {
 	concurrency := r.Concurrency
 	src := r.Src
 	dst := r.Dst
+
+	if !r.SkipBackupCompleteCheck {
+		ok, err := src.HasFile(fscommon.BackupCompleteFilename)
+		if err != nil {
+			return err
+		}
+		if !ok {
+			return fmt.Errorf("cannot find %s file in %s; this means either incomplete backup or old backup; "+
+				"pass `-skipBackupCompleteCheck` command-line flag if you still need restoring from this backup", fscommon.BackupCompleteFilename, src)
+		}
+	}
+
 	logger.Infof("starting restore from %s to %s", src, dst)
 
 	logger.Infof("obtaining list of parts at %s", src)
diff --git a/lib/backup/common/fs.go b/lib/backup/common/fs.go
index 787515c767..94d497d5ec 100644
--- a/lib/backup/common/fs.go
+++ b/lib/backup/common/fs.go
@@ -38,4 +38,13 @@ type RemoteFS interface {
 
 	// UploadPart must upload part p from r to RemoteFS.
 	UploadPart(p Part, r io.Reader) error
+
+	// DeleteFile deletes filePath at RemoteFS
+	DeleteFile(filePath string) error
+
+	// CreateFile creates filePath at RemoteFS and puts data into it.
+	CreateFile(filePath string, data []byte) error
+
+	// HasFile returns true if filePath exists at RemoteFS.
+	HasFile(filePath string) (bool, error)
 }
diff --git a/lib/backup/fscommon/fscommon.go b/lib/backup/fscommon/fscommon.go
index d6680aa62b..b6db48ad65 100644
--- a/lib/backup/fscommon/fscommon.go
+++ b/lib/backup/fscommon/fscommon.go
@@ -260,3 +260,11 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
 	}
 	return true, nil
 }
+
+// IgnorePath returns true if the given path must be ignored.
+func IgnorePath(path string) bool {
+	return strings.HasSuffix(path, ".ignore")
+}
+
+// BackupCompleteFilename is a filename, which is created in the destination fs when backup is complete.
+const BackupCompleteFilename = "backup_complete.ignore"
diff --git a/lib/backup/fsremote/fsremote.go b/lib/backup/fsremote/fsremote.go
index 025a5d8992..c3f2640016 100644
--- a/lib/backup/fsremote/fsremote.go
+++ b/lib/backup/fsremote/fsremote.go
@@ -3,6 +3,7 @@ package fsremote
 import (
 	"fmt"
 	"io"
+	"io/ioutil"
 	"os"
 	"path/filepath"
 	"strings"
@@ -48,6 +49,9 @@ func (fs *FS) ListParts() ([]common.Part, error) {
 		if !strings.HasPrefix(file, dir) {
 			logger.Panicf("BUG: unexpected prefix for file %q; want %q", file, dir)
 		}
+		if fscommon.IgnorePath(file) {
+			continue
+		}
 		var p common.Part
 		if !p.ParseFromRemotePath(file[len(dir):]) {
 			logger.Infof("skipping unknown file %s", file)
@@ -188,3 +192,42 @@ func (fs *FS) mkdirAll(filePath string) error {
 func (fs *FS) path(p common.Part) string {
 	return p.RemotePath(fs.Dir)
 }
+
+// DeleteFile deletes filePath at fs.
+//
+// The function does nothing if the filePath doesn't exist.
+func (fs *FS) DeleteFile(filePath string) error {
+	path := filepath.Join(fs.Dir, filePath)
+	err := os.Remove(path)
+	if err != nil && !os.IsNotExist(err) {
+		return fmt.Errorf("cannot remove %q: %s", path, err)
+	}
+	return nil
+}
+
+// CreateFile creates filePath at fs and puts data into it.
+//
+// The file is overwritten if it exists.
+func (fs *FS) CreateFile(filePath string, data []byte) error {
+	path := filepath.Join(fs.Dir, filePath)
+	if err := ioutil.WriteFile(path, data, 0600); err != nil {
+		return fmt.Errorf("cannot write %d bytes to %q: %s", len(data), path, err)
+	}
+	return nil
+}
+
+// HasFile returns true if filePath exists at fs.
+func (fs *FS) HasFile(filePath string) (bool, error) {
+	path := filepath.Join(fs.Dir, filePath)
+	fi, err := os.Stat(path)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return false, nil
+		}
+		return false, fmt.Errorf("cannot stat %q: %s", path, err)
+	}
+	if fi.IsDir() {
+		return false, fmt.Errorf("%q is directory, while file is needed", path)
+	}
+	return true, nil
+}
diff --git a/lib/backup/gcsremote/gcs.go b/lib/backup/gcsremote/gcs.go
index 128940fe8e..5a362017f2 100644
--- a/lib/backup/gcsremote/gcs.go
+++ b/lib/backup/gcsremote/gcs.go
@@ -8,6 +8,7 @@ import (
 
 	"cloud.google.com/go/storage"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/option"
@@ -97,6 +98,9 @@ func (fs *FS) ListParts() ([]common.Part, error) {
 		if !strings.HasPrefix(file, dir) {
 			return nil, fmt.Errorf("unexpected prefix for gcs key %q; want %q", file, dir)
 		}
+		if fscommon.IgnorePath(file) {
+			continue
+		}
 		var p common.Part
 		if !p.ParseFromRemotePath(file[len(dir):]) {
 			logger.Infof("skipping unknown object %q", file)
@@ -187,3 +191,56 @@ func (fs *FS) object(p common.Part) *storage.ObjectHandle {
 	path := p.RemotePath(fs.Dir)
 	return fs.bkt.Object(path)
 }
+
+// DeleteFile deletes filePath at fs if it exists.
+//
+// The function does nothing if the filePath doesn't exists.
+func (fs *FS) DeleteFile(filePath string) error {
+	path := fs.Dir + filePath
+	o := fs.bkt.Object(path)
+	ctx := context.Background()
+	if err := o.Delete(ctx); err != nil {
+		if err != storage.ErrObjectNotExist {
+			return fmt.Errorf("cannot delete %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err)
+		}
+	}
+	return nil
+}
+
+// CreateFile creates filePath at fs and puts data into it.
+//
+// The file is overwritten if it exists.
+func (fs *FS) CreateFile(filePath string, data []byte) error {
+	path := fs.Dir + filePath
+	o := fs.bkt.Object(path)
+	ctx := context.Background()
+	w := o.NewWriter(ctx)
+	n, err := w.Write(data)
+	if err != nil {
+		_ = w.Close()
+		return fmt.Errorf("cannot upload %d bytes to %q at %s (remote path %q): %s", len(data), filePath, fs, o.ObjectName(), err)
+	}
+	if n != len(data) {
+		_ = w.Close()
+		return fmt.Errorf("wrong data size uploaded to %q at %s (remote path %q); got %d bytes; want %d bytes", filePath, fs, o.ObjectName(), n, len(data))
+	}
+	if err := w.Close(); err != nil {
+		return fmt.Errorf("cannot close %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err)
+	}
+	return nil
+}
+
+// HasFile returns ture if filePath exists at fs.
+func (fs *FS) HasFile(filePath string) (bool, error) {
+	path := fs.Dir + filePath
+	o := fs.bkt.Object(path)
+	ctx := context.Background()
+	_, err := o.Attrs(ctx)
+	if err != nil {
+		if err == storage.ErrObjectNotExist {
+			return false, nil
+		}
+		return false, fmt.Errorf("unexpected error when obtaining attributes for %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err)
+	}
+	return true, nil
+}
diff --git a/lib/backup/s3remote/s3.go b/lib/backup/s3remote/s3.go
index 48a1d0d173..2b38ca72ed 100644
--- a/lib/backup/s3remote/s3.go
+++ b/lib/backup/s3remote/s3.go
@@ -1,14 +1,17 @@
 package s3remote
 
 import (
+	"bytes"
 	"context"
 	"fmt"
 	"io"
 	"strings"
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
 	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/awserr"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/s3"
 	"github.com/aws/aws-sdk-go/service/s3/s3manager"
@@ -113,6 +116,9 @@ func (fs *FS) ListParts() ([]common.Part, error) {
 				errOuter = fmt.Errorf("unexpected prefix for s3 key %q; want %q", file, dir)
 				return false
 			}
+			if fscommon.IgnorePath(file) {
+				continue
+			}
 			var p common.Part
 			if !p.ParseFromRemotePath(file[len(dir):]) {
 				logger.Infof("skipping unknown object %q", file)
@@ -220,6 +226,69 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error {
 	return nil
 }
 
+// DeleteFile deletes filePath from fs if it exists.
+//
+// The function does nothing if the file doesn't exist.
+func (fs *FS) DeleteFile(filePath string) error {
+	path := fs.Dir + filePath
+	input := &s3.DeleteObjectInput{
+		Bucket: aws.String(fs.Bucket),
+		Key:    aws.String(path),
+	}
+	_, err := fs.s3.DeleteObject(input)
+	if err != nil {
+		if ae, ok := err.(awserr.Error); ok && ae.Code() == s3.ErrCodeNoSuchKey {
+			return nil
+		}
+		return fmt.Errorf("cannot delete %q at %s (remote path %q): %s", filePath, fs, path, err)
+	}
+	return nil
+}
+
+// CreateFile creates filePath at fs and puts data into it.
+//
+// The file is overwritten if it already exists.
+func (fs *FS) CreateFile(filePath string, data []byte) error {
+	path := fs.Dir + filePath
+	sr := &statReader{
+		r: bytes.NewReader(data),
+	}
+	input := &s3manager.UploadInput{
+		Bucket: aws.String(fs.Bucket),
+		Key:    aws.String(path),
+		Body:   sr,
+	}
+	_, err := fs.uploader.Upload(input)
+	if err != nil {
+		return fmt.Errorf("cannot upoad data to %q at %s (remote path %q): %s", filePath, fs, path, err)
+	}
+	l := int64(len(data))
+	if sr.size != l {
+		return fmt.Errorf("wrong data size uploaded to %q at %s; got %d bytes; want %d bytes", filePath, fs, sr.size, l)
+	}
+	return nil
+}
+
+// HasFile returns true if filePath exists at fs.
+func (fs *FS) HasFile(filePath string) (bool, error) {
+	path := fs.Dir + filePath
+	input := &s3.GetObjectInput{
+		Bucket: aws.String(fs.Bucket),
+		Key:    aws.String(path),
+	}
+	o, err := fs.s3.GetObject(input)
+	if err != nil {
+		if ae, ok := err.(awserr.Error); ok && ae.Code() == s3.ErrCodeNoSuchKey {
+			return false, nil
+		}
+		return false, fmt.Errorf("cannot open %q at %s (remote path %q): %s", filePath, fs, path, err)
+	}
+	if err := o.Body.Close(); err != nil {
+		return false, fmt.Errorf("cannot close %q at %s (remote path %q): %s", filePath, fs, path, err)
+	}
+	return true, nil
+}
+
 func (fs *FS) path(p common.Part) string {
 	return p.RemotePath(fs.Dir)
 }