mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/fs: add canOverwrite arg to WriteFileAtomically when it is allowed to overwrite the file atomically if it already exists
This commit is contained in:
parent
db8abd000e
commit
c4265322f4
8 changed files with 13 additions and 14 deletions
|
@ -433,8 +433,7 @@ func mustLoadRollupResultCacheKeyPrefix(path string) {
|
|||
func mustSaveRollupResultCacheKeyPrefix(path string) {
|
||||
path = path + ".key.prefix"
|
||||
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix)
|
||||
fs.MustRemoveAll(path)
|
||||
if err := fs.WriteFileAtomically(path, data); err != nil {
|
||||
if err := fs.WriteFileAtomically(path, data, true); err != nil {
|
||||
logger.Fatalf("cannot store rollupResult cache key prefix to %q: %s", path, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,11 +29,14 @@ func MustSyncPath(path string) {
|
|||
//
|
||||
// WriteFileAtomically returns only after the file is fully written and synced
|
||||
// to the underlying storage.
|
||||
func WriteFileAtomically(path string, data []byte) error {
|
||||
//
|
||||
// If the file at path already exists, then the file is overwritten atomically if canOverwrite is true.
|
||||
// Otherwise error is returned.
|
||||
func WriteFileAtomically(path string, data []byte, canOverwrite bool) error {
|
||||
// Check for the existing file. It is expected that
|
||||
// the WriteFileAtomically function cannot be called concurrently
|
||||
// with the same `path`.
|
||||
if IsPathExist(path) {
|
||||
if IsPathExist(path) && !canOverwrite {
|
||||
return fmt.Errorf("cannot create file %q, since it already exists", path)
|
||||
}
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
|
|||
return fmt.Errorf("cannot marshal metadata: %w", err)
|
||||
}
|
||||
metadataPath := partPath + "/metadata.json"
|
||||
if err := fs.WriteFileAtomically(metadataPath, metadata); err != nil {
|
||||
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
|
||||
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -580,7 +580,7 @@ func (tb *Table) convertToV1280() {
|
|||
logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds())
|
||||
}
|
||||
|
||||
if err := fs.WriteFileAtomically(flagFilePath, []byte("ok")); err != nil {
|
||||
if err := fs.WriteFileAtomically(flagFilePath, []byte("ok"), false); err != nil {
|
||||
logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err)
|
||||
}
|
||||
}
|
||||
|
@ -975,7 +975,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||
dstPartPath := ph.Path(tb.path, mergeIdx)
|
||||
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
||||
txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx)
|
||||
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil {
|
||||
if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
|
||||
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -211,7 +211,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
|||
|
||||
// Create initial chunk file.
|
||||
filepath := q.chunkFilePath(0)
|
||||
if err := fs.WriteFileAtomically(filepath, nil); err != nil {
|
||||
if err := fs.WriteFileAtomically(filepath, nil, false); err != nil {
|
||||
return nil, fmt.Errorf("cannot create %q: %w", filepath, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ func (ph *partHeader) writeMinDedupInterval(partPath string) error {
|
|||
filePath := partPath + "/min_dedup_interval"
|
||||
dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond
|
||||
data := dedupInterval.String()
|
||||
if err := fs.WriteFileAtomically(filePath, []byte(data)); err != nil {
|
||||
if err := fs.WriteFileAtomically(filePath, []byte(data), false); err != nil {
|
||||
return fmt.Errorf("cannot create %q: %w", filePath, err)
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -1246,7 +1246,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
}
|
||||
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
||||
txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx)
|
||||
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil {
|
||||
if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
|
||||
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1013,10 +1013,7 @@ func mustGetMinTimestampForCompositeIndex(metadataDir string, isEmptyDB bool) in
|
|||
}
|
||||
minTimestamp = date * msecPerDay
|
||||
dateBuf := encoding.MarshalInt64(nil, minTimestamp)
|
||||
if err := os.RemoveAll(path); err != nil {
|
||||
logger.Fatalf("cannot remove a file with minTimestampForCompositeIndex: %s", err)
|
||||
}
|
||||
if err := fs.WriteFileAtomically(path, dateBuf); err != nil {
|
||||
if err := fs.WriteFileAtomically(path, dateBuf, true); err != nil {
|
||||
logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err)
|
||||
}
|
||||
return minTimestamp
|
||||
|
|
Loading…
Reference in a new issue