lib/{mergeset,storage}: prevent from long wait time when creating a snapshot under high data ingestion rate

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3873
This commit is contained in:
Aliaksandr Valialkin 2023-03-19 00:10:24 -07:00
parent a20c4804a0
commit d6b6cb56e5
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 24 additions and 52 deletions

View file

@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
## v1.87.x long-time support release (LTS) ## v1.87.x long-time support release (LTS)
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3) ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)
Released at 2023-03-12 Released at 2023-03-12

View file

@ -662,35 +662,20 @@ func (tb *Table) flushInmemoryItems() {
} }
func (tb *Table) flushInmemoryParts(isFinal bool) { func (tb *Table) flushInmemoryParts(isFinal bool) {
for { currentTime := time.Now()
currentTime := time.Now() var pws []*partWrapper
var pws []*partWrapper
tb.partsLock.Lock() tb.partsLock.Lock()
for _, pw := range tb.inmemoryParts { for _, pw := range tb.inmemoryParts {
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
pw.isInMerge = true pw.isInMerge = true
pws = append(pws, pw) pws = append(pws, pw)
}
} }
tb.partsLock.Unlock() }
tb.partsLock.Unlock()
if err := tb.mergePartsOptimal(pws); err != nil { if err := tb.mergePartsOptimal(pws); err != nil {
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
}
if !isFinal {
return
}
tb.partsLock.Lock()
n := len(tb.inmemoryParts)
tb.partsLock.Unlock()
if n == 0 {
// All the in-memory parts were flushed to disk.
return
}
// Some parts weren't flushed to disk because they were being merged.
// Sleep for a while and try flushing them again.
time.Sleep(10 * time.Millisecond)
} }
} }

View file

@ -871,35 +871,20 @@ func (pt *partition) flushInmemoryRows() {
} }
func (pt *partition) flushInmemoryParts(isFinal bool) { func (pt *partition) flushInmemoryParts(isFinal bool) {
for { currentTime := time.Now()
currentTime := time.Now() var pws []*partWrapper
var pws []*partWrapper
pt.partsLock.Lock() pt.partsLock.Lock()
for _, pw := range pt.inmemoryParts { for _, pw := range pt.inmemoryParts {
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
pw.isInMerge = true pw.isInMerge = true
pws = append(pws, pw) pws = append(pws, pw)
}
} }
pt.partsLock.Unlock() }
pt.partsLock.Unlock()
if err := pt.mergePartsOptimal(pws, nil); err != nil { if err := pt.mergePartsOptimal(pws, nil); err != nil {
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
}
if !isFinal {
return
}
pt.partsLock.Lock()
n := len(pt.inmemoryParts)
pt.partsLock.Unlock()
if n == 0 {
// All the in-memory parts were flushed to disk.
return
}
// Some parts weren't flushed to disk because they were being merged.
// Sleep for a while and try flushing them again.
time.Sleep(10 * time.Millisecond)
} }
} }