diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index 457b23c56..865a4f0a4 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -16,8 +16,9 @@ var ( "Example: gcs://bucket/path/to/backup/dir, s3://bucket/path/to/backup/dir or fs:///path/to/local/backup/dir") storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Destination path where backup must be restored. "+ "VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case only missing data is downloaded from backup") - concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration") - maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0") + concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration") + maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0") + skipBackupCompleteCheck = flag.Bool("skipBackupCompleteCheck", false, "Whether to skip checking for `backup complete` file in `-src`. This may be useful for restoring from old backups, which were created without `backup complete` file") ) func main() { @@ -34,9 +35,10 @@ func main() { logger.Fatalf("%s", err) } a := &actions.Restore{ - Concurrency: *concurrency, - Src: srcFS, - Dst: dstFS, + Concurrency: *concurrency, + Src: srcFS, + Dst: dstFS, + SkipBackupCompleteCheck: *skipBackupCompleteCheck, } if err := a.Run(); err != nil { logger.Fatalf("cannot restore from backup: %s", err) diff --git a/lib/backup/actions/backup.go b/lib/backup/actions/backup.go index fa13861f6..bbc8df4f6 100644 --- a/lib/backup/actions/backup.go +++ b/lib/backup/actions/backup.go @@ -7,6 +7,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -41,17 +42,33 @@ type Backup struct { // Run runs b with the provided settings. func (b *Backup) Run() error { - startTime := time.Now() - concurrency := b.Concurrency src := b.Src dst := b.Dst origin := b.Origin + if origin != nil && origin.String() == dst.String() { + origin = nil + } if origin == nil { origin = &fsnil.FS{} } + if err := dst.DeleteFile(fscommon.BackupCompleteFilename); err != nil { + return fmt.Errorf("cannot delete `backup complete` file at %s: %s", dst, err) + } + if err := runBackup(src, dst, origin, concurrency); err != nil { + return err + } + if err := dst.CreateFile(fscommon.BackupCompleteFilename, []byte("ok")); err != nil { + return fmt.Errorf("cannot create `backup complete` file at %s: %s", dst, err) + } + return nil +} + +func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, concurrency int) error { + startTime := time.Now() + logger.Infof("starting backup from %s to %s using origin %s", src, dst, origin) logger.Infof("obtaining list of parts at %s", src) diff --git a/lib/backup/actions/restore.go b/lib/backup/actions/restore.go index e310993b6..5994dea81 100644 --- a/lib/backup/actions/restore.go +++ b/lib/backup/actions/restore.go @@ -7,6 +7,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -29,6 +30,11 @@ type Restore struct { // If dst points to existing directory, then incremental restore is performed, // i.e. only new data is downloaded from src. Dst *fslocal.FS + + // SkipBackupCompleteCheck may be set in order to skip for `backup complete` file in Src. + // + // This may be needed for restoring from old backups with missing `backup complete` file. + SkipBackupCompleteCheck bool } // Run runs r with the provided settings. @@ -48,6 +54,18 @@ func (r *Restore) Run() error { concurrency := r.Concurrency src := r.Src dst := r.Dst + + if !r.SkipBackupCompleteCheck { + ok, err := src.HasFile(fscommon.BackupCompleteFilename) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("cannot find %s file in %s; this means either incomplete backup or old backup; "+ + "pass `-skipBackupCompleteCheck` command-line flag if you still need restoring from this backup", fscommon.BackupCompleteFilename, src) + } + } + logger.Infof("starting restore from %s to %s", src, dst) logger.Infof("obtaining list of parts at %s", src) diff --git a/lib/backup/common/fs.go b/lib/backup/common/fs.go index 787515c76..94d497d5e 100644 --- a/lib/backup/common/fs.go +++ b/lib/backup/common/fs.go @@ -38,4 +38,13 @@ type RemoteFS interface { // UploadPart must upload part p from r to RemoteFS. UploadPart(p Part, r io.Reader) error + + // DeleteFile deletes filePath at RemoteFS + DeleteFile(filePath string) error + + // CreateFile creates filePath at RemoteFS and puts data into it. + CreateFile(filePath string, data []byte) error + + // HasFile returns true if filePath exists at RemoteFS. + HasFile(filePath string) (bool, error) } diff --git a/lib/backup/fscommon/fscommon.go b/lib/backup/fscommon/fscommon.go index d6680aa62..b6db48ad6 100644 --- a/lib/backup/fscommon/fscommon.go +++ b/lib/backup/fscommon/fscommon.go @@ -260,3 +260,11 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) { } return true, nil } + +// IgnorePath returns true if the given path must be ignored. +func IgnorePath(path string) bool { + return strings.HasSuffix(path, ".ignore") +} + +// BackupCompleteFilename is a filename, which is created in the destination fs when backup is complete. +const BackupCompleteFilename = "backup_complete.ignore" diff --git a/lib/backup/fsremote/fsremote.go b/lib/backup/fsremote/fsremote.go index 025a5d899..c3f264001 100644 --- a/lib/backup/fsremote/fsremote.go +++ b/lib/backup/fsremote/fsremote.go @@ -3,6 +3,7 @@ package fsremote import ( "fmt" "io" + "io/ioutil" "os" "path/filepath" "strings" @@ -48,6 +49,9 @@ func (fs *FS) ListParts() ([]common.Part, error) { if !strings.HasPrefix(file, dir) { logger.Panicf("BUG: unexpected prefix for file %q; want %q", file, dir) } + if fscommon.IgnorePath(file) { + continue + } var p common.Part if !p.ParseFromRemotePath(file[len(dir):]) { logger.Infof("skipping unknown file %s", file) @@ -188,3 +192,42 @@ func (fs *FS) mkdirAll(filePath string) error { func (fs *FS) path(p common.Part) string { return p.RemotePath(fs.Dir) } + +// DeleteFile deletes filePath at fs. +// +// The function does nothing if the filePath doesn't exist. +func (fs *FS) DeleteFile(filePath string) error { + path := filepath.Join(fs.Dir, filePath) + err := os.Remove(path) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("cannot remove %q: %s", path, err) + } + return nil +} + +// CreateFile creates filePath at fs and puts data into it. +// +// The file is overwritten if it exists. +func (fs *FS) CreateFile(filePath string, data []byte) error { + path := filepath.Join(fs.Dir, filePath) + if err := ioutil.WriteFile(path, data, 0600); err != nil { + return fmt.Errorf("cannot write %d bytes to %q: %s", len(data), path, err) + } + return nil +} + +// HasFile returns true if filePath exists at fs. +func (fs *FS) HasFile(filePath string) (bool, error) { + path := filepath.Join(fs.Dir, filePath) + fi, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, fmt.Errorf("cannot stat %q: %s", path, err) + } + if fi.IsDir() { + return false, fmt.Errorf("%q is directory, while file is needed", path) + } + return true, nil +} diff --git a/lib/backup/gcsremote/gcs.go b/lib/backup/gcsremote/gcs.go index 128940fe8..5a362017f 100644 --- a/lib/backup/gcsremote/gcs.go +++ b/lib/backup/gcsremote/gcs.go @@ -8,6 +8,7 @@ import ( "cloud.google.com/go/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "google.golang.org/api/iterator" "google.golang.org/api/option" @@ -97,6 +98,9 @@ func (fs *FS) ListParts() ([]common.Part, error) { if !strings.HasPrefix(file, dir) { return nil, fmt.Errorf("unexpected prefix for gcs key %q; want %q", file, dir) } + if fscommon.IgnorePath(file) { + continue + } var p common.Part if !p.ParseFromRemotePath(file[len(dir):]) { logger.Infof("skipping unknown object %q", file) @@ -187,3 +191,56 @@ func (fs *FS) object(p common.Part) *storage.ObjectHandle { path := p.RemotePath(fs.Dir) return fs.bkt.Object(path) } + +// DeleteFile deletes filePath at fs if it exists. +// +// The function does nothing if the filePath doesn't exists. +func (fs *FS) DeleteFile(filePath string) error { + path := fs.Dir + filePath + o := fs.bkt.Object(path) + ctx := context.Background() + if err := o.Delete(ctx); err != nil { + if err != storage.ErrObjectNotExist { + return fmt.Errorf("cannot delete %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err) + } + } + return nil +} + +// CreateFile creates filePath at fs and puts data into it. +// +// The file is overwritten if it exists. +func (fs *FS) CreateFile(filePath string, data []byte) error { + path := fs.Dir + filePath + o := fs.bkt.Object(path) + ctx := context.Background() + w := o.NewWriter(ctx) + n, err := w.Write(data) + if err != nil { + _ = w.Close() + return fmt.Errorf("cannot upload %d bytes to %q at %s (remote path %q): %s", len(data), filePath, fs, o.ObjectName(), err) + } + if n != len(data) { + _ = w.Close() + return fmt.Errorf("wrong data size uploaded to %q at %s (remote path %q); got %d bytes; want %d bytes", filePath, fs, o.ObjectName(), n, len(data)) + } + if err := w.Close(); err != nil { + return fmt.Errorf("cannot close %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err) + } + return nil +} + +// HasFile returns ture if filePath exists at fs. +func (fs *FS) HasFile(filePath string) (bool, error) { + path := fs.Dir + filePath + o := fs.bkt.Object(path) + ctx := context.Background() + _, err := o.Attrs(ctx) + if err != nil { + if err == storage.ErrObjectNotExist { + return false, nil + } + return false, fmt.Errorf("unexpected error when obtaining attributes for %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err) + } + return true, nil +} diff --git a/lib/backup/s3remote/s3.go b/lib/backup/s3remote/s3.go index 48a1d0d17..2b38ca72e 100644 --- a/lib/backup/s3remote/s3.go +++ b/lib/backup/s3remote/s3.go @@ -1,14 +1,17 @@ package s3remote import ( + "bytes" "context" "fmt" "io" "strings" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -113,6 +116,9 @@ func (fs *FS) ListParts() ([]common.Part, error) { errOuter = fmt.Errorf("unexpected prefix for s3 key %q; want %q", file, dir) return false } + if fscommon.IgnorePath(file) { + continue + } var p common.Part if !p.ParseFromRemotePath(file[len(dir):]) { logger.Infof("skipping unknown object %q", file) @@ -220,6 +226,69 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error { return nil } +// DeleteFile deletes filePath from fs if it exists. +// +// The function does nothing if the file doesn't exist. +func (fs *FS) DeleteFile(filePath string) error { + path := fs.Dir + filePath + input := &s3.DeleteObjectInput{ + Bucket: aws.String(fs.Bucket), + Key: aws.String(path), + } + _, err := fs.s3.DeleteObject(input) + if err != nil { + if ae, ok := err.(awserr.Error); ok && ae.Code() == s3.ErrCodeNoSuchKey { + return nil + } + return fmt.Errorf("cannot delete %q at %s (remote path %q): %s", filePath, fs, path, err) + } + return nil +} + +// CreateFile creates filePath at fs and puts data into it. +// +// The file is overwritten if it already exists. +func (fs *FS) CreateFile(filePath string, data []byte) error { + path := fs.Dir + filePath + sr := &statReader{ + r: bytes.NewReader(data), + } + input := &s3manager.UploadInput{ + Bucket: aws.String(fs.Bucket), + Key: aws.String(path), + Body: sr, + } + _, err := fs.uploader.Upload(input) + if err != nil { + return fmt.Errorf("cannot upoad data to %q at %s (remote path %q): %s", filePath, fs, path, err) + } + l := int64(len(data)) + if sr.size != l { + return fmt.Errorf("wrong data size uploaded to %q at %s; got %d bytes; want %d bytes", filePath, fs, sr.size, l) + } + return nil +} + +// HasFile returns true if filePath exists at fs. +func (fs *FS) HasFile(filePath string) (bool, error) { + path := fs.Dir + filePath + input := &s3.GetObjectInput{ + Bucket: aws.String(fs.Bucket), + Key: aws.String(path), + } + o, err := fs.s3.GetObject(input) + if err != nil { + if ae, ok := err.(awserr.Error); ok && ae.Code() == s3.ErrCodeNoSuchKey { + return false, nil + } + return false, fmt.Errorf("cannot open %q at %s (remote path %q): %s", filePath, fs, path, err) + } + if err := o.Body.Close(); err != nil { + return false, fmt.Errorf("cannot close %q at %s (remote path %q): %s", filePath, fs, path, err) + } + return true, nil +} + func (fs *FS) path(p common.Part) string { return p.RemotePath(fs.Dir) }