app/{vmbackup,vmrestore}: add backup complete file to backup when it is complete and check for this file before restoring from backup

This should prevent from restoring from incomplete backups.

Add `-skipBackupCompleteCheck` command-line flag to `vmrestore` in order to be able restoring from old backups without `backup complete` file.
This commit is contained in:
Aliaksandr Valialkin 2020-01-09 15:24:26 +02:00
parent 7edbd930d5
commit 705af61587
8 changed files with 230 additions and 7 deletions

View file

@ -18,6 +18,7 @@ var (
"VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case only missing data is downloaded from backup")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0")
skipBackupCompleteCheck = flag.Bool("skipBackupCompleteCheck", false, "Whether to skip checking for `backup complete` file in `-src`. This may be useful for restoring from old backups, which were created without `backup complete` file")
)
func main() {
@ -37,6 +38,7 @@ func main() {
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
SkipBackupCompleteCheck: *skipBackupCompleteCheck,
}
if err := a.Run(); err != nil {
logger.Fatalf("cannot restore from backup: %s", err)

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsnil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -41,17 +42,33 @@ type Backup struct {
// Run runs b with the provided settings.
func (b *Backup) Run() error {
startTime := time.Now()
concurrency := b.Concurrency
src := b.Src
dst := b.Dst
origin := b.Origin
if origin != nil && origin.String() == dst.String() {
origin = nil
}
if origin == nil {
origin = &fsnil.FS{}
}
if err := dst.DeleteFile(fscommon.BackupCompleteFilename); err != nil {
return fmt.Errorf("cannot delete `backup complete` file at %s: %s", dst, err)
}
if err := runBackup(src, dst, origin, concurrency); err != nil {
return err
}
if err := dst.CreateFile(fscommon.BackupCompleteFilename, []byte("ok")); err != nil {
return fmt.Errorf("cannot create `backup complete` file at %s: %s", dst, err)
}
return nil
}
func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, concurrency int) error {
startTime := time.Now()
logger.Infof("starting backup from %s to %s using origin %s", src, dst, origin)
logger.Infof("obtaining list of parts at %s", src)

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -29,6 +30,11 @@ type Restore struct {
// If dst points to existing directory, then incremental restore is performed,
// i.e. only new data is downloaded from src.
Dst *fslocal.FS
// SkipBackupCompleteCheck may be set in order to skip for `backup complete` file in Src.
//
// This may be needed for restoring from old backups with missing `backup complete` file.
SkipBackupCompleteCheck bool
}
// Run runs r with the provided settings.
@ -48,6 +54,18 @@ func (r *Restore) Run() error {
concurrency := r.Concurrency
src := r.Src
dst := r.Dst
if !r.SkipBackupCompleteCheck {
ok, err := src.HasFile(fscommon.BackupCompleteFilename)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("cannot find %s file in %s; this means either incomplete backup or old backup; "+
"pass `-skipBackupCompleteCheck` command-line flag if you still need restoring from this backup", fscommon.BackupCompleteFilename, src)
}
}
logger.Infof("starting restore from %s to %s", src, dst)
logger.Infof("obtaining list of parts at %s", src)

View file

@ -38,4 +38,13 @@ type RemoteFS interface {
// UploadPart must upload part p from r to RemoteFS.
UploadPart(p Part, r io.Reader) error
// DeleteFile deletes filePath at RemoteFS
DeleteFile(filePath string) error
// CreateFile creates filePath at RemoteFS and puts data into it.
CreateFile(filePath string, data []byte) error
// HasFile returns true if filePath exists at RemoteFS.
HasFile(filePath string) (bool, error)
}

View file

@ -260,3 +260,11 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
}
return true, nil
}
// IgnorePath returns true if the given path must be ignored.
func IgnorePath(path string) bool {
return strings.HasSuffix(path, ".ignore")
}
// BackupCompleteFilename is a filename, which is created in the destination fs when backup is complete.
const BackupCompleteFilename = "backup_complete.ignore"

View file

@ -3,6 +3,7 @@ package fsremote
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
@ -48,6 +49,9 @@ func (fs *FS) ListParts() ([]common.Part, error) {
if !strings.HasPrefix(file, dir) {
logger.Panicf("BUG: unexpected prefix for file %q; want %q", file, dir)
}
if fscommon.IgnorePath(file) {
continue
}
var p common.Part
if !p.ParseFromRemotePath(file[len(dir):]) {
logger.Infof("skipping unknown file %s", file)
@ -188,3 +192,42 @@ func (fs *FS) mkdirAll(filePath string) error {
func (fs *FS) path(p common.Part) string {
return p.RemotePath(fs.Dir)
}
// DeleteFile deletes filePath at fs.
//
// The function does nothing if the filePath doesn't exist.
func (fs *FS) DeleteFile(filePath string) error {
path := filepath.Join(fs.Dir, filePath)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("cannot remove %q: %s", path, err)
}
return nil
}
// CreateFile creates filePath at fs and puts data into it.
//
// The file is overwritten if it exists.
func (fs *FS) CreateFile(filePath string, data []byte) error {
path := filepath.Join(fs.Dir, filePath)
if err := ioutil.WriteFile(path, data, 0600); err != nil {
return fmt.Errorf("cannot write %d bytes to %q: %s", len(data), path, err)
}
return nil
}
// HasFile returns true if filePath exists at fs.
func (fs *FS) HasFile(filePath string) (bool, error) {
path := filepath.Join(fs.Dir, filePath)
fi, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("cannot stat %q: %s", path, err)
}
if fi.IsDir() {
return false, fmt.Errorf("%q is directory, while file is needed", path)
}
return true, nil
}

View file

@ -8,6 +8,7 @@ import (
"cloud.google.com/go/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
@ -97,6 +98,9 @@ func (fs *FS) ListParts() ([]common.Part, error) {
if !strings.HasPrefix(file, dir) {
return nil, fmt.Errorf("unexpected prefix for gcs key %q; want %q", file, dir)
}
if fscommon.IgnorePath(file) {
continue
}
var p common.Part
if !p.ParseFromRemotePath(file[len(dir):]) {
logger.Infof("skipping unknown object %q", file)
@ -187,3 +191,56 @@ func (fs *FS) object(p common.Part) *storage.ObjectHandle {
path := p.RemotePath(fs.Dir)
return fs.bkt.Object(path)
}
// DeleteFile deletes filePath at fs if it exists.
//
// The function does nothing if the filePath doesn't exists.
func (fs *FS) DeleteFile(filePath string) error {
path := fs.Dir + filePath
o := fs.bkt.Object(path)
ctx := context.Background()
if err := o.Delete(ctx); err != nil {
if err != storage.ErrObjectNotExist {
return fmt.Errorf("cannot delete %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err)
}
}
return nil
}
// CreateFile creates filePath at fs and puts data into it.
//
// The file is overwritten if it exists.
func (fs *FS) CreateFile(filePath string, data []byte) error {
path := fs.Dir + filePath
o := fs.bkt.Object(path)
ctx := context.Background()
w := o.NewWriter(ctx)
n, err := w.Write(data)
if err != nil {
_ = w.Close()
return fmt.Errorf("cannot upload %d bytes to %q at %s (remote path %q): %s", len(data), filePath, fs, o.ObjectName(), err)
}
if n != len(data) {
_ = w.Close()
return fmt.Errorf("wrong data size uploaded to %q at %s (remote path %q); got %d bytes; want %d bytes", filePath, fs, o.ObjectName(), n, len(data))
}
if err := w.Close(); err != nil {
return fmt.Errorf("cannot close %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err)
}
return nil
}
// HasFile returns ture if filePath exists at fs.
func (fs *FS) HasFile(filePath string) (bool, error) {
path := fs.Dir + filePath
o := fs.bkt.Object(path)
ctx := context.Background()
_, err := o.Attrs(ctx)
if err != nil {
if err == storage.ErrObjectNotExist {
return false, nil
}
return false, fmt.Errorf("unexpected error when obtaining attributes for %q at %s (remote path %q): %s", filePath, fs, o.ObjectName(), err)
}
return true, nil
}

View file

@ -1,14 +1,17 @@
package s3remote
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
@ -113,6 +116,9 @@ func (fs *FS) ListParts() ([]common.Part, error) {
errOuter = fmt.Errorf("unexpected prefix for s3 key %q; want %q", file, dir)
return false
}
if fscommon.IgnorePath(file) {
continue
}
var p common.Part
if !p.ParseFromRemotePath(file[len(dir):]) {
logger.Infof("skipping unknown object %q", file)
@ -220,6 +226,69 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error {
return nil
}
// DeleteFile deletes filePath from fs if it exists.
//
// The function does nothing if the file doesn't exist.
func (fs *FS) DeleteFile(filePath string) error {
path := fs.Dir + filePath
input := &s3.DeleteObjectInput{
Bucket: aws.String(fs.Bucket),
Key: aws.String(path),
}
_, err := fs.s3.DeleteObject(input)
if err != nil {
if ae, ok := err.(awserr.Error); ok && ae.Code() == s3.ErrCodeNoSuchKey {
return nil
}
return fmt.Errorf("cannot delete %q at %s (remote path %q): %s", filePath, fs, path, err)
}
return nil
}
// CreateFile creates filePath at fs and puts data into it.
//
// The file is overwritten if it already exists.
func (fs *FS) CreateFile(filePath string, data []byte) error {
path := fs.Dir + filePath
sr := &statReader{
r: bytes.NewReader(data),
}
input := &s3manager.UploadInput{
Bucket: aws.String(fs.Bucket),
Key: aws.String(path),
Body: sr,
}
_, err := fs.uploader.Upload(input)
if err != nil {
return fmt.Errorf("cannot upoad data to %q at %s (remote path %q): %s", filePath, fs, path, err)
}
l := int64(len(data))
if sr.size != l {
return fmt.Errorf("wrong data size uploaded to %q at %s; got %d bytes; want %d bytes", filePath, fs, sr.size, l)
}
return nil
}
// HasFile returns true if filePath exists at fs.
func (fs *FS) HasFile(filePath string) (bool, error) {
path := fs.Dir + filePath
input := &s3.GetObjectInput{
Bucket: aws.String(fs.Bucket),
Key: aws.String(path),
}
o, err := fs.s3.GetObject(input)
if err != nil {
if ae, ok := err.(awserr.Error); ok && ae.Code() == s3.ErrCodeNoSuchKey {
return false, nil
}
return false, fmt.Errorf("cannot open %q at %s (remote path %q): %s", filePath, fs, path, err)
}
if err := o.Body.Close(); err != nil {
return false, fmt.Errorf("cannot close %q at %s (remote path %q): %s", filePath, fs, path, err)
}
return true, nil
}
func (fs *FS) path(p common.Part) string {
return p.RemotePath(fs.Dir)
}