VictoriaMetrics/lib/fs/fs.go
rtm0 602bedf362
testing: allow disabling fsync to make tests run faster (#6871)
### Describe Your Changes

fsync() ensures that the data is written to disk. In production this is
needed for data durability. However, during the development, when the
unit tests are run, this level of durability is not needed. Therefore
fsync() can be disabled which will makes test runs two times faster.

The disabling is done by setting the `DISABLE_FSYNC_FOR_TESTING`
environment variable. The valid values for this variable are the same as
the values of the arg of `go doc strconv.ParseBool`:

```
1, t, T, TRUE, true, True, 0, f, F, FALSE, false, False.
```

Any other value means `false`.

The variable is set for all test build targets. Compare running times:

Build Target | DISABLE_FSYNC_FOR_TESTING=0 | DISABLE_FSYNC_FOR_TESTING=1
----------------- | ------------------------------------------------ |
-------------------------------------------------
make test | 1m5s  | 0m22s
make test-race | 3m1s | 1m42s
make test-pure | 1m7s | 0m20s
make test-full | 1m21s | 0m32s
make test-full-386 | 1m42s | 0m36s

When running tests for a given package, fsync can be disabled as
follows:

```shell
DISABLE_FSYNC_FOR_TESTING=1 go test ./lib/storage
```

Disabling fsync() is intended for testing purposes only and the name of
the variables reflects that.

What could also have been done but haven't:

- lib/filestream/filestream.go: `Writer.MustFlush()` also uses f.Sync()
but nothing has been done to it, because the Writer.MustFlush() is not
used anywhere in the VM codebase. A side question: what is the general
policy for the unused code?
- lib/filestream/filestream.go: Writer.Write() calls `adviceDontNeed()`
which calls unix.Fdatasync(). Disabling it could potentially improve
running time, but running tests with this code disabled has shown
otherwise.

### Checklist

The following checks are **mandatory**:

- [ x] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: Artem Fetishev <wwctrsrx@gmail.com>

(cherry picked from commit 334cd92a6c)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
2024-08-30 11:18:21 +02:00

394 lines
11 KiB
Go

package fs
import (
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
var disableFSyncForTesting = envutil.GetenvBool("DISABLE_FSYNC_FOR_TESTING")
var tmpFileNum atomic.Uint64
// MustSyncPath syncs contents of the given path.
func MustSyncPath(path string) {
mustSyncPath(path)
}
// MustWriteSync writes data to the file at path and then calls fsync on the created file.
//
// The fsync guarantees that the written data survives hardware reset after successful call.
//
// This function may leave the file at the path in inconsistent state on app crash
// in the middle of the write.
// Use MustWriteAtomic if the file at the path must be either written in full
// or not written at all on app crash in the middle of the write.
func MustWriteSync(path string, data []byte) {
f := filestream.MustCreate(path, false)
if _, err := f.Write(data); err != nil {
f.MustClose()
// Do not call MustRemoveAll(path), so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(data), path, err)
}
// Sync and close the file.
f.MustClose()
}
// MustWriteAtomic atomically writes data to the given file path.
//
// This function returns only after the file is fully written and synced
// to the underlying storage.
//
// This function guarantees that the file at path either fully written or not written at all on app crash
// in the middle of the write.
//
// If the file at path already exists, then the file is overwritten atomically if canOverwrite is true.
// Otherwise, error is returned.
func MustWriteAtomic(path string, data []byte, canOverwrite bool) {
// Check for the existing file. It is expected that
// the MustWriteAtomic function cannot be called concurrently
// with the same `path`.
if IsPathExist(path) && !canOverwrite {
logger.Panicf("FATAL: cannot create file %q, since it already exists", path)
}
// Write data to a temporary file.
n := tmpFileNum.Add(1)
tmpPath := fmt.Sprintf("%s.tmp.%d", path, n)
MustWriteSync(tmpPath, data)
// Atomically move the temporary file from tmpPath to path.
if err := os.Rename(tmpPath, path); err != nil {
// do not call MustRemoveAll(tmpPath) here, so the user could inspect
// the file contents during investigation of the issue.
logger.Panicf("FATAL: cannot move temporary file %q to %q: %s", tmpPath, path, err)
}
// Sync the containing directory, so the file is guaranteed to appear in the directory.
// See https://www.quora.com/When-should-you-fsync-the-containing-directory-in-addition-to-the-file-itself
absPath, err := filepath.Abs(path)
if err != nil {
logger.Panicf("FATAL: cannot obtain absolute path to %q: %s", path, err)
}
parentDirPath := filepath.Dir(absPath)
MustSyncPath(parentDirPath)
}
// IsTemporaryFileName returns true if fn matches temporary file name pattern
// from MustWriteAtomic.
func IsTemporaryFileName(fn string) bool {
return tmpFileNameRe.MatchString(fn)
}
// tmpFileNameRe is regexp for temporary file name - see MustWriteAtomic for details.
var tmpFileNameRe = regexp.MustCompile(`\.tmp\.\d+$`)
// MustMkdirIfNotExist creates the given path dir if it isn't exist.
func MustMkdirIfNotExist(path string) {
if IsPathExist(path) {
return
}
mustMkdirSync(path)
}
// MustMkdirFailIfExist creates the given path dir if it isn't exist.
//
// If the directory at the given path already exists, then the function logs the error and exits.
func MustMkdirFailIfExist(path string) {
if IsPathExist(path) {
logger.Panicf("FATAL: the %q already exists", path)
}
mustMkdirSync(path)
}
func mustMkdirSync(path string) {
if err := os.MkdirAll(path, 0755); err != nil {
logger.Panicf("FATAL: cannot create directory: %s", err)
}
// Sync the parent directory, so the created directory becomes visible
// in the fs after power loss.
parentDirPath := filepath.Dir(path)
MustSyncPath(parentDirPath)
}
// RemoveDirContents removes all the contents of the given dir if it exists.
//
// It doesn't remove the dir itself, so the dir may be mounted
// to a separate partition.
func RemoveDirContents(dir string) {
if !IsPathExist(dir) {
// The path doesn't exist, so nothing to remove.
return
}
d, err := os.Open(dir)
if err != nil {
logger.Panicf("FATAL: cannot open dir: %s", err)
}
defer MustClose(d)
names, err := d.Readdirnames(-1)
if err != nil {
logger.Panicf("FATAL: cannot read contents of the dir %q: %s", dir, err)
}
for _, name := range names {
if name == "." || name == ".." || name == "lost+found" {
// Skip special dirs.
continue
}
fullPath := filepath.Join(dir, name)
MustRemoveAll(fullPath)
}
MustSyncPath(dir)
}
// MustClose must close the given file f.
func MustClose(f *os.File) {
fname := f.Name()
if err := f.Close(); err != nil {
logger.Panicf("FATAL: cannot close %q: %s", fname, err)
}
}
// MustFileSize returns file size for the given path.
func MustFileSize(path string) uint64 {
fi, err := os.Stat(path)
if err != nil {
logger.Panicf("FATAL: cannot stat %q: %s", path, err)
}
if fi.IsDir() {
logger.Panicf("FATAL: %q must be a file, not a directory", path)
}
return uint64(fi.Size())
}
// IsPathExist returns whether the given path exists.
func IsPathExist(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
logger.Panicf("FATAL: cannot stat %q: %s", path, err)
}
return true
}
func mustSyncParentDirIfExists(path string) {
parentDirPath := filepath.Dir(path)
if !IsPathExist(parentDirPath) {
return
}
MustSyncPath(parentDirPath)
}
// MustRemoveDirAtomic removes the given dir atomically.
//
// It uses the following algorithm:
//
// 1. Atomically rename the "<dir>" to "<dir>.must-remove.<XYZ>",
// where <XYZ> is an unique number.
// 2. Remove the "<dir>.must-remove.XYZ" in background.
//
// If the process crashes after the step 1, then the directory must be removed
// on the next process start by calling MustRemoveTemporaryDirs on the parent directory.
func MustRemoveDirAtomic(dir string) {
if !IsPathExist(dir) {
return
}
mustRemoveDirAtomic(dir)
parentDir := filepath.Dir(dir)
MustSyncPath(parentDir)
}
var atomicDirRemoveCounter = func() *atomic.Uint64 {
var x atomic.Uint64
x.Store(uint64(time.Now().UnixNano()))
return &x
}()
// MustReadDir reads directory entries at the given dir.
func MustReadDir(dir string) []os.DirEntry {
des, err := os.ReadDir(dir)
if err != nil {
logger.Panicf("FATAL: cannot read directory contents: %s", err)
}
return des
}
// MustRemoveTemporaryDirs removes all the subdirectories with ".must-remove.<XYZ>" suffix.
//
// Such directories may be left on unclean shutdown during MustRemoveDirAtomic call.
func MustRemoveTemporaryDirs(dir string) {
des := MustReadDir(dir)
for _, de := range des {
if !IsDirOrSymlink(de) {
// Skip non-directories
continue
}
dirName := de.Name()
if IsScheduledForRemoval(dirName) {
fullPath := filepath.Join(dir, dirName)
MustRemoveAll(fullPath)
}
}
MustSyncPath(dir)
}
// MustHardLinkFiles makes hard links for all the files from srcDir in dstDir.
func MustHardLinkFiles(srcDir, dstDir string) {
mustMkdirSync(dstDir)
des := MustReadDir(srcDir)
for _, de := range des {
if IsDirOrSymlink(de) {
// Skip directories.
continue
}
fn := de.Name()
srcPath := filepath.Join(srcDir, fn)
dstPath := filepath.Join(dstDir, fn)
if err := os.Link(srcPath, dstPath); err != nil {
logger.Panicf("FATAL: cannot link files: %s", err)
}
}
MustSyncPath(dstDir)
}
// MustSymlinkRelative creates relative symlink for srcPath in dstPath.
func MustSymlinkRelative(srcPath, dstPath string) {
baseDir := filepath.Dir(dstPath)
srcPathRel, err := filepath.Rel(baseDir, srcPath)
if err != nil {
logger.Panicf("FATAL: cannot make relative path for srcPath=%q: %s", srcPath, err)
}
if err := os.Symlink(srcPathRel, dstPath); err != nil {
logger.Panicf("FATAL: cannot make a symlink: %s", err)
}
}
// MustCopyDirectory copies all the files in srcPath to dstPath.
func MustCopyDirectory(srcPath, dstPath string) {
des := MustReadDir(srcPath)
MustMkdirIfNotExist(dstPath)
for _, de := range des {
if !de.Type().IsRegular() {
// Skip non-files
continue
}
src := filepath.Join(srcPath, de.Name())
dst := filepath.Join(dstPath, de.Name())
MustCopyFile(src, dst)
}
MustSyncPath(dstPath)
}
// MustCopyFile copies the file from srcPath to dstPath.
func MustCopyFile(srcPath, dstPath string) {
src, err := os.Open(srcPath)
if err != nil {
logger.Panicf("FATAL: cannot open srcPath: %s", err)
}
defer MustClose(src)
dst, err := os.Create(dstPath)
if err != nil {
logger.Panicf("FATAL: cannot create dstPath: %s", err)
}
defer MustClose(dst)
if _, err := io.Copy(dst, src); err != nil {
logger.Panicf("FATAL: cannot copy %q to %q: %s", srcPath, dstPath, err)
}
MustSyncPath(dstPath)
}
// MustReadData reads len(data) bytes from r.
func MustReadData(r filestream.ReadCloser, data []byte) {
n, err := io.ReadFull(r, data)
if err != nil {
if err == io.EOF {
return
}
logger.Panicf("FATAL: cannot read %d bytes from %s; read only %d bytes; error: %s", len(data), r.Path(), n, err)
}
if n != len(data) {
logger.Panicf("BUG: io.ReadFull read only %d bytes from %s; must read %d bytes", n, r.Path(), len(data))
}
}
// MustWriteData writes data to w.
func MustWriteData(w filestream.WriteCloser, data []byte) {
if len(data) == 0 {
return
}
n, err := w.Write(data)
if err != nil {
logger.Panicf("FATAL: cannot write %d bytes to %s: %s", len(data), w.Path(), err)
}
if n != len(data) {
logger.Panicf("BUG: writer wrote %d bytes instead of %d bytes to %s", n, len(data), w.Path())
}
}
// MustCreateFlockFile creates FlockFilename file in the directory dir
// and returns the handler to the file.
func MustCreateFlockFile(dir string) *os.File {
flockFilepath := filepath.Join(dir, FlockFilename)
f, err := createFlockFile(flockFilepath)
if err != nil {
logger.Panicf("FATAL: cannot create lock file: %s; make sure a single process has exclusive access to %q", err, dir)
}
return f
}
// FlockFilename is the filename for the file created by MustCreateFlockFile().
const FlockFilename = "flock.lock"
// MustGetFreeSpace returns free space for the given directory path.
func MustGetFreeSpace(path string) uint64 {
// Try obtaining cached value at first.
freeSpaceMapLock.Lock()
defer freeSpaceMapLock.Unlock()
e, ok := freeSpaceMap[path]
if ok && fasttime.UnixTimestamp()-e.updateTime < 2 {
// Fast path - the entry is fresh.
return e.freeSpace
}
// Slow path.
// Determine the amount of free space at path.
e.freeSpace = mustGetFreeSpace(path)
e.updateTime = fasttime.UnixTimestamp()
freeSpaceMap[path] = e
return e.freeSpace
}
var (
freeSpaceMap = make(map[string]freeSpaceEntry)
freeSpaceMapLock sync.Mutex
)
type freeSpaceEntry struct {
updateTime uint64
freeSpace uint64
}
// IsScheduledForRemoval returns true if the filename contains .must-remove. substring
func IsScheduledForRemoval(filename string) bool {
return strings.Contains(filename, ".must-remove.")
}
// IsDirOrSymlink returns true if de is directory or symlink.
func IsDirOrSymlink(de os.DirEntry) bool {
return de.IsDir() || (de.Type()&os.ModeSymlink == os.ModeSymlink)
}