2020-02-23 11:35:47 +00:00
package remotewrite
import (
2020-07-20 16:27:25 +00:00
"bytes"
2020-02-23 11:35:47 +00:00
"crypto/tls"
"encoding/base64"
"fmt"
2020-07-20 16:27:25 +00:00
"io/ioutil"
"net/http"
"net/url"
2020-02-23 11:35:47 +00:00
"strings"
"sync"
"time"
2020-05-06 13:51:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-12 14:20:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
2021-01-26 22:23:10 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
2021-02-01 12:27:05 +00:00
rateLimit = flagutil . NewArrayInt ( "remoteWrite.rateLimit" , "Optional rate limit in bytes per second for data sent to -remoteWrite.url. " +
2021-01-26 22:19:35 +00:00
"By default the rate limit is disabled. It can be useful for limiting load on remote storage when big amounts of buffered data " +
"is sent after temporary unavailability of the remote storage" )
2020-12-15 10:51:12 +00:00
sendTimeout = flagutil . NewArrayDuration ( "remoteWrite.sendTimeout" , "Timeout for sending a single block of data to -remoteWrite.url" )
2020-07-20 16:27:25 +00:00
proxyURL = flagutil . NewArray ( "remoteWrite.proxyURL" , "Optional proxy URL for writing data to -remoteWrite.url. Supported proxies: http, https, socks5. " +
"Example: -remoteWrite.proxyURL=socks5://proxy:1234" )
2020-02-23 11:35:47 +00:00
2020-12-15 10:51:12 +00:00
tlsInsecureSkipVerify = flagutil . NewArrayBool ( "remoteWrite.tlsInsecureSkipVerify" , "Whether to skip tls verification when connecting to -remoteWrite.url" )
2020-05-06 13:51:32 +00:00
tlsCertFile = flagutil . NewArray ( "remoteWrite.tlsCertFile" , "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
tlsKeyFile = flagutil . NewArray ( "remoteWrite.tlsKeyFile" , "Optional path to client-side TLS certificate key to use when connecting to -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
tlsCAFile = flagutil . NewArray ( "remoteWrite.tlsCAFile" , "Optional path to TLS CA file to use for verifying connections to -remoteWrite.url. " +
"By default system CA is used. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
2020-05-12 14:20:55 +00:00
tlsServerName = flagutil . NewArray ( "remoteWrite.tlsServerName" , "Optional TLS server name to use for connections to -remoteWrite.url. " +
"By default the server name from -remoteWrite.url is used. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
2020-02-23 11:35:47 +00:00
2020-05-06 13:51:32 +00:00
basicAuthUsername = flagutil . NewArray ( "remoteWrite.basicAuth.username" , "Optional basic auth username to use for -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
basicAuthPassword = flagutil . NewArray ( "remoteWrite.basicAuth.password" , "Optional basic auth password to use for -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
bearerToken = flagutil . NewArray ( "remoteWrite.bearerToken" , "Optional bearer auth token to use for -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
2020-02-23 11:35:47 +00:00
)
type client struct {
2020-09-16 19:34:01 +00:00
sanitizedURL string
2020-02-23 11:35:47 +00:00
remoteWriteURL string
authHeader string
fq * persistentqueue . FastQueue
2020-07-20 16:27:25 +00:00
hc * http . Client
2020-02-23 11:35:47 +00:00
2021-01-26 22:19:35 +00:00
rl rateLimiter
2020-12-15 18:39:12 +00:00
bytesSent * metrics . Counter
blocksSent * metrics . Counter
2020-02-23 11:35:47 +00:00
requestDuration * metrics . Histogram
requestsOKCount * metrics . Counter
errorsCount * metrics . Counter
2020-11-01 22:43:51 +00:00
packetsDropped * metrics . Counter
2020-02-23 11:35:47 +00:00
retriesCount * metrics . Counter
wg sync . WaitGroup
stopCh chan struct { }
}
2020-09-16 19:34:01 +00:00
func newClient ( argIdx int , remoteWriteURL , sanitizedURL string , fq * persistentqueue . FastQueue , concurrency int ) * client {
2020-07-20 16:27:25 +00:00
tlsCfg , err := getTLSConfig ( argIdx )
if err != nil {
logger . Panicf ( "FATAL: cannot initialize TLS config: %s" , err )
}
tr := & http . Transport {
Dial : statDial ,
TLSClientConfig : tlsCfg ,
TLSHandshakeTimeout : 5 * time . Second ,
MaxConnsPerHost : 2 * concurrency ,
2020-08-04 17:59:55 +00:00
MaxIdleConnsPerHost : 2 * concurrency ,
IdleConnTimeout : time . Minute ,
WriteBufferSize : 64 * 1024 ,
2020-07-20 16:27:25 +00:00
}
pURL := proxyURL . GetOptionalArg ( argIdx )
if len ( pURL ) > 0 {
if ! strings . Contains ( pURL , "://" ) {
logger . Fatalf ( "cannot parse -remoteWrite.proxyURL=%q: it must start with `http://`, `https://` or `socks5://`" , pURL )
}
urlProxy , err := url . Parse ( pURL )
if err != nil {
logger . Fatalf ( "cannot parse -remoteWrite.proxyURL=%q: %s" , pURL , err )
}
tr . Proxy = http . ProxyURL ( urlProxy )
}
2020-02-23 11:35:47 +00:00
authHeader := ""
2020-05-06 13:51:32 +00:00
username := basicAuthUsername . GetOptionalArg ( argIdx )
password := basicAuthPassword . GetOptionalArg ( argIdx )
if len ( username ) > 0 || len ( password ) > 0 {
2020-02-23 11:35:47 +00:00
// See https://en.wikipedia.org/wiki/Basic_access_authentication
2020-05-06 13:51:32 +00:00
token := username + ":" + password
2020-02-23 11:35:47 +00:00
token64 := base64 . StdEncoding . EncodeToString ( [ ] byte ( token ) )
authHeader = "Basic " + token64
}
2020-05-06 13:51:32 +00:00
token := bearerToken . GetOptionalArg ( argIdx )
if len ( token ) > 0 {
2020-02-23 11:35:47 +00:00
if authHeader != "" {
2020-05-30 11:22:38 +00:00
logger . Fatalf ( "`-remoteWrite.bearerToken`=%q cannot be set when `-remoteWrite.basicAuth.*` flags are set" , token )
2020-02-23 11:35:47 +00:00
}
2020-05-06 13:51:32 +00:00
authHeader = "Bearer " + token
2020-02-23 11:35:47 +00:00
}
c := & client {
2020-09-16 19:34:01 +00:00
sanitizedURL : sanitizedURL ,
2020-02-23 11:35:47 +00:00
remoteWriteURL : remoteWriteURL ,
authHeader : authHeader ,
fq : fq ,
2020-07-20 16:27:25 +00:00
hc : & http . Client {
Transport : tr ,
2020-12-15 10:51:12 +00:00
Timeout : sendTimeout . GetOptionalArgOrDefault ( argIdx , time . Minute ) ,
2020-07-20 16:27:25 +00:00
} ,
stopCh : make ( chan struct { } ) ,
2020-02-23 11:35:47 +00:00
}
2021-02-01 12:27:05 +00:00
if bytesPerSec := rateLimit . GetOptionalArgOrDefault ( argIdx , 0 ) ; bytesPerSec > 0 {
logger . Infof ( "applying %d bytes per second rate limit for -remoteWrite.url=%q" , bytesPerSec , sanitizedURL )
c . rl . perSecondLimit = int64 ( bytesPerSec )
2021-01-26 22:19:35 +00:00
}
c . rl . limitReached = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remote_write_rate_limit_reached_total { url=%q} ` , c . sanitizedURL ) )
2020-12-15 18:39:12 +00:00
c . bytesSent = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_bytes_sent_total { url=%q} ` , c . sanitizedURL ) )
c . blocksSent = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_blocks_sent_total { url=%q} ` , c . sanitizedURL ) )
2020-09-16 19:34:01 +00:00
c . requestDuration = metrics . GetOrCreateHistogram ( fmt . Sprintf ( ` vmagent_remotewrite_duration_seconds { url=%q} ` , c . sanitizedURL ) )
c . requestsOKCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="2XX"} ` , c . sanitizedURL ) )
c . errorsCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_errors_total { url=%q} ` , c . sanitizedURL ) )
2020-11-01 22:43:51 +00:00
c . packetsDropped = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_packets_dropped_total { url=%q} ` , c . sanitizedURL ) )
2020-09-16 19:34:01 +00:00
c . retriesCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_retries_count_total { url=%q} ` , c . sanitizedURL ) )
2020-03-03 11:08:17 +00:00
for i := 0 ; i < concurrency ; i ++ {
2020-02-23 11:35:47 +00:00
c . wg . Add ( 1 )
go func ( ) {
defer c . wg . Done ( )
c . runWorker ( )
} ( )
}
2020-09-16 19:34:01 +00:00
logger . Infof ( "initialized client for -remoteWrite.url=%q" , c . sanitizedURL )
2020-02-23 11:35:47 +00:00
return c
}
func ( c * client ) MustStop ( ) {
close ( c . stopCh )
c . wg . Wait ( )
2020-09-16 19:34:01 +00:00
logger . Infof ( "stopped client for -remoteWrite.url=%q" , c . sanitizedURL )
2020-02-23 11:35:47 +00:00
}
2020-05-06 13:51:32 +00:00
func getTLSConfig ( argIdx int ) ( * tls . Config , error ) {
2020-07-20 16:27:25 +00:00
c := & promauth . TLSConfig {
2020-05-12 14:20:55 +00:00
CAFile : tlsCAFile . GetOptionalArg ( argIdx ) ,
CertFile : tlsCertFile . GetOptionalArg ( argIdx ) ,
KeyFile : tlsKeyFile . GetOptionalArg ( argIdx ) ,
ServerName : tlsServerName . GetOptionalArg ( argIdx ) ,
2020-12-15 10:51:12 +00:00
InsecureSkipVerify : tlsInsecureSkipVerify . GetOptionalArg ( argIdx ) ,
2020-02-23 11:35:47 +00:00
}
2020-07-20 16:27:25 +00:00
if c . CAFile == "" && c . CertFile == "" && c . KeyFile == "" && c . ServerName == "" && ! c . InsecureSkipVerify {
return nil , nil
}
cfg , err := promauth . NewConfig ( "." , nil , "" , "" , c )
2020-05-12 14:20:55 +00:00
if err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot populate TLS config: %w" , err )
2020-02-23 11:35:47 +00:00
}
2020-05-12 14:20:55 +00:00
tlsCfg := cfg . NewTLSConfig ( )
2020-02-23 11:35:47 +00:00
return tlsCfg , nil
}
func ( c * client ) runWorker ( ) {
var ok bool
var block [ ] byte
2021-02-17 19:42:45 +00:00
ch := make ( chan bool , 1 )
2020-02-23 11:35:47 +00:00
for {
block , ok = c . fq . MustReadBlock ( block [ : 0 ] )
if ! ok {
return
}
go func ( ) {
2021-02-17 19:42:45 +00:00
ch <- c . sendBlock ( block )
2020-02-23 11:35:47 +00:00
} ( )
select {
2021-02-17 19:42:45 +00:00
case ok := <- ch :
if ok {
// The block has been sent successfully
continue
}
// Return unsent block to the queue.
c . fq . MustWriteBlock ( block )
return
2020-02-23 11:35:47 +00:00
case <- c . stopCh :
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time . Second
select {
2021-02-17 19:42:45 +00:00
case ok := <- ch :
if ! ok {
// Return unsent block to the queue.
c . fq . MustWriteBlock ( block )
}
2020-02-23 11:35:47 +00:00
case <- time . After ( graceDuration ) :
2021-02-17 19:42:45 +00:00
// Return unsent block to the queue.
2021-02-17 19:23:38 +00:00
c . fq . MustWriteBlock ( block )
2020-02-23 11:35:47 +00:00
}
return
}
}
}
2021-02-17 19:42:45 +00:00
// sendBlock returns false only if c.stopCh is closed.
// Otherwise it tries sending the block to remote storage indefinitely.
func ( c * client ) sendBlock ( block [ ] byte ) bool {
2021-01-26 22:19:35 +00:00
c . rl . register ( len ( block ) , c . stopCh )
2020-07-27 14:31:36 +00:00
retryDuration := time . Second
2020-08-30 18:39:45 +00:00
retriesCount := 0
2020-12-15 18:39:12 +00:00
c . bytesSent . Add ( len ( block ) )
c . blocksSent . Inc ( )
2020-07-27 14:31:36 +00:00
again :
2020-07-20 16:27:25 +00:00
req , err := http . NewRequest ( "POST" , c . remoteWriteURL , bytes . NewBuffer ( block ) )
if err != nil {
2020-09-16 19:34:01 +00:00
logger . Panicf ( "BUG: unexected error from http.NewRequest(%q): %s" , c . sanitizedURL , err )
2020-07-20 16:27:25 +00:00
}
h := req . Header
h . Set ( "User-Agent" , "vmagent" )
h . Set ( "Content-Type" , "application/x-protobuf" )
h . Set ( "Content-Encoding" , "snappy" )
h . Set ( "X-Prometheus-Remote-Write-Version" , "0.1.0" )
2020-02-23 11:35:47 +00:00
if c . authHeader != "" {
req . Header . Set ( "Authorization" , c . authHeader )
}
startTime := time . Now ( )
2020-07-20 16:27:25 +00:00
resp , err := c . hc . Do ( req )
2020-02-23 11:35:47 +00:00
c . requestDuration . UpdateDuration ( startTime )
if err != nil {
c . errorsCount . Inc ( )
retryDuration *= 2
if retryDuration > time . Minute {
retryDuration = time . Minute
}
logger . Errorf ( "couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds" ,
2020-09-16 19:34:01 +00:00
len ( block ) , c . sanitizedURL , err , retryDuration . Seconds ( ) )
2021-01-26 22:23:10 +00:00
t := timerpool . Get ( retryDuration )
2020-07-20 16:27:25 +00:00
select {
case <- c . stopCh :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2021-02-17 19:23:38 +00:00
return false
2020-07-20 16:27:25 +00:00
case <- t . C :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2020-07-20 16:27:25 +00:00
}
2020-02-23 11:35:47 +00:00
c . retriesCount . Inc ( )
goto again
}
2020-07-20 16:27:25 +00:00
statusCode := resp . StatusCode
2020-07-28 17:52:00 +00:00
if statusCode / 100 == 2 {
_ = resp . Body . Close ( )
c . requestsOKCount . Inc ( )
2021-02-17 19:23:38 +00:00
return true
2020-07-28 17:52:00 +00:00
}
2020-11-01 22:43:51 +00:00
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="%d"} ` , c . sanitizedURL , statusCode ) ) . Inc ( )
2020-11-03 12:24:48 +00:00
if statusCode == 409 {
// Just drop block on 409 status code like Prometheus does.
2020-11-01 22:43:51 +00:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
body , _ := ioutil . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
2020-11-03 12:24:48 +00:00
logger . Errorf ( "unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; " +
2020-11-01 22:43:51 +00:00
"response body=%q" , len ( block ) , c . sanitizedURL , statusCode , body )
c . packetsDropped . Inc ( )
2021-02-17 19:23:38 +00:00
return true
2020-11-01 22:43:51 +00:00
}
2020-07-28 17:52:00 +00:00
// Unexpected status code returned
2020-08-30 18:39:45 +00:00
retriesCount ++
2020-07-28 17:52:00 +00:00
retryDuration *= 2
if retryDuration > time . Minute {
retryDuration = time . Minute
}
body , err := ioutil . ReadAll ( resp . Body )
_ = resp . Body . Close ( )
if err != nil {
2020-09-16 19:34:01 +00:00
logger . Errorf ( "cannot read response body from %q during retry #%d: %s" , c . sanitizedURL , retriesCount , err )
2020-07-28 17:52:00 +00:00
} else {
2020-08-30 18:39:45 +00:00
logger . Errorf ( "unexpected status code received after sending a block with size %d bytes to %q during retry #%d: %d; response body=%q; " +
2020-09-16 19:34:01 +00:00
"re-sending the block in %.3f seconds" , len ( block ) , c . sanitizedURL , retriesCount , statusCode , body , retryDuration . Seconds ( ) )
2020-07-28 17:52:00 +00:00
}
2021-01-26 22:23:10 +00:00
t := timerpool . Get ( retryDuration )
2020-07-28 17:52:00 +00:00
select {
case <- c . stopCh :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2021-02-17 19:23:38 +00:00
return false
2020-07-28 17:52:00 +00:00
case <- t . C :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2020-02-23 11:35:47 +00:00
}
2020-07-28 17:52:00 +00:00
c . retriesCount . Inc ( )
goto again
2020-04-17 12:51:29 +00:00
}
2021-01-26 22:19:35 +00:00
type rateLimiter struct {
perSecondLimit int64
// The current budget. It is increased by perSecondLimit every second.
budget int64
// The next deadline for increasing the budget by perSecondLimit
deadline time . Time
limitReached * metrics . Counter
}
func ( rl * rateLimiter ) register ( dataLen int , stopCh <- chan struct { } ) {
limit := rl . perSecondLimit
if limit <= 0 {
return
}
for rl . budget <= 0 {
now := time . Now ( )
if d := rl . deadline . Sub ( now ) ; d > 0 {
rl . limitReached . Inc ( )
2021-01-26 22:23:10 +00:00
t := timerpool . Get ( d )
2021-01-26 22:19:35 +00:00
select {
case <- stopCh :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2021-01-26 22:19:35 +00:00
return
case <- t . C :
2021-01-26 22:23:10 +00:00
timerpool . Put ( t )
2021-01-26 22:19:35 +00:00
}
}
rl . budget += limit
rl . deadline = now . Add ( time . Second )
}
rl . budget -= int64 ( dataLen )
}