Aliaksandr Valialkin 2023-01-16 14:31:50 -08:00
parent 4fc8cacd1a
commit 103dfd0525
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
9 changed files with 1 additions and 248 deletions

View file

@ -553,10 +553,6 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().TooSmallTimestampRows)
})
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays)
})
metrics.NewGauge(`vm_slow_row_inserts_total`, func() float64 {
return float64(m().SlowRowInserts)
})

View file

@ -18,6 +18,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to show custom dashboards at vmui by specifying a path to a directory with dashboard config files via `-vmui.customDashboardsPath` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3322) and [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmui/packages/vmui/public/dashboards).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): apply the `step` globally to all the displayed graphs. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3574).
* BUGFIX: do not slow down concurrently executed queries during assisted merges, since assisted merges already prioritize data ingestion over queries. The probability of assisted merges has been increased starting from [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850) because of internal refactoring. This could result in slowed down queries when there is a plenty of free CPU resources. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3647) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641) issues.
* BUGFIX: reduce the increased CPU usage at `vmselect` to v1.85.3 level when processing heavy queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3641).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): [dockerswarm_sd_configs](https://docs.victoriametrics.com/sd_configs.html#dockerswarm_sd_configs): apply `filters` only to objects of the specified `role`. Previously filters were applied to all the objects, which could cause errors when different types of objects were used with filters that were not compatible with them. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3579).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): suppress all the scrape errors when `-promscrape.suppressScrapeErrors` is enabled. Previously some scrape errors were logged even if `-promscrape.suppressScrapeErrors` flag was set.

View file

@ -18,7 +18,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
)
@ -782,11 +781,8 @@ func (tb *Table) assistedMergeForInmemoryParts() {
return
}
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&tb.inmemoryAssistedMerges, 1)
err := tb.mergeInmemoryParts()
storagepacelimiter.Search.Dec()
if err == nil {
continue
}
@ -806,11 +802,8 @@ func (tb *Table) assistedMergeForFileParts() {
return
}
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&tb.fileAssistedMerges, 1)
err := tb.mergeExistingParts(false)
storagepacelimiter.Search.Dec()
if err == nil {
continue
}

View file

@ -1,63 +0,0 @@
package pacelimiter
import (
"sync"
"sync/atomic"
)
// PaceLimiter throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
//
// It is expected that Inc is called before performing high-priority work,
// while Dec is called when the work is done.
// WaitIfNeeded must be called inside the work which must be throttled (i.e. lower-priority work).
// It may be called in the loop before performing a part of low-priority work.
type PaceLimiter struct {
mu sync.Mutex
cond *sync.Cond
delaysTotal uint64
n int32
}
// New returns pace limiter that throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
func New() *PaceLimiter {
var pl PaceLimiter
pl.cond = sync.NewCond(&pl.mu)
return &pl
}
// Inc increments pl.
func (pl *PaceLimiter) Inc() {
atomic.AddInt32(&pl.n, 1)
}
// Dec decrements pl.
func (pl *PaceLimiter) Dec() {
if atomic.AddInt32(&pl.n, -1) == 0 {
// Wake up all the goroutines blocked in WaitIfNeeded,
// since the number of Dec calls equals the number of Inc calls.
pl.cond.Broadcast()
}
}
// WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls.
func (pl *PaceLimiter) WaitIfNeeded() {
if atomic.LoadInt32(&pl.n) <= 0 {
// Fast path - there is no need in lock.
return
}
// Slow path - wait until Dec is called.
pl.mu.Lock()
for atomic.LoadInt32(&pl.n) > 0 {
pl.delaysTotal++
pl.cond.Wait()
}
pl.mu.Unlock()
}
// DelaysTotal returns the number of delays inside WaitIfNeeded.
func (pl *PaceLimiter) DelaysTotal() uint64 {
pl.mu.Lock()
n := pl.delaysTotal
pl.mu.Unlock()
return n
}

View file

