lib/backup/actions: expose vm_backups_downloaded_bytes_total metric in order to be consistent with vm_backups_uploaded_bytes_total metric

This commit is contained in:
Aliaksandr Valialkin 2024-02-24 01:14:35 +02:00
parent 906a35bdbb
commit d5ca67e667
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 11 additions and 7 deletions

View file

@ -17,8 +17,6 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var bytesUploadedTotal = metrics.NewCounter(`vm_backups_uploaded_bytes_total`)
// Backup performs backup according to the provided settings. // Backup performs backup according to the provided settings.
// //
// Note that the backup works only for VictoriaMetrics snapshots // Note that the backup works only for VictoriaMetrics snapshots
@ -190,10 +188,12 @@ type statReader struct {
func (sr *statReader) Read(p []byte) (int, error) { func (sr *statReader) Read(p []byte) (int, error) {
n, err := sr.r.Read(p) n, err := sr.r.Read(p)
sr.bytesRead.Add(uint64(n)) sr.bytesRead.Add(uint64(n))
bytesUploadedTotal.Add(int(n)) bytesUploadedTotal.Add(n)
return n, err return n, err
} }
var bytesUploadedTotal = metrics.NewCounter(`vm_backups_uploaded_bytes_total`)
func deleteDstParts(dst common.RemoteFS, partsToDelete []common.Part, concurrency int) error { func deleteDstParts(dst common.RemoteFS, partsToDelete []common.Part, concurrency int) error {
if len(partsToDelete) == 0 { if len(partsToDelete) == 0 {
return nil return nil

View file

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fslocal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
) )
// Restore restores data according to the provided settings. // Restore restores data according to the provided settings.
@ -153,7 +154,7 @@ func (r *Restore) Run() error {
perPath[p.Path] = parts perPath[p.Path] = parts
} }
logger.Infof("downloading %d parts from %s to %s", len(partsToCopy), src, dst) logger.Infof("downloading %d parts from %s to %s", len(partsToCopy), src, dst)
bytesDownloaded := uint64(0) var bytesDownloaded atomic.Uint64
err = runParallelPerPath(concurrency, perPath, func(parts []common.Part) error { err = runParallelPerPath(concurrency, perPath, func(parts []common.Part) error {
// Sort partsToCopy in order to properly grow file size during downloading // Sort partsToCopy in order to properly grow file size during downloading
// and to properly resume downloading of incomplete files on the next Restore.Run call. // and to properly resume downloading of incomplete files on the next Restore.Run call.
@ -177,7 +178,7 @@ func (r *Restore) Run() error {
} }
return nil return nil
}, func(elapsed time.Duration) { }, func(elapsed time.Duration) {
n := atomic.LoadUint64(&bytesDownloaded) n := bytesDownloaded.Load()
prc := 100 * float64(n) / float64(downloadSize) prc := 100 * float64(n) / float64(downloadSize)
logger.Infof("downloaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, downloadSize, prc, src, dst, elapsed) logger.Infof("downloaded %d out of %d bytes (%.2f%%) from %s to %s in %s", n, downloadSize, prc, src, dst, elapsed)
}) })
@ -194,15 +195,18 @@ func (r *Restore) Run() error {
type statWriter struct { type statWriter struct {
w io.Writer w io.Writer
bytesWritten *uint64 bytesWritten *atomic.Uint64
} }
func (sw *statWriter) Write(p []byte) (int, error) { func (sw *statWriter) Write(p []byte) (int, error) {
n, err := sw.w.Write(p) n, err := sw.w.Write(p)
atomic.AddUint64(sw.bytesWritten, uint64(n)) sw.bytesWritten.Add(uint64(n))
bytesDownloadedTotal.Add(n)
return n, err return n, err
} }
var bytesDownloadedTotal = metrics.NewCounter(`vm_backups_downloaded_bytes_total`)
func createRestoreLock(dstDir string) error { func createRestoreLock(dstDir string) error {
lockF := path.Join(dstDir, backupnames.RestoreInProgressFilename) lockF := path.Join(dstDir, backupnames.RestoreInProgressFilename)
f, err := os.Create(lockF) f, err := os.Create(lockF)