lib{mergset,storage}: prevent possible race condition with logging st… (#3900)

lib{mergset,storage}: prevent possible race condition with logging stats for merges

Previously partwrapper could be release by background process and reference for part may be invalid 
during logging stats. It will lead to panic at vmstorage
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3897
This commit is contained in:
Nikolay 2023-03-03 12:33:42 +01:00 committed by GitHub
parent d056be710b
commit 6bfe9cc733
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 15 deletions

View file

@ -17,6 +17,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): `vmctl` `vm-native` mode now splits the migration process on per-metric basis. This allows to migrate metrics one-by-one according to the specified filter. This change allows to retry export/import requests for a specific metric and provides a better understanding of the migration progress. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3600).
* BUGFIX: prevent from possible panic during [background merge process](https://docs.victoriametrics.com/#storage). It may occur in rare case and was introduced at [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3337).
## [v1.88.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.88.1)
Released at 2023-02-27

View file

@ -1164,14 +1164,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
if err != nil {
return fmt.Errorf("cannot atomically register the created part: %w", err)
}
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
d := time.Since(startTime)
if d <= 30*time.Second {
return nil
}
// Log stats for long merges.
dstItemsCount := uint64(0)
dstBlocksCount := uint64(0)
dstSize := uint64(0)
@ -1183,6 +1175,15 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFinal
dstSize = pDst.size
dstPartPath = pDst.path
}
tb.swapSrcWithDstParts(pws, pwNew, dstPartType)
d := time.Since(startTime)
if d <= 30*time.Second {
return nil
}
// Log stats for long merges.
durationSecs := d.Seconds()
itemsPerSec := int(float64(srcItemsCount) / durationSecs)
logger.Infof("merged (%d parts, %d items, %d blocks, %d bytes) into (1 part, %d items, %d blocks, %d bytes) in %.3f seconds at %d items/sec to %q",

View file

@ -1376,14 +1376,7 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
if err != nil {
return fmt.Errorf("cannot atomically register the created part: %w", err)
}
pt.swapSrcWithDstParts(pws, pwNew, dstPartType)
d := time.Since(startTime)
if d <= 30*time.Second {
return nil
}
// Log stats for long merges.
dstRowsCount := uint64(0)
dstBlocksCount := uint64(0)
dstSize := uint64(0)
@ -1395,6 +1388,15 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
dstSize = pDst.size
dstPartPath = pDst.String()
}
pt.swapSrcWithDstParts(pws, pwNew, dstPartType)
d := time.Since(startTime)
if d <= 30*time.Second {
return nil
}
// Log stats for long merges.
durationSecs := d.Seconds()
rowsPerSec := int(float64(srcRowsCount) / durationSecs)
logger.Infof("merged (%d parts, %d rows, %d blocks, %d bytes) into (1 part, %d rows, %d blocks, %d bytes) in %.3f seconds at %d rows/sec to %q",