From d0f5a9d77a2221c2f7115e3d940679d46126659d Mon Sep 17 00:00:00 2001 From: Yury Akudovich Date: Fri, 23 Aug 2024 14:05:51 +0200 Subject: [PATCH] app/vmagent: add `remoteWrite.retryMinInterval` and `remoteWrite.retryMaxTime` flags (#6289) ## Describe Your Changes Add RemoteWrite Retry Controls This PR introduces two new flags to the remote write functionality: - remoteWrite.retryMinInterval - remoteWrite.retryMaxTime These flags provide finer control over the retry behavior for remoteWrite operations, allowing users to customize the minimum interval between retries and the maximum duration for retry attempts. Fixes #5486. ## Checklist - [x] The following checks are mandatory: My change adheres [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/contributing/). --------- Signed-off-by: Yury Akudovich Co-authored-by: hagen1778 --- app/vmagent/remotewrite/client.go | 31 +++++++++++++++++++------------ docs/changelog/CHANGELOG.md | 1 + docs/vmagent.md | 8 ++++++++ 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 7ffb50a2f..49b92b7bd 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -34,8 +34,10 @@ var ( rateLimit = flagutil.NewArrayInt("remoteWrite.rateLimit", 0, "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. "+ "By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+ "is sent after temporary unavailability of the remote storage. See also -maxIngestionRate") - sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to the corresponding -remoteWrite.url") - proxyURL = flagutil.NewArrayString("remoteWrite.proxyURL", "Optional proxy URL for writing data to the corresponding -remoteWrite.url. "+ + sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", time.Minute, "Timeout for sending a single block of data to the corresponding -remoteWrite.url") + retryMinInterval = flagutil.NewArrayDuration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval") + retryMaxTime = flagutil.NewArrayDuration("remoteWrite.retryMaxTime", time.Minute, "The max time spent on retry attempts to send a block of data to the corresponding -remoteWrite.url. Change this value if it is expected for -remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval") + proxyURL = flagutil.NewArrayString("remoteWrite.proxyURL", "Optional proxy URL for writing data to the corresponding -remoteWrite.url. "+ "Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234") tlsHandshakeTimeout = flagutil.NewArrayDuration("remoteWrite.tlsHandshakeTimeout", 20*time.Second, "The timeout for establishing tls connections to the corresponding -remoteWrite.url") @@ -90,6 +92,9 @@ type client struct { fq *persistentqueue.FastQueue hc *http.Client + retryMinInterval time.Duration + retryMaxTime time.Duration + sendBlock func(block []byte) bool authCfg *promauth.Config awsCfg *awsapi.Config @@ -143,13 +148,15 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste Timeout: sendTimeout.GetOptionalArg(argIdx), } c := &client{ - sanitizedURL: sanitizedURL, - remoteWriteURL: remoteWriteURL, - authCfg: authCfg, - awsCfg: awsCfg, - fq: fq, - hc: hc, - stopCh: make(chan struct{}), + sanitizedURL: sanitizedURL, + remoteWriteURL: remoteWriteURL, + authCfg: authCfg, + awsCfg: awsCfg, + fq: fq, + hc: hc, + retryMinInterval: retryMinInterval.GetOptionalArg(argIdx), + retryMaxTime: retryMaxTime.GetOptionalArg(argIdx), + stopCh: make(chan struct{}), } c.sendBlock = c.sendBlockHTTP @@ -396,11 +403,11 @@ func (c *client) newRequest(url string, body []byte) (*http.Request, error) { // sendBlockHTTP sends the given block to c.remoteWriteURL. // // The function returns false only if c.stopCh is closed. -// Otherwise it tries sending the block to remote storage indefinitely. +// Otherwise, it tries sending the block to remote storage indefinitely. func (c *client) sendBlockHTTP(block []byte) bool { c.rl.Register(len(block)) - maxRetryDuration := timeutil.AddJitterToDuration(time.Minute) - retryDuration := timeutil.AddJitterToDuration(time.Second) + maxRetryDuration := timeutil.AddJitterToDuration(c.retryMaxTime) + retryDuration := timeutil.AddJitterToDuration(c.retryMinInterval) retriesCount := 0 again: diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 8d974ffec..e3333b4f4 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -28,6 +28,7 @@ The value of `instance` label for those scrape targets will be changed from `