diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2a0814656..387fd4847 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -20,6 +20,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `--kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). +* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). + ## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1) Released at 2023-03-12 diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index b123da213..05396cdbc 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -662,35 +662,20 @@ func (tb *Table) flushInmemoryItems() { } func (tb *Table) flushInmemoryParts(isFinal bool) { - for { - currentTime := time.Now() - var pws []*partWrapper + currentTime := time.Now() + var pws []*partWrapper - tb.partsLock.Lock() - for _, pw := range tb.inmemoryParts { - if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { - pw.isInMerge = true - pws = append(pws, pw) - } + tb.partsLock.Lock() + for _, pw := range tb.inmemoryParts { + if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { + pw.isInMerge = true + pws = append(pws, pw) } - tb.partsLock.Unlock() + } + tb.partsLock.Unlock() - if err := tb.mergePartsOptimal(pws); err != nil { - logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) - } - if !isFinal { - return - } - tb.partsLock.Lock() - n := len(tb.inmemoryParts) - tb.partsLock.Unlock() - if n == 0 { - // All the in-memory parts were flushed to disk. - return - } - // Some parts weren't flushed to disk because they were being merged. - // Sleep for a while and try flushing them again. - time.Sleep(10 * time.Millisecond) + if err := tb.mergePartsOptimal(pws); err != nil { + logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index ece31a9c4..f41381300 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -871,35 +871,20 @@ func (pt *partition) flushInmemoryRows() { } func (pt *partition) flushInmemoryParts(isFinal bool) { - for { - currentTime := time.Now() - var pws []*partWrapper + currentTime := time.Now() + var pws []*partWrapper - pt.partsLock.Lock() - for _, pw := range pt.inmemoryParts { - if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { - pw.isInMerge = true - pws = append(pws, pw) - } + pt.partsLock.Lock() + for _, pw := range pt.inmemoryParts { + if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { + pw.isInMerge = true + pws = append(pws, pw) } - pt.partsLock.Unlock() + } + pt.partsLock.Unlock() - if err := pt.mergePartsOptimal(pws, nil); err != nil { - logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) - } - if !isFinal { - return - } - pt.partsLock.Lock() - n := len(pt.inmemoryParts) - pt.partsLock.Unlock() - if n == 0 { - // All the in-memory parts were flushed to disk. - return - } - // Some parts weren't flushed to disk because they were being merged. - // Sleep for a while and try flushing them again. - time.Sleep(10 * time.Millisecond) + if err := pt.mergePartsOptimal(pws, nil); err != nil { + logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) } }