diff --git a/app/vmbackup/main.go b/app/vmbackup/main.go index c4facc8e6..06c9e475d 100644 --- a/app/vmbackup/main.go +++ b/app/vmbackup/main.go @@ -155,7 +155,7 @@ See the docs at https://docs.victoriametrics.com/vmbackup.html . } func newSrcFS() (*fslocal.FS, error) { - snapshotPath := *storageDataPath + "/snapshots/" + *snapshotName + snapshotPath := filepath.Join(*storageDataPath, "snapshots", *snapshotName) // Verify the snapshot exists. f, err := os.Open(snapshotPath) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0e5125cc8..eb88fcff6 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -42,6 +42,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores. * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix performance issue when migrating data from VictoriaMetrics according to [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-from-victoriametrics). Add the ability to speed up the data migration via `--vm-native-disable-retries` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4092). +* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fixes compatibility with Windows OS. For details see [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) * BUGFIX: do not panic at windows during snapshot deletion. Note, at windows process restart required for complete snapshot delete. See [this comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70#issuecomment-1491529183) for details. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix a panic when the duration in the query contains uppercase `M` suffix. Such a suffix isn't allowed to use in durations, since it clashes with `a million` suffix, e.g. it isn't clear whether `rate(metric[5M])` means rate over 5 minutes, 5 months or 5 million seconds. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4120) issues. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): retry failed read request on the closed connection one more time. This improves rules execution reliability when connection between vmalert and datasource closes unexpectedly. diff --git a/lib/backup/actions/backup.go b/lib/backup/actions/backup.go index f01d5b127..5c6c80df9 100644 --- a/lib/backup/actions/backup.go +++ b/lib/backup/actions/backup.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "io" - "strings" + "path/filepath" "sync/atomic" "time" @@ -83,12 +83,7 @@ func (b *Backup) Run() error { } func storeMetadata(src *fslocal.FS, dst common.RemoteFS) error { - last := strings.LastIndex(src.Dir, "/") - if last < 0 { - return fmt.Errorf("cannot decode snapshot location %q", src.Dir) - } - - snapshotName := src.Dir[last+1:] + snapshotName := filepath.Base(src.Dir) snapshotTime, err := snapshot.Time(snapshotName) if err != nil { return fmt.Errorf("cannot decode snapshot name %q: %w", snapshotName, err) diff --git a/lib/backup/actions/util.go b/lib/backup/actions/util.go index a49cd9c53..b911b4f74 100644 --- a/lib/backup/actions/util.go +++ b/lib/backup/actions/util.go @@ -3,6 +3,7 @@ package actions import ( "flag" "fmt" + "path/filepath" "strings" "sync" "time" @@ -190,7 +191,7 @@ func NewRemoteFS(path string) (common.RemoteFS, error) { dir := path[n+len("://"):] switch scheme { case "fs": - if !strings.HasPrefix(dir, "/") { + if !filepath.IsAbs(dir) { return nil, fmt.Errorf("dir must be absolute; got %q", dir) } fs := &fsremote.FS{ diff --git a/lib/backup/common/part.go b/lib/backup/common/part.go index 35fc6cfe6..1b58b66b1 100644 --- a/lib/backup/common/part.go +++ b/lib/backup/common/part.go @@ -51,7 +51,7 @@ func (p *Part) RemotePath(prefix string) string { return fmt.Sprintf("%s/%s/%016X_%016X_%016X", prefix, p.Path, p.FileSize, p.Offset, p.Size) } -var partNameRegexp = regexp.MustCompile(`^(.+)/([0-9A-F]{16})_([0-9A-F]{16})_([0-9A-F]{16})$`) +var partNameRegexp = regexp.MustCompile(`^(.+)[/\\]([0-9A-F]{16})_([0-9A-F]{16})_([0-9A-F]{16})$`) // ParseFromRemotePath parses p from remotePath. // diff --git a/lib/backup/fscommon/fscommon.go b/lib/backup/fscommon/fscommon.go index 0221229e9..8897cc6b8 100644 --- a/lib/backup/fscommon/fscommon.go +++ b/lib/backup/fscommon/fscommon.go @@ -10,36 +10,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -// FsyncFile fsyncs path contents and the parent directory contents. -func FsyncFile(path string) error { - if err := fsync(path); err != nil { - _ = os.RemoveAll(path) - return fmt.Errorf("cannot fsync file %q: %w", path, err) - } - dir := filepath.Dir(path) - if err := fsync(dir); err != nil { - return fmt.Errorf("cannot fsync dir %q: %w", dir, err) - } - return nil -} - -// FsyncDir fsyncs dir contents. -func FsyncDir(dir string) error { - return fsync(dir) -} - -func fsync(path string) error { - f, err := os.Open(path) - if err != nil { - return err - } - if err := f.Sync(); err != nil { - _ = f.Close() - return err - } - return f.Close() -} - // AppendFiles appends all the files from dir to dst. // // All the appended files will have dir prefix. @@ -77,7 +47,7 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) { // Do not take into account special files. continue } - path := dir + "/" + name + path := filepath.Join(dir, name) if fi.IsDir() { // Process directory dst, err = AppendFiles(dst, path) @@ -112,13 +82,13 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) { if err != nil { return nil, fmt.Errorf("cannot list files at %q from symlink %q: %w", pathReal, path, err) } - pathReal += "/" + pathReal += string(filepath.Separator) for i := len(dst); i < len(dstNew); i++ { x := dstNew[i] if !strings.HasPrefix(x, pathReal) { return nil, fmt.Errorf("unexpected prefix for path %q; want %q", x, pathReal) } - dstNew[i] = path + "/" + x[len(pathReal):] + dstNew[i] = filepath.Join(path, x[len(pathReal):]) } dst = dstNew continue @@ -182,7 +152,7 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) { if name == "." || name == ".." { continue } - path := dir + "/" + name + path := filepath.Join(dir, name) if fi.IsDir() { // Process directory ok, err := removeEmptyDirs(path) diff --git a/lib/backup/fslocal/fslocal.go b/lib/backup/fslocal/fslocal.go index 055c38123..1136cee74 100644 --- a/lib/backup/fslocal/fslocal.go +++ b/lib/backup/fslocal/fslocal.go @@ -68,7 +68,7 @@ func (fs *FS) ListParts() ([]common.Part, error) { } var parts []common.Part - dir += "/" + dir += string(filepath.Separator) for _, file := range files { if !strings.HasPrefix(file, dir) { logger.Panicf("BUG: unexpected prefix for file %q; want %q", file, dir) @@ -187,11 +187,7 @@ func (fs *FS) mkdirAll(filePath string) error { } func (fs *FS) path(p common.Part) string { - dir := fs.Dir - for strings.HasSuffix(dir, "/") { - dir = dir[:len(dir)-1] - } - return fs.Dir + "/" + p.Path + return filepath.Join(fs.Dir, p.Path) } type limitedReadCloser struct { @@ -235,9 +231,11 @@ func (wc *writeCloser) Write(p []byte) (int, error) { } func (wc *writeCloser) Close() error { + wc.w.MustFlush(true) wc.w.MustClose() if wc.n != 0 { return fmt.Errorf("missing data writes for %d bytes", wc.n) } - return fscommon.FsyncFile(wc.path) + + return nil } diff --git a/lib/backup/fsremote/fsremote.go b/lib/backup/fsremote/fsremote.go index d2a7ce851..116d8d999 100644 --- a/lib/backup/fsremote/fsremote.go +++ b/lib/backup/fsremote/fsremote.go @@ -9,6 +9,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon" + libfs "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -48,7 +49,7 @@ func (fs *FS) ListParts() ([]common.Part, error) { } var parts []common.Part - dir += "/" + dir += string(filepath.Separator) for _, file := range files { if !strings.HasPrefix(file, dir) { logger.Panicf("BUG: unexpected prefix for file %q; want %q", file, dir) @@ -101,7 +102,8 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error { } // Attempt to create hardlink from srcPath to dstPath. if err := os.Link(srcPath, dstPath); err == nil { - return fscommon.FsyncFile(dstPath) + libfs.MustSyncPath(dstPath) + return nil } // Cannot create hardlink. Just copy file contents @@ -115,6 +117,9 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error { return fmt.Errorf("cannot create destination file: %w", err) } n, err := io.Copy(dstFile, srcFile) + if err := dstFile.Sync(); err != nil { + return fmt.Errorf("cannot fsync dstFile: %q: %w", dstFile.Name(), err) + } if err1 := dstFile.Close(); err1 != nil { err = err1 } @@ -129,10 +134,6 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error { _ = os.RemoveAll(dstPath) return fmt.Errorf("unexpected number of bytes copied from %q to %q; got %d bytes; want %d bytes", srcPath, dstPath, n, p.Size) } - if err := fscommon.FsyncFile(dstPath); err != nil { - _ = os.RemoveAll(dstPath) - return err - } return nil } @@ -167,6 +168,9 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error { return fmt.Errorf("cannot create file %q: %w", path, err) } n, err := io.Copy(w, r) + if err := w.Sync(); err != nil { + return fmt.Errorf("cannot fsync file: %q: %w", w.Name(), err) + } if err1 := w.Close(); err1 != nil && err == nil { err = err1 } @@ -178,10 +182,6 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error { _ = os.RemoveAll(path) return fmt.Errorf("wrong data size uploaded to %q; got %d bytes; want %d bytes", path, n, p.Size) } - if err := fscommon.FsyncFile(path); err != nil { - _ = os.RemoveAll(path) - return err - } return nil } @@ -194,7 +194,7 @@ func (fs *FS) mkdirAll(filePath string) error { } func (fs *FS) path(p common.Part) string { - return p.RemotePath(fs.Dir) + return filepath.Join(fs.Dir, p.Path, fmt.Sprintf("%016X_%016X_%016X", p.FileSize, p.Offset, p.Size)) } // DeleteFile deletes filePath at fs.