app/vmbackup: add ability to make server-side copying of existing backups

This commit is contained in:
Aliaksandr Valialkin 2023-08-13 17:17:12 -07:00
parent 3d25a82372
commit a0f695f5de
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
11 changed files with 300 additions and 72 deletions

View file

@ -89,6 +89,23 @@ Do not forget to remove old backups when they are no longer needed in order to s
See also [vmbackupmanager tool](https://docs.victoriametrics.com/vmbackupmanager.html) for automating smart backups.
### Server-side copy of the existing backup
Sometimes it is needed to make server-side copy of the existing backup. This can be done by specifying the source backup path via `-origin` command-line flag,
while the destination path for backup copy must be specified via `-dst` command-line flag. For example, the following command copies backup
from `gs://bucket/foo` to `gs://bucket/bar`:
```console
./vmbackup -origin=gs://bucket/foo -dst=gs://bucket/bar
```
The `-origin` and `-dst` must point to the same object storage bucket or to the same filesystem.
The server-side backup copy is usually performed at much faster speed comparing to the usual backup, since backup data isn't transferred
between the remote storage and locally running `vmbackup` tool.
If the `-dst` already contains some data, then its' contents is synced with the `-origin` data. This allows making incremental server-side copies of backups.
## How does it work?
The backup algorithm is the following:

View file

@ -92,8 +92,6 @@ func main() {
logger.Fatalf("cannot delete snapshot: %s", err)
}
}
} else if len(*snapshotName) == 0 {
logger.Fatalf("`-snapshotName` or `-snapshot.createURL` must be provided")
}
go httpserver.Serve(*httpListenAddr, false, nil)
@ -113,34 +111,48 @@ func main() {
}
func makeBackup() error {
if err := snapshot.Validate(*snapshotName); err != nil {
return fmt.Errorf("invalid -snapshotName=%q: %s", *snapshotName, err)
}
srcFS, err := newSrcFS()
if err != nil {
return err
}
dstFS, err := newDstFS()
if err != nil {
return err
}
originFS, err := newOriginFS()
if err != nil {
return err
if *snapshotName == "" {
// Make server-side copy from -origin to -dst
originFS, err := newRemoteOriginFS()
if err != nil {
return err
}
a := &actions.RemoteBackupCopy{
Concurrency: *concurrency,
Src: originFS,
Dst: dstFS,
}
if err := a.Run(); err != nil {
return err
}
originFS.MustStop()
} else {
// Make backup from srcFS to -dst
srcFS, err := newSrcFS()
if err != nil {
return err
}
originFS, err := newOriginFS()
if err != nil {
return err
}
a := &actions.Backup{
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
Origin: originFS,
}
if err := a.Run(); err != nil {
return err
}
srcFS.MustStop()
originFS.MustStop()
}
a := &actions.Backup{
Concurrency: *concurrency,
Src: srcFS,
Dst: dstFS,
Origin: originFS,
}
if err := a.Run(); err != nil {
return err
}
srcFS.MustStop()
dstFS.MustStop()
originFS.MustStop()
return nil
}
@ -155,6 +167,9 @@ See the docs at https://docs.victoriametrics.com/vmbackup.html .
}
func newSrcFS() (*fslocal.FS, error) {
if err := snapshot.Validate(*snapshotName); err != nil {
return nil, fmt.Errorf("invalid -snapshotName=%q: %s", *snapshotName, err)
}
snapshotPath := filepath.Join(*storageDataPath, "snapshots", *snapshotName)
// Verify the snapshot exists.
@ -218,3 +233,14 @@ func newOriginFS() (common.OriginFS, error) {
}
return fs, nil
}
func newRemoteOriginFS() (common.RemoteFS, error) {
if len(*origin) == 0 {
return nil, fmt.Errorf("-origin cannot be empty when -snapshotName and -snapshot.createURL aren't set")
}
fs, err := actions.NewRemoteFS(*origin)
if err != nil {
return nil, fmt.Errorf("cannot parse `-origin`=%q: %w", *origin, err)
}
return fs, nil
}

View file

