From 13d2350e6ae1bf1b402d0805cee731df4ba34c3a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Apr 2023 21:03:06 -0700 Subject: [PATCH] lib/{mergeset,storage}: explicitly fsync the created part directory listing Previously the created part directory listing was fsynced implicitly when storing metadata.json file in it. Also remove superflouous fsync for part directory listing, which was called at blockStreamWriter.MustClose(). After that the metadata.json file is created, so an additional fsync for the directory contents is needed. --- lib/mergeset/block_stream_writer.go | 8 -------- lib/mergeset/part_header.go | 5 ++++- lib/mergeset/table.go | 3 +++ lib/storage/block_stream_writer.go | 8 -------- lib/storage/part_header.go | 5 ++++- lib/storage/partition.go | 3 +++ 6 files changed, 14 insertions(+), 18 deletions(-) diff --git a/lib/mergeset/block_stream_writer.go b/lib/mergeset/block_stream_writer.go index f778b4a9e..d760e61fe 100644 --- a/lib/mergeset/block_stream_writer.go +++ b/lib/mergeset/block_stream_writer.go @@ -39,7 +39,6 @@ type blockStreamWriter struct { func (bsw *blockStreamWriter) reset() { bsw.compressLevel = 0 - bsw.path = "" bsw.metaindexWriter = nil bsw.indexWriter = nil @@ -124,7 +123,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.reset() bsw.compressLevel = compressLevel - bsw.path = path bsw.metaindexWriter = metaindexFile bsw.indexWriter = indexFile @@ -151,12 +149,6 @@ func (bsw *blockStreamWriter) MustClose() { bsw.itemsWriter.MustClose() bsw.lensWriter.MustClose() - // Sync bsw.path contents to make sure it doesn't disappear - // after system crash or power loss. - if bsw.path != "" { - fs.MustSyncPath(bsw.path) - } - bsw.reset() } diff --git a/lib/mergeset/part_header.go b/lib/mergeset/part_header.go index 288bd05cf..1f46907ec 100644 --- a/lib/mergeset/part_header.go +++ b/lib/mergeset/part_header.go @@ -125,7 +125,10 @@ func (ph *partHeader) WriteMetadata(partPath string) error { logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err) } metadataPath := filepath.Join(partPath, metadataFilename) - if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil { + // There is no need in calling fs.WriteFileAtomically() here, + // since the file is created only once during part creatinng + // and the part directory is synced aftewards. + if err := fs.WriteFileAndSync(metadataPath, metadata); err != nil { return fmt.Errorf("cannot create %q: %w", metadataPath, err) } return nil diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 6fb107835..fdbe09849 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1143,6 +1143,9 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal if mpNew != nil { // Update partHeader for destination inmemory part after the merge. mpNew.ph = *ph + } else { + // Make sure the created part directory listing is synced. + fs.MustSyncPath(dstPartPath) } // Atomically swap the source parts with the newly created part. diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index 1c5745c76..e55811948 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -56,7 +56,6 @@ func (bsw *blockStreamWriter) assertWriteClosers() { // Init initializes bsw with the given writers. func (bsw *blockStreamWriter) reset() { bsw.compressLevel = 0 - bsw.path = "" bsw.timestampsWriter = nil bsw.valuesWriter = nil @@ -142,7 +141,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre bsw.reset() bsw.compressLevel = compressLevel - bsw.path = path bsw.timestampsWriter = timestampsFile bsw.valuesWriter = valuesFile @@ -171,12 +169,6 @@ func (bsw *blockStreamWriter) MustClose() { bsw.indexWriter.MustClose() bsw.metaindexWriter.MustClose() - // Sync bsw.path contents to make sure it doesn't disappear - // after system crash or power loss. - if bsw.path != "" { - fs.MustSyncPath(bsw.path) - } - bsw.reset() } diff --git a/lib/storage/part_header.go b/lib/storage/part_header.go index df1962ff7..40b17237a 100644 --- a/lib/storage/part_header.go +++ b/lib/storage/part_header.go @@ -171,7 +171,10 @@ func (ph *partHeader) WriteMetadata(partPath string) error { logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err) } metadataPath := filepath.Join(partPath, metadataFilename) - if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil { + // There is no need in calling fs.WriteFileAtomically() here, + // since the file is created only once during part creatinng + // and the part directory is synced aftewards. + if err := fs.WriteFileAndSync(metadataPath, metadata); err != nil { return fmt.Errorf("cannot create %q: %w", metadataPath, err) } return nil diff --git a/lib/storage/partition.go b/lib/storage/partition.go index af61a5956..1f99c901f 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1324,6 +1324,9 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi if mpNew != nil { // Update partHeader for destination inmemory part after the merge. mpNew.ph = *ph + } else { + // Make sure the created part directory listing is synced. + fs.MustSyncPath(dstPartPath) } // Atomically swap the source parts with the newly created part.