mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
lib/storage: fixes finalDedup for backfilled data (#3737)
previously historical data backfilling may trigger force merge for previous month every hour it consumes cpu, disk io and decrease cluster performance. Following commit fixes it by applying deduplication for InMemoryParts
This commit is contained in:
parent
326a702559
commit
554876cc38
3 changed files with 12 additions and 8 deletions
|
@ -736,12 +736,15 @@ func (pt *partition) HasTimestamp(timestamp int64) bool {
|
|||
// GetParts appends parts snapshot to dst and returns it.
|
||||
//
|
||||
// The appended parts must be released with PutParts.
|
||||
func (pt *partition) GetParts(dst []*partWrapper) []*partWrapper {
|
||||
func (pt *partition) GetParts(dst []*partWrapper, addInMemory bool) []*partWrapper {
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pt.inmemoryParts {
|
||||
pw.incRef()
|
||||
if addInMemory {
|
||||
for _, pw := range pt.inmemoryParts {
|
||||
pw.incRef()
|
||||
}
|
||||
dst = append(dst, pt.inmemoryParts...)
|
||||
}
|
||||
dst = append(dst, pt.inmemoryParts...)
|
||||
|
||||
for _, pw := range pt.smallParts {
|
||||
pw.incRef()
|
||||
}
|
||||
|
@ -1227,7 +1230,7 @@ func (pt *partition) runFinalDedup() error {
|
|||
}
|
||||
|
||||
func (pt *partition) getRequiredDedupInterval() (int64, int64) {
|
||||
pws := pt.GetParts(nil)
|
||||
pws := pt.GetParts(nil, false)
|
||||
defer pt.PutParts(pws)
|
||||
dedupInterval := GetDedupInterval()
|
||||
minDedupInterval := getMinDedupInterval(pws)
|
||||
|
@ -1275,12 +1278,13 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isFi
|
|||
}()
|
||||
}
|
||||
|
||||
if isFinal && len(pws) == 1 && pws[0].mp != nil {
|
||||
if !isDedupEnabled() && isFinal && len(pws) == 1 && pws[0].mp != nil {
|
||||
// Fast path: flush a single in-memory part to disk.
|
||||
mp := pws[0].mp
|
||||
if tmpPartPath == "" {
|
||||
logger.Panicf("BUG: tmpPartPath must be non-empty")
|
||||
}
|
||||
|
||||
if err := mp.StoreToDisk(tmpPartPath); err != nil {
|
||||
return fmt.Errorf("cannot store in-memory part to %q: %w", tmpPartPath, err)
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange) {
|
|||
return
|
||||
}
|
||||
|
||||
pts.pws = pt.GetParts(pts.pws[:0])
|
||||
pts.pws = pt.GetParts(pts.pws[:0], true)
|
||||
|
||||
// Initialize psPool.
|
||||
if n := len(pts.pws) - cap(pts.psPool); n > 0 {
|
||||
|
|
|
@ -1199,7 +1199,7 @@ func testStorageAddRows(rng *rand.Rand, s *Storage) error {
|
|||
}
|
||||
ptws := s1.tb.GetPartitions(nil)
|
||||
for _, ptw := range ptws {
|
||||
pws := ptw.pt.GetParts(nil)
|
||||
pws := ptw.pt.GetParts(nil, true)
|
||||
numParts := len(pws)
|
||||
ptw.pt.PutParts(pws)
|
||||
if numParts != 1 {
|
||||
|
|
Loading…
Reference in a new issue