mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/pacelimiter: increase scalability for multi-CPU system
This commit is contained in:
parent
14ddb8a34e
commit
6c0a92a1ee
1 changed files with 10 additions and 9 deletions
|
@ -2,6 +2,7 @@ package pacelimiter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PaceLimiter throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
|
// PaceLimiter throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
|
||||||
|
@ -14,7 +15,7 @@ type PaceLimiter struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
delaysTotal uint64
|
delaysTotal uint64
|
||||||
n int
|
n int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns pace limiter that throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
|
// New returns pace limiter that throttles WaitIfNeeded callers while the number of Inc calls is bigger than the number of Dec calls.
|
||||||
|
@ -26,27 +27,27 @@ func New() *PaceLimiter {
|
||||||
|
|
||||||
// Inc increments pl.
|
// Inc increments pl.
|
||||||
func (pl *PaceLimiter) Inc() {
|
func (pl *PaceLimiter) Inc() {
|
||||||
pl.mu.Lock()
|
atomic.AddInt32(&pl.n, 1)
|
||||||
pl.n++
|
|
||||||
pl.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dec decrements pl.
|
// Dec decrements pl.
|
||||||
func (pl *PaceLimiter) Dec() {
|
func (pl *PaceLimiter) Dec() {
|
||||||
pl.mu.Lock()
|
if atomic.AddInt32(&pl.n, -1) == 0 {
|
||||||
pl.n--
|
|
||||||
if pl.n == 0 {
|
|
||||||
// Wake up all the goroutines blocked in WaitIfNeeded,
|
// Wake up all the goroutines blocked in WaitIfNeeded,
|
||||||
// since the number of Dec calls equals the number of Inc calls.
|
// since the number of Dec calls equals the number of Inc calls.
|
||||||
pl.cond.Broadcast()
|
pl.cond.Broadcast()
|
||||||
}
|
}
|
||||||
pl.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls.
|
// WaitIfNeeded blocks while the number of Inc calls is bigger than the number of Dec calls.
|
||||||
func (pl *PaceLimiter) WaitIfNeeded() {
|
func (pl *PaceLimiter) WaitIfNeeded() {
|
||||||
|
if atomic.LoadInt32(&pl.n) <= 0 {
|
||||||
|
// Fast path - there is no need in lock.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Slow path - wait until Dec is called.
|
||||||
pl.mu.Lock()
|
pl.mu.Lock()
|
||||||
for pl.n > 0 {
|
for atomic.LoadInt32(&pl.n) > 0 {
|
||||||
pl.delaysTotal++
|
pl.delaysTotal++
|
||||||
pl.cond.Wait()
|
pl.cond.Wait()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue