diff --git a/app/vmagent/main.go b/app/vmagent/main.go index bb50d56f8..107d701b4 100644 --- a/app/vmagent/main.go +++ b/app/vmagent/main.go @@ -125,6 +125,7 @@ func main() { } logger.Infof("starting vmagent at %q...", listenAddrs) startTime := time.Now() + remotewrite.StartIngestionRateLimiter() remotewrite.Init() common.StartUnmarshalWorkers() if len(*influxListenAddr) > 0 { @@ -152,6 +153,7 @@ func main() { pushmetrics.Init() sig := procutil.WaitForSigterm() logger.Infof("received signal %s", sig) + remotewrite.StopIngestionRateLimiter() pushmetrics.Stop() startTime = time.Now() diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index c76c1cab8..14ef10f58 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -17,6 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil" "github.com/VictoriaMetrics/metrics" @@ -30,7 +31,7 @@ var ( rateLimit = flagutil.NewArrayInt("remoteWrite.rateLimit", 0, "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+ "By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+ - "is sent after temporary unavailability of the remote storage") + "is sent after temporary unavailability of the remote storage. See also -maxIngestionRate") sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to the corresponding -remoteWrite.url") proxyURL = flagutil.NewArrayString("remoteWrite.proxyURL", "Optional proxy URL for writing data to the corresponding -remoteWrite.url. "+ "Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234") @@ -91,7 +92,7 @@ type client struct { authCfg *promauth.Config awsCfg *awsapi.Config - rl *rateLimiter + rl *ratelimiter.RateLimiter bytesSent *metrics.Counter blocksSent *metrics.Counter @@ -180,7 +181,7 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) { limitReached := metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rate_limit_reached_total{url=%q}`, c.sanitizedURL)) if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec > 0 { logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", bytesPerSec, sanitizedURL) - c.rl = newRateLimiter(time.Second, int64(bytesPerSec), limitReached, c.stopCh) + c.rl = ratelimiter.New(int64(bytesPerSec), limitReached, c.stopCh) } c.bytesSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_bytes_sent_total{url=%q}`, c.sanitizedURL)) c.blocksSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_blocks_sent_total{url=%q}`, c.sanitizedURL)) @@ -395,7 +396,7 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) { // The function returns false only if c.stopCh is closed. // Otherwise it tries sending the block to remote storage indefinitely. func (c *client) sendBlockHTTP(block []byte) bool { - c.rl.register(len(block)) + c.rl.Register(len(block)) maxRetryDuration := timeutil.AddJitterToDuration(time.Minute) retryDuration := timeutil.AddJitterToDuration(time.Second) retriesCount := 0 @@ -478,60 +479,3 @@ again: } var remoteWriteRejectedLogger = logger.WithThrottler("remoteWriteRejected", 5*time.Second) - -type rateLimiter struct { - interval time.Duration - perIntervalLimit int64 - stopCh <-chan struct{} - - // mu protects budget and deadline from concurrent access. - mu sync.Mutex - - // The current budget. It is increased by perIntervalLimit every interval. - budget int64 - - // The next deadline for increasing the budget by perIntervalLimit - deadline time.Time - - limitReached *metrics.Counter -} - -func newRateLimiter(interval time.Duration, perIntervalLimit int64, limitReached *metrics.Counter, stopCh <-chan struct{}) *rateLimiter { - return &rateLimiter{ - interval: interval, - perIntervalLimit: perIntervalLimit, - stopCh: stopCh, - limitReached: limitReached, - } -} - -func (rl *rateLimiter) register(count int) { - if rl == nil { - return - } - - limit := rl.perIntervalLimit - if limit <= 0 { - return - } - - rl.mu.Lock() - defer rl.mu.Unlock() - - for rl.budget <= 0 { - if d := time.Until(rl.deadline); d > 0 { - rl.limitReached.Inc() - t := timerpool.Get(d) - select { - case <-rl.stopCh: - timerpool.Put(t) - return - case <-t.C: - timerpool.Put(t) - } - } - rl.budget += limit - rl.deadline = time.Now().Add(rl.interval) - } - rl.budget -= int64(count) -} diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 6ae13e81c..8e6de0737 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -27,6 +27,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" "github.com/VictoriaMetrics/metrics" @@ -81,8 +82,8 @@ var ( "Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+ "Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter") - maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. "+ - "If the limit is exceeded, the ingestion rate will be throttled.") + maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+ + "By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit") streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation.html . "+ @@ -181,12 +182,7 @@ func Init() { return float64(dailySeriesLimiter.CurrentItems()) }) } - if *maxIngestionRate > 0 { - // Start ingestion rate limiter. - ingestionRateLimitReached = metrics.NewCounter(`vmagent_max_ingestion_rate_limit_reached_total`) - ingestionRateLimiterStopCh = make(chan struct{}) - ingestionRateLimiter = newRateLimiter(time.Second, int64(*maxIngestionRate), ingestionRateLimitReached, ingestionRateLimiterStopCh) - } + if *queues > maxQueues { *queues = maxQueues } @@ -351,13 +347,39 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { var configReloaderStopCh = make(chan struct{}) var configReloaderWG sync.WaitGroup +// StartIngestionRateLimiter starts ingestion rate limiter. +// +// Ingestion rate limiter must be started before Init() call. +// +// StopIngestionRateLimiter must be called before Stop() call in order to unblock all the callers +// to ingestion rate limiter. Otherwise deadlock may occur at Stop() call. +func StartIngestionRateLimiter() { + if *maxIngestionRate <= 0 { + return + } + ingestionRateLimitReached := metrics.NewCounter(`vmagent_max_ingestion_rate_limit_reached_total`) + ingestionRateLimiterStopCh = make(chan struct{}) + ingestionRateLimiter = ratelimiter.New(int64(*maxIngestionRate), ingestionRateLimitReached, ingestionRateLimiterStopCh) +} + +// StopIngestionRateLimiter stops ingestion rate limiter. +func StopIngestionRateLimiter() { + if ingestionRateLimiterStopCh == nil { + return + } + close(ingestionRateLimiterStopCh) + ingestionRateLimiterStopCh = nil +} + +var ( + ingestionRateLimiter *ratelimiter.RateLimiter + ingestionRateLimiterStopCh chan struct{} +) + // Stop stops remotewrite. // // It is expected that nobody calls TryPush during and after the call to this func. func Stop() { - if ingestionRateLimiterStopCh != nil { - close(ingestionRateLimiterStopCh) - } close(configReloaderStopCh) configReloaderWG.Wait() @@ -481,7 +503,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, dropSamplesOnFailur } } - ingestionRateLimiter.register(samplesCount) + ingestionRateLimiter.Register(samplesCount) tssBlock := tss if i < len(tss) { @@ -633,10 +655,6 @@ func limitSeriesCardinality(tss []prompbmarshal.TimeSeries) []prompbmarshal.Time } var ( - ingestionRateLimiter *rateLimiter - ingestionRateLimiterStopCh chan struct{} - ingestionRateLimitReached *metrics.Counter - hourlySeriesLimiter *bloomfilter.Limiter dailySeriesLimiter *bloomfilter.Limiter diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 13175e04e..cebcc7de9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -52,6 +52,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus) outputs, which can be used for `increase` and `total` aggregations when the first sample of every new [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) must be ignored. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose `vm_streamaggr_flush_timeouts_total` and `vm_streamaggr_dedup_flush_timeouts_total` [counters](https://docs.victoriametrics.com/keyconcepts/#counter) at [`/metrics` page](https://docs.victoriametrics.com/#monitoring), which can be used for detecting flush timeouts for stream aggregation states. Expose also `vm_streamaggr_flush_duration_seconds` and `vm_streamaggr_dedup_flush_duration_seconds` [histograms](https://docs.victoriametrics.com/keyconcepts/#histogram) for monitoring the real flush durations of stream aggregation states. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve trace display for better visual separation of branches. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5926). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): ability to limit the ingestion rate via `-maxIngestionRate` command-line flag. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5900). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): use the provided `-remoteWrite.tlsServerName` as `Host` header in requests to `-remoteWrite.url`. This allows sending data to https remote storage by IP address instead of hostname. Thanks to @minor-fixes for initial idea and [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5802). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-remoteWrite.shardByURL.inverseLabels` cmd-line flag to enhance sharding logic across configured `-remoteWrite.url` URLs when `-remoteWrite.shardByURL.labels` is set. Thanks to @edma2 for the idea and [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5938). * FEATURE: optimize [`/api/v1/labels`](https://docs.victoriametrics.com/url-examples/#apiv1labels) and [`/api/v1/label/.../values`](https://docs.victoriametrics.com/url-examples/#apiv1labelvalues) when `match[]` filters contains metric name. For example, `/api/v1/label/instance/values?match[]=up` now works much faster than before. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5055). @@ -95,7 +96,6 @@ Released at 2024-03-01 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): allow filling gaps on graphs with interpolated lines as Grafana does. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5152) and [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5862). * FEATURE: [vmalert](https://docs.victoriametrics.com/#vmalert): support filtering by group, rule or labels in [vmalert's UI](https://docs.victoriametrics.com/vmalert/#web) for `/groups` and `/alerts` pages. See [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5791) by @victoramsantos. * FEATURE: [docker-compose](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#docker-compose-environment-for-victoriametrics): create a separate [docker-compose environment](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/docker-compose-victorialogs.yml) for VictoriaLogs installation, including fluentbit and [VictoriaLogs Grafana datasource](https://github.com/VictoriaMetrics/victorialogs-datasource). See [these docs](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#victorialogs-server) for details. -* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add ingestion rate limiting with new flag `-maxIngestionRate`. This flag can be used to limit the number of samples ingested by vmagent per second. * FEATURE: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager/): wait for up 30 seconds before making a [snapshot](https://docs.victoriametrics.com/#how-to-work-with-snapshots) for backup if `vmstorage` is temporarily unavailalbe. This should prevent from `vmbackupmanager` termination in this case. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5859). * BUGFIX: downgrade Go builder from `1.22.0` to `1.21.7`, since `1.22.0` contains [the bug](https://github.com/golang/go/issues/65705), which can lead to deadlocked HTTP connections to remote storage systems, scrape targets and service discovery endpoints at [vmagent](https://docs.victoriametrics.com/vmagent/). This may result in incorrect service discovery, target scraping and failed sending samples to remote storage. diff --git a/docs/vmagent.md b/docs/vmagent.md index c19b09077..00277545a 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1805,7 +1805,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -maxConcurrentInserts int The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 32) -maxIngestionRate int - The maximum number of samples vmagent can receive per second. If the limit is exceeded, the ingestion rate will be throttled (default 0) + The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit -maxInsertRequestSize size The maximum size in bytes of a single Prometheus remote_write API request Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432) @@ -2091,7 +2091,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -remoteWrite.queues int The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues isn't enough for sending high volume of collected data to remote storage. Default value is 2 * numberOfAvailableCPUs (default 32) -remoteWrite.rateLimit array - Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data is sent after temporary unavailability of the remote storage (default 0) + Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data is sent after temporary unavailability of the remote storage. See also -maxIngestionRate (default 0) Supports array of values separated by comma or specified via multiple flags. Empty values are set to default value. -remoteWrite.relabelConfig string diff --git a/lib/ratelimiter/ratelimiter.go b/lib/ratelimiter/ratelimiter.go new file mode 100644 index 000000000..abe0850d9 --- /dev/null +++ b/lib/ratelimiter/ratelimiter.go @@ -0,0 +1,78 @@ +package ratelimiter + +import ( + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" + "github.com/VictoriaMetrics/metrics" +) + +// RateLimiter limits per-second rate of arbitrary resources. +// +// Call Register() for registering the given amounts of resources. +type RateLimiter struct { + // perSecondLimit is the per-second limit of resources. + perSecondLimit int64 + + // stopCh is used for unbloking rate limiting. + stopCh <-chan struct{} + + // mu protects budget and deadline from concurrent access. + mu sync.Mutex + + // The current budget. It is increased by perSecondLimit every second. + budget int64 + + // The next deadline for increasing the budget by perSecondLimit. + deadline time.Time + + // limitReached is a counter, which is increased every time the limit is reached. + limitReached *metrics.Counter +} + +// New creates new rate limiter with the given perSecondLimit. +// +// stopCh is used for unblocking Register() calls when the rate limiter is no longer needed. +func New(perSecondLimit int64, limitReached *metrics.Counter, stopCh <-chan struct{}) *RateLimiter { + return &RateLimiter{ + perSecondLimit: perSecondLimit, + stopCh: stopCh, + limitReached: limitReached, + } +} + +// Register registers count resources. +// +// Register blocks if the given per-second rate limit is exceeded. +// It may be forcibly unblocked by closing stopCh passed to New(). +func (rl *RateLimiter) Register(count int) { + if rl == nil { + return + } + + limit := rl.perSecondLimit + if limit <= 0 { + return + } + + rl.mu.Lock() + defer rl.mu.Unlock() + + for rl.budget <= 0 { + if d := time.Until(rl.deadline); d > 0 { + rl.limitReached.Inc() + t := timerpool.Get(d) + select { + case <-rl.stopCh: + timerpool.Put(t) + return + case <-t.C: + timerpool.Put(t) + } + } + rl.budget += limit + rl.deadline = time.Now().Add(time.Second) + } + rl.budget -= int64(count) +}