mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib{mergset,storage}: prevent possible race condition with logging stats for merges Previously partwrapper could be release by background process and reference for part may be invalid during logging stats. It will lead to panic at vmstorage https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3897
This commit is contained in:
parent
57f50ace96
commit
361e1b1165
3 changed files with 20 additions and 15 deletions
|
@ -16,6 +16,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
## tip
|
||||
|
||||
|
||||
* BUGFIX: prevent from possible panic during [background merge process](https://docs.victoriametrics.com/#storage). It may occur in rare case and was introduced at [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337).
|
||||
|
||||
## [v1.88.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.1)
|
||||
|
||||
Released at 2023-02-27
|
||||
|
|
|
@ -1164,14 +1164,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
if err != nil {
|
||||
return fmt.Errorf("cannot atomically register the created part: %w", err)
|
||||
}
|
||||
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
||||
|
||||
d := time.Since(startTime)
|
||||
if d <= 30*time.Second {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Log stats for long merges.
|
||||
dstItemsCount := uint64(0)
|
||||
dstBlocksCount := uint64(0)
|
||||
dstSize := uint64(0)
|
||||
|
@ -1183,6 +1175,15 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
|
|||
dstSize = pDst.size
|
||||
dstPartPath = pDst.path
|
||||
}
|
||||
|
||||
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
||||
|
||||
d := time.Since(startTime)
|
||||
if d <= 30*time.Second {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Log stats for long merges.
|
||||
durationSecs := d.Seconds()
|
||||
itemsPerSec := int(float64(srcItemsCount) / durationSecs)
|
||||
logger.Infof("merged (%d parts, %d items, %d blocks, %d bytes) into (1 part, %d items, %d blocks, %d bytes) in %.3f seconds at %d items/sec to %q",
|
||||
|
|
|
@ -1376,14 +1376,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
if err != nil {
|
||||
return fmt.Errorf("cannot atomically register the created part: %w", err)
|
||||
}
|
||||
pt.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
||||
|
||||
d := time.Since(startTime)
|
||||
if d <= 30*time.Second {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Log stats for long merges.
|
||||
dstRowsCount := uint64(0)
|
||||
dstBlocksCount := uint64(0)
|
||||
dstSize := uint64(0)
|
||||
|
@ -1395,6 +1388,15 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
dstSize = pDst.size
|
||||
dstPartPath = pDst.String()
|
||||
}
|
||||
|
||||
pt.swapSrcWithDstParts(pws, pwNew, dstPartType)
|
||||
|
||||
d := time.Since(startTime)
|
||||
if d <= 30*time.Second {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Log stats for long merges.
|
||||
durationSecs := d.Seconds()
|
||||
rowsPerSec := int(float64(srcRowsCount) / durationSecs)
|
||||
logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q",
|
||||
|
|
Loading…
Reference in a new issue