From ece86cd31400ac73f80e3988d1ab48f1be7d2f5e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 24 Feb 2024 01:02:18 +0200 Subject: [PATCH] lib/backup/actions: consistently use atomic.* types instead of atomic.* functions See ea9e2b19a5fa8aecc25c9da10fad8f4c1c58df38 --- lib/backup/actions/backup.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/lib/backup/actions/backup.go b/lib/backup/actions/backup.go index b06d873e9..4ea974412 100644 --- a/lib/backup/actions/backup.go +++ b/lib/backup/actions/backup.go @@ -17,10 +17,7 @@ import ( "github.com/VictoriaMetrics/metrics" ) -var ( - bytesUploadedTotal = uint64(0) - bytesUploadedTotalMetric = metrics.NewCounter(`vm_backups_uploaded_bytes_total`) -) +var bytesUploadedTotal = metrics.NewCounter(`vm_backups_uploaded_bytes_total`) // Backup performs backup according to the provided settings. // @@ -150,7 +147,7 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con uploadSize := getPartsSize(srcCopyParts) if len(srcCopyParts) > 0 { logger.Infof("uploading %d parts from %s to %s", len(srcCopyParts), src, dst) - bytesUploaded := uint64(0) + var bytesUploaded atomic.Uint64 err = runParallel(concurrency, srcCopyParts, func(p common.Part) error { logger.Infof("uploading %s from %s to %s", &p, src, dst) rc, err := src.NewReadCloser(p) @@ -169,12 +166,11 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con } return nil }, func(elapsed time.Duration) { - n := atomic.LoadUint64(&bytesUploaded) + n := bytesUploaded.Load() prc := 100 * float64(n) / float64(uploadSize) logger.Infof("uploaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, uploadSize, prc, src, dst, elapsed) }) - atomic.AddUint64(&bytesUploadedTotal, bytesUploaded) - bytesUploadedTotalMetric.Set(bytesUploadedTotal) + bytesUploadedTotal.Add(int(bytesUploaded.Load())) if err != nil { return err } @@ -189,12 +185,12 @@ func runBackup(src *fslocal.FS, dst common.RemoteFS, origin common.OriginFS, con type statReader struct { r io.Reader - bytesRead *uint64 + bytesRead *atomic.Uint64 } func (sr *statReader) Read(p []byte) (int, error) { n, err := sr.r.Read(p) - atomic.AddUint64(sr.bytesRead, uint64(n)) + sr.bytesRead.Add(uint64(n)) return n, err } @@ -203,16 +199,16 @@ func deleteDstParts(dst common.RemoteFS, partsToDelete []common.Part, concurrenc return nil } logger.Infof("deleting %d parts from %s", len(partsToDelete), dst) - deletedParts := uint64(0) + var deletedParts atomic.Uint64 err := runParallel(concurrency, partsToDelete, func(p common.Part) error { logger.Infof("deleting %s from %s", &p, dst) if err := dst.DeletePart(p); err != nil { return fmt.Errorf("cannot delete %s from %s: %w", &p, dst, err) } - atomic.AddUint64(&deletedParts, 1) + deletedParts.Add(1) return nil }, func(elapsed time.Duration) { - n := atomic.LoadUint64(&deletedParts) + n := deletedParts.Load() logger.Infof("deleted %d out of %d parts from %s in %s", n, len(partsToDelete), dst, elapsed) }) if err != nil { @@ -229,16 +225,16 @@ func copySrcParts(src common.OriginFS, dst common.RemoteFS, partsToCopy []common return nil } logger.Infof("server-side copying %d parts from %s to %s", len(partsToCopy), src, dst) - copiedParts := uint64(0) + var copiedParts atomic.Uint64 err := runParallel(concurrency, partsToCopy, func(p common.Part) error { logger.Infof("server-side copying %s from %s to %s", &p, src, dst) if err := dst.CopyPart(src, p); err != nil { return fmt.Errorf("cannot copy %s from %s to %s: %w", &p, src, dst, err) } - atomic.AddUint64(&copiedParts, 1) + copiedParts.Add(1) return nil }, func(elapsed time.Duration) { - n := atomic.LoadUint64(&copiedParts) + n := copiedParts.Load() logger.Infof("server-side copied %d out of %d parts from %s to %s in %s", n, len(partsToCopy), src, dst, elapsed) }) return err