mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmagent: add -remoteWrite.rateLimit command-line flag for limiting data rate to remote storage
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1035
This commit is contained in:
parent
68a66be811
commit
29bf531f7d
2 changed files with 55 additions and 0 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -20,6 +21,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
rateLimit = flagutil.NewArray("remoteWrite.rateLimit", "Optional rate limit in bytes per second for data sent to -remoteWrite.url. "+
|
||||||
|
"By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data "+
|
||||||
|
"is sent after temporary unavailability of the remote storage")
|
||||||
sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", "Timeout for sending a single block of data to -remoteWrite.url")
|
sendTimeout = flagutil.NewArrayDuration("remoteWrite.sendTimeout", "Timeout for sending a single block of data to -remoteWrite.url")
|
||||||
proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+
|
proxyURL = flagutil.NewArray("remoteWrite.proxyURL", "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. "+
|
||||||
"Example: -remoteWrite.proxyURL=socks5://proxy:1234")
|
"Example: -remoteWrite.proxyURL=socks5://proxy:1234")
|
||||||
|
@ -49,6 +53,8 @@ type client struct {
|
||||||
fq *persistentqueue.FastQueue
|
fq *persistentqueue.FastQueue
|
||||||
hc *http.Client
|
hc *http.Client
|
||||||
|
|
||||||
|
rl rateLimiter
|
||||||
|
|
||||||
bytesSent *metrics.Counter
|
bytesSent *metrics.Counter
|
||||||
blocksSent *metrics.Counter
|
blocksSent *metrics.Counter
|
||||||
requestDuration *metrics.Histogram
|
requestDuration *metrics.Histogram
|
||||||
|
@ -113,6 +119,18 @@ func newClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persistentqu
|
||||||
},
|
},
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
if bytesPerSec := rateLimit.GetOptionalArg(argIdx); bytesPerSec != "" {
|
||||||
|
limit, err := strconv.ParseInt(bytesPerSec, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("cannot parse -remoteWrite.rateLimit=%q for -remoteWrite.url=%q: %s", bytesPerSec, sanitizedURL, err)
|
||||||
|
}
|
||||||
|
if limit > 0 {
|
||||||
|
logger.Infof("applying %d bytes per second rate limit for -remoteWrite.url=%q", limit, sanitizedURL)
|
||||||
|
c.rl.perSecondLimit = limit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.rl.limitReached = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remote_write_rate_limit_reached_total{url=%q}`, c.sanitizedURL))
|
||||||
|
|
||||||
c.bytesSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_bytes_sent_total{url=%q}`, c.sanitizedURL))
|
c.bytesSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_bytes_sent_total{url=%q}`, c.sanitizedURL))
|
||||||
c.blocksSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_blocks_sent_total{url=%q}`, c.sanitizedURL))
|
c.blocksSent = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_blocks_sent_total{url=%q}`, c.sanitizedURL))
|
||||||
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.sanitizedURL))
|
c.requestDuration = metrics.GetOrCreateHistogram(fmt.Sprintf(`vmagent_remotewrite_duration_seconds{url=%q}`, c.sanitizedURL))
|
||||||
|
@ -189,6 +207,7 @@ func (c *client) runWorker() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *client) sendBlock(block []byte) {
|
func (c *client) sendBlock(block []byte) {
|
||||||
|
c.rl.register(len(block), c.stopCh)
|
||||||
retryDuration := time.Second
|
retryDuration := time.Second
|
||||||
retriesCount := 0
|
retriesCount := 0
|
||||||
c.bytesSent.Add(len(block))
|
c.bytesSent.Add(len(block))
|
||||||
|
@ -271,3 +290,38 @@ again:
|
||||||
c.retriesCount.Inc()
|
c.retriesCount.Inc()
|
||||||
goto again
|
goto again
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rateLimiter struct {
|
||||||
|
perSecondLimit int64
|
||||||
|
|
||||||
|
// The current budget. It is increased by perSecondLimit every second.
|
||||||
|
budget int64
|
||||||
|
|
||||||
|
// The next deadline for increasing the budget by perSecondLimit
|
||||||
|
deadline time.Time
|
||||||
|
|
||||||
|
limitReached *metrics.Counter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rl *rateLimiter) register(dataLen int, stopCh <-chan struct{}) {
|
||||||
|
limit := rl.perSecondLimit
|
||||||
|
if limit <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for rl.budget <= 0 {
|
||||||
|
now := time.Now()
|
||||||
|
if d := rl.deadline.Sub(now); d > 0 {
|
||||||
|
rl.limitReached.Inc()
|
||||||
|
t := time.NewTimer(retryDuration)
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
t.Stop()
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rl.budget += limit
|
||||||
|
rl.deadline = now.Add(time.Second)
|
||||||
|
}
|
||||||
|
rl.budget -= int64(dataLen)
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
* FEATURE: added `-loggerTimezone` command-line flag for adjusting time zone for timestamps in log messages. By default UTC is used.
|
* FEATURE: added `-loggerTimezone` command-line flag for adjusting time zone for timestamps in log messages. By default UTC is used.
|
||||||
* FEATURE: added `-search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default.
|
* FEATURE: added `-search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default.
|
||||||
* FEATURE: vmalert: added `-datasource.queryStep` command-line flag for passing `step` query arg to `/api/v1/query` endpoint. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1025
|
* FEATURE: vmalert: added `-datasource.queryStep` command-line flag for passing `step` query arg to `/api/v1/query` endpoint. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1025
|
||||||
|
* FEATURE: vmagent: added `-remoteWrite.rateLimit` command-line flag for limiting data transfer rate to `-remoteWrite.url`. This may be useful when big amounts of buffered data is sent after temporarily unavailability of the remote storage. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1035
|
||||||
|
|
||||||
* BUGFIX: vmagent: reduce the HTTP reconnection rate to scrape targets. Previously vmagent could errorneusly close HTTP keep-alive connections more often than needed.
|
* BUGFIX: vmagent: reduce the HTTP reconnection rate to scrape targets. Previously vmagent could errorneusly close HTTP keep-alive connections more often than needed.
|
||||||
* BUGFIX: vmagent: retry scrape and service discovery requests when the remote server closes HTTP keep-alive connection. Previously `disable_keepalive: true` option could be used under `scrape_configs` section when working with such servers.
|
* BUGFIX: vmagent: retry scrape and service discovery requests when the remote server closes HTTP keep-alive connection. Previously `disable_keepalive: true` option could be used under `scrape_configs` section when working with such servers.
|
||||||
|
|
Loading…
Reference in a new issue