VictoriaMetrics/lib/ratelimiter/ratelimiter.go
Aliaksandr Valialkin 7edb5f77f1
app/vmagent: properly shutdown when -maxIngestionRate limit is reached
The remotewrite.Stop() expects that there are no pending calls to TryPush().
This means that the ingestionRateLimiter.Register() must be unblocked inside TryPush() when calling remotewrite.Stop().
Provide remotewrite.StopIngestionRateLimiter() function for unblocking the rate limiter before calling the remotewrite.Stop().

While at it, move the rate limiter into lib/ratelimiter package, since it has two users.
Also move the description of the feature to the correct place at docs/CHANGELOG.md.
Also cross-reference -remoteWrite.rateLimit and -maxIngestionRate command-line flags.

This is a follow-up for 02bccd1eb9
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5900
2024-04-03 02:41:11 +03:00

78 lines
1.8 KiB
Go

package ratelimiter
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
// RateLimiter limits per-second rate of arbitrary resources.
//
// Call Register() for registering the given amounts of resources.
type RateLimiter struct {
// perSecondLimit is the per-second limit of resources.
perSecondLimit int64
// stopCh is used for unbloking rate limiting.
stopCh <-chan struct{}
// mu protects budget and deadline from concurrent access.
mu sync.Mutex
// The current budget. It is increased by perSecondLimit every second.
budget int64
// The next deadline for increasing the budget by perSecondLimit.
deadline time.Time
// limitReached is a counter, which is increased every time the limit is reached.
limitReached *metrics.Counter
}
// New creates new rate limiter with the given perSecondLimit.
//
// stopCh is used for unblocking Register() calls when the rate limiter is no longer needed.
func New(perSecondLimit int64, limitReached *metrics.Counter, stopCh <-chan struct{}) *RateLimiter {
return &RateLimiter{
perSecondLimit: perSecondLimit,
stopCh: stopCh,
limitReached: limitReached,
}
}
// Register registers count resources.
//
// Register blocks if the given per-second rate limit is exceeded.
// It may be forcibly unblocked by closing stopCh passed to New().
func (rl *RateLimiter) Register(count int) {
if rl == nil {
return
}
limit := rl.perSecondLimit
if limit <= 0 {
return
}
rl.mu.Lock()
defer rl.mu.Unlock()
for rl.budget <= 0 {
if d := time.Until(rl.deadline); d > 0 {
rl.limitReached.Inc()
t := timerpool.Get(d)
select {
case <-rl.stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
}
}
rl.budget += limit
rl.deadline = time.Now().Add(time.Second)
}
rl.budget -= int64(count)
}