lib/{mergeset,storage}: prevent from long wait time when creating a snapshot under high data ingestion rate

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3873
This commit is contained in:
Aliaksandr Valialkin 2023-03-19 00:10:24 -07:00
parent 6d56149b9f
commit 6460475e3b
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 24 additions and 52 deletions

View file

@ -20,6 +20,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `--kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `--kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1) ## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1)
Released at 2023-03-12 Released at 2023-03-12

View file

@ -662,35 +662,20 @@ func (tb *Table) flushInmemoryItems() {
} }
func (tb *Table) flushInmemoryParts(isFinal bool) { func (tb *Table) flushInmemoryParts(isFinal bool) {
for { currentTime := time.Now()
currentTime := time.Now() var pws []*partWrapper
var pws []*partWrapper
tb.partsLock.Lock() tb.partsLock.Lock()
for _, pw := range tb.inmemoryParts { for _, pw := range tb.inmemoryParts {
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
pw.isInMerge = true pw.isInMerge = true
pws = append(pws, pw) pws = append(pws, pw)
}
} }
tb.partsLock.Unlock() }
tb.partsLock.Unlock()
if err := tb.mergePartsOptimal(pws); err != nil { if err := tb.mergePartsOptimal(pws); err != nil {
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
}
if !isFinal {
return
}
tb.partsLock.Lock()
n := len(tb.inmemoryParts)
tb.partsLock.Unlock()
if n == 0 {
// All the in-memory parts were flushed to disk.
return
}
// Some parts weren't flushed to disk because they were being merged.
// Sleep for a while and try flushing them again.
time.Sleep(10 * time.Millisecond)
} }
} }

View file

@ -871,35 +871,20 @@ func (pt *partition) flushInmemoryRows() {
} }
func (pt *partition) flushInmemoryParts(isFinal bool) { func (pt *partition) flushInmemoryParts(isFinal bool) {
for { currentTime := time.Now()
currentTime := time.Now() var pws []*partWrapper
var pws []*partWrapper
pt.partsLock.Lock() pt.partsLock.Lock()
for _, pw := range pt.inmemoryParts { for _, pw := range pt.inmemoryParts {
if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) {
pw.isInMerge = true pw.isInMerge = true
pws = append(pws, pw) pws = append(pws, pw)
}
} }
pt.partsLock.Unlock() }
pt.partsLock.Unlock()
if err := pt.mergePartsOptimal(pws, nil); err != nil { if err := pt.mergePartsOptimal(pws, nil); err != nil {
logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) logger.Panicf("FATAL: cannot merge in-memory parts: %s", err)
}
if !isFinal {
return
}
pt.partsLock.Lock()
n := len(pt.inmemoryParts)
pt.partsLock.Unlock()
if n == 0 {
// All the in-memory parts were flushed to disk.
return
}
// Some parts weren't flushed to disk because they were being merged.
// Sleep for a while and try flushing them again.
time.Sleep(10 * time.Millisecond)
} }
} }