mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: fixes finalDedup for backfilled data (#3737)
previously historical data backfilling may trigger force merge for previous month every hour it consumes cpu, disk io and decrease cluster performance. Following commit fixes it by applying deduplication for InMemoryParts
This commit is contained in:
parent
68985455f1
commit
9254e494f9
3 changed files with 12 additions and 8 deletions
|
@ -736,12 +736,15 @@ func (pt *partition) HasTimestamp(timestamp int64) bool {
|
|||
// GetParts appends parts snapshot to dst and returns it.
|
||||
//
|
||||
// The appended parts must be released with PutParts.
|
||||
func (pt *partition) GetParts(dst []*partWrapper) []*partWrapper {
|
||||
func (pt *partition) GetParts(dst []*partWrapper, addInMemory bool) []*partWrapper {
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pt.inmemoryParts {
|
||||
pw.incRef()
|
||||
if addInMemory {
|
||||
for _, pw := range pt.inmemoryParts {
|
||||
pw.incRef()
|
||||
}
|
||||
dst = append(dst, pt.inmemoryParts...)
|
||||
}
|
||||
dst = append(dst, pt.inmemoryParts...)
|
||||
|
||||
for _, pw := range pt.smallParts {
|
||||
pw.incRef()
|
||||
}
|
||||
|
@ -1227,7 +1230,7 @@ func (pt *partition) runFinalDedup() error {
|
|||
}
|
||||
|
||||
func (pt *partition) getRequiredDedupInterval() (int64, int64) {
|
||||
pws := pt.GetParts(nil)
|
||||
pws := pt.GetParts(nil, false)
|
||||
defer pt.PutParts(pws)
|
||||
dedupInterval := GetDedupInterval()
|
||||
minDedupInterval := getMinDedupInterval(pws)
|
||||
|
@ -1275,12 +1278,13 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
}()
|
||||
}
|
||||
|
||||
if isFinal && len(pws) == 1 && pws[0].mp != nil {
|
||||
if !isDedupEnabled() && isFinal && len(pws) == 1 && pws[0].mp != nil {
|
||||
// Fast path: flush a single in-memory part to disk.
|
||||
mp := pws[0].mp
|
||||
if tmpPartPath == "" {
|
||||
logger.Panicf("BUG: tmpPartPath must be non-empty")
|
||||
}
|
||||
|
||||
if err := mp.StoreToDisk(tmpPartPath); err != nil {
|
||||
return fmt.Errorf("cannot store in-memory part to %q: %w", tmpPartPath, err)
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
|
|||
return
|
||||
}
|
||||
|
||||
pts.pws = pt.GetParts(pts.pws[:0])
|
||||
pts.pws = pt.GetParts(pts.pws[:0], true)
|
||||
|
||||
// Initialize psPool.
|
||||
if n := len(pts.pws) - cap(pts.psPool); n > 0 {
|
||||
|
|
|
@ -1055,7 +1055,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
|||
}
|
||||
ptws := s1.tb.GetPartitions(nil)
|
||||
for _, ptw := range ptws {
|
||||
pws := ptw.pt.GetParts(nil)
|
||||
pws := ptw.pt.GetParts(nil, true)
|
||||
numParts := len(pws)
|
||||
ptw.pt.PutParts(pws)
|
||||
if numParts != 1 {
|
||||
|
|
Loading…
Reference in a new issue