mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/fs: add canOverwrite arg to WriteFileAtomically when it is allowed to overwrite the file atomically if it already exists
This commit is contained in:
parent
9d0652fc6e
commit
ecb71a7221
8 changed files with 13 additions and 14 deletions
|
@ -387,8 +387,7 @@ func mustLoadRollupResultCacheKeyPrefix(path string) {
|
||||||
func mustSaveRollupResultCacheKeyPrefix(path string) {
|
func mustSaveRollupResultCacheKeyPrefix(path string) {
|
||||||
path = path + ".key.prefix"
|
path = path + ".key.prefix"
|
||||||
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix)
|
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix)
|
||||||
fs.MustRemoveAll(path)
|
if err := fs.WriteFileAtomically(path, data, true); err != nil {
|
||||||
if err := fs.WriteFileAtomically(path, data); err != nil {
|
|
||||||
logger.Fatalf("cannot store rollupResult cache key prefix to %q: %s", path, err)
|
logger.Fatalf("cannot store rollupResult cache key prefix to %q: %s", path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,11 +29,14 @@ func MustSyncPath(path string) {
|
||||||
//
|
//
|
||||||
// WriteFileAtomically returns only after the file is fully written and synced
|
// WriteFileAtomically returns only after the file is fully written and synced
|
||||||
// to the underlying storage.
|
// to the underlying storage.
|
||||||
func WriteFileAtomically(path string, data []byte) error {
|
//
|
||||||
|
// If the file at path already exists, then the file is overwritten atomically if canOverwrite is true.
|
||||||
|
// Otherwise error is returned.
|
||||||
|
func WriteFileAtomically(path string, data []byte, canOverwrite bool) error {
|
||||||
// Check for the existing file. It is expected that
|
// Check for the existing file. It is expected that
|
||||||
// the WriteFileAtomically function cannot be called concurrently
|
// the WriteFileAtomically function cannot be called concurrently
|
||||||
// with the same `path`.
|
// with the same `path`.
|
||||||
if IsPathExist(path) {
|
if IsPathExist(path) && !canOverwrite {
|
||||||
return fmt.Errorf("cannot create file %q, since it already exists", path)
|
return fmt.Errorf("cannot create file %q, since it already exists", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -163,7 +163,7 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
|
||||||
return fmt.Errorf("cannot marshal metadata: %w", err)
|
return fmt.Errorf("cannot marshal metadata: %w", err)
|
||||||
}
|
}
|
||||||
metadataPath := partPath + "/metadata.json"
|
metadataPath := partPath + "/metadata.json"
|
||||||
if err := fs.WriteFileAtomically(metadataPath, metadata); err != nil {
|
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
|
||||||
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
|
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -580,7 +580,7 @@ func (tb *Table) convertToV1280() {
|
||||||
logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds())
|
logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.WriteFileAtomically(flagFilePath, []byte("ok")); err != nil {
|
if err := fs.WriteFileAtomically(flagFilePath, []byte("ok"), false); err != nil {
|
||||||
logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err)
|
logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -975,7 +975,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
||||||
dstPartPath := ph.Path(tb.path, mergeIdx)
|
dstPartPath := ph.Path(tb.path, mergeIdx)
|
||||||
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
||||||
txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx)
|
txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx)
|
||||||
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil {
|
if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
|
||||||
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
|
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -211,7 +211,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
|
||||||
|
|
||||||
// Create initial chunk file.
|
// Create initial chunk file.
|
||||||
filepath := q.chunkFilePath(0)
|
filepath := q.chunkFilePath(0)
|
||||||
if err := fs.WriteFileAtomically(filepath, nil); err != nil {
|
if err := fs.WriteFileAtomically(filepath, nil, false); err != nil {
|
||||||
return nil, fmt.Errorf("cannot create %q: %w", filepath, err)
|
return nil, fmt.Errorf("cannot create %q: %w", filepath, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -151,7 +151,7 @@ func (ph *partHeader) writeMinDedupInterval(partPath string) error {
|
||||||
filePath := partPath + "/min_dedup_interval"
|
filePath := partPath + "/min_dedup_interval"
|
||||||
dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond
|
dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond
|
||||||
data := dedupInterval.String()
|
data := dedupInterval.String()
|
||||||
if err := fs.WriteFileAtomically(filePath, []byte(data)); err != nil {
|
if err := fs.WriteFileAtomically(filePath, []byte(data), false); err != nil {
|
||||||
return fmt.Errorf("cannot create %q: %w", filePath, err)
|
return fmt.Errorf("cannot create %q: %w", filePath, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1246,7 +1246,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
||||||
}
|
}
|
||||||
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
|
||||||
txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx)
|
txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx)
|
||||||
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil {
|
if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
|
||||||
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
|
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1076,10 +1076,7 @@ func mustGetMinTimestampForCompositeIndex(metadataDir string, isEmptyDB bool) in
|
||||||
}
|
}
|
||||||
minTimestamp = date * msecPerDay
|
minTimestamp = date * msecPerDay
|
||||||
dateBuf := encoding.MarshalInt64(nil, minTimestamp)
|
dateBuf := encoding.MarshalInt64(nil, minTimestamp)
|
||||||
if err := os.RemoveAll(path); err != nil {
|
if err := fs.WriteFileAtomically(path, dateBuf, true); err != nil {
|
||||||
logger.Fatalf("cannot remove a file with minTimestampForCompositeIndex: %s", err)
|
|
||||||
}
|
|
||||||
if err := fs.WriteFileAtomically(path, dateBuf); err != nil {
|
|
||||||
logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err)
|
logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err)
|
||||||
}
|
}
|
||||||
return minTimestamp
|
return minTimestamp
|
||||||
|
|
Loading…
Reference in a new issue