From d6b6cb56e53eadfba7cd765738c1d9e85e8e1aea Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 19 Mar 2023 00:10:24 -0700 Subject: [PATCH] lib/{mergeset,storage}: prevent from long wait time when creating a snapshot under high data ingestion rate Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3873 --- docs/CHANGELOG.md | 2 ++ lib/mergeset/table.go | 37 +++++++++++-------------------------- lib/storage/partition.go | 37 +++++++++++-------------------------- 3 files changed, 24 insertions(+), 52 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d15981654..5bb854fcb 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f ## v1.87.x long-time support release (LTS) +* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551). + ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3) Released at 2023-03-12 diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 486eb4742..5ce62829b 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -662,35 +662,20 @@ func (tb *Table) flushInmemoryItems() { } func (tb *Table) flushInmemoryParts(isFinal bool) { - for { - currentTime := time.Now() - var pws []*partWrapper + currentTime := time.Now() + var pws []*partWrapper - tb.partsLock.Lock() - for _, pw := range tb.inmemoryParts { - if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { - pw.isInMerge = true - pws = append(pws, pw) - } + tb.partsLock.Lock() + for _, pw := range tb.inmemoryParts { + if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { + pw.isInMerge = true + pws = append(pws, pw) } - tb.partsLock.Unlock() + } + tb.partsLock.Unlock() - if err := tb.mergePartsOptimal(pws); err != nil { - logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) - } - if !isFinal { - return - } - tb.partsLock.Lock() - n := len(tb.inmemoryParts) - tb.partsLock.Unlock() - if n == 0 { - // All the in-memory parts were flushed to disk. - return - } - // Some parts weren't flushed to disk because they were being merged. - // Sleep for a while and try flushing them again. - time.Sleep(10 * time.Millisecond) + if err := tb.mergePartsOptimal(pws); err != nil { + logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index d2dbc63b4..8127943df 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -871,35 +871,20 @@ func (pt *partition) flushInmemoryRows() { } func (pt *partition) flushInmemoryParts(isFinal bool) { - for { - currentTime := time.Now() - var pws []*partWrapper + currentTime := time.Now() + var pws []*partWrapper - pt.partsLock.Lock() - for _, pw := range pt.inmemoryParts { - if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { - pw.isInMerge = true - pws = append(pws, pw) - } + pt.partsLock.Lock() + for _, pw := range pt.inmemoryParts { + if !pw.isInMerge && (isFinal || pw.flushToDiskDeadline.Before(currentTime)) { + pw.isInMerge = true + pws = append(pws, pw) } - pt.partsLock.Unlock() + } + pt.partsLock.Unlock() - if err := pt.mergePartsOptimal(pws, nil); err != nil { - logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) - } - if !isFinal { - return - } - pt.partsLock.Lock() - n := len(pt.inmemoryParts) - pt.partsLock.Unlock() - if n == 0 { - // All the in-memory parts were flushed to disk. - return - } - // Some parts weren't flushed to disk because they were being merged. - // Sleep for a while and try flushing them again. - time.Sleep(10 * time.Millisecond) + if err := pt.mergePartsOptimal(pws, nil); err != nil { + logger.Panicf("FATAL: cannot merge in-memory parts: %s", err) } }