From a0f695f5deadd518348c037f2edc88068dc56a70 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 13 Aug 2023 17:17:12 -0700 Subject: [PATCH] app/vmbackup: add ability to make server-side copying of existing backups --- app/vmbackup/README.md | 17 +++++ app/vmbackup/main.go | 74 +++++++++++++++------- docs/CHANGELOG.md | 2 + docs/vmbackup.md | 17 +++++ lib/backup/actions/backup.go | 109 ++++++++++++++++++-------------- lib/backup/actions/copy.go | 96 ++++++++++++++++++++++++++++ lib/backup/azremote/azblob.go | 16 +++++ lib/backup/common/fs.go | 3 + lib/backup/fsremote/fsremote.go | 7 ++ lib/backup/gcsremote/gcs.go | 12 ++++ lib/backup/s3remote/s3.go | 19 ++++++ 11 files changed, 300 insertions(+), 72 deletions(-) create mode 100644 lib/backup/actions/copy.go diff --git a/app/vmbackup/README.md b/app/vmbackup/README.md index 62c93c4fb..8117524eb 100644 --- a/app/vmbackup/README.md +++ b/app/vmbackup/README.md @@ -89,6 +89,23 @@ Do not forget to remove old backups when they are no longer needed in order to s See also [vmbackupmanager tool](https://docs.victoriametrics.com/vmbackupmanager.html) for automating smart backups. +### Server-side copy of the existing backup + +Sometimes it is needed to make server-side copy of the existing backup. This can be done by specifying the source backup path via `-origin` command-line flag, +while the destination path for backup copy must be specified via `-dst` command-line flag. For example, the following command copies backup +from `gs://bucket/foo` to `gs://bucket/bar`: + +```console +./vmbackup -origin=gs://bucket/foo -dst=gs://bucket/bar +``` + +The `-origin` and `-dst` must point to the same object storage bucket or to the same filesystem. + +The server-side backup copy is usually performed at much faster speed comparing to the usual backup, since backup data isn't transferred +between the remote storage and locally running `vmbackup` tool. + +If the `-dst` already contains some data, then its' contents is synced with the `-origin` data. This allows making incremental server-side copies of backups. + ## How does it work? The backup algorithm is the following: diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 06c9e475d..d0900caa6 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -92,8 +92,6 @@ func main() { logger.Fatalf("cannot delete snapshot: %s", err) } } - } else if len(*snapshotName) == 0 { - logger.Fatalf("`-snapshotName` or `-snapshot.createURL` must be provided") } go httpserver.Serve(*httpListenAddr, false, nil) @@ -113,34 +111,48 @@ func main() { } func makeBackup() error { - if err := snapshot.Validate(*snapshotName); err != nil { - return fmt.Errorf("invalid -snapshotName=%q: %s", *snapshotName, err) - } - - srcFS, err := newSrcFS() - if err != nil { - return err - } dstFS, err := newDstFS() if err != nil { return err } - originFS, err := newOriginFS() - if err != nil { - return err + if *snapshotName == "" { + // Make server-side copy from -origin to -dst + originFS, err := newRemoteOriginFS() + if err != nil { + return err + } + a := &actions.RemoteBackupCopy{ + Concurrency: *concurrency, + Src: originFS, + Dst: dstFS, + } + if err := a.Run(); err != nil { + return err + } + originFS.MustStop() + } else { + // Make backup from srcFS to -dst + srcFS, err := newSrcFS() + if err != nil { + return err + } + originFS, err := newOriginFS() + if err != nil { + return err + } + a := &actions.Backup{ + Concurrency: *concurrency, + Src: srcFS, + Dst: dstFS, + Origin: originFS, + } + if err := a.Run(); err != nil { + return err + } + srcFS.MustStop() + originFS.MustStop() } - a := &actions.Backup{ - Concurrency: *concurrency, - Src: srcFS, - Dst: dstFS, - Origin: originFS, - } - if err := a.Run(); err != nil { - return err - } - srcFS.MustStop() dstFS.MustStop() - originFS.MustStop() return nil } @@ -155,6 +167,9 @@ See the docs at https://docs.victoriametrics.com/vmbackup.html . } func newSrcFS() (*fslocal.FS, error) { + if err := snapshot.Validate(*snapshotName); err != nil { + return nil, fmt.Errorf("invalid -snapshotName=%q: %s", *snapshotName, err) + } snapshotPath := filepath.Join(*storageDataPath, "snapshots", *snapshotName) // Verify the snapshot exists. @@ -218,3 +233,14 @@ func newOriginFS() (common.OriginFS, error) { } return fs, nil } + +func newRemoteOriginFS() (common.RemoteFS, error) { + if len(*origin) == 0 { + return nil, fmt.Errorf("-origin cannot be empty when -snapshotName and -snapshot.createURL aren't set") + } + fs, err := actions.NewRemoteFS(*origin) + if err != nil { + return nil, fmt.Errorf("cannot parse `-origin`=%q: %w", *origin, err) + } + return fs, nil +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8a3f0dfb2..77cea2fd5 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components ## tip +* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add support for server-side copy of existing backups. See [these docs](https://docs.victoriametrics.com/vmbackup.html#server-side-copy-of-the-existing-backup) for details. + ## [v1.93.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.0) Released at 2023-08-12 diff --git a/docs/vmbackup.md b/docs/vmbackup.md index f6ef69bb0..80221c294 100644 --- a/docs/vmbackup.md +++ b/docs/vmbackup.md @@ -100,6 +100,23 @@ Do not forget to remove old backups when they are no longer needed in order to s See also [vmbackupmanager tool](https://docs.victoriametrics.com/vmbackupmanager.html) for automating smart backups. +### Server-side copy of the existing backup + +Sometimes it is needed to make server-side copy of the existing backup. This can be done by specifying the source backup path via `-origin` command-line flag, +while the destination path for backup copy must be specified via `-dst` command-line flag. For example, the following command copies backup +from `gs://bucket/foo` to `gs://bucket/bar`: + +```console +./vmbackup -origin=gs://bucket/foo -dst=gs://bucket/bar +``` + +The `-origin` and `-dst` must point to the same object storage bucket or to the same filesystem. + +The server-side backup copy is usually performed at much faster speed comparing to the usual backup, since backup data isn't transferred +between the remote storage and locally running `vmbackup` tool. + +If the `-dst` already contains some data, then its' contents is synced with the `-origin` data. This allows making incremental server-side copies of backups. + ## How does it work? The backup algorithm is the following: diff --git a/lib/backup/actions/backup.go b/lib/backup/actions/backup.go index 7801d1bce..1fffc51b9 100644 --- a/lib/backup/actions/backup.go +++ b/lib/backup/actions/backup.go @@ -78,7 +78,7 @@ func (b *Backup) Run() error { if err := storeMetadata(src, dst); err != nil { return fmt.Errorf("cannot store backup metadata: %w", err) } - if err := dst.CreateFile(backupnames.BackupCompleteFilename, []byte{}); err != nil { + if err := dst.CreateFile(backupnames.BackupCompleteFilename, nil); err != nil { return fmt.Errorf("cannot create `backup complete` file at %s: %w", dst, err) } @@ -133,79 +133,45 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con logger.Infof("obtained %d parts from origin %s", len(originParts), origin) backupSize := getPartsSize(srcParts) - partsToDelete := common.PartsDifference(dstParts, srcParts) deleteSize := getPartsSize(partsToDelete) - if len(partsToDelete) > 0 { - logger.Infof("deleting %d parts from dst %s", len(partsToDelete), dst) - deletedParts := uint64(0) - err = runParallel(concurrency, partsToDelete, func(p common.Part) error { - logger.Infof("deleting %s from dst %s", &p, dst) - if err := dst.DeletePart(p); err != nil { - return fmt.Errorf("cannot delete %s from dst %s: %w", &p, dst, err) - } - atomic.AddUint64(&deletedParts, 1) - return nil - }, func(elapsed time.Duration) { - n := atomic.LoadUint64(&deletedParts) - logger.Infof("deleted %d out of %d parts from dst %s in %s", n, len(partsToDelete), dst, elapsed) - }) - if err != nil { - return err - } - if err := dst.RemoveEmptyDirs(); err != nil { - return fmt.Errorf("cannot remove empty directories at dst %s: %w", dst, err) - } + if err := deleteDstParts(dst, partsToDelete, concurrency); err != nil { + return fmt.Errorf("cannot delete unneeded parts at dst: %w", err) } partsToCopy := common.PartsDifference(srcParts, dstParts) - originCopyParts := common.PartsIntersect(originParts, partsToCopy) - copySize := getPartsSize(originCopyParts) - if len(originCopyParts) > 0 { - logger.Infof("server-side copying %d parts from origin %s to dst %s", len(originCopyParts), origin, dst) - copiedParts := uint64(0) - err = runParallel(concurrency, originCopyParts, func(p common.Part) error { - logger.Infof("server-side copying %s from origin %s to dst %s", &p, origin, dst) - if err := dst.CopyPart(origin, p); err != nil { - return fmt.Errorf("cannot copy %s from origin %s to dst %s: %w", &p, origin, dst, err) - } - atomic.AddUint64(&copiedParts, 1) - return nil - }, func(elapsed time.Duration) { - n := atomic.LoadUint64(&copiedParts) - logger.Infof("server-side copied %d out of %d parts from origin %s to dst %s in %s", n, len(originCopyParts), origin, dst, elapsed) - }) - if err != nil { - return err - } + originPartsToCopy := common.PartsIntersect(originParts, partsToCopy) + copySize := getPartsSize(originPartsToCopy) + if err := copySrcParts(src, dst, originPartsToCopy, concurrency); err != nil { + return fmt.Errorf("cannot server-side copy origin parts to dst: %w", err) } srcCopyParts := common.PartsDifference(partsToCopy, originParts) uploadSize := getPartsSize(srcCopyParts) if len(srcCopyParts) > 0 { - logger.Infof("uploading %d parts from src %s to dst %s", len(srcCopyParts), src, dst) + logger.Infof("uploading %d parts from %s to %s", len(srcCopyParts), src, dst) bytesUploaded := uint64(0) err = runParallel(concurrency, srcCopyParts, func(p common.Part) error { - logger.Infof("uploading %s from src %s to dst %s", &p, src, dst) + logger.Infof("uploading %s from %s to %s", &p, src, dst) rc, err := src.NewReadCloser(p) if err != nil { - return fmt.Errorf("cannot create reader for %s from src %s: %w", &p, src, err) + return fmt.Errorf("cannot create reader for %s from %s: %w", &p, src, err) } sr := &statReader{ r: rc, bytesRead: &bytesUploaded, } if err := dst.UploadPart(p, sr); err != nil { - return fmt.Errorf("cannot upload %s to dst %s: %w", &p, dst, err) + return fmt.Errorf("cannot upload %s to %s: %w", &p, dst, err) } if err = rc.Close(); err != nil { - return fmt.Errorf("cannot close reader for %s from src %s: %w", &p, src, err) + return fmt.Errorf("cannot close reader for %s from %s: %w", &p, src, err) } return nil }, func(elapsed time.Duration) { n := atomic.LoadUint64(&bytesUploaded) prc := 100 * float64(n) / float64(uploadSize) - logger.Infof("uploaded %d out of %d bytes (%.2f%%) from src %s to dst %s in %s", n, uploadSize, prc, src, dst, elapsed) + logger.Infof("uploaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, uploadSize, prc, src, dst, elapsed) }) atomic.AddUint64(&bytesUploadedTotal, bytesUploaded) bytesUploadedTotalMetric.Set(bytesUploadedTotal) @@ -214,7 +180,8 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con } } - logger.Infof("backup from src %s to dst %s with origin %s is complete; backed up %d bytes in %.3f seconds; deleted %d bytes; server-side copied %d bytes; uploaded %d bytes", + logger.Infof("backup from %s to %s with origin %s is complete; backed up %d bytes in %.3f seconds; server-side deleted %d bytes; "+ + "server-side copied %d bytes; uploaded %d bytes", src, dst, origin, backupSize, time.Since(startTime).Seconds(), deleteSize, copySize, uploadSize) return nil @@ -230,3 +197,49 @@ func (sr *statReader) Read(p []byte) (int, error) { atomic.AddUint64(sr.bytesRead, uint64(n)) return n, err } + +func deleteDstParts(dst common.RemoteFS, partsToDelete []common.Part, concurrency int) error { + if len(partsToDelete) == 0 { + return nil + } + logger.Infof("deleting %d parts from %s", len(partsToDelete), dst) + deletedParts := uint64(0) + err := runParallel(concurrency, partsToDelete, func(p common.Part) error { + logger.Infof("deleting %s from %s", &p, dst) + if err := dst.DeletePart(p); err != nil { + return fmt.Errorf("cannot delete %s from %s: %w", &p, dst, err) + } + atomic.AddUint64(&deletedParts, 1) + return nil + }, func(elapsed time.Duration) { + n := atomic.LoadUint64(&deletedParts) + logger.Infof("deleted %d out of %d parts from %s in %s", n, len(partsToDelete), dst, elapsed) + }) + if err != nil { + return err + } + if err := dst.RemoveEmptyDirs(); err != nil { + return fmt.Errorf("cannot remove empty directories at %s: %w", dst, err) + } + return nil +} + +func copySrcParts(src common.OriginFS, dst common.RemoteFS, partsToCopy []common.Part, concurrency int) error { + if len(partsToCopy) == 0 { + return nil + } + logger.Infof("server-side copying %d parts from %s to %s", len(partsToCopy), src, dst) + copiedParts := uint64(0) + err := runParallel(concurrency, partsToCopy, func(p common.Part) error { + logger.Infof("server-side copying %s from %s to %s", &p, src, dst) + if err := dst.CopyPart(src, p); err != nil { + return fmt.Errorf("cannot copy %s from %s to %s: %w", &p, src, dst, err) + } + atomic.AddUint64(&copiedParts, 1) + return nil + }, func(elapsed time.Duration) { + n := atomic.LoadUint64(&copiedParts) + logger.Infof("server-side copied %d out of %d parts from %s to %s in %s", n, len(partsToCopy), src, dst, elapsed) + }) + return err +} diff --git a/lib/backup/actions/copy.go b/lib/backup/actions/copy.go new file mode 100644 index 000000000..c684d2a61 --- /dev/null +++ b/lib/backup/actions/copy.go @@ -0,0 +1,96 @@ +package actions + +import ( + "fmt" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// RemoteBackupCopy copies backup from Src to Dst. +type RemoteBackupCopy struct { + // Concurrency is the number of concurrent workers during the backup. + Concurrency int + + // Src is the copy source + Src common.RemoteFS + + // Dst is the copy destination. + // + // If dst contains the previous backup data, then dst is updated incrementally, + // i.e. only the changed data is copied. + // + // If dst points to empty dir, then full copy is made. + Dst common.RemoteFS +} + +// Run runs copy with the provided settings. +func (b *RemoteBackupCopy) Run() error { + concurrency := b.Concurrency + src := b.Src + dst := b.Dst + + if err := dst.DeleteFile(backupnames.BackupCompleteFilename); err != nil { + return fmt.Errorf("cannot delete `backup complete` file at %s: %w", dst, err) + } + if err := runCopy(src, dst, concurrency); err != nil { + return err + } + if err := copyMetadata(src, dst); err != nil { + return fmt.Errorf("cannot store backup metadata: %w", err) + } + if err := dst.CreateFile(backupnames.BackupCompleteFilename, nil); err != nil { + return fmt.Errorf("cannot create `backup complete` file at %s: %w", dst, err) + } + + return nil +} + +func copyMetadata(src common.RemoteFS, dst common.RemoteFS) error { + metadata, err := src.ReadFile(backupnames.BackupMetadataFilename) + if err != nil { + return fmt.Errorf("cannot read metadata from %s: %w", src, err) + } + if err := dst.CreateFile(backupnames.BackupMetadataFilename, metadata); err != nil { + return fmt.Errorf("cannot create metadata at %s: %w", dst, err) + } + return nil +} + +func runCopy(src common.OriginFS, dst common.RemoteFS, concurrency int) error { + startTime := time.Now() + + logger.Infof("starting remote backup copy from %s to %s", src, dst) + + srcParts, err := src.ListParts() + if err != nil { + return fmt.Errorf("cannot list src parts: %w", err) + } + logger.Infof("obtained %d parts from src %s", len(srcParts), src) + + dstParts, err := dst.ListParts() + if err != nil { + return fmt.Errorf("cannot list dst parts: %w", err) + } + logger.Infof("obtained %d parts from dst %s", len(dstParts), dst) + + backupSize := getPartsSize(srcParts) + partsToDelete := common.PartsDifference(dstParts, srcParts) + deleteSize := getPartsSize(partsToDelete) + if err := deleteDstParts(dst, partsToDelete, concurrency); err != nil { + return fmt.Errorf("cannot delete unneeded parts at dst: %w", err) + } + + partsToCopy := common.PartsDifference(srcParts, dstParts) + copySize := getPartsSize(partsToCopy) + if err := copySrcParts(src, dst, partsToCopy, concurrency); err != nil { + return fmt.Errorf("cannot server-side copy parts from src to dst: %w", err) + } + + logger.Infof("remote backup copy from %s to %s is complete; backed up %d bytes in %.3f seconds; server-side deleted %d bytes; server-side copied %d bytes", + src, dst, backupSize, time.Since(startTime).Seconds(), deleteSize, copySize) + + return nil +} diff --git a/lib/backup/azremote/azblob.go b/lib/backup/azremote/azblob.go index b96df99fb..e8f069a6d 100644 --- a/lib/backup/azremote/azblob.go +++ b/lib/backup/azremote/azblob.go @@ -328,3 +328,19 @@ func (fs *FS) HasFile(filePath string) (bool, error) { return true, nil } + +// ReadFile returns the content of filePath at fs. +func (fs *FS) ReadFile(filePath string) ([]byte, error) { + resp, err := fs.clientForPath(fs.Dir+filePath).DownloadStream(context.Background(), &blob.DownloadStreamOptions{}) + if err != nil { + return nil, fmt.Errorf("cannot download %q at %s (remote dir %q): %w", filePath, fs, fs.Dir, err) + } + defer resp.Body.Close() + + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("cannot read %q at %s (remote dir %q): %w", filePath, fs, fs.Dir, err) + } + + return b, nil +} diff --git a/lib/backup/common/fs.go b/lib/backup/common/fs.go index 1282fd583..42c051556 100644 --- a/lib/backup/common/fs.go +++ b/lib/backup/common/fs.go @@ -53,4 +53,7 @@ type RemoteFS interface { // HasFile returns true if filePath exists at RemoteFS. HasFile(filePath string) (bool, error) + + // ReadFile returns file contents at the given filePath. + ReadFile(filePath string) ([]byte, error) } diff --git a/lib/backup/fsremote/fsremote.go b/lib/backup/fsremote/fsremote.go index 45d5ff8f8..cd2d81014 100644 --- a/lib/backup/fsremote/fsremote.go +++ b/lib/backup/fsremote/fsremote.go @@ -243,3 +243,10 @@ func (fs *FS) HasFile(filePath string) (bool, error) { } return true, nil } + +// ReadFile returns the content of filePath at fs. +func (fs *FS) ReadFile(filePath string) ([]byte, error) { + path := filepath.Join(fs.Dir, filePath) + + return os.ReadFile(path) +} diff --git a/lib/backup/gcsremote/gcs.go b/lib/backup/gcsremote/gcs.go index 4cb1eee26..f1cd91359 100644 --- a/lib/backup/gcsremote/gcs.go +++ b/lib/backup/gcsremote/gcs.go @@ -262,3 +262,15 @@ func (fs *FS) HasFile(filePath string) (bool, error) { } return true, nil } + +// ReadFile returns the content of filePath at fs. +func (fs *FS) ReadFile(filePath string) ([]byte, error) { + o := fs.bkt.Object(fs.Dir + filePath) + ctx := context.Background() + r, err := o.NewReader(ctx) + if err != nil { + return nil, fmt.Errorf("cannot read %q at %s (remote path %q): %w", filePath, fs, o.ObjectName(), err) + } + defer r.Close() + return io.ReadAll(r) +} diff --git a/lib/backup/s3remote/s3.go b/lib/backup/s3remote/s3.go index 2c6a56596..1bd958e95 100644 --- a/lib/backup/s3remote/s3.go +++ b/lib/backup/s3remote/s3.go @@ -356,6 +356,25 @@ func (fs *FS) HasFile(filePath string) (bool, error) { return true, nil } +// ReadFile returns the content of filePath at fs. +func (fs *FS) ReadFile(filePath string) ([]byte, error) { + p := fs.Dir + filePath + input := &s3.GetObjectInput{ + Bucket: aws.String(fs.Bucket), + Key: aws.String(p), + } + o, err := fs.s3.GetObject(context.Background(), input) + if err != nil { + return nil, fmt.Errorf("cannot open %q at %s (remote path %q): %w", filePath, fs, p, err) + } + defer o.Body.Close() + b, err := io.ReadAll(o.Body) + if err != nil { + return nil, fmt.Errorf("cannot read %q at %s (remote path %q): %w", filePath, fs, p, err) + } + return b, nil +} + func (fs *FS) path(p common.Part) string { return p.RemotePath(fs.Dir) }