diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2e91bada16..81d3dd4a0f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -16,6 +16,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* BUGFIX: prevent from possible panic during [background merge process](https://docs.victoriametrics.com/#storage). It may occur in rare case and was introduced at [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337). + ## [v1.88.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.1) Released at 2023-02-27 diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 9fa62bc9d6..76c60cac80 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -1164,14 +1164,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal if err != nil { return fmt.Errorf("cannot atomically register the created part: %w", err) } - tb.swapSrcWithDstParts(pws, pwNew, dstPartType) - - d := time.Since(startTime) - if d <= 30*time.Second { - return nil - } - - // Log stats for long merges. dstItemsCount := uint64(0) dstBlocksCount := uint64(0) dstSize := uint64(0) @@ -1183,6 +1175,15 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal dstSize = pDst.size dstPartPath = pDst.path } + + tb.swapSrcWithDstParts(pws, pwNew, dstPartType) + + d := time.Since(startTime) + if d <= 30*time.Second { + return nil + } + + // Log stats for long merges. durationSecs := d.Seconds() itemsPerSec := int(float64(srcItemsCount) / durationSecs) logger.Infof("merged (%d parts, %d items, %d blocks, %d bytes) into (1 part, %d items, %d blocks, %d bytes) in %.3f seconds at %d items/sec to %q", diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 793fa6ddf4..570b44c252 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1376,14 +1376,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi if err != nil { return fmt.Errorf("cannot atomically register the created part: %w", err) } - pt.swapSrcWithDstParts(pws, pwNew, dstPartType) - d := time.Since(startTime) - if d <= 30*time.Second { - return nil - } - - // Log stats for long merges. dstRowsCount := uint64(0) dstBlocksCount := uint64(0) dstSize := uint64(0) @@ -1395,6 +1388,15 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi dstSize = pDst.size dstPartPath = pDst.String() } + + pt.swapSrcWithDstParts(pws, pwNew, dstPartType) + + d := time.Since(startTime) + if d <= 30*time.Second { + return nil + } + + // Log stats for long merges. durationSecs := d.Seconds() rowsPerSec := int(float64(srcRowsCount) / durationSecs) logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q",