@ -1,144 +0,0 @@
package pacelimiter
import (
"fmt"
"runtime"
"sync"
"testing"
"time"
)
func TestPacelimiter(t *testing.T) {
t.Run("nonblocking", func(t *testing.T) {
pl := New()
ch := make(chan struct{}, 10)
for i := 0; i < cap(ch); i++ {
go func() {
for j := 0; j < 10; j++ {
pl.WaitIfNeeded()
runtime.Gosched()
}
ch <- struct{}{}
}()
}
// Check that all the goroutines are finished.
timeoutCh := time.After(5 * time.Second)
for i := 0; i < cap(ch); i++ {
select {
case <-ch:
case <-timeoutCh:
t.Fatalf("timeout")
}
}
if n := pl.DelaysTotal(); n > 0 {
t.Fatalf("unexpected non-zero number of delays: %d", n)
}
})
t.Run("blocking", func(t *testing.T) {
pl := New()
pl.Inc()
ch := make(chan struct{}, 10)
var wg sync.WaitGroup
for i := 0; i < cap(ch); i++ {
wg.Add(1)
go func() {
wg.Done()
for j := 0; j < 10; j++ {
pl.WaitIfNeeded()
}
ch <- struct{}{}
}()
}
// Check that all the goroutines created above are started and blocked in WaitIfNeeded
wg.Wait()
select {
case <-ch:
t.Fatalf("the pl must be blocked")
default:
}
// Unblock goroutines and check that they are unblocked.
pl.Dec()
timeoutCh := time.After(5 * time.Second)
for i := 0; i < cap(ch); i++ {
select {
case <-ch:
case <-timeoutCh:
t.Fatalf("timeout")
}
}
if n := pl.DelaysTotal(); n == 0 {
t.Fatalf("expecting non-zero number of delays")
}
// Verify that the pl is unblocked now.
pl.WaitIfNeeded()
// Verify that negative count doesn't block pl.
pl.Dec()
pl.WaitIfNeeded()
if n := pl.DelaysTotal(); n == 0 {
t.Fatalf("expecting non-zero number of delays after subsequent pl.Dec()")
}
})
t.Run("negative_count", func(t *testing.T) {
n := 10
pl := New()
for i := 0; i < n; i++ {
pl.Dec()
}
doneCh := make(chan error)
go func() {
defer close(doneCh)
for i := 0; i < n; i++ {
pl.Inc()
pl.WaitIfNeeded()
if n := pl.DelaysTotal(); n != 0 {
doneCh <- fmt.Errorf("expecting zero number of delays")
return
}
}
doneCh <- nil
}()
select {
case err := <-doneCh:
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
})
t.Run("concurrent_inc_dec", func(t *testing.T) {
pl := New()
ch := make(chan struct{}, 10)
for i := 0; i < cap(ch); i++ {
go func() {
for j := 0; j < 10; j++ {
pl.Inc()
runtime.Gosched()
pl.Dec()
}
ch <- struct{}{}
}()
}
// Verify that all the goroutines are finished
timeoutCh := time.After(5 * time.Second)
for i := 0; i < cap(ch); i++ {
select {
case <-ch:
case <-timeoutCh:
t.Fatalf("timeout")
}
}
// Verify that the pl is unblocked.
pl.WaitIfNeeded()
if n := pl.DelaysTotal(); n > 0 {
t.Fatalf("expecting zer number of delays; got %d", n)
}
})
}

View file

@ -20,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
)
@ -610,14 +609,8 @@ func (pt *partition) assistedMergeForInmemoryParts() {
return
}
// There are too many unmerged inmemory parts.
// This usually means that the app cannot keep up with the data ingestion rate.
// Assist with mering inmemory parts.
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&pt.inmemoryAssistedMerges, 1)
err := pt.mergeInmemoryParts()
storagepacelimiter.Search.Dec()
if err == nil {
continue
}
@ -637,14 +630,8 @@ func (pt *partition) assistedMergeForSmallParts() {
return
}
// There are too many unmerged small parts.
// This usually means that the app cannot keep up with the data ingestion rate.
// Assist with mering small parts.
// Prioritize assisted merges over searches.
storagepacelimiter.Search.Inc()
atomic.AddUint64(&pt.smallAssistedMerges, 1)
err := pt.mergeExistingParts(false)
storagepacelimiter.Search.Dec()
if err == nil {
continue
}

View file

@ -10,7 +10,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
)
// BlockRef references a Block.
@ -512,7 +511,6 @@ func checkSearchDeadlineAndPace(deadline uint64) error {
if fasttime.UnixTimestamp() > deadline {
return ErrDeadlineExceeded
}
storagepacelimiter.Search.WaitIfNeeded()
return nil
}

View file

@ -26,7 +26,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/snapshot"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storagepacelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
"github.com/VictoriaMetrics/fastcache"
@ -461,8 +460,6 @@ type Metrics struct {
TooSmallTimestampRows uint64
TooBigTimestampRows uint64
SearchDelays uint64
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
SlowMetricNameLoads uint64
@ -532,8 +529,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)

View file

@ -1,10 +0,0 @@
package storagepacelimiter
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pacelimiter"
)
// Search limits the pace of search calls when there is at least a single in-flight assisted merge.
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/291
var Search = pacelimiter.New()