From 443189fb0ae795d8ec480b2aeb2e47c22c57f9d1 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 19 Nov 2019 20:31:52 +0200 Subject: [PATCH] app/{vmbackup,vmrestore}: add `-maxBytesPerSecond` command-line flag for limiting the used network bandwidth during backup / restore --- app/vmbackup/README.md | 4 +- app/vmbackup/main.go | 11 ++- app/vmrestore/README.md | 12 ++- app/vmrestore/main.go | 9 +- lib/backup/fslocal/bandwidth_limiter.go | 116 ++++++++++++++++++++++++ lib/backup/fslocal/fslocal.go | 37 +++++++- 6 files changed, 176 insertions(+), 13 deletions(-) create mode 100644 lib/backup/fslocal/bandwidth_limiter.go diff --git a/app/vmbackup/README.md b/app/vmbackup/README.md index c3a3f1adb..d67aeb6db 100644 --- a/app/vmbackup/README.md +++ b/app/vmbackup/README.md @@ -115,7 +115,7 @@ See [this article](https://medium.com/@valyala/speeding-up-backups-for-big-time- ### Troubleshooting * If the backup is slow, then try setting higher value for `-concurrency` flag. This will increase the number of concurrent workers that upload data to backup storage. -* If `vmbackup` eats all the network bandwidth, then set `-concurrency` to 1. This should reduce network bandwidth usage. +* If `vmbackup` eats all the network bandwidth, then set `-maxBytesPerSecond` to the desired value. * If `vmbackup` has been interrupted due to temporary error, then just restart it with the same args. It will resume the backup process. @@ -137,6 +137,8 @@ Run `vmbackup -help` in order to see all the available options: -dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded -loggerLevel string Minimum level of errors to log. Possible values: INFO, ERROR, FATAL, PANIC (default "INFO") + -maxBytesPerSecond int + The maximum upload speed. There is no limit if it is set to 0 -memory.allowedPercent float Allowed percent of system memory VictoriaMetrics caches may occupy (default 60) -origin string diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 45d058978..da623721c 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -18,8 +18,9 @@ var ( dst = flag.String("dst", "", "Where to put the backup on the remote storage. "+ "Example: gcs://bucket/path/to/backup/dir, s3://bucket/path/to/backup/dir or fs:///path/to/local/backup/dir\n"+ "-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded") - origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups") - concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration") + origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups") + concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration") + maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0") ) func main() { @@ -84,7 +85,11 @@ func newSrcFS() (*fslocal.FS, error) { } fs := &fslocal.FS{ - Dir: snapshotPath, + Dir: snapshotPath, + MaxBytesPerSecond: *maxBytesPerSecond, + } + if err := fs.Init(); err != nil { + return nil, fmt.Errorf("cannot initialize fs: %s", err) } return fs, nil } diff --git a/app/vmrestore/README.md b/app/vmrestore/README.md index 40ce55709..d017806f2 100644 --- a/app/vmrestore/README.md +++ b/app/vmrestore/README.md @@ -24,15 +24,17 @@ vmrestore -src=gcs:/// -storageDataPath= 0 { + quota := blw.bl.GetQuota(len(p)) + n, err := blw.wc.Write(p[:quota]) + nn += n + if err != nil { + return nn, err + } + p = p[quota:] + } + return nn, nil +} + +func (blw *bandwidthLimitedWriter) Close() error { + return blw.wc.Close() +} + +func (bl *bandwidthLimiter) perSecondUpdater() { + tc := time.NewTicker(time.Second) + c := bl.c + for range tc.C { + c.L.Lock() + bl.quota = bl.perSecondLimit + c.Signal() + c.L.Unlock() + } +} + +// GetQuota returns the number in the range [1..n] - the allowed quota for now. +// +// The function blocks until at least 1 can be returned from it. +func (bl *bandwidthLimiter) GetQuota(n int) int { + if n <= 0 { + logger.Panicf("BUG: n must be positive; got %d", n) + } + c := bl.c + c.L.Lock() + for bl.quota <= 0 { + c.Wait() + } + quota := bl.quota + if quota > n { + quota = n + } + bl.quota -= quota + if bl.quota > 0 { + c.Signal() + } + c.L.Unlock() + return quota +} diff --git a/lib/backup/fslocal/fslocal.go b/lib/backup/fslocal/fslocal.go index 1d96dac8e..913c52287 100644 --- a/lib/backup/fslocal/fslocal.go +++ b/lib/backup/fslocal/fslocal.go @@ -20,6 +20,19 @@ import ( type FS struct { // Dir is a path to local directory to work with. Dir string + + // MaxBytesPerSecond is the maximum bandwidth usage during backups or restores. + MaxBytesPerSecond int + + bl *bandwidthLimiter +} + +// Init initializes fs +func (fs *FS) Init() error { + if fs.MaxBytesPerSecond > 0 { + fs.bl = newBandwidthLimiter(fs.MaxBytesPerSecond) + } + return nil } // String returns user-readable representation for the fs. @@ -93,7 +106,11 @@ func (fs *FS) NewReadCloser(p common.Part) (io.ReadCloser, error) { r: r, n: p.Size, } - return lrc, nil + if fs.bl == nil { + return lrc, nil + } + blrc := fs.bl.NewReadCloser(lrc) + return blrc, nil } // NewWriteCloser returns io.WriteCloser for the given part p located in fs. @@ -108,9 +125,14 @@ func (fs *FS) NewWriteCloser(p common.Part) (io.WriteCloser, error) { } wc := &writeCloser{ w: w, + n: p.Size, path: path, } - return wc, nil + if fs.bl == nil { + return wc, nil + } + blwc := fs.bl.NewWriteCloser(wc) + return blwc, nil } // DeletePath deletes the given path from fs and returns the size @@ -188,14 +210,23 @@ func (lrc *limitedReadCloser) Close() error { type writeCloser struct { w *filestream.Writer + n uint64 path string } func (wc *writeCloser) Write(p []byte) (int, error) { - return wc.w.Write(p) + n, err := wc.w.Write(p) + if uint64(n) > wc.n { + return n, fmt.Errorf("too much data written; got %d bytes; want %d bytes", n, wc.n) + } + wc.n -= uint64(n) + return n, err } func (wc *writeCloser) Close() error { wc.w.MustClose() + if wc.n != 0 { + return fmt.Errorf("missing data writes for %d bytes", wc.n) + } return fscommon.FsyncFile(wc.path) }