lib/{mergeset,storage}: explicitly fsync the created part directory listing

Previously the created part directory listing was fsynced implicitly
when storing metadata.json file in it.

Also remove superflouous fsync for part directory listing,
which was called at blockStreamWriter.MustClose().
After that the metadata.json file is created, so an additional fsync
for the directory contents is needed.
This commit is contained in:
Aliaksandr Valialkin 2023-04-13 21:03:06 -07:00
parent cf53ce83a0
commit 13d2350e6a
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
6 changed files with 14 additions and 18 deletions

View file

@ -39,7 +39,6 @@ type blockStreamWriter struct {
func (bsw *blockStreamWriter) reset() {
bsw.compressLevel = 0
bsw.path = ""
bsw.metaindexWriter = nil
bsw.indexWriter = nil
@ -124,7 +123,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
bsw.reset()
bsw.compressLevel = compressLevel
bsw.path = path
bsw.metaindexWriter = metaindexFile
bsw.indexWriter = indexFile
@ -151,12 +149,6 @@ func (bsw *blockStreamWriter) MustClose() {
bsw.itemsWriter.MustClose()
bsw.lensWriter.MustClose()
// Sync bsw.path contents to make sure it doesn't disappear
// after system crash or power loss.
if bsw.path != "" {
fs.MustSyncPath(bsw.path)
}
bsw.reset()
}

View file

@ -125,7 +125,10 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err)
}
metadataPath := filepath.Join(partPath, metadataFilename)
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
// There is no need in calling fs.WriteFileAtomically() here,
// since the file is created only once during part creatinng
// and the part directory is synced aftewards.
if err := fs.WriteFileAndSync(metadataPath, metadata); err != nil {
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
}
return nil

View file

@ -1143,6 +1143,9 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
if mpNew != nil {
// Update partHeader for destination inmemory part after the merge.
mpNew.ph = *ph
} else {
// Make sure the created part directory listing is synced.
fs.MustSyncPath(dstPartPath)
}
// Atomically swap the source parts with the newly created part.

View file

@ -56,7 +56,6 @@ func (bsw *blockStreamWriter) assertWriteClosers() {
// Init initializes bsw with the given writers.
func (bsw *blockStreamWriter) reset() {
bsw.compressLevel = 0
bsw.path = ""
bsw.timestampsWriter = nil
bsw.valuesWriter = nil
@ -142,7 +141,6 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
bsw.reset()
bsw.compressLevel = compressLevel
bsw.path = path
bsw.timestampsWriter = timestampsFile
bsw.valuesWriter = valuesFile
@ -171,12 +169,6 @@ func (bsw *blockStreamWriter) MustClose() {
bsw.indexWriter.MustClose()
bsw.metaindexWriter.MustClose()
// Sync bsw.path contents to make sure it doesn't disappear
// after system crash or power loss.
if bsw.path != "" {
fs.MustSyncPath(bsw.path)
}
bsw.reset()
}

View file

@ -171,7 +171,10 @@ func (ph *partHeader) WriteMetadata(partPath string) error {
logger.Panicf("BUG: cannot marshal partHeader metadata: %s", err)
}
metadataPath := filepath.Join(partPath, metadataFilename)
if err := fs.WriteFileAtomically(metadataPath, metadata, false); err != nil {
// There is no need in calling fs.WriteFileAtomically() here,
// since the file is created only once during part creatinng
// and the part directory is synced aftewards.
if err := fs.WriteFileAndSync(metadataPath, metadata); err != nil {
return fmt.Errorf("cannot create %q: %w", metadataPath, err)
}
return nil

View file

@ -1324,6 +1324,9 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
if mpNew != nil {
// Update partHeader for destination inmemory part after the merge.
mpNew.ph = *ph
} else {
// Make sure the created part directory listing is synced.
fs.MustSyncPath(dstPartPath)
}
// Atomically swap the source parts with the newly created part.