app/{vmbackup,vmrestore}: add -maxBytesPerSecond command-line flag for limiting the used network bandwidth during backup / restore

This commit is contained in:
Aliaksandr Valialkin 2019-11-19 20:31:52 +02:00
parent 9d1ee1e2ae
commit 216a260ced
6 changed files with 176 additions and 13 deletions

View file

@ -112,7 +112,7 @@ These properties allow performing fast and cheap incremental backups and server-
### Troubleshooting
* If the backup is slow, then try setting higher value for `-concurrency` flag. This will increase the number of concurrent workers that upload data to backup storage.
* If `vmbackup` eats all the network bandwidth, then set `-concurrency` to 1. This should reduce network bandwidth usage.
* If `vmbackup` eats all the network bandwidth, then set `-maxBytesPerSecond` to the desired value.
* If `vmbackup` has been interrupted due to temporary error, then just restart it with the same args. It will resume the backup process.
@ -134,6 +134,8 @@ Run `vmbackup -help` in order to see all the available options:
-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded
-loggerLevel string
Minimum level of errors to log. Possible values: INFO, ERROR, FATAL, PANIC (default "INFO")
-maxBytesPerSecond int
The maximum upload speed. There is no limit if it is set to 0
-memory.allowedPercent float
Allowed percent of system memory VictoriaMetrics caches may occupy (default 60)
-origin string

View file

@ -18,8 +18,9 @@ var (
dst = flag.String("dst", "", "Where to put the backup on the remote storage. "+
"Example: gcs://bucket/path/to/backup/dir, s3://bucket/path/to/backup/dir or fs:///path/to/local/backup/dir\n"+
"-dst can point to the previous backup. In this case incremental backup is performed, i.e. only changed data is uploaded")
origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration")
origin = flag.String("origin", "", "Optional origin directory on the remote storage with old backup for server-side copying when performing full backup. This speeds up full backups")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce backup duration")
maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum upload speed. There is no limit if it is set to 0")
)
func main() {
@ -84,7 +85,11 @@ func newSrcFS() (*fslocal.FS, error) {
}
fs := &fslocal.FS{
Dir: snapshotPath,
Dir: snapshotPath,
MaxBytesPerSecond: *maxBytesPerSecond,
}
if err := fs.Init(); err != nil {
return nil, fmt.Errorf("cannot initialize fs: %s", err)
}
return fs, nil
}

View file

