mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{mergeset,storage}: do not slow down concurrently executed queries during assisted merges
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
This commit is contained in:
parent
094ae82df5
commit
09d7fa2737
9 changed files with 1 additions and 248 deletions
|
@ -646,10 +646,6 @@ func registerStorageMetrics(strg *storage.Storage) {
|
|||
return float64(m().TooSmallTimestampRows)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
|
||||
return float64(m().SearchDelays)
|
||||
})
|
||||
|
||||
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
|
||||
return float64(m().SlowRowInserts)
|
||||
})
|
||||
|
|
|
@ -18,6 +18,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to show custom dashboards at vmui by specifying a path to a directory with dashboard config files via `-vmui.customDashboardsPath` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3322) and [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmui/packages/vmui/public/dashboards).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): apply the `step` globally to all the displayed graphs. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3574).
|
||||
|
||||
* BUGFIX: do not slow down concurrently executed queries during assisted merges, since assisted merges already prioritize data ingestion over queries. The probability of assisted merges has been increased starting from [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) because of internal refactoring. This could result in slowed down queries when there is a plenty of free CPU resources. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641) issues.
|
||||
* BUGFIX: reduce the increased CPU usage at `vmselect` to v1.85.3 level when processing heavy queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set.
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
)
|
||||
|
||||
|
@ -782,11 +781,8 @@ func (tb *Table) assistedMergeForInmemoryParts() {
|
|||
return
|
||||
}
|
||||
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&tb.inmemoryAssistedMerges, 1)
|
||||
err := tb.mergeInmemoryParts()
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -806,11 +802,8 @@ func (tb *Table) assistedMergeForFileParts() {
|
|||
return
|
||||
}
|
||||
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&tb.fileAssistedMerges, 1)
|
||||
err := tb.mergeExistingParts(false)
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
package pacelimiter
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// PaceLimiter throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
|
||||
//
|
||||
// It is expected that Inc is called before performing high-priority work,
|
||||
// while Dec is called when the work is done.
|
||||
// WaitIfNeeded must be called inside the work which must be throttled (i.e. lower-priority work).
|
||||
// It may be called in the loop before performing a part of low-priority work.
|
||||
type PaceLimiter struct {
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
delaysTotal uint64
|
||||
n int32
|
||||
}
|
||||
|
||||
// New returns pace limiter that throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
|
||||
func New() *PaceLimiter {
|
||||
var pl PaceLimiter
|
||||
pl.cond = sync.NewCond(&pl.mu)
|
||||
return &pl
|
||||
}
|
||||
|
||||
// Inc increments pl.
|
||||
func (pl *PaceLimiter) Inc() {
|
||||
atomic.AddInt32(&pl.n, 1)
|
||||
}
|
||||
|
||||
// Dec decrements pl.
|
||||
func (pl *PaceLimiter) Dec() {
|
||||
if atomic.AddInt32(&pl.n, -1) == 0 {
|
||||
// Wake up all the goroutines blocked in WaitIfNeeded,
|
||||
// since the number of Dec calls equals the number of Inc calls.
|
||||
pl.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
// WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls.
|
||||
func (pl *PaceLimiter) WaitIfNeeded() {
|
||||
if atomic.LoadInt32(&pl.n) <= 0 {
|
||||
// Fast path - there is no need in lock.
|
||||
return
|
||||
}
|
||||
// Slow path - wait until Dec is called.
|
||||
pl.mu.Lock()
|
||||
for atomic.LoadInt32(&pl.n) > 0 {
|
||||
pl.delaysTotal++
|
||||
pl.cond.Wait()
|
||||
}
|
||||
pl.mu.Unlock()
|
||||
}
|
||||
|
||||
// DelaysTotal returns the number of delays inside WaitIfNeeded.
|
||||
func (pl *PaceLimiter) DelaysTotal() uint64 {
|
||||
pl.mu.Lock()
|
||||
n := pl.delaysTotal
|
||||
pl.mu.Unlock()
|
||||
return n
|
||||
}
|
|
@ -1,144 +0,0 @@
|
|||
package pacelimiter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPacelimiter(t *testing.T) {
|
||||
t.Run("nonblocking", func(t *testing.T) {
|
||||
pl := New()
|
||||
ch := make(chan struct{}, 10)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func() {
|
||||
for j := 0; j < 10; j++ {
|
||||
pl.WaitIfNeeded()
|
||||
runtime.Gosched()
|
||||
}
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
// Check that all the goroutines are finished.
|
||||
timeoutCh := time.After(5 * time.Second)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-timeoutCh:
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
if n := pl.DelaysTotal(); n > 0 {
|
||||
t.Fatalf("unexpected non-zero number of delays: %d", n)
|
||||
}
|
||||
})
|
||||
t.Run("blocking", func(t *testing.T) {
|
||||
pl := New()
|
||||
pl.Inc()
|
||||
ch := make(chan struct{}, 10)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
wg.Done()
|
||||
for j := 0; j < 10; j++ {
|
||||
pl.WaitIfNeeded()
|
||||
}
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
// Check that all the goroutines created above are started and blocked in WaitIfNeeded
|
||||
wg.Wait()
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatalf("the pl must be blocked")
|
||||
default:
|
||||
}
|
||||
|
||||
// Unblock goroutines and check that they are unblocked.
|
||||
pl.Dec()
|
||||
timeoutCh := time.After(5 * time.Second)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-timeoutCh:
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
if n := pl.DelaysTotal(); n == 0 {
|
||||
t.Fatalf("expecting non-zero number of delays")
|
||||
}
|
||||
// Verify that the pl is unblocked now.
|
||||
pl.WaitIfNeeded()
|
||||
|
||||
// Verify that negative count doesn't block pl.
|
||||
pl.Dec()
|
||||
pl.WaitIfNeeded()
|
||||
if n := pl.DelaysTotal(); n == 0 {
|
||||
t.Fatalf("expecting non-zero number of delays after subsequent pl.Dec()")
|
||||
}
|
||||
})
|
||||
t.Run("negative_count", func(t *testing.T) {
|
||||
n := 10
|
||||
pl := New()
|
||||
for i := 0; i < n; i++ {
|
||||
pl.Dec()
|
||||
}
|
||||
|
||||
doneCh := make(chan error)
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
for i := 0; i < n; i++ {
|
||||
pl.Inc()
|
||||
pl.WaitIfNeeded()
|
||||
if n := pl.DelaysTotal(); n != 0 {
|
||||
doneCh <- fmt.Errorf("expecting zero number of delays")
|
||||
return
|
||||
}
|
||||
}
|
||||
doneCh <- nil
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-doneCh:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
})
|
||||
t.Run("concurrent_inc_dec", func(t *testing.T) {
|
||||
pl := New()
|
||||
ch := make(chan struct{}, 10)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
go func() {
|
||||
for j := 0; j < 10; j++ {
|
||||
pl.Inc()
|
||||
runtime.Gosched()
|
||||
pl.Dec()
|
||||
}
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
// Verify that all the goroutines are finished
|
||||
timeoutCh := time.After(5 * time.Second)
|
||||
for i := 0; i < cap(ch); i++ {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-timeoutCh:
|
||||
t.Fatalf("timeout")
|
||||
}
|
||||
}
|
||||
// Verify that the pl is unblocked.
|
||||
pl.WaitIfNeeded()
|
||||
if n := pl.DelaysTotal(); n > 0 {
|
||||
t.Fatalf("expecting zer number of delays; got %d", n)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -20,7 +20,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
||||
)
|
||||
|
||||
|
@ -610,14 +609,8 @@ func (pt *partition) assistedMergeForInmemoryParts() {
|
|||
return
|
||||
}
|
||||
|
||||
// There are too many unmerged inmemory parts.
|
||||
// This usually means that the app cannot keep up with the data ingestion rate.
|
||||
// Assist with mering inmemory parts.
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&pt.inmemoryAssistedMerges, 1)
|
||||
err := pt.mergeInmemoryParts()
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -637,14 +630,8 @@ func (pt *partition) assistedMergeForSmallParts() {
|
|||
return
|
||||
}
|
||||
|
||||
// There are too many unmerged small parts.
|
||||
// This usually means that the app cannot keep up with the data ingestion rate.
|
||||
// Assist with mering small parts.
|
||||
// Prioritize assisted merges over searches.
|
||||
storagepacelimiter.Search.Inc()
|
||||
atomic.AddUint64(&pt.smallAssistedMerges, 1)
|
||||
err := pt.mergeExistingParts(false)
|
||||
storagepacelimiter.Search.Dec()
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
||||
)
|
||||
|
||||
// BlockRef references a Block.
|
||||
|
@ -448,7 +447,6 @@ func checkSearchDeadlineAndPace(deadline uint64) error {
|
|||
if fasttime.UnixTimestamp() > deadline {
|
||||
return ErrDeadlineExceeded
|
||||
}
|
||||
storagepacelimiter.Search.WaitIfNeeded()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
|
@ -446,8 +445,6 @@ type Metrics struct {
|
|||
TooSmallTimestampRows uint64
|
||||
TooBigTimestampRows uint64
|
||||
|
||||
SearchDelays uint64
|
||||
|
||||
SlowRowInserts uint64
|
||||
SlowPerDayIndexInserts uint64
|
||||
SlowMetricNameLoads uint64
|
||||
|
@ -517,8 +514,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
|
||||
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
|
||||
|
||||
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
|
||||
|
||||
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
|
||||
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
|
||||
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
|
||||
|
|
|
@ -1,10 +0,0 @@
|
|||
package storagepacelimiter
|
||||
|
||||
import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
|
||||
)
|
||||
|
||||
// Search limits the pace of search calls when there is at least a single in-flight assisted merge.
|
||||
//
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
|
||||
var Search = pacelimiter.New()
|
Loading…
Reference in a new issue