lib/storage: remove ForceMergeAllParts internal loop (#4999)

Signed-off-by: faceair <git@faceair.me>
This commit is contained in:
faceair 2023-09-16 01:04:54 +08:00 committed by Aliaksandr Valialkin
parent 77756c7acc
commit 3045ba01f5
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -936,29 +936,25 @@ func (pt *partition) ForceMergeAllParts() error {
// Nothing to merge.
return nil
}
for {
// Check whether there is enough disk space for merging pws.
newPartSize := getPartsSize(pws)
maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath)
if newPartSize > maxOutBytes {
freeSpaceNeededBytes := newPartSize - maxOutBytes
forceMergeLogger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes)
pt.releasePartsToMerge(pws)
return nil
}
// If len(pws) == 1, then the merge must run anyway.
// This allows applying the configured retention, removing the deleted series
// and performing de-duplication if needed.
if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil {
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
}
pws = pt.getAllPartsForMerge()
if len(pws) <= 1 {
pt.releasePartsToMerge(pws)
return nil
}
// Check whether there is enough disk space for merging pws.
newPartSize := getPartsSize(pws)
maxOutBytes := fs.MustGetFreeSpace(pt.bigPartsPath)
if newPartSize > maxOutBytes {
freeSpaceNeededBytes := newPartSize - maxOutBytes
forceMergeLogger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes)
pt.releasePartsToMerge(pws)
return nil
}
// If len(pws) == 1, then the merge must run anyway.
// This allows applying the configured retention, removing the deleted series
// and performing de-duplication if needed.
if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil {
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
}
return nil
}
var forceMergeLogger = logger.WithThrottler("forceMerge", time.Minute)