@ -24,6 +24,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components
## tip
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): add support for server-side copy of existing backups. See [these docs](https://docs.victoriametrics.com/vmbackup.html#server-side-copy-of-the-existing-backup) for details.
## [v1.93.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.0)
Released at 2023-08-12

View file

@ -100,6 +100,23 @@ Do not forget to remove old backups when they are no longer needed in order to s
See also [vmbackupmanager tool](https://docs.victoriametrics.com/vmbackupmanager.html) for automating smart backups.
### Server-side copy of the existing backup
Sometimes it is needed to make server-side copy of the existing backup. This can be done by specifying the source backup path via `-origin` command-line flag,
while the destination path for backup copy must be specified via `-dst` command-line flag. For example, the following command copies backup
from `gs://bucket/foo` to `gs://bucket/bar`:
```console
./vmbackup -origin=gs://bucket/foo -dst=gs://bucket/bar
```
The `-origin` and `-dst` must point to the same object storage bucket or to the same filesystem.
The server-side backup copy is usually performed at much faster speed comparing to the usual backup, since backup data isn't transferred
between the remote storage and locally running `vmbackup` tool.
If the `-dst` already contains some data, then its' contents is synced with the `-origin` data. This allows making incremental server-side copies of backups.
## How does it work?
The backup algorithm is the following:

View file

@ -78,7 +78,7 @@ func (b *Backup) Run() error {
if err := storeMetadata(src, dst); err != nil {
return fmt.Errorf("cannot store backup metadata: %w", err)
}
if err := dst.CreateFile(backupnames.BackupCompleteFilename, []byte{}); err != nil {
if err := dst.CreateFile(backupnames.BackupCompleteFilename, nil); err != nil {
return fmt.Errorf("cannot create `backup complete` file at %s: %w", dst, err)
}
@ -133,79 +133,45 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con
logger.Infof("obtained %d parts from origin %s", len(originParts), origin)
backupSize := getPartsSize(srcParts)
partsToDelete := common.PartsDifference(dstParts, srcParts)
deleteSize := getPartsSize(partsToDelete)
if len(partsToDelete) > 0 {
logger.Infof("deleting %d parts from dst %s", len(partsToDelete), dst)
deletedParts := uint64(0)
err = runParallel(concurrency, partsToDelete, func(p common.Part) error {
logger.Infof("deleting %s from dst %s", &p, dst)
if err := dst.DeletePart(p); err != nil {
return fmt.Errorf("cannot delete %s from dst %s: %w", &p, dst, err)
}
atomic.AddUint64(&deletedParts, 1)
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&deletedParts)
logger.Infof("deleted %d out of %d parts from dst %s in %s", n, len(partsToDelete), dst, elapsed)
})
if err != nil {
return err
}
if err := dst.RemoveEmptyDirs(); err != nil {
return fmt.Errorf("cannot remove empty directories at dst %s: %w", dst, err)
}
if err := deleteDstParts(dst, partsToDelete, concurrency); err != nil {
return fmt.Errorf("cannot delete unneeded parts at dst: %w", err)
}
partsToCopy := common.PartsDifference(srcParts, dstParts)
originCopyParts := common.PartsIntersect(originParts, partsToCopy)
copySize := getPartsSize(originCopyParts)
if len(originCopyParts) > 0 {
logger.Infof("server-side copying %d parts from origin %s to dst %s", len(originCopyParts), origin, dst)
copiedParts := uint64(0)
err = runParallel(concurrency, originCopyParts, func(p common.Part) error {
logger.Infof("server-side copying %s from origin %s to dst %s", &p, origin, dst)
if err := dst.CopyPart(origin, p); err != nil {
return fmt.Errorf("cannot copy %s from origin %s to dst %s: %w", &p, origin, dst, err)
}
atomic.AddUint64(&copiedParts, 1)
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&copiedParts)
logger.Infof("server-side copied %d out of %d parts from origin %s to dst %s in %s", n, len(originCopyParts), origin, dst, elapsed)
})
if err != nil {
return err
}
originPartsToCopy := common.PartsIntersect(originParts, partsToCopy)
copySize := getPartsSize(originPartsToCopy)
if err := copySrcParts(src, dst, originPartsToCopy, concurrency); err != nil {
return fmt.Errorf("cannot server-side copy origin parts to dst: %w", err)
}
srcCopyParts := common.PartsDifference(partsToCopy, originParts)
uploadSize := getPartsSize(srcCopyParts)
if len(srcCopyParts) > 0 {
logger.Infof("uploading %d parts from src %s to dst %s", len(srcCopyParts), src, dst)
logger.Infof("uploading %d parts from %s to %s", len(srcCopyParts), src, dst)
bytesUploaded := uint64(0)
err = runParallel(concurrency, srcCopyParts, func(p common.Part) error {
logger.Infof("uploading %s from src %s to dst %s", &p, src, dst)
logger.Infof("uploading %s from %s to %s", &p, src, dst)
rc, err := src.NewReadCloser(p)
if err != nil {
return fmt.Errorf("cannot create reader for %s from src %s: %w", &p, src, err)
return fmt.Errorf("cannot create reader for %s from %s: %w", &p, src, err)
}
sr := &statReader{
r: rc,
bytesRead: &bytesUploaded,
}
if err := dst.UploadPart(p, sr); err != nil {
return fmt.Errorf("cannot upload %s to dst %s: %w", &p, dst, err)
return fmt.Errorf("cannot upload %s to %s: %w", &p, dst, err)
}
if err = rc.Close(); err != nil {
return fmt.Errorf("cannot close reader for %s from src %s: %w", &p, src, err)
return fmt.Errorf("cannot close reader for %s from %s: %w", &p, src, err)
}
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&bytesUploaded)
prc := 100 * float64(n) / float64(uploadSize)
logger.Infof("uploaded %d out of %d bytes (%.2f%%) from src %s to dst %s in %s", n, uploadSize, prc, src, dst, elapsed)
logger.Infof("uploaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, uploadSize, prc, src, dst, elapsed)
})
atomic.AddUint64(&bytesUploadedTotal, bytesUploaded)
bytesUploadedTotalMetric.Set(bytesUploadedTotal)
@ -214,7 +180,8 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con
}
}
logger.Infof("backup from src %s to dst %s with origin %s is complete; backed up %d bytes in %.3f seconds; deleted %d bytes; server-side copied %d bytes; uploaded %d bytes",
logger.Infof("backup from %s to %s with origin %s is complete; backed up %d bytes in %.3f seconds; server-side deleted %d bytes; "+
"server-side copied %d bytes; uploaded %d bytes",
src, dst, origin, backupSize, time.Since(startTime).Seconds(), deleteSize, copySize, uploadSize)
return nil
@ -230,3 +197,49 @@ func (sr *statReader) Read(p []byte) (int, error) {
atomic.AddUint64(sr.bytesRead, uint64(n))
return n, err
}
func deleteDstParts(dst common.RemoteFS, partsToDelete []common.Part, concurrency int) error {
if len(partsToDelete) == 0 {
return nil
}
logger.Infof("deleting %d parts from %s", len(partsToDelete), dst)
deletedParts := uint64(0)
err := runParallel(concurrency, partsToDelete, func(p common.Part) error {
logger.Infof("deleting %s from %s", &p, dst)
if err := dst.DeletePart(p); err != nil {
return fmt.Errorf("cannot delete %s from %s: %w", &p, dst, err)
}
atomic.AddUint64(&deletedParts, 1)
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&deletedParts)
logger.Infof("deleted %d out of %d parts from %s in %s", n, len(partsToDelete), dst, elapsed)
})
if err != nil {
return err
}
if err := dst.RemoveEmptyDirs(); err != nil {
return fmt.Errorf("cannot remove empty directories at %s: %w", dst, err)
}
return nil
}
func copySrcParts(src common.OriginFS, dst common.RemoteFS, partsToCopy []common.Part, concurrency int) error {
if len(partsToCopy) == 0 {
return nil
}
logger.Infof("server-side copying %d parts from %s to %s", len(partsToCopy), src, dst)
copiedParts := uint64(0)
err := runParallel(concurrency, partsToCopy, func(p common.Part) error {
logger.Infof("server-side copying %s from %s to %s", &p, src, dst)
if err := dst.CopyPart(src, p); err != nil {
return fmt.Errorf("cannot copy %s from %s to %s: %w", &p, src, dst, err)
}
atomic.AddUint64(&copiedParts, 1)
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&copiedParts)
logger.Infof("server-side copied %d out of %d parts from %s to %s in %s", n, len(partsToCopy), src, dst, elapsed)
})
return err
}

View file

@ -0,0 +1,96 @@
package actions
import (
"fmt"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/backupnames"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// RemoteBackupCopy copies backup from Src to Dst.
type RemoteBackupCopy struct {
// Concurrency is the number of concurrent workers during the backup.
Concurrency int
// Src is the copy source
Src common.RemoteFS
// Dst is the copy destination.
//
// If dst contains the previous backup data, then dst is updated incrementally,
// i.e. only the changed data is copied.
//
// If dst points to empty dir, then full copy is made.
Dst common.RemoteFS
}
// Run runs copy with the provided settings.
func (b *RemoteBackupCopy) Run() error {
concurrency := b.Concurrency
src := b.Src
dst := b.Dst
if err := dst.DeleteFile(backupnames.BackupCompleteFilename); err != nil {
return fmt.Errorf("cannot delete `backup complete` file at %s: %w", dst, err)
}
if err := runCopy(src, dst, concurrency); err != nil {
return err
}
if err := copyMetadata(src, dst); err != nil {
return fmt.Errorf("cannot store backup metadata: %w", err)
}
if err := dst.CreateFile(backupnames.BackupCompleteFilename, nil); err != nil {
return fmt.Errorf("cannot create `backup complete` file at %s: %w", dst, err)
}
return nil
}
func copyMetadata(src common.RemoteFS, dst common.RemoteFS) error {
metadata, err := src.ReadFile(backupnames.BackupMetadataFilename)
if err != nil {
return fmt.Errorf("cannot read metadata from %s: %w", src, err)
}
if err := dst.CreateFile(backupnames.BackupMetadataFilename, metadata); err != nil {
return fmt.Errorf("cannot create metadata at %s: %w", dst, err)
}
return nil
}
func runCopy(src common.OriginFS, dst common.RemoteFS, concurrency int) error {
startTime := time.Now()
logger.Infof("starting remote backup copy from %s to %s", src, dst)
srcParts, err := src.ListParts()
if err != nil {
return fmt.Errorf("cannot list src parts: %w", err)
}
logger.Infof("obtained %d parts from src %s", len(srcParts), src)
dstParts, err := dst.ListParts()
if err != nil {
return fmt.Errorf("cannot list dst parts: %w", err)
}
logger.Infof("obtained %d parts from dst %s", len(dstParts), dst)
backupSize := getPartsSize(srcParts)
partsToDelete := common.PartsDifference(dstParts, srcParts)
deleteSize := getPartsSize(partsToDelete)
if err := deleteDstParts(dst, partsToDelete, concurrency); err != nil {
return fmt.Errorf("cannot delete unneeded parts at dst: %w", err)
}
partsToCopy := common.PartsDifference(srcParts, dstParts)
copySize := getPartsSize(partsToCopy)
if err := copySrcParts(src, dst, partsToCopy, concurrency); err != nil {
return fmt.Errorf("cannot server-side copy parts from src to dst: %w", err)
}
logger.Infof("remote backup copy from %s to %s is complete; backed up %d bytes in %.3f seconds; server-side deleted %d bytes; server-side copied %d bytes",
src, dst, backupSize, time.Since(startTime).Seconds(), deleteSize, copySize)
return nil
}

View file

@ -328,3 +328,19 @@ func (fs *FS) HasFile(filePath string) (bool, error) {
return true, nil
}
// ReadFile returns the content of filePath at fs.
func (fs *FS) ReadFile(filePath string) ([]byte, error) {
resp, err := fs.clientForPath(fs.Dir+filePath).DownloadStream(context.Background(), &blob.DownloadStreamOptions{})
if err != nil {
return nil, fmt.Errorf("cannot download %q at %s (remote dir %q): %w", filePath, fs, fs.Dir, err)
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("cannot read %q at %s (remote dir %q): %w", filePath, fs, fs.Dir, err)
}
return b, nil
}

View file

@ -53,4 +53,7 @@ type RemoteFS interface {
// HasFile returns true if filePath exists at RemoteFS.
HasFile(filePath string) (bool, error)
// ReadFile returns file contents at the given filePath.
ReadFile(filePath string) ([]byte, error)
}

View file

@ -243,3 +243,10 @@ func (fs *FS) HasFile(filePath string) (bool, error) {
}
return true, nil
}
// ReadFile returns the content of filePath at fs.
func (fs *FS) ReadFile(filePath string) ([]byte, error) {
path := filepath.Join(fs.Dir, filePath)
return os.ReadFile(path)
}

View file

@ -262,3 +262,15 @@ func (fs *FS) HasFile(filePath string) (bool, error) {
}
return true, nil
}
// ReadFile returns the content of filePath at fs.
func (fs *FS) ReadFile(filePath string) ([]byte, error) {
o := fs.bkt.Object(fs.Dir + filePath)
ctx := context.Background()
r, err := o.NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("cannot read %q at %s (remote path %q): %w", filePath, fs, o.ObjectName(), err)
}
defer r.Close()
return io.ReadAll(r)
}

View file

@ -356,6 +356,25 @@ func (fs *FS) HasFile(filePath string) (bool, error) {
return true, nil
}
// ReadFile returns the content of filePath at fs.
func (fs *FS) ReadFile(filePath string) ([]byte, error) {
p := fs.Dir + filePath
input := &s3.GetObjectInput{
Bucket: aws.String(fs.Bucket),
Key: aws.String(p),
}
o, err := fs.s3.GetObject(context.Background(), input)
if err != nil {
return nil, fmt.Errorf("cannot open %q at %s (remote path %q): %w", filePath, fs, p, err)
}
defer o.Body.Close()
b, err := io.ReadAll(o.Body)
if err != nil {
return nil, fmt.Errorf("cannot read %q at %s (remote path %q): %w", filePath, fs, p, err)
}
return b, nil
}
func (fs *FS) path(p common.Part) string {
return p.RemotePath(fs.Dir)
}