2020-02-23 11:35:47 +00:00
package remotewrite
import (
2020-07-20 16:27:25 +00:00
"bytes"
2023-08-23 22:08:04 +00:00
"errors"
2020-02-23 11:35:47 +00:00
"fmt"
2022-08-21 21:13:44 +00:00
"io"
2020-07-20 16:27:25 +00:00
"net/http"
"net/url"
2024-09-24 10:44:03 +00:00
"strconv"
2020-02-23 11:35:47 +00:00
"strings"
"sync"
"time"
2022-05-04 17:24:19 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
2020-05-06 13:51:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2024-07-15 21:00:14 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-12 14:20:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2023-02-26 20:07:30 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
2024-03-30 04:38:29 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
2021-01-26 22:23:10 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2024-01-22 16:12:37 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
2024-09-24 10:44:03 +00:00
"github.com/VictoriaMetrics/metrics"
2020-02-23 11:35:47 +00:00
)
var (
2023-02-26 20:07:30 +00:00
forcePromProto = flagutil . NewArrayBool ( "remoteWrite.forcePromProto" , "Whether to force Prometheus remote write protocol for sending data " +
2024-04-17 23:31:37 +00:00
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol" )
2023-02-26 20:07:30 +00:00
forceVMProto = flagutil . NewArrayBool ( "remoteWrite.forceVMProto" , "Whether to force VictoriaMetrics remote write protocol for sending data " +
2024-04-17 23:31:37 +00:00
"to the corresponding -remoteWrite.url . See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol" )
2023-02-26 20:07:30 +00:00
2023-08-12 11:17:55 +00:00
rateLimit = flagutil . NewArrayInt ( "remoteWrite.rateLimit" , 0 , "Optional rate limit in bytes per second for data sent to the corresponding -remoteWrite.url. " +
2023-05-10 07:50:41 +00:00
"By default, the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data " +
2024-03-30 04:38:29 +00:00
"is sent after temporary unavailability of the remote storage. See also -maxIngestionRate" )
2024-08-23 12:05:51 +00:00
sendTimeout = flagutil . NewArrayDuration ( "remoteWrite.sendTimeout" , time . Minute , "Timeout for sending a single block of data to the corresponding -remoteWrite.url" )
2024-10-08 11:14:38 +00:00
retryMinInterval = flagutil . NewArrayDuration ( "remoteWrite.retryMinInterval" , time . Second , "The minimum delay between retry attempts to send a block of data to the corresponding -remoteWrite.url. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxTime" )
2024-08-23 12:05:51 +00:00
retryMaxTime = flagutil . NewArrayDuration ( "remoteWrite.retryMaxTime" , time . Minute , "The max time spent on retry attempts to send a block of data to the corresponding -remoteWrite.url. Change this value if it is expected for -remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval" )
proxyURL = flagutil . NewArrayString ( "remoteWrite.proxyURL" , "Optional proxy URL for writing data to the corresponding -remoteWrite.url. " +
2022-06-30 17:15:56 +00:00
"Supported proxies: http, https, socks5. Example: -remoteWrite.proxyURL=socks5://proxy:1234" )
2020-02-23 11:35:47 +00:00
2024-06-03 08:26:57 +00:00
tlsHandshakeTimeout = flagutil . NewArrayDuration ( "remoteWrite.tlsHandshakeTimeout" , 20 * time . Second , "The timeout for establishing tls connections to the corresponding -remoteWrite.url" )
2022-06-30 17:15:56 +00:00
tlsInsecureSkipVerify = flagutil . NewArrayBool ( "remoteWrite.tlsInsecureSkipVerify" , "Whether to skip tls verification when connecting to the corresponding -remoteWrite.url" )
2022-10-01 15:26:05 +00:00
tlsCertFile = flagutil . NewArrayString ( "remoteWrite.tlsCertFile" , "Optional path to client-side TLS certificate file to use when connecting " +
2022-06-30 17:15:56 +00:00
"to the corresponding -remoteWrite.url" )
2022-10-01 15:26:05 +00:00
tlsKeyFile = flagutil . NewArrayString ( "remoteWrite.tlsKeyFile" , "Optional path to client-side TLS certificate key to use when connecting to the corresponding -remoteWrite.url" )
tlsCAFile = flagutil . NewArrayString ( "remoteWrite.tlsCAFile" , "Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. " +
2023-05-10 07:50:41 +00:00
"By default, system CA is used" )
2022-10-01 15:26:05 +00:00
tlsServerName = flagutil . NewArrayString ( "remoteWrite.tlsServerName" , "Optional TLS server name to use for connections to the corresponding -remoteWrite.url. " +
2023-05-10 07:50:41 +00:00
"By default, the server name from -remoteWrite.url is used" )
2020-02-23 11:35:47 +00:00
2022-10-01 15:26:05 +00:00
headers = flagutil . NewArrayString ( "remoteWrite.headers" , "Optional HTTP headers to send with each request to the corresponding -remoteWrite.url. " +
2022-06-30 17:15:56 +00:00
"For example, -remoteWrite.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteWrite.url. " +
2022-06-30 17:00:03 +00:00
"Multiple headers must be delimited by '^^': -remoteWrite.headers='header1:value1^^header2:value2'" )
2022-10-01 15:26:05 +00:00
basicAuthUsername = flagutil . NewArrayString ( "remoteWrite.basicAuth.username" , "Optional basic auth username to use for the corresponding -remoteWrite.url" )
basicAuthPassword = flagutil . NewArrayString ( "remoteWrite.basicAuth.password" , "Optional basic auth password to use for the corresponding -remoteWrite.url" )
basicAuthPasswordFile = flagutil . NewArrayString ( "remoteWrite.basicAuth.passwordFile" , "Optional path to basic auth password to use for the corresponding -remoteWrite.url. " +
2022-06-30 17:15:56 +00:00
"The file is re-read every second" )
2022-10-01 15:26:05 +00:00
bearerToken = flagutil . NewArrayString ( "remoteWrite.bearerToken" , "Optional bearer auth token to use for the corresponding -remoteWrite.url" )
bearerTokenFile = flagutil . NewArrayString ( "remoteWrite.bearerTokenFile" , "Optional path to bearer token file to use for the corresponding -remoteWrite.url. " +
2022-06-30 17:15:56 +00:00
"The token is re-read from the file every second" )
2021-05-22 13:20:18 +00:00
2022-10-01 15:26:05 +00:00
oauth2ClientID = flagutil . NewArrayString ( "remoteWrite.oauth2.clientID" , "Optional OAuth2 clientID to use for the corresponding -remoteWrite.url" )
oauth2ClientSecret = flagutil . NewArrayString ( "remoteWrite.oauth2.clientSecret" , "Optional OAuth2 clientSecret to use for the corresponding -remoteWrite.url" )
oauth2ClientSecretFile = flagutil . NewArrayString ( "remoteWrite.oauth2.clientSecretFile" , "Optional OAuth2 clientSecretFile to use for the corresponding -remoteWrite.url" )
2023-12-20 19:35:16 +00:00
oauth2EndpointParams = flagutil . NewArrayString ( "remoteWrite.oauth2.endpointParams" , "Optional OAuth2 endpoint parameters to use for the corresponding -remoteWrite.url . " +
` The endpoint parameters must be set in JSON format: { "param1":"value1",...,"paramN":"valueN"} ` )
oauth2TokenURL = flagutil . NewArrayString ( "remoteWrite.oauth2.tokenUrl" , "Optional OAuth2 tokenURL to use for the corresponding -remoteWrite.url" )
oauth2Scopes = flagutil . NewArrayString ( "remoteWrite.oauth2.scopes" , "Optional OAuth2 scopes to use for the corresponding -remoteWrite.url. Scopes must be delimited by ';'" )
2022-05-04 17:24:19 +00:00
2022-06-30 17:15:56 +00:00
awsUseSigv4 = flagutil . NewArrayBool ( "remoteWrite.aws.useSigv4" , "Enables SigV4 request signing for the corresponding -remoteWrite.url. " +
"It is expected that other -remoteWrite.aws.* command-line flags are set if sigv4 request signing is enabled" )
2022-10-01 15:26:05 +00:00
awsEC2Endpoint = flagutil . NewArrayString ( "remoteWrite.aws.ec2Endpoint" , "Optional AWS EC2 API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsSTSEndpoint = flagutil . NewArrayString ( "remoteWrite.aws.stsEndpoint" , "Optional AWS STS API endpoint to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsRegion = flagutil . NewArrayString ( "remoteWrite.aws.region" , "Optional AWS region to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsRoleARN = flagutil . NewArrayString ( "remoteWrite.aws.roleARN" , "Optional AWS roleARN to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsAccessKey = flagutil . NewArrayString ( "remoteWrite.aws.accessKey" , "Optional AWS AccessKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
awsService = flagutil . NewArrayString ( "remoteWrite.aws.service" , "Optional AWS Service to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set. " +
2022-06-30 17:15:56 +00:00
"Defaults to \"aps\"" )
2022-10-01 15:26:05 +00:00
awsSecretKey = flagutil . NewArrayString ( "remoteWrite.aws.secretKey" , "Optional AWS SecretKey to use for the corresponding -remoteWrite.url if -remoteWrite.aws.useSigv4 is set" )
2020-02-23 11:35:47 +00:00
)
type client struct {
2023-02-26 20:07:30 +00:00
sanitizedURL string
remoteWriteURL string
// Whether to use VictoriaMetrics remote write protocol for sending the data to remoteWriteURL
useVMProto bool
fq * persistentqueue . FastQueue
hc * http . Client
2020-02-23 11:35:47 +00:00
2024-08-23 12:05:51 +00:00
retryMinInterval time . Duration
retryMaxTime time . Duration
2021-09-28 21:52:07 +00:00
sendBlock func ( block [ ] byte ) bool
authCfg * promauth . Config
2022-05-04 17:24:19 +00:00
awsCfg * awsapi . Config
2021-05-22 13:20:18 +00:00
2024-03-30 04:38:29 +00:00
rl * ratelimiter . RateLimiter
2021-01-26 22:19:35 +00:00
2020-12-15 18:39:12 +00:00
bytesSent * metrics . Counter
blocksSent * metrics . Counter
2020-02-23 11:35:47 +00:00
requestDuration * metrics . Histogram
requestsOKCount * metrics . Counter
errorsCount * metrics . Counter
2020-11-01 22:43:51 +00:00
packetsDropped * metrics . Counter
2022-05-02 19:20:05 +00:00
rateLimit * metrics . Gauge
2020-02-23 11:35:47 +00:00
retriesCount * metrics . Counter
2021-08-15 10:32:40 +00:00
sendDuration * metrics . FloatCounter
2020-02-23 11:35:47 +00:00
wg sync . WaitGroup
stopCh chan struct { }
}
2023-02-26 20:07:30 +00:00
func newHTTPClient ( argIdx int , remoteWriteURL , sanitizedURL string , fq * persistentqueue . FastQueue , concurrency int ) * client {
2021-05-22 14:59:23 +00:00
authCfg , err := getAuthConfig ( argIdx )
2020-07-20 16:27:25 +00:00
if err != nil {
2023-10-25 21:19:33 +00:00
logger . Fatalf ( "cannot initialize auth config for -remoteWrite.url=%q: %s" , remoteWriteURL , err )
}
2022-05-04 17:24:19 +00:00
awsCfg , err := getAWSAPIConfig ( argIdx )
if err != nil {
2023-10-25 21:19:33 +00:00
logger . Fatalf ( "cannot initialize AWS Config for -remoteWrite.url=%q: %s" , remoteWriteURL , err )
2022-05-04 17:24:19 +00:00
}
2020-07-20 16:27:25 +00:00
tr := & http . Transport {
2024-07-15 21:00:14 +00:00
DialContext : netutil . NewStatDialFunc ( "vmagent_remotewrite" ) ,
2024-02-13 00:36:35 +00:00
TLSHandshakeTimeout : tlsHandshakeTimeout . GetOptionalArg ( argIdx ) ,
2020-07-20 16:27:25 +00:00
MaxConnsPerHost : 2 * concurrency ,
2020-08-04 17:59:55 +00:00
MaxIdleConnsPerHost : 2 * concurrency ,
IdleConnTimeout : time . Minute ,
WriteBufferSize : 64 * 1024 ,
2020-07-20 16:27:25 +00:00
}
pURL := proxyURL . GetOptionalArg ( argIdx )
if len ( pURL ) > 0 {
if ! strings . Contains ( pURL , "://" ) {
logger . Fatalf ( "cannot parse -remoteWrite.proxyURL=%q: it must start with `http://`, `https://` or `socks5://`" , pURL )
}
2021-10-26 18:21:08 +00:00
pu , err := url . Parse ( pURL )
2020-07-20 16:27:25 +00:00
if err != nil {
logger . Fatalf ( "cannot parse -remoteWrite.proxyURL=%q: %s" , pURL , err )
}
2021-10-26 18:21:08 +00:00
tr . Proxy = http . ProxyURL ( pu )
2020-07-20 16:27:25 +00:00
}
2023-02-24 01:36:52 +00:00
hc := & http . Client {
2024-04-03 21:46:40 +00:00
Transport : authCfg . NewRoundTripper ( tr ) ,
2023-08-12 11:17:55 +00:00
Timeout : sendTimeout . GetOptionalArg ( argIdx ) ,
2023-02-24 01:36:52 +00:00
}
2020-02-23 11:35:47 +00:00
c := & client {
2024-08-23 12:05:51 +00:00
sanitizedURL : sanitizedURL ,
remoteWriteURL : remoteWriteURL ,
authCfg : authCfg ,
awsCfg : awsCfg ,
fq : fq ,
hc : hc ,
retryMinInterval : retryMinInterval . GetOptionalArg ( argIdx ) ,
retryMaxTime : retryMaxTime . GetOptionalArg ( argIdx ) ,
stopCh : make ( chan struct { } ) ,
2020-02-23 11:35:47 +00:00
}
2021-09-28 21:52:07 +00:00
c . sendBlock = c . sendBlockHTTP
2023-02-26 20:07:30 +00:00
useVMProto := forceVMProto . GetOptionalArg ( argIdx )
usePromProto := forcePromProto . GetOptionalArg ( argIdx )
if useVMProto && usePromProto {
logger . Fatalf ( "-remoteWrite.useVMProto and -remoteWrite.usePromProto cannot be set simultaneously for -remoteWrite.url=%s" , sanitizedURL )
}
app/vmagent/remotewrite: follow-up for e3a756d82869f8c357b072f6e635ebfc7d65dd2c
- Document the fix
- Move the detection of VictoriaMetrics remoteWrite protocol from client.init() to newHTTPClient()
This simplifies the fix to the following diff:
diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go
index 099899c19..70b904af4 100644
--- a/app/vmagent/remotewrite/client.go
+++ b/app/vmagent/remotewrite/client.go
@@ -151,10 +151,6 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
c.sendBlock = c.sendBlockHTTP
- return c
-}
-
-func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
useVMProto := forceVMProto.GetOptionalArg(argIdx)
usePromProto := forcePromProto.GetOptionalArg(argIdx)
if useVMProto && usePromProto {
@@ -173,6 +169,10 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
}
c.useVMProto = useVMProto
+ return c
+}
+
+func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
2023-03-08 07:50:06 +00:00
if ! useVMProto && ! usePromProto {
2023-02-26 20:07:30 +00:00
// Auto-detect whether the remote storage supports VictoriaMetrics remote write protocol.
doRequest := func ( url string ) ( * http . Response , error ) {
return c . doRequest ( url , nil )
}
useVMProto = common . HandleVMProtoClientHandshake ( c . remoteWriteURL , doRequest )
if ! useVMProto {
logger . Infof ( "the remote storage at %q doesn't support VictoriaMetrics remote write protocol. Switching to Prometheus remote write protocol. " +
2024-04-17 23:31:37 +00:00
"See https://docs.victoriametrics.com/vmagent/#victoriametrics-remote-write-protocol" , sanitizedURL )
2023-02-26 20:07:30 +00:00
}
}
app/vmagent/remotewrite: follow-up for e3a756d82869f8c357b072f6e635ebfc7d65dd2c
- Document the fix
- Move the detection of VictoriaMetrics remoteWrite protocol from client.init() to newHTTPClient()
This simplifies the fix to the following diff:
diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go
index 099899c19..70b904af4 100644
--- a/app/vmagent/remotewrite/client.go
+++ b/app/vmagent/remotewrite/client.go
@@ -151,10 +151,6 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
c.sendBlock = c.sendBlockHTTP
- return c
-}
-
-func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
useVMProto := forceVMProto.GetOptionalArg(argIdx)
usePromProto := forcePromProto.GetOptionalArg(argIdx)
if useVMProto && usePromProto {
@@ -173,6 +169,10 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
}
c.useVMProto = useVMProto
+ return c
+}
+
+func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
2023-03-08 07:50:06 +00:00
c . useVMProto = useVMProto
2023-02-26 20:07:30 +00:00
app/vmagent/remotewrite: follow-up for e3a756d82869f8c357b072f6e635ebfc7d65dd2c
- Document the fix
- Move the detection of VictoriaMetrics remoteWrite protocol from client.init() to newHTTPClient()
This simplifies the fix to the following diff:
diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go
index 099899c19..70b904af4 100644
--- a/app/vmagent/remotewrite/client.go
+++ b/app/vmagent/remotewrite/client.go
@@ -151,10 +151,6 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
}
c.sendBlock = c.sendBlockHTTP
- return c
-}
-
-func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
useVMProto := forceVMProto.GetOptionalArg(argIdx)
usePromProto := forcePromProto.GetOptionalArg(argIdx)
if useVMProto && usePromProto {
@@ -173,6 +169,10 @@ func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
}
c.useVMProto = useVMProto
+ return c
+}
+
+func (c *client) init(argIdx, concurrency int, sanitizedURL string) {
2023-03-08 07:50:06 +00:00
return c
}
func ( c * client ) init ( argIdx , concurrency int , sanitizedURL string ) {
2024-03-21 16:14:49 +00:00
limitReached := metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_rate_limit_reached_total { url=%q} ` , c . sanitizedURL ) )
2023-08-12 11:17:55 +00:00
if bytesPerSec := rateLimit . GetOptionalArg ( argIdx ) ; bytesPerSec > 0 {
2021-02-01 12:27:05 +00:00
logger . Infof ( "applying %d bytes per second rate limit for -remoteWrite.url=%q" , bytesPerSec , sanitizedURL )
2024-03-30 04:38:29 +00:00
c . rl = ratelimiter . New ( int64 ( bytesPerSec ) , limitReached , c . stopCh )
2021-01-26 22:19:35 +00:00
}
2020-12-15 18:39:12 +00:00
c . bytesSent = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_bytes_sent_total { url=%q} ` , c . sanitizedURL ) )
c . blocksSent = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_blocks_sent_total { url=%q} ` , c . sanitizedURL ) )
2022-05-02 19:20:05 +00:00
c . rateLimit = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_rate_limit { url=%q} ` , c . sanitizedURL ) , func ( ) float64 {
2023-08-12 11:17:55 +00:00
return float64 ( rateLimit . GetOptionalArg ( argIdx ) )
2022-05-02 19:20:05 +00:00
} )
2020-09-16 19:34:01 +00:00
c . requestDuration = metrics . GetOrCreateHistogram ( fmt . Sprintf ( ` vmagent_remotewrite_duration_seconds { url=%q} ` , c . sanitizedURL ) )
c . requestsOKCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="2XX"} ` , c . sanitizedURL ) )
c . errorsCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_errors_total { url=%q} ` , c . sanitizedURL ) )
2020-11-01 22:43:51 +00:00
c . packetsDropped = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_packets_dropped_total { url=%q} ` , c . sanitizedURL ) )
2020-09-16 19:34:01 +00:00
c . retriesCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_retries_count_total { url=%q} ` , c . sanitizedURL ) )
2021-08-15 10:32:40 +00:00
c . sendDuration = metrics . GetOrCreateFloatCounter ( fmt . Sprintf ( ` vmagent_remotewrite_send_duration_seconds_total { url=%q} ` , c . sanitizedURL ) )
2022-07-18 11:31:35 +00:00
metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_queues { url=%q} ` , c . sanitizedURL ) , func ( ) float64 {
return float64 ( * queues )
} )
2020-03-03 11:08:17 +00:00
for i := 0 ; i < concurrency ; i ++ {
2020-02-23 11:35:47 +00:00
c . wg . Add ( 1 )
go func ( ) {
defer c . wg . Done ( )
c . runWorker ( )
} ( )
}
2020-09-16 19:34:01 +00:00
logger . Infof ( "initialized client for -remoteWrite.url=%q" , c . sanitizedURL )
2020-02-23 11:35:47 +00:00
}
func ( c * client ) MustStop ( ) {
close ( c . stopCh )
c . wg . Wait ( )
2020-09-16 19:34:01 +00:00
logger . Infof ( "stopped client for -remoteWrite.url=%q" , c . sanitizedURL )
2020-02-23 11:35:47 +00:00
}
2021-05-22 14:59:23 +00:00
func getAuthConfig ( argIdx int ) ( * promauth . Config , error ) {
2022-06-30 17:00:03 +00:00
headersValue := headers . GetOptionalArg ( argIdx )
2022-06-30 17:17:30 +00:00
var hdrs [ ] string
2022-06-30 17:00:03 +00:00
if headersValue != "" {
2022-06-30 17:17:30 +00:00
hdrs = strings . Split ( headersValue , "^^" )
2022-06-30 17:00:03 +00:00
}
2021-05-22 14:59:23 +00:00
username := basicAuthUsername . GetOptionalArg ( argIdx )
password := basicAuthPassword . GetOptionalArg ( argIdx )
passwordFile := basicAuthPasswordFile . GetOptionalArg ( argIdx )
var basicAuthCfg * promauth . BasicAuthConfig
if username != "" || password != "" || passwordFile != "" {
basicAuthCfg = & promauth . BasicAuthConfig {
Username : username ,
2021-11-05 12:41:14 +00:00
Password : promauth . NewSecret ( password ) ,
2021-05-22 14:59:23 +00:00
PasswordFile : passwordFile ,
}
}
token := bearerToken . GetOptionalArg ( argIdx )
tokenFile := bearerTokenFile . GetOptionalArg ( argIdx )
var oauth2Cfg * promauth . OAuth2Config
clientSecret := oauth2ClientSecret . GetOptionalArg ( argIdx )
clientSecretFile := oauth2ClientSecretFile . GetOptionalArg ( argIdx )
if clientSecretFile != "" || clientSecret != "" {
2023-12-20 19:35:16 +00:00
endpointParamsJSON := oauth2EndpointParams . GetOptionalArg ( argIdx )
endpointParams , err := flagutil . ParseJSONMap ( endpointParamsJSON )
if err != nil {
return nil , fmt . Errorf ( "cannot parse JSON for -remoteWrite.oauth2.endpointParams=%s: %w" , endpointParamsJSON , err )
}
2021-05-22 14:59:23 +00:00
oauth2Cfg = & promauth . OAuth2Config {
ClientID : oauth2ClientID . GetOptionalArg ( argIdx ) ,
2021-11-05 12:41:14 +00:00
ClientSecret : promauth . NewSecret ( clientSecret ) ,
2021-05-22 14:59:23 +00:00
ClientSecretFile : clientSecretFile ,
2023-12-20 19:35:16 +00:00
EndpointParams : endpointParams ,
2021-05-22 14:59:23 +00:00
TokenURL : oauth2TokenURL . GetOptionalArg ( argIdx ) ,
Scopes : strings . Split ( oauth2Scopes . GetOptionalArg ( argIdx ) , ";" ) ,
}
}
tlsCfg := & promauth . TLSConfig {
2020-05-12 14:20:55 +00:00
CAFile : tlsCAFile . GetOptionalArg ( argIdx ) ,
CertFile : tlsCertFile . GetOptionalArg ( argIdx ) ,
KeyFile : tlsKeyFile . GetOptionalArg ( argIdx ) ,
ServerName : tlsServerName . GetOptionalArg ( argIdx ) ,
2020-12-15 10:51:12 +00:00
InsecureSkipVerify : tlsInsecureSkipVerify . GetOptionalArg ( argIdx ) ,
2020-02-23 11:35:47 +00:00
}
2021-05-22 13:20:18 +00:00
2022-07-04 11:27:48 +00:00
opts := & promauth . Options {
BasicAuth : basicAuthCfg ,
BearerToken : token ,
BearerTokenFile : tokenFile ,
OAuth2 : oauth2Cfg ,
TLSConfig : tlsCfg ,
Headers : hdrs ,
}
authCfg , err := opts . NewConfig ( )
2021-05-22 13:20:18 +00:00
if err != nil {
2023-07-03 11:12:40 +00:00
return nil , fmt . Errorf ( "cannot populate auth config for remoteWrite idx: %d, err: %w" , argIdx , err )
2021-05-22 13:20:18 +00:00
}
return authCfg , nil
}
2022-05-04 17:24:19 +00:00
func getAWSAPIConfig ( argIdx int ) ( * awsapi . Config , error ) {
2022-05-04 17:39:38 +00:00
if ! awsUseSigv4 . GetOptionalArg ( argIdx ) {
2022-05-04 17:24:19 +00:00
return nil , nil
}
2022-08-05 15:50:00 +00:00
ec2Endpoint := awsEC2Endpoint . GetOptionalArg ( argIdx )
stsEndpoint := awsSTSEndpoint . GetOptionalArg ( argIdx )
2022-05-04 17:24:19 +00:00
region := awsRegion . GetOptionalArg ( argIdx )
roleARN := awsRoleARN . GetOptionalArg ( argIdx )
accessKey := awsAccessKey . GetOptionalArg ( argIdx )
secretKey := awsSecretKey . GetOptionalArg ( argIdx )
2022-05-18 12:58:31 +00:00
service := awsService . GetOptionalArg ( argIdx )
2022-08-05 15:50:00 +00:00
cfg , err := awsapi . NewConfig ( ec2Endpoint , stsEndpoint , region , roleARN , accessKey , secretKey , service )
2022-05-04 17:24:19 +00:00
if err != nil {
return nil , err
}
return cfg , nil
}
2020-02-23 11:35:47 +00:00
func ( c * client ) runWorker ( ) {
var ok bool
var block [ ] byte
2021-02-17 19:42:45 +00:00
ch := make ( chan bool , 1 )
2020-02-23 11:35:47 +00:00
for {
block , ok = c . fq . MustReadBlock ( block [ : 0 ] )
if ! ok {
return
}
2024-05-17 12:55:17 +00:00
if len ( block ) == 0 {
// skip empty data blocks from sending
// see https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6241
continue
}
2020-02-23 11:35:47 +00:00
go func ( ) {
2021-08-15 10:32:40 +00:00
startTime := time . Now ( )
2021-02-17 19:42:45 +00:00
ch <- c . sendBlock ( block )
2021-08-15 10:32:40 +00:00
c . sendDuration . Add ( time . Since ( startTime ) . Seconds ( ) )
2020-02-23 11:35:47 +00:00
} ( )
select {
2021-02-17 19:42:45 +00:00
case ok := <- ch :
if ok {
// The block has been sent successfully
continue
}
// Return unsent block to the queue.
2023-11-24 12:42:11 +00:00
c . fq . MustWriteBlockIgnoreDisabledPQ ( block )
2021-02-17 19:42:45 +00:00
return
2020-02-23 11:35:47 +00:00
case <- c . stopCh :
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time . Second
select {
2021-02-17 19:42:45 +00:00
case ok := <- ch :
if ! ok {
// Return unsent block to the queue.
2023-11-24 12:42:11 +00:00
c . fq . MustWriteBlockIgnoreDisabledPQ ( block )
2021-02-17 19:42:45 +00:00
}
2020-02-23 11:35:47 +00:00
case <- time . After ( graceDuration ) :
2021-02-17 19:42:45 +00:00
// Return unsent block to the queue.
2023-11-24 12:42:11 +00:00
c . fq . MustWriteBlockIgnoreDisabledPQ ( block )
2020-02-23 11:35:47 +00:00
}
return
}
}
}
2023-02-26 20:07:30 +00:00
func ( c * client ) doRequest ( url string , body [ ] byte ) ( * http . Response , error ) {
2023-10-17 09:58:19 +00:00
req , err := c . newRequest ( url , body )
if err != nil {
return nil , err
}
2023-08-23 22:08:04 +00:00
resp , err := c . hc . Do ( req )
2023-10-25 21:19:33 +00:00
if err == nil {
return resp , nil
}
if ! errors . Is ( err , io . EOF ) && ! errors . Is ( err , io . ErrUnexpectedEOF ) {
return nil , err
}
// It is likely connection became stale or timed out during the first request.
// Make another attempt in hope request will succeed.
// If not, the error should be handled by the caller as usual.
// This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4139
req , err = c . newRequest ( url , body )
if err != nil {
return nil , fmt . Errorf ( "second attempt: %w" , err )
}
resp , err = c . hc . Do ( req )
if err != nil {
return nil , fmt . Errorf ( "second attempt: %w" , err )
2023-08-23 22:08:04 +00:00
}
2023-10-25 21:19:33 +00:00
return resp , nil
2023-08-23 22:08:04 +00:00
}
2023-10-17 09:58:19 +00:00
func ( c * client ) newRequest ( url string , body [ ] byte ) ( * http . Request , error ) {
2023-02-26 20:07:30 +00:00
reqBody := bytes . NewBuffer ( body )
req , err := http . NewRequest ( http . MethodPost , url , reqBody )
2020-07-20 16:27:25 +00:00
if err != nil {
2023-02-26 20:07:30 +00:00
logger . Panicf ( "BUG: unexpected error from http.NewRequest(%q): %s" , url , err )
2020-07-20 16:27:25 +00:00
}
2023-10-17 09:58:19 +00:00
err = c . authCfg . SetHeaders ( req , true )
if err != nil {
return nil , err
}
2020-07-20 16:27:25 +00:00
h := req . Header
h . Set ( "User-Agent" , "vmagent" )
h . Set ( "Content-Type" , "application/x-protobuf" )
2023-02-26 20:07:30 +00:00
if c . useVMProto {
2023-02-21 02:38:49 +00:00
h . Set ( "Content-Encoding" , "zstd" )
h . Set ( "X-VictoriaMetrics-Remote-Write-Version" , "1" )
} else {
h . Set ( "Content-Encoding" , "snappy" )
h . Set ( "X-Prometheus-Remote-Write-Version" , "0.1.0" )
}
2022-05-04 17:24:19 +00:00
if c . awsCfg != nil {
2023-02-26 20:07:30 +00:00
sigv4Hash := awsapi . HashHex ( body )
2022-05-18 12:58:31 +00:00
if err := c . awsCfg . SignRequest ( req , sigv4Hash ) ; err != nil {
2023-10-25 21:19:33 +00:00
return nil , fmt . Errorf ( "cannot sign remoteWrite request with AWS sigv4: %w" , err )
2022-05-04 17:24:19 +00:00
}
}
2023-10-17 09:58:19 +00:00
return req , nil
2023-02-26 20:07:30 +00:00
}
// sendBlockHTTP sends the given block to c.remoteWriteURL.
//
// The function returns false only if c.stopCh is closed.
2024-08-23 12:05:51 +00:00
// Otherwise, it tries sending the block to remote storage indefinitely.
2023-02-26 20:07:30 +00:00
func ( c * client ) sendBlockHTTP ( block [ ] byte ) bool {
2024-03-30 04:38:29 +00:00
c . rl . Register ( len ( block ) )
2024-08-23 12:05:51 +00:00
maxRetryDuration := timeutil . AddJitterToDuration ( c . retryMaxTime )
retryDuration := timeutil . AddJitterToDuration ( c . retryMinInterval )
2023-02-26 20:07:30 +00:00
retriesCount := 0
again :
2020-02-23 11:35:47 +00:00
startTime := time . Now ( )
2023-02-26 20:07:30 +00:00
resp , err := c . doRequest ( c . remoteWriteURL , block )
2020-02-23 11:35:47 +00:00
c . requestDuration . UpdateDuration ( startTime )
if err != nil {
c . errorsCount . Inc ( )
retryDuration *= 2
2024-01-22 16:12:37 +00:00
if retryDuration > maxRetryDuration {
retryDuration = maxRetryDuration
2020-02-23 11:35:47 +00:00
}
2021-05-24 12:42:43 +00:00
logger . Warnf ( "couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds" ,
2020-09-16 19:34:01 +00:00
len ( block ) , c . sanitizedURL , err , retryDuration . Seconds ( ) )
2021-01-26 22:23:10 +00:00
t := timerpool . Get ( retryDuration )
2020-07-20 16:27:25 +00:00
select {
case <- c . stopCh :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2021-02-17 19:23:38 +00:00
return false
2020-07-20 16:27:25 +00:00
case <- t . C :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2020-07-20 16:27:25 +00:00
}
2020-02-23 11:35:47 +00:00
c . retriesCount . Inc ( )
goto again
}
2020-07-20 16:27:25 +00:00
statusCode := resp . StatusCode
2020-07-28 17:52:00 +00:00
if statusCode / 100 == 2 {
_ = resp . Body . Close ( )
c . requestsOKCount . Inc ( )
2023-02-26 20:07:30 +00:00
c . bytesSent . Add ( len ( block ) )
c . blocksSent . Inc ( )
2021-02-17 19:23:38 +00:00
return true
2020-07-28 17:52:00 +00:00
}
2020-11-01 22:43:51 +00:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="%d"} ` , c . sanitizedURL , statusCode ) ) . Inc ( )
2021-03-26 11:17:59 +00:00
if statusCode == 409 || statusCode == 400 {
2022-08-21 21:13:44 +00:00
body , err := io . ReadAll ( resp . Body )
2021-12-21 14:36:09 +00:00
_ = resp . Body . Close ( )
if err != nil {
2022-06-27 09:31:16 +00:00
remoteWriteRejectedLogger . Errorf ( "sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; " +
2021-12-21 14:36:09 +00:00
"failed to read response body: %s" ,
len ( block ) , c . sanitizedURL , statusCode , err )
} else {
2022-06-27 09:31:16 +00:00
remoteWriteRejectedLogger . Errorf ( "sending a block with size %d bytes to %q was rejected (skipping the block): status code %d; response body: %s" ,
2021-12-21 14:36:09 +00:00
len ( block ) , c . sanitizedURL , statusCode , string ( body ) )
}
2021-05-13 13:16:16 +00:00
// Just drop block on 409 and 400 status codes like Prometheus does.
2020-11-01 22:43:51 +00:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
2021-05-13 13:16:16 +00:00
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149
2020-11-01 22:43:51 +00:00
_ = resp . Body . Close ( )
c . packetsDropped . Inc ( )
2021-02-17 19:23:38 +00:00
return true
2020-11-01 22:43:51 +00:00
}
2020-07-28 17:52:00 +00:00
// Unexpected status code returned
2020-08-30 18:39:45 +00:00
retriesCount ++
2024-09-24 10:44:03 +00:00
retryAfterHeader := parseRetryAfterHeader ( resp . Header . Get ( "Retry-After" ) )
retryDuration = getRetryDuration ( retryAfterHeader , retryDuration , maxRetryDuration )
// Handle response
2022-08-21 21:13:44 +00:00
body , err := io . ReadAll ( resp . Body )
2020-07-28 17:52:00 +00:00
_ = resp . Body . Close ( )
if err != nil {
2020-09-16 19:34:01 +00:00
logger . Errorf ( "cannot read response body from %q during retry #%d: %s" , c . sanitizedURL , retriesCount , err )
2020-07-28 17:52:00 +00:00
} else {
2020-08-30 18:39:45 +00:00
logger . Errorf ( "unexpected status code received after sending a block with size %d bytes to %q during retry #%d: %d; response body=%q; " +
2020-09-16 19:34:01 +00:00
"re-sending the block in %.3f seconds" , len ( block ) , c . sanitizedURL , retriesCount , statusCode , body , retryDuration . Seconds ( ) )
2020-07-28 17:52:00 +00:00
}
2021-01-26 22:23:10 +00:00
t := timerpool . Get ( retryDuration )
2020-07-28 17:52:00 +00:00
select {
case <- c . stopCh :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2021-02-17 19:23:38 +00:00
return false
2020-07-28 17:52:00 +00:00
case <- t . C :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2020-02-23 11:35:47 +00:00
}
2020-07-28 17:52:00 +00:00
c . retriesCount . Inc ( )
goto again
2020-04-17 12:51:29 +00:00
}
2021-01-26 22:19:35 +00:00
2022-06-27 09:31:16 +00:00
var remoteWriteRejectedLogger = logger . WithThrottler ( "remoteWriteRejected" , 5 * time . Second )
2024-09-24 10:44:03 +00:00
// getRetryDuration returns retry duration.
// retryAfterDuration has the highest priority.
// If retryAfterDuration is not specified, retryDuration gets doubled.
// retryDuration can't exceed maxRetryDuration.
//
// Also see: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6097
func getRetryDuration ( retryAfterDuration , retryDuration , maxRetryDuration time . Duration ) time . Duration {
// retryAfterDuration has the highest priority duration
if retryAfterDuration > 0 {
return timeutil . AddJitterToDuration ( retryAfterDuration )
}
// default backoff retry policy
retryDuration *= 2
if retryDuration > maxRetryDuration {
retryDuration = maxRetryDuration
}
return retryDuration
}
// parseRetryAfterHeader parses `Retry-After` value retrieved from HTTP response header.
// retryAfterString should be in either HTTP-date or a number of seconds.
// It will return time.Duration(0) if `retryAfterString` does not follow RFC 7231.
func parseRetryAfterHeader ( retryAfterString string ) ( retryAfterDuration time . Duration ) {
if retryAfterString == "" {
return retryAfterDuration
}
defer func ( ) {
v := retryAfterDuration . Seconds ( )
logger . Infof ( "'Retry-After: %s' parsed into %.2f second(s)" , retryAfterString , v )
} ( )
// Retry-After could be in "Mon, 02 Jan 2006 15:04:05 GMT" format.
if parsedTime , err := time . Parse ( http . TimeFormat , retryAfterString ) ; err == nil {
return time . Duration ( time . Until ( parsedTime ) . Seconds ( ) ) * time . Second
}
// Retry-After could be in seconds.
if seconds , err := strconv . Atoi ( retryAfterString ) ; err == nil {
return time . Duration ( seconds ) * time . Second
}
return 0
}