From cf5f2874cd0c0e13dab44210c1f6a3230a29ddc5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 9 Oct 2020 15:11:28 +0300 Subject: [PATCH] lib/backup/fslocal: add FS.MustStop() method for stopping bandwidth limiter --- app/vmbackup/main.go | 1 + app/vmrestore/main.go | 1 + lib/backup/fslocal/bandwidth_limiter.go | 22 ++++++++++++++++++++-- lib/backup/fslocal/fslocal.go | 13 ++++++++++++- 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index 75d8672365..357e4c5ac2 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -90,6 +90,7 @@ func main() { if err := a.Run(); err != nil { logger.Fatalf("cannot create backup: %s", err) } + srcFS.MustStop() } func usage() { diff --git a/app/vmrestore/main.go b/app/vmrestore/main.go index b4d156b6ac..1c0d225f39 100644 --- a/app/vmrestore/main.go +++ b/app/vmrestore/main.go @@ -52,6 +52,7 @@ func main() { if err := a.Run(); err != nil { logger.Fatalf("cannot restore from backup: %s", err) } + dstFS.MustStop() } func usage() { diff --git a/lib/backup/fslocal/bandwidth_limiter.go b/lib/backup/fslocal/bandwidth_limiter.go index db5a2ff3b2..75c866435e 100644 --- a/lib/backup/fslocal/bandwidth_limiter.go +++ b/lib/backup/fslocal/bandwidth_limiter.go @@ -15,6 +15,9 @@ type bandwidthLimiter struct { // quota for the current second quota int + + stopCh chan struct{} + wg sync.WaitGroup } func newBandwidthLimiter(perSecondLimit int) *bandwidthLimiter { @@ -25,10 +28,20 @@ func newBandwidthLimiter(perSecondLimit int) *bandwidthLimiter { bl.perSecondLimit = perSecondLimit var mu sync.Mutex bl.c = sync.NewCond(&mu) - go bl.perSecondUpdater() + bl.stopCh = make(chan struct{}) + bl.wg.Add(1) + go func() { + defer bl.wg.Done() + bl.perSecondUpdater() + }() return &bl } +func (bl *bandwidthLimiter) MustStop() { + close(bl.stopCh) + bl.wg.Wait() +} + func (bl *bandwidthLimiter) NewReadCloser(rc io.ReadCloser) *bandwidthLimitedReader { return &bandwidthLimitedReader{ rc: rc, @@ -83,7 +96,12 @@ func (blw *bandwidthLimitedWriter) Close() error { func (bl *bandwidthLimiter) perSecondUpdater() { tc := time.NewTicker(time.Second) c := bl.c - for range tc.C { + for { + select { + case <-tc.C: + case <-bl.stopCh: + return + } c.L.Lock() bl.quota = bl.perSecondLimit c.Signal() diff --git a/lib/backup/fslocal/fslocal.go b/lib/backup/fslocal/fslocal.go index ced08c2cfb..051182a624 100644 --- a/lib/backup/fslocal/fslocal.go +++ b/lib/backup/fslocal/fslocal.go @@ -27,7 +27,9 @@ type FS struct { bl *bandwidthLimiter } -// Init initializes fs +// Init initializes fs. +// +// The returned fs must be stopped when no long needed with MustStop call. func (fs *FS) Init() error { if fs.MaxBytesPerSecond > 0 { fs.bl = newBandwidthLimiter(fs.MaxBytesPerSecond) @@ -35,6 +37,15 @@ func (fs *FS) Init() error { return nil } +// MustStop stops fs. +func (fs *FS) MustStop() { + if fs.bl == nil { + return + } + fs.bl.MustStop() + fs.bl = nil +} + // String returns user-readable representation for the fs. func (fs *FS) String() string { return fmt.Sprintf("fslocal %q", fs.Dir)