lib/backup/actions: consistently use atomic.* types instead of atomic.* functions

See ea9e2b19a5
This commit is contained in:
Aliaksandr Valialkin 2024-02-24 01:02:18 +02:00
parent 55f1f24e62
commit ece86cd314
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -17,10 +17,7 @@ import (
"github.com/VictoriaMetrics/metrics"
)
var (
bytesUploadedTotal = uint64(0)
bytesUploadedTotalMetric = metrics.NewCounter(`vm_backups_uploaded_bytes_total`)
)
var bytesUploadedTotal = metrics.NewCounter(`vm_backups_uploaded_bytes_total`)
// Backup performs backup according to the provided settings.
//
@ -150,7 +147,7 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con
uploadSize := getPartsSize(srcCopyParts)
if len(srcCopyParts) > 0 {
logger.Infof("uploading %d parts from %s to %s", len(srcCopyParts), src, dst)
bytesUploaded := uint64(0)
var bytesUploaded atomic.Uint64
err = runParallel(concurrency, srcCopyParts, func(p common.Part) error {
logger.Infof("uploading %s from %s to %s", &p, src, dst)
rc, err := src.NewReadCloser(p)
@ -169,12 +166,11 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con
}
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&bytesUploaded)
n := bytesUploaded.Load()
prc := 100 * float64(n) / float64(uploadSize)
logger.Infof("uploaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, uploadSize, prc, src, dst, elapsed)
})
atomic.AddUint64(&bytesUploadedTotal, bytesUploaded)
bytesUploadedTotalMetric.Set(bytesUploadedTotal)
bytesUploadedTotal.Add(int(bytesUploaded.Load()))
if err != nil {
return err
}
@ -189,12 +185,12 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con
type statReader struct {
r io.Reader
bytesRead *uint64
bytesRead *atomic.Uint64
}
func (sr *statReader) Read(p []byte) (int, error) {
n, err := sr.r.Read(p)
atomic.AddUint64(sr.bytesRead, uint64(n))
sr.bytesRead.Add(uint64(n))
return n, err
}
@ -203,16 +199,16 @@ func deleteDstParts(dst common.RemoteFS, partsToDelete []common.Part, concurrenc
return nil
}
logger.Infof("deleting %d parts from %s", len(partsToDelete), dst)
deletedParts := uint64(0)
var deletedParts atomic.Uint64
err := runParallel(concurrency, partsToDelete, func(p common.Part) error {
logger.Infof("deleting %s from %s", &p, dst)
if err := dst.DeletePart(p); err != nil {
return fmt.Errorf("cannot delete %s from %s: %w", &p, dst, err)
}
atomic.AddUint64(&deletedParts, 1)
deletedParts.Add(1)
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&deletedParts)
n := deletedParts.Load()
logger.Infof("deleted %d out of %d parts from %s in %s", n, len(partsToDelete), dst, elapsed)
})
if err != nil {
@ -229,16 +225,16 @@ func copySrcParts(src common.OriginFS, dst common.RemoteFS, partsToCopy []common
return nil
}
logger.Infof("server-side copying %d parts from %s to %s", len(partsToCopy), src, dst)
copiedParts := uint64(0)
var copiedParts atomic.Uint64
err := runParallel(concurrency, partsToCopy, func(p common.Part) error {
logger.Infof("server-side copying %s from %s to %s", &p, src, dst)
if err := dst.CopyPart(src, p); err != nil {
return fmt.Errorf("cannot copy %s from %s to %s: %w", &p, src, dst, err)
}
atomic.AddUint64(&copiedParts, 1)
copiedParts.Add(1)
return nil
}, func(elapsed time.Duration) {
n := atomic.LoadUint64(&copiedParts)
n := copiedParts.Load()
logger.Infof("server-side copied %d out of %d parts from %s to %s in %s", n, len(partsToCopy), src, dst, elapsed)
})
return err