@ -24,15 +24,17 @@ vmrestore -src=gcs://<bucket>/<path/to/backup> -storageDataPath=<local/path/to/r
The original `-storageDataPath` directory may contain old files. They will be susbstituted by the files from backup.
### Troubleshooting
* If `vmrestore` eats all the network bandwidth, then set `-maxBytesPerSecond` to the desired value.
* If `vmrestore` has been interrupted due to temporary error, then just restart it with the same args. It will resume the restore process.
### Advanced usage
Run `vmrestore -help` in order to see all the available options:
```
vmrestore restores VictoriaMetrics data from backups made by vmbackup.
See the docs at https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmrestore/README.md .
-concurrency int
The number of concurrent workers. Higher concurrency may reduce restore duration (default 10)
-configFilePath string
@ -43,6 +45,8 @@ See the docs at https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/a
See https://cloud.google.com/iam/docs/creating-managing-service-account-keys and https://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html
-loggerLevel string
Minimum level of errors to log. Possible values: INFO, ERROR, FATAL, PANIC (default "INFO")
-maxBytesPerSecond int
The maximum download speed. There is no limit if it is set to 0
-memory.allowedPercent float
Allowed percent of system memory VictoriaMetrics caches may occupy (default 60)
-src string

View file

@ -16,7 +16,8 @@ var (
"Example: gcs://bucket/path/to/backup/dir, s3://bucket/path/to/backup/dir or fs:///path/to/local/backup/dir")
storageDataPath = flag.String("storageDataPath", "victoria-metrics-data", "Destination path where backup must be restored. "+
"VictoriaMetrics must be stopped when restoring from backup. -storageDataPath dir can be non-empty. In this case only missing data is downloaded from backup")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
concurrency = flag.Int("concurrency", 10, "The number of concurrent workers. Higher concurrency may reduce restore duration")
maxBytesPerSecond = flag.Int("maxBytesPerSecond", 0, "The maximum download speed. There is no limit if it is set to 0")
)
func main() {
@ -59,7 +60,11 @@ func newDstFS() (*fslocal.FS, error) {
return nil, fmt.Errorf("`-storageDataPath` cannot be empty")
}
fs := &fslocal.FS{
Dir: *storageDataPath,
Dir: *storageDataPath,
MaxBytesPerSecond: *maxBytesPerSecond,
}
if err := fs.Init(); err != nil {
return nil, fmt.Errorf("cannot initialize local fs: %s", err)
}
return fs, nil
}

View file

@ -0,0 +1,116 @@
package fslocal
import (
"io"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type bandwidthLimiter struct {
perSecondLimit int
c *sync.Cond
// quota for the current second
quota int
}
func newBandwidthLimiter(perSecondLimit int) *bandwidthLimiter {
if perSecondLimit <= 0 {
logger.Panicf("BUG: perSecondLimit must be positive; got %d", perSecondLimit)
}
var bl bandwidthLimiter
bl.perSecondLimit = perSecondLimit
var mu sync.Mutex
bl.c = sync.NewCond(&mu)
go bl.perSecondUpdater()
return &bl
}
func (bl *bandwidthLimiter) NewReadCloser(rc io.ReadCloser) *bandwidthLimitedReader {
return &bandwidthLimitedReader{
rc: rc,
bl: bl,
}
}
func (bl *bandwidthLimiter) NewWriteCloser(wc io.WriteCloser) *bandwidthLimitedWriter {
return &bandwidthLimitedWriter{
wc: wc,
bl: bl,
}
}
type bandwidthLimitedReader struct {
rc io.ReadCloser
bl *bandwidthLimiter
}
func (blr *bandwidthLimitedReader) Read(p []byte) (int, error) {
quota := blr.bl.GetQuota(len(p))
return blr.rc.Read(p[:quota])
}
func (blr *bandwidthLimitedReader) Close() error {
return blr.rc.Close()
}
type bandwidthLimitedWriter struct {
wc io.WriteCloser
bl *bandwidthLimiter
}
func (blw *bandwidthLimitedWriter) Write(p []byte) (int, error) {
nn := 0
for len(p) > 0 {
quota := blw.bl.GetQuota(len(p))
n, err := blw.wc.Write(p[:quota])
nn += n
if err != nil {
return nn, err
}
p = p[quota:]
}
return nn, nil
}
func (blw *bandwidthLimitedWriter) Close() error {
return blw.wc.Close()
}
func (bl *bandwidthLimiter) perSecondUpdater() {
tc := time.NewTicker(time.Second)
c := bl.c
for range tc.C {
c.L.Lock()
bl.quota = bl.perSecondLimit
c.Signal()
c.L.Unlock()
}
}
// GetQuota returns the number in the range [1..n] - the allowed quota for now.
//
// The function blocks until at least 1 can be returned from it.
func (bl *bandwidthLimiter) GetQuota(n int) int {
if n <= 0 {
logger.Panicf("BUG: n must be positive; got %d", n)
}
c := bl.c
c.L.Lock()
for bl.quota <= 0 {
c.Wait()
}
quota := bl.quota
if quota > n {
quota = n
}
bl.quota -= quota
if bl.quota > 0 {
c.Signal()
}
c.L.Unlock()
return quota
}

View file

@ -20,6 +20,19 @@ import (
type FS struct {
// Dir is a path to local directory to work with.
Dir string
// MaxBytesPerSecond is the maximum bandwidth usage during backups or restores.
MaxBytesPerSecond int
bl *bandwidthLimiter
}
// Init initializes fs
func (fs *FS) Init() error {
if fs.MaxBytesPerSecond > 0 {
fs.bl = newBandwidthLimiter(fs.MaxBytesPerSecond)
}
return nil
}
// String returns user-readable representation for the fs.
@ -93,7 +106,11 @@ func (fs *FS) NewReadCloser(p common.Part) (io.ReadCloser, error) {
r: r,
n: p.Size,
}
return lrc, nil
if fs.bl == nil {
return lrc, nil
}
blrc := fs.bl.NewReadCloser(lrc)
return blrc, nil
}
// NewWriteCloser returns io.WriteCloser for the given part p located in fs.
@ -108,9 +125,14 @@ func (fs *FS) NewWriteCloser(p common.Part) (io.WriteCloser, error) {
}
wc := &writeCloser{
w: w,
n: p.Size,
path: path,
}
return wc, nil
if fs.bl == nil {
return wc, nil
}
blwc := fs.bl.NewWriteCloser(wc)
return blwc, nil
}
// DeletePath deletes the given path from fs and returns the size
@ -188,14 +210,23 @@ func (lrc *limitedReadCloser) Close() error {
type writeCloser struct {
w *filestream.Writer
n uint64
path string
}
func (wc *writeCloser) Write(p []byte) (int, error) {
return wc.w.Write(p)
n, err := wc.w.Write(p)
if uint64(n) > wc.n {
return n, fmt.Errorf("too much data written; got %d bytes; want %d bytes", n, wc.n)
}
wc.n -= uint64(n)
return n, err
}
func (wc *writeCloser) Close() error {
wc.w.MustClose()
if wc.n != 0 {
return fmt.Errorf("missing data writes for %d bytes", wc.n)
}
return fscommon.FsyncFile(wc.path)
}