2020-02-23 11:35:47 +00:00
package remotewrite
import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"flag"
"fmt"
"io/ioutil"
"strings"
"sync"
"time"
2020-05-06 13:51:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-04-29 13:20:23 +00:00
"github.com/VictoriaMetrics/fasthttp"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/metrics"
)
var (
sendTimeout = flag . Duration ( "remoteWrite.sendTimeout" , time . Minute , "Timeout for sending a single block of data to -remoteWrite.url" )
tlsInsecureSkipVerify = flag . Bool ( "remoteWrite.tlsInsecureSkipVerify" , false , "Whether to skip tls verification when connecting to -remoteWrite.url" )
2020-05-06 13:51:32 +00:00
tlsCertFile = flagutil . NewArray ( "remoteWrite.tlsCertFile" , "Optional path to client-side TLS certificate file to use when connecting to -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
tlsKeyFile = flagutil . NewArray ( "remoteWrite.tlsKeyFile" , "Optional path to client-side TLS certificate key to use when connecting to -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
tlsCAFile = flagutil . NewArray ( "remoteWrite.tlsCAFile" , "Optional path to TLS CA file to use for verifying connections to -remoteWrite.url. " +
"By default system CA is used. If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
2020-02-23 11:35:47 +00:00
2020-05-06 13:51:32 +00:00
basicAuthUsername = flagutil . NewArray ( "remoteWrite.basicAuth.username" , "Optional basic auth username to use for -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
basicAuthPassword = flagutil . NewArray ( "remoteWrite.basicAuth.password" , "Optional basic auth password to use for -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
bearerToken = flagutil . NewArray ( "remoteWrite.bearerToken" , "Optional bearer auth token to use for -remoteWrite.url. " +
"If multiple args are set, then they are applied independently for the corresponding -remoteWrite.url" )
2020-02-23 11:35:47 +00:00
)
type client struct {
urlLabelValue string
remoteWriteURL string
host string
requestURI string
authHeader string
fq * persistentqueue . FastQueue
hc * fasthttp . HostClient
requestDuration * metrics . Histogram
requestsOKCount * metrics . Counter
errorsCount * metrics . Counter
retriesCount * metrics . Counter
wg sync . WaitGroup
stopCh chan struct { }
}
2020-05-06 13:51:32 +00:00
func newClient ( argIdx int , remoteWriteURL , urlLabelValue string , fq * persistentqueue . FastQueue , concurrency int ) * client {
2020-02-23 11:35:47 +00:00
authHeader := ""
2020-05-06 13:51:32 +00:00
username := basicAuthUsername . GetOptionalArg ( argIdx )
password := basicAuthPassword . GetOptionalArg ( argIdx )
if len ( username ) > 0 || len ( password ) > 0 {
2020-02-23 11:35:47 +00:00
// See https://en.wikipedia.org/wiki/Basic_access_authentication
2020-05-06 13:51:32 +00:00
token := username + ":" + password
2020-02-23 11:35:47 +00:00
token64 := base64 . StdEncoding . EncodeToString ( [ ] byte ( token ) )
authHeader = "Basic " + token64
}
2020-05-06 13:51:32 +00:00
token := bearerToken . GetOptionalArg ( argIdx )
if len ( token ) > 0 {
2020-02-23 11:35:47 +00:00
if authHeader != "" {
2020-05-06 13:51:32 +00:00
logger . Panicf ( "FATAL: `-remoteWrite.bearerToken`=%q cannot be set when `-remoteWrite.basicAuth.*` flags are set" , token )
2020-02-23 11:35:47 +00:00
}
2020-05-06 13:51:32 +00:00
authHeader = "Bearer " + token
2020-02-23 11:35:47 +00:00
}
readTimeout := * sendTimeout
if readTimeout <= 0 {
readTimeout = time . Minute
}
2020-03-09 11:30:53 +00:00
writeTimeout := readTimeout
2020-02-23 11:35:47 +00:00
var u fasthttp . URI
u . Update ( remoteWriteURL )
scheme := string ( u . Scheme ( ) )
switch scheme {
case "http" , "https" :
default :
logger . Panicf ( "FATAL: unsupported scheme in -remoteWrite.url=%q: %q. It must be http or https" , remoteWriteURL , scheme )
}
host := string ( u . Host ( ) )
if len ( host ) == 0 {
logger . Panicf ( "FATAL: invalid -remoteWrite.url=%q: host cannot be empty. Make sure the url looks like `http://host:port/path`" , remoteWriteURL )
}
requestURI := string ( u . RequestURI ( ) )
isTLS := scheme == "https"
var tlsCfg * tls . Config
if isTLS {
var err error
2020-05-06 13:51:32 +00:00
tlsCfg , err = getTLSConfig ( argIdx )
2020-02-23 11:35:47 +00:00
if err != nil {
logger . Panicf ( "FATAL: cannot initialize TLS config: %s" , err )
}
}
if ! strings . Contains ( host , ":" ) {
if isTLS {
host += ":443"
} else {
host += ":80"
}
}
2020-03-03 11:08:17 +00:00
maxConns := 2 * concurrency
2020-02-23 11:35:47 +00:00
hc := & fasthttp . HostClient {
Addr : host ,
Name : "vmagent" ,
Dial : statDial ,
IsTLS : isTLS ,
TLSConfig : tlsCfg ,
MaxConns : maxConns ,
MaxIdleConnDuration : 10 * readTimeout ,
ReadTimeout : readTimeout ,
2020-03-09 11:30:53 +00:00
WriteTimeout : writeTimeout ,
2020-02-23 11:35:47 +00:00
MaxResponseBodySize : 1024 * 1024 ,
}
c := & client {
urlLabelValue : urlLabelValue ,
remoteWriteURL : remoteWriteURL ,
host : host ,
requestURI : requestURI ,
authHeader : authHeader ,
fq : fq ,
hc : hc ,
stopCh : make ( chan struct { } ) ,
}
2020-03-03 17:48:46 +00:00
c . requestDuration = metrics . GetOrCreateHistogram ( fmt . Sprintf ( ` vmagent_remotewrite_duration_seconds { url=%q} ` , c . urlLabelValue ) )
c . requestsOKCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="2XX"} ` , c . urlLabelValue ) )
c . errorsCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_errors_total { url=%q} ` , c . urlLabelValue ) )
c . retriesCount = metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_retries_count_total { url=%q} ` , c . urlLabelValue ) )
2020-03-03 11:08:17 +00:00
for i := 0 ; i < concurrency ; i ++ {
2020-02-23 11:35:47 +00:00
c . wg . Add ( 1 )
go func ( ) {
defer c . wg . Done ( )
c . runWorker ( )
} ( )
}
logger . Infof ( "initialized client for -remoteWrite.url=%q" , c . remoteWriteURL )
return c
}
func ( c * client ) MustStop ( ) {
close ( c . stopCh )
c . wg . Wait ( )
logger . Infof ( "stopped client for -remoteWrite.url=%q" , c . remoteWriteURL )
}
2020-05-06 13:51:32 +00:00
func getTLSConfig ( argIdx int ) ( * tls . Config , error ) {
2020-02-23 11:35:47 +00:00
var tlsRootCA * x509 . CertPool
var tlsCertificate * tls . Certificate
2020-05-06 13:51:32 +00:00
certFile := tlsCertFile . GetOptionalArg ( argIdx )
keyFile := tlsKeyFile . GetOptionalArg ( argIdx )
if certFile != "" || keyFile != "" {
cert , err := tls . LoadX509KeyPair ( certFile , keyFile )
2020-02-23 11:35:47 +00:00
if err != nil {
2020-05-06 13:51:32 +00:00
return nil , fmt . Errorf ( "cannot load TLS certificate for -remoteWrite.tlsCertFile=%q and -remoteWrite.tlsKeyFile=%q: %s" , certFile , keyFile , err )
2020-02-23 11:35:47 +00:00
}
tlsCertificate = & cert
}
2020-05-06 13:51:32 +00:00
if caFile := tlsCAFile . GetOptionalArg ( argIdx ) ; caFile != "" {
data , err := ioutil . ReadFile ( caFile )
2020-02-23 11:35:47 +00:00
if err != nil {
2020-05-06 13:51:32 +00:00
return nil , fmt . Errorf ( "cannot read -remoteWrite.tlsCAFile=%q: %s" , caFile , err )
2020-02-23 11:35:47 +00:00
}
tlsRootCA = x509 . NewCertPool ( )
if ! tlsRootCA . AppendCertsFromPEM ( data ) {
2020-05-06 13:51:32 +00:00
return nil , fmt . Errorf ( "cannot parse data -remoteWrite.tlsCAFile=%q" , caFile )
2020-02-23 11:35:47 +00:00
}
}
tlsCfg := & tls . Config {
RootCAs : tlsRootCA ,
ClientSessionCache : tls . NewLRUClientSessionCache ( 0 ) ,
}
if tlsCertificate != nil {
2020-05-06 13:50:22 +00:00
tlsCfg . GetClientCertificate = func ( * tls . CertificateRequestInfo ) ( * tls . Certificate , error ) {
return tlsCertificate , nil
}
2020-02-23 11:35:47 +00:00
}
tlsCfg . InsecureSkipVerify = * tlsInsecureSkipVerify
return tlsCfg , nil
}
func ( c * client ) runWorker ( ) {
var ok bool
var block [ ] byte
ch := make ( chan struct { } )
for {
block , ok = c . fq . MustReadBlock ( block [ : 0 ] )
if ! ok {
return
}
go func ( ) {
c . sendBlock ( block )
ch <- struct { } { }
} ( )
select {
case <- ch :
// The block has been sent successfully
continue
case <- c . stopCh :
// c must be stopped. Wait for a while in the hope the block will be sent.
graceDuration := 5 * time . Second
select {
case <- ch :
// The block has been sent successfully.
case <- time . After ( graceDuration ) :
logger . Errorf ( "couldn't sent block with size %d bytes to %q in %.3f seconds during shutdown; dropping it" ,
len ( block ) , c . remoteWriteURL , graceDuration . Seconds ( ) )
}
return
}
}
}
func ( c * client ) sendBlock ( block [ ] byte ) {
req := fasthttp . AcquireRequest ( )
req . SetRequestURI ( c . requestURI )
req . SetHost ( c . host )
req . Header . SetMethod ( "POST" )
req . Header . Add ( "Content-Type" , "application/x-protobuf" )
req . Header . Add ( "Content-Encoding" , "snappy" )
2020-04-04 13:22:58 +00:00
req . Header . Add ( "X-Prometheus-Remote-Write-Version" , "0.1.0" )
2020-02-23 11:35:47 +00:00
if c . authHeader != "" {
req . Header . Set ( "Authorization" , c . authHeader )
}
req . SetBody ( block )
retryDuration := time . Second
resp := fasthttp . AcquireResponse ( )
again :
select {
case <- c . stopCh :
fasthttp . ReleaseRequest ( req )
fasthttp . ReleaseResponse ( resp )
return
default :
}
startTime := time . Now ( )
2020-04-17 12:51:29 +00:00
err := doRequestWithPossibleRetry ( c . hc , req , resp )
2020-02-23 11:35:47 +00:00
c . requestDuration . UpdateDuration ( startTime )
if err != nil {
c . errorsCount . Inc ( )
retryDuration *= 2
if retryDuration > time . Minute {
retryDuration = time . Minute
}
logger . Errorf ( "couldn't send a block with size %d bytes to %q: %s; re-sending the block in %.3f seconds" ,
len ( block ) , c . remoteWriteURL , err , retryDuration . Seconds ( ) )
time . Sleep ( retryDuration )
c . retriesCount . Inc ( )
goto again
}
statusCode := resp . StatusCode ( )
if statusCode / 100 != 2 {
metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_requests_total { url=%q, status_code="%d"} ` , c . urlLabelValue , statusCode ) ) . Inc ( )
retryDuration *= 2
if retryDuration > time . Minute {
retryDuration = time . Minute
}
logger . Errorf ( "unexpected status code received after sending a block with size %d bytes to %q: %d; response body=%q; re-sending the block in %.3f seconds" ,
len ( block ) , c . remoteWriteURL , statusCode , resp . Body ( ) , retryDuration . Seconds ( ) )
time . Sleep ( retryDuration )
c . retriesCount . Inc ( )
goto again
}
c . requestsOKCount . Inc ( )
// The block has been successfully sent to the remote storage.
fasthttp . ReleaseResponse ( resp )
fasthttp . ReleaseRequest ( req )
}
2020-04-17 12:51:29 +00:00
func doRequestWithPossibleRetry ( hc * fasthttp . HostClient , req * fasthttp . Request , resp * fasthttp . Response ) error {
// There is no need in calling DoTimeout, since the timeout must be already set in hc.ReadTimeout.
err := hc . Do ( req , resp )
if err == nil {
return nil
}
if err != fasthttp . ErrConnectionClosed {
return err
}
// Retry request if the server closed the keep-alive connection during the first attempt.
return hc . Do ( req , resp )
}