diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index f9b4ee65b..84a99fd9a 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -553,10 +553,6 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(m().TooSmallTimestampRows) }) - metrics.NewGauge(`vm_search_delays_total`, func() float64 { - return float64(m().SearchDelays) - }) - metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 { return float64(m().SlowRowInserts) }) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 8fadc6930..156969b58 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -18,6 +18,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to show custom dashboards at vmui by specifying a path to a directory with dashboard config files via `-vmui.customDashboardsPath` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3322) and [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmui/packages/vmui/public/dashboards). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): apply the `step` globally to all the displayed graphs. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3574). +* BUGFIX: do not slow down concurrently executed queries during assisted merges, since assisted merges already prioritize data ingestion over queries. The probability of assisted merges has been increased starting from [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) because of internal refactoring. This could result in slowed down queries when there is a plenty of free CPU resources. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641) issues. * BUGFIX: reduce the increased CPU usage at `vmselect` to v1.85.3 level when processing heavy queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set. diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index cb875c026..158b96cd4 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -18,7 +18,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" ) @@ -782,11 +781,8 @@ func (tb *Table) assistedMergeForInmemoryParts() { return } - // Prioritize assisted merges over searches. - storagepacelimiter.Search.Inc() atomic.AddUint64(&tb.inmemoryAssistedMerges, 1) err := tb.mergeInmemoryParts() - storagepacelimiter.Search.Dec() if err == nil { continue } @@ -806,11 +802,8 @@ func (tb *Table) assistedMergeForFileParts() { return } - // Prioritize assisted merges over searches. - storagepacelimiter.Search.Inc() atomic.AddUint64(&tb.fileAssistedMerges, 1) err := tb.mergeExistingParts(false) - storagepacelimiter.Search.Dec() if err == nil { continue } diff --git a/lib/pacelimiter/pacelimiter.go b/lib/pacelimiter/pacelimiter.go deleted file mode 100644 index 462a0ef6c..000000000 --- a/lib/pacelimiter/pacelimiter.go +++ /dev/null @@ -1,63 +0,0 @@ -package pacelimiter - -import ( - "sync" - "sync/atomic" -) - -// PaceLimiter throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls. -// -// It is expected that Inc is called before performing high-priority work, -// while Dec is called when the work is done. -// WaitIfNeeded must be called inside the work which must be throttled (i.e. lower-priority work). -// It may be called in the loop before performing a part of low-priority work. -type PaceLimiter struct { - mu sync.Mutex - cond *sync.Cond - delaysTotal uint64 - n int32 -} - -// New returns pace limiter that throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls. -func New() *PaceLimiter { - var pl PaceLimiter - pl.cond = sync.NewCond(&pl.mu) - return &pl -} - -// Inc increments pl. -func (pl *PaceLimiter) Inc() { - atomic.AddInt32(&pl.n, 1) -} - -// Dec decrements pl. -func (pl *PaceLimiter) Dec() { - if atomic.AddInt32(&pl.n, -1) == 0 { - // Wake up all the goroutines blocked in WaitIfNeeded, - // since the number of Dec calls equals the number of Inc calls. - pl.cond.Broadcast() - } -} - -// WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls. -func (pl *PaceLimiter) WaitIfNeeded() { - if atomic.LoadInt32(&pl.n) <= 0 { - // Fast path - there is no need in lock. - return - } - // Slow path - wait until Dec is called. - pl.mu.Lock() - for atomic.LoadInt32(&pl.n) > 0 { - pl.delaysTotal++ - pl.cond.Wait() - } - pl.mu.Unlock() -} - -// DelaysTotal returns the number of delays inside WaitIfNeeded. -func (pl *PaceLimiter) DelaysTotal() uint64 { - pl.mu.Lock() - n := pl.delaysTotal - pl.mu.Unlock() - return n -} diff --git a/lib/pacelimiter/pacelimiter_test.go b/lib/pacelimiter/pacelimiter_test.go deleted file mode 100644 index 4c5bbf252..000000000 --- a/lib/pacelimiter/pacelimiter_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package pacelimiter - -import ( - "fmt" - "runtime" - "sync" - "testing" - "time" -) - -func TestPacelimiter(t *testing.T) { - t.Run("nonblocking", func(t *testing.T) { - pl := New() - ch := make(chan struct{}, 10) - for i := 0; i < cap(ch); i++ { - go func() { - for j := 0; j < 10; j++ { - pl.WaitIfNeeded() - runtime.Gosched() - } - ch <- struct{}{} - }() - } - - // Check that all the goroutines are finished. - timeoutCh := time.After(5 * time.Second) - for i := 0; i < cap(ch); i++ { - select { - case <-ch: - case <-timeoutCh: - t.Fatalf("timeout") - } - } - if n := pl.DelaysTotal(); n > 0 { - t.Fatalf("unexpected non-zero number of delays: %d", n) - } - }) - t.Run("blocking", func(t *testing.T) { - pl := New() - pl.Inc() - ch := make(chan struct{}, 10) - var wg sync.WaitGroup - for i := 0; i < cap(ch); i++ { - wg.Add(1) - go func() { - wg.Done() - for j := 0; j < 10; j++ { - pl.WaitIfNeeded() - } - ch <- struct{}{} - }() - } - - // Check that all the goroutines created above are started and blocked in WaitIfNeeded - wg.Wait() - select { - case <-ch: - t.Fatalf("the pl must be blocked") - default: - } - - // Unblock goroutines and check that they are unblocked. - pl.Dec() - timeoutCh := time.After(5 * time.Second) - for i := 0; i < cap(ch); i++ { - select { - case <-ch: - case <-timeoutCh: - t.Fatalf("timeout") - } - } - if n := pl.DelaysTotal(); n == 0 { - t.Fatalf("expecting non-zero number of delays") - } - // Verify that the pl is unblocked now. - pl.WaitIfNeeded() - - // Verify that negative count doesn't block pl. - pl.Dec() - pl.WaitIfNeeded() - if n := pl.DelaysTotal(); n == 0 { - t.Fatalf("expecting non-zero number of delays after subsequent pl.Dec()") - } - }) - t.Run("negative_count", func(t *testing.T) { - n := 10 - pl := New() - for i := 0; i < n; i++ { - pl.Dec() - } - - doneCh := make(chan error) - go func() { - defer close(doneCh) - for i := 0; i < n; i++ { - pl.Inc() - pl.WaitIfNeeded() - if n := pl.DelaysTotal(); n != 0 { - doneCh <- fmt.Errorf("expecting zero number of delays") - return - } - } - doneCh <- nil - }() - - select { - case err := <-doneCh: - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - case <-time.After(5 * time.Second): - t.Fatalf("timeout") - } - }) - t.Run("concurrent_inc_dec", func(t *testing.T) { - pl := New() - ch := make(chan struct{}, 10) - for i := 0; i < cap(ch); i++ { - go func() { - for j := 0; j < 10; j++ { - pl.Inc() - runtime.Gosched() - pl.Dec() - } - ch <- struct{}{} - }() - } - - // Verify that all the goroutines are finished - timeoutCh := time.After(5 * time.Second) - for i := 0; i < cap(ch); i++ { - select { - case <-ch: - case <-timeoutCh: - t.Fatalf("timeout") - } - } - // Verify that the pl is unblocked. - pl.WaitIfNeeded() - if n := pl.DelaysTotal(); n > 0 { - t.Fatalf("expecting zer number of delays; got %d", n) - } - }) -} diff --git a/lib/storage/partition.go b/lib/storage/partition.go index cd0358337..8fcc8bc13 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -20,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg" ) @@ -610,14 +609,8 @@ func (pt *partition) assistedMergeForInmemoryParts() { return } - // There are too many unmerged inmemory parts. - // This usually means that the app cannot keep up with the data ingestion rate. - // Assist with mering inmemory parts. - // Prioritize assisted merges over searches. - storagepacelimiter.Search.Inc() atomic.AddUint64(&pt.inmemoryAssistedMerges, 1) err := pt.mergeInmemoryParts() - storagepacelimiter.Search.Dec() if err == nil { continue } @@ -637,14 +630,8 @@ func (pt *partition) assistedMergeForSmallParts() { return } - // There are too many unmerged small parts. - // This usually means that the app cannot keep up with the data ingestion rate. - // Assist with mering small parts. - // Prioritize assisted merges over searches. - storagepacelimiter.Search.Inc() atomic.AddUint64(&pt.smallAssistedMerges, 1) err := pt.mergeExistingParts(false) - storagepacelimiter.Search.Dec() if err == nil { continue } diff --git a/lib/storage/search.go b/lib/storage/search.go index 1a3cdefe6..b5d780ad5 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -10,7 +10,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" ) // BlockRef references a Block. @@ -512,7 +511,6 @@ func checkSearchDeadlineAndPace(deadline uint64) error { if fasttime.UnixTimestamp() > deadline { return ErrDeadlineExceeded } - storagepacelimiter.Search.WaitIfNeeded() return nil } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 6dcb696a3..5774ffe37 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -26,7 +26,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" "github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" "github.com/VictoriaMetrics/fastcache" @@ -461,8 +460,6 @@ type Metrics struct { TooSmallTimestampRows uint64 TooBigTimestampRows uint64 - SearchDelays uint64 - SlowRowInserts uint64 SlowPerDayIndexInserts uint64 SlowMetricNameLoads uint64 @@ -532,8 +529,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows) m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows) - m.SearchDelays = storagepacelimiter.Search.DelaysTotal() - m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) diff --git a/lib/storagepacelimiter/storagepacelimiter.go b/lib/storagepacelimiter/storagepacelimiter.go deleted file mode 100644 index e309e6c52..000000000 --- a/lib/storagepacelimiter/storagepacelimiter.go +++ /dev/null @@ -1,10 +0,0 @@ -package storagepacelimiter - -import ( - "github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter" -) - -// Search limits the pace of search calls when there is at least a single in-flight assisted merge. -// -// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291 -var Search = pacelimiter.New()