mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
adds restore.lock (#1988)
* adds restore.lock it must prevent from running storage after incomplete restore process https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958 * return back flock file deletion * Apply suggestions from code review * wip * docs/CHANGELOG.md: document https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958 Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
f40b1e7e9f
commit
8ff7da7202
4 changed files with 41 additions and 16 deletions
|
@ -7,6 +7,7 @@ sort: 15
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): log error message when remote storage returns 400 or 409 http errors. This should simplify detection and debugging of this case. See [this issue](vmagent_remotewrite_packets_dropped_total).
|
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): log error message when remote storage returns 400 or 409 http errors. This should simplify detection and debugging of this case. See [this issue](vmagent_remotewrite_packets_dropped_total).
|
||||||
|
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): store `restore-in-progress` file in `-dst` directory while `vmrestore` is running. This file is automatically deleted when `vmrestore` is successfully finished. This helps detecting incompletely restored data on VictoriaMetrics start. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1958).
|
||||||
|
|
||||||
|
|
||||||
## [v1.71.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.71.0)
|
## [v1.71.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.71.0)
|
||||||
|
|
|
@ -3,6 +3,8 @@ package actions
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -51,6 +53,9 @@ func (r *Restore) Run() error {
|
||||||
}
|
}
|
||||||
defer fs.MustClose(flockF)
|
defer fs.MustClose(flockF)
|
||||||
|
|
||||||
|
if err := createRestoreLock(r.Dst.Dir); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
concurrency := r.Concurrency
|
concurrency := r.Concurrency
|
||||||
src := r.Src
|
src := r.Src
|
||||||
dst := r.Dst
|
dst := r.Dst
|
||||||
|
@ -189,7 +194,7 @@ func (r *Restore) Run() error {
|
||||||
logger.Infof("restored %d bytes from backup in %.3f seconds; deleted %d bytes; downloaded %d bytes",
|
logger.Infof("restored %d bytes from backup in %.3f seconds; deleted %d bytes; downloaded %d bytes",
|
||||||
backupSize, time.Since(startTime).Seconds(), deleteSize, downloadSize)
|
backupSize, time.Since(startTime).Seconds(), deleteSize, downloadSize)
|
||||||
|
|
||||||
return nil
|
return removeLockFile(r.Dst.Dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
type statWriter struct {
|
type statWriter struct {
|
||||||
|
@ -202,3 +207,20 @@ func (sw *statWriter) Write(p []byte) (int, error) {
|
||||||
atomic.AddUint64(sw.bytesWritten, uint64(n))
|
atomic.AddUint64(sw.bytesWritten, uint64(n))
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createRestoreLock(dstDir string) error {
|
||||||
|
lockF := path.Join(dstDir, "restore-in-progress")
|
||||||
|
f, err := os.Create(lockF)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot create restore lock file %q: %w", lockF, err)
|
||||||
|
}
|
||||||
|
return f.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeLockFile(dstDir string) error {
|
||||||
|
lockF := path.Join(dstDir, "restore-in-progress")
|
||||||
|
if err := os.Remove(lockF); err != nil {
|
||||||
|
return fmt.Errorf("cannote remove restore lock file %q: %w", lockF, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -72,9 +72,8 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) {
|
||||||
if name == "." || name == ".." {
|
if name == "." || name == ".." {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if name == "flock.lock" {
|
if isSpecialFile(name) {
|
||||||
// Do not take into account flock.lock files, since they are used
|
// Do not take into account special files.
|
||||||
// for preventing from concurrent access.
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
path := dir + "/" + name
|
path := dir + "/" + name
|
||||||
|
@ -135,6 +134,10 @@ func appendFilesInternal(dst []string, d *os.File) ([]string, error) {
|
||||||
return dst, nil
|
return dst, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isSpecialFile(name string) bool {
|
||||||
|
return name == "flock.lock" || name == "restore-in-progress"
|
||||||
|
}
|
||||||
|
|
||||||
// RemoveEmptyDirs recursively removes empty directories under the given dir.
|
// RemoveEmptyDirs recursively removes empty directories under the given dir.
|
||||||
func RemoveEmptyDirs(dir string) error {
|
func RemoveEmptyDirs(dir string) error {
|
||||||
_, err := removeEmptyDirs(dir)
|
_, err := removeEmptyDirs(dir)
|
||||||
|
@ -173,7 +176,6 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
|
||||||
return false, fmt.Errorf("cannot read directory contents in %q: %w", dir, err)
|
return false, fmt.Errorf("cannot read directory contents in %q: %w", dir, err)
|
||||||
}
|
}
|
||||||
dirEntries := 0
|
dirEntries := 0
|
||||||
hasFlock := false
|
|
||||||
for _, fi := range fis {
|
for _, fi := range fis {
|
||||||
name := fi.Name()
|
name := fi.Name()
|
||||||
if name == "." || name == ".." {
|
if name == "." || name == ".." {
|
||||||
|
@ -192,11 +194,10 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if fi.Mode()&os.ModeSymlink != os.ModeSymlink {
|
if fi.Mode()&os.ModeSymlink != os.ModeSymlink {
|
||||||
if name == "flock.lock" {
|
if isSpecialFile(name) {
|
||||||
hasFlock = true
|
// Do not take into account special files
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Skip plain files.
|
|
||||||
dirEntries++
|
dirEntries++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -248,14 +249,9 @@ func removeEmptyDirsInternal(d *os.File) (bool, error) {
|
||||||
if dirEntries > 0 {
|
if dirEntries > 0 {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
logger.Infof("removing empty dir %q", dir)
|
// Use os.RemoveAll() instead of os.Remove(), since the dir may contain special files such as flock.lock and restore-in-progress,
|
||||||
if hasFlock {
|
// which must be ingored.
|
||||||
flockFilepath := dir + "/flock.lock"
|
if err := os.RemoveAll(dir); err != nil {
|
||||||
if err := os.Remove(flockFilepath); err != nil {
|
|
||||||
return false, fmt.Errorf("cannot remove %q: %w", flockFilepath, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := os.Remove(dir); err != nil {
|
|
||||||
return false, fmt.Errorf("cannot remove %q: %w", dir, err)
|
return false, fmt.Errorf("cannot remove %q: %w", dir, err)
|
||||||
}
|
}
|
||||||
return true, nil
|
return true, nil
|
||||||
|
|
|
@ -181,6 +181,12 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
|
||||||
}
|
}
|
||||||
s.flockF = flockF
|
s.flockF = flockF
|
||||||
|
|
||||||
|
// Check whether restore process finished successfully
|
||||||
|
restoreLockF := path + "/restore-in-progress"
|
||||||
|
if fs.IsPathExist(restoreLockF) {
|
||||||
|
return nil, fmt.Errorf("restore lock file exists, incomplete vmrestore run. Run vmrestore again or remove lock file %q", restoreLockF)
|
||||||
|
}
|
||||||
|
|
||||||
// Pre-create snapshots directory if it is missing.
|
// Pre-create snapshots directory if it is missing.
|
||||||
snapshotsPath := path + "/snapshots"
|
snapshotsPath := path + "/snapshots"
|
||||||
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
if err := fs.MkdirAllIfNotExist(snapshotsPath); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue