Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-10-26 01:11:17 +03:00
commit 0074539441
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 13 additions and 14 deletions

View file

@ -433,8 +433,7 @@ func mustLoadRollupResultCacheKeyPrefix(path string) {
func mustSaveRollupResultCacheKeyPrefix(path string) { func mustSaveRollupResultCacheKeyPrefix(path string) {
path = path + ".key.prefix" path = path + ".key.prefix"
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix) data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix)
fs.MustRemoveAll(path) if err := fs.WriteFileAtomically(path, data, true); err != nil {
if err := fs.WriteFileAtomically(path, data); err != nil {
logger.Fatalf("cannot store rollupResult cache key prefix to %q: %s", path, err) logger.Fatalf("cannot store rollupResult cache key prefix to %q: %s", path, err)
} }
} }

View file

@ -29,11 +29,14 @@ func MustSyncPath(path string) {
// //
// WriteFileAtomically returns only after the file is fully written and synced // WriteFileAtomically returns only after the file is fully written and synced
// to the underlying storage. // to the underlying storage.
func WriteFileAtomically(path string, data []byte) error { //
// If the file at path already exists, then the file is overwritten atomically if canOverwrite is true.
// Otherwise error is returned.
func WriteFileAtomically(path string, data []byte, canOverwrite bool) error {
// Check for the existing file. It is expected that // Check for the existing file. It is expected that
// the WriteFileAtomically function cannot be called concurrently // the WriteFileAtomically function cannot be called concurrently
// with the same `path`. // with the same `path`.
if IsPathExist(path) { if IsPathExist(path) && !canOverwrite {
return fmt.Errorf("cannot create file %q, since it already exists", path) return fmt.Errorf("cannot create file %q, since it already exists", path)
} }

View file

@ -163,7 +163,7 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
return fmt.Errorf("cannot marshal metadata: %w", err) return fmt.Errorf("cannot marshal metadata: %w", err)
} }
metadataPath := partPath + "/metadata.json" metadataPath := partPath + "/metadata.json"
if err := fs.WriteFileAtomically(metadataPath, metadata); err != nil { if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
return fmt.Errorf("cannot create %q: %w", metadataPath, err) return fmt.Errorf("cannot create %q: %w", metadataPath, err)
} }
return nil return nil

View file

@ -580,7 +580,7 @@ func (tb *Table) convertToV1280() {
logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds()) logger.Infof("finished round 2 of background conversion of %q to v1.28.0 format in %.3f seconds", tb.path, time.Since(startTime).Seconds())
} }
if err := fs.WriteFileAtomically(flagFilePath, []byte("ok")); err != nil { if err := fs.WriteFileAtomically(flagFilePath, []byte("ok"), false); err != nil {
logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err) logger.Panicf("FATAL: cannot create %q: %s", flagFilePath, err)
} }
} }
@ -975,7 +975,7 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
dstPartPath := ph.Path(tb.path, mergeIdx) dstPartPath := ph.Path(tb.path, mergeIdx)
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath) fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx) txnPath := fmt.Sprintf("%s/txn/%016X", tb.path, mergeIdx)
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil { if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err) return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
} }

View file

@ -211,7 +211,7 @@ func tryOpeningQueue(path, name string, chunkFileSize, maxBlockSize, maxPendingB
// Create initial chunk file. // Create initial chunk file.
filepath := q.chunkFilePath(0) filepath := q.chunkFilePath(0)
if err := fs.WriteFileAtomically(filepath, nil); err != nil { if err := fs.WriteFileAtomically(filepath, nil, false); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", filepath, err) return nil, fmt.Errorf("cannot create %q: %w", filepath, err)
} }
} }

View file

@ -151,7 +151,7 @@ func (ph *partHeader) writeMinDedupInterval(partPath string) error {
filePath := partPath + "/min_dedup_interval" filePath := partPath + "/min_dedup_interval"
dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond dedupInterval := time.Duration(ph.MinDedupInterval) * time.Millisecond
data := dedupInterval.String() data := dedupInterval.String()
if err := fs.WriteFileAtomically(filePath, []byte(data)); err != nil { if err := fs.WriteFileAtomically(filePath, []byte(data), false); err != nil {
return fmt.Errorf("cannot create %q: %w", filePath, err) return fmt.Errorf("cannot create %q: %w", filePath, err)
} }
return nil return nil

View file

@ -1246,7 +1246,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
} }
fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath) fmt.Fprintf(&bb, "%s -> %s\n", tmpPartPath, dstPartPath)
txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx) txnPath := fmt.Sprintf("%s/txn/%016X", ptPath, mergeIdx)
if err := fs.WriteFileAtomically(txnPath, bb.B); err != nil { if err := fs.WriteFileAtomically(txnPath, bb.B, false); err != nil {
return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err) return fmt.Errorf("cannot create transaction file %q: %w", txnPath, err)
} }

View file

@ -1013,10 +1013,7 @@ func mustGetMinTimestampForCompositeIndex(metadataDir string, isEmptyDB bool) in
} }
minTimestamp = date * msecPerDay minTimestamp = date * msecPerDay
dateBuf := encoding.MarshalInt64(nil, minTimestamp) dateBuf := encoding.MarshalInt64(nil, minTimestamp)
if err := os.RemoveAll(path); err != nil { if err := fs.WriteFileAtomically(path, dateBuf, true); err != nil {
logger.Fatalf("cannot remove a file with minTimestampForCompositeIndex: %s", err)
}
if err := fs.WriteFileAtomically(path, dateBuf); err != nil {
logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err) logger.Fatalf("cannot store minTimestampForCompositeIndex: %s", err)
} }
return minTimestamp return minTimestamp