mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/{mergeset,storage}: prevent from long wait time when creating a snapshot under high data ingestion rate
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3873
This commit is contained in:
parent
0627b845be
commit
d2f85816ea
3 changed files with 24 additions and 52 deletions
|
@ -20,6 +20,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `--kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
|
||||
|
||||
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
||||
|
||||
## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1)
|
||||
|
||||
Released at 2023-03-12
|
||||
|
|
|
@ -662,35 +662,20 @@ func (tb *Table) flushInmemoryItems() {
|
|||
}
|
||||
|
||||
func (tb *Table) flushInmemoryParts(isFinal bool) {
|
||||
for {
|
||||
currentTime := time.Now()
|
||||
var pws []*partWrapper
|
||||
currentTime := time.Now()
|
||||
var pws []*partWrapper
|
||||
|
||||
tb.partsLock.Lock()
|
||||
for _, pw := range tb.inmemoryParts {
|
||||
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
|
||||
pw.isInMerge = true
|
||||
pws = append(pws, pw)
|
||||
}
|
||||
tb.partsLock.Lock()
|
||||
for _, pw := range tb.inmemoryParts {
|
||||
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
|
||||
pw.isInMerge = true
|
||||
pws = append(pws, pw)
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
|
||||
if err := tb.mergePartsOptimal(pws); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
|
||||
}
|
||||
if !isFinal {
|
||||
return
|
||||
}
|
||||
tb.partsLock.Lock()
|
||||
n := len(tb.inmemoryParts)
|
||||
tb.partsLock.Unlock()
|
||||
if n == 0 {
|
||||
// All the in-memory parts were flushed to disk.
|
||||
return
|
||||
}
|
||||
// Some parts weren't flushed to disk because they were being merged.
|
||||
// Sleep for a while and try flushing them again.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if err := tb.mergePartsOptimal(pws); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -871,35 +871,20 @@ func (pt *partition) flushInmemoryRows() {
|
|||
}
|
||||
|
||||
func (pt *partition) flushInmemoryParts(isFinal bool) {
|
||||
for {
|
||||
currentTime := time.Now()
|
||||
var pws []*partWrapper
|
||||
currentTime := time.Now()
|
||||
var pws []*partWrapper
|
||||
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pt.inmemoryParts {
|
||||
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
|
||||
pw.isInMerge = true
|
||||
pws = append(pws, pw)
|
||||
}
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pt.inmemoryParts {
|
||||
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
|
||||
pw.isInMerge = true
|
||||
pws = append(pws, pw)
|
||||
}
|
||||
pt.partsLock.Unlock()
|
||||
}
|
||||
pt.partsLock.Unlock()
|
||||
|
||||
if err := pt.mergePartsOptimal(pws, nil); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
|
||||
}
|
||||
if !isFinal {
|
||||
return
|
||||
}
|
||||
pt.partsLock.Lock()
|
||||
n := len(pt.inmemoryParts)
|
||||
pt.partsLock.Unlock()
|
||||
if n == 0 {
|
||||
// All the in-memory parts were flushed to disk.
|
||||
return
|
||||
}
|
||||
// Some parts weren't flushed to disk because they were being merged.
|
||||
// Sleep for a while and try flushing them again.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if err := pt.mergePartsOptimal(pws, nil); err != nil {
|
||||
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue