2020-05-05 07:53:42 +00:00
package main
import (
"flag"
2021-09-14 09:17:49 +00:00
"fmt"
2023-01-27 21:38:13 +00:00
"io"
"net"
2020-05-05 07:53:42 +00:00
"net/http"
2023-01-27 21:38:13 +00:00
"net/textproto"
2023-01-27 22:06:42 +00:00
"net/url"
2020-05-16 08:59:30 +00:00
"os"
2022-03-18 16:31:58 +00:00
"strings"
2021-11-09 17:18:27 +00:00
"sync"
2020-05-05 07:53:42 +00:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
2023-01-27 21:38:13 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-05-05 07:53:42 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
2020-12-03 19:40:30 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-05-05 07:53:42 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2023-01-27 21:38:13 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
2020-05-05 07:53:42 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2022-07-21 16:58:22 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
2021-05-17 23:23:53 +00:00
"github.com/VictoriaMetrics/metrics"
2020-05-05 07:53:42 +00:00
)
var (
2023-01-27 07:08:35 +00:00
httpListenAddr = flag . String ( "httpListenAddr" , ":8427" , "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol" )
useProxyProtocol = flag . Bool ( "httpListenAddr.useProxyProtocol" , false , "Whether to use proxy protocol for connections accepted at -httpListenAddr . " +
2023-03-08 09:26:53 +00:00
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt . " +
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing" )
2023-01-27 22:06:42 +00:00
maxIdleConnsPerBackend = flag . Int ( "maxIdleConnsPerBackend" , 100 , "The maximum number of idle connections vmauth can open per each backend host. " +
"See also -maxConcurrentRequests" )
responseTimeout = flag . Duration ( "responseTimeout" , 5 * time . Minute , "The timeout for receiving a response from backend" )
maxConcurrentRequests = flag . Int ( "maxConcurrentRequests" , 1000 , "The maximum number of concurrent requests vmauth can process. Other requests are rejected with " +
2023-02-11 05:57:49 +00:00
"'429 Too Many Requests' http status code. See also -maxConcurrentPerUserRequests and -maxIdleConnsPerBackend command-line options" )
maxConcurrentPerUserRequests = flag . Int ( "maxConcurrentPerUserRequests" , 300 , "The maximum number of concurrent requests vmauth can process per each configured user. " +
"Other requests are rejected with '429 Too Many Requests' http status code. See also -maxConcurrentRequests command-line option and max_concurrent_requests option " +
"in per-user config" )
2023-01-27 22:06:42 +00:00
reloadAuthKey = flag . String ( "reloadAuthKey" , "" , "Auth key for /-/reload http endpoint. It must be passed as authKey=..." )
logInvalidAuthTokens = flag . Bool ( "logInvalidAuthTokens" , false , "Whether to log requests with invalid auth tokens. " +
2021-10-19 12:29:07 +00:00
` Such requests are always counted at vmauth_http_request_errors_total { reason="invalid_auth_token"} metric, which is exposed at /metrics page ` )
2023-08-02 12:30:21 +00:00
failTimeout = flag . Duration ( "failTimeout" , 3 * time . Second , "Sets a delay period for load balancing to skip a malfunctioning backend." )
2020-05-05 07:53:42 +00:00
)
func main ( ) {
2020-05-16 08:59:30 +00:00
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag . CommandLine . SetOutput ( os . Stdout )
2020-06-05 07:39:46 +00:00
flag . Usage = usage
2020-05-05 07:53:42 +00:00
envflag . Parse ( )
buildinfo . Init ( )
logger . Init ( )
2022-07-22 10:35:58 +00:00
pushmetrics . Init ( )
2020-05-05 07:53:42 +00:00
logger . Infof ( "starting vmauth at %q..." , * httpListenAddr )
startTime := time . Now ( )
initAuthConfig ( )
2023-01-27 07:08:35 +00:00
go httpserver . Serve ( * httpListenAddr , * useProxyProtocol , requestHandler )
2020-05-05 07:53:42 +00:00
logger . Infof ( "started vmauth in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
sig := procutil . WaitForSigterm ( )
logger . Infof ( "received signal %s" , sig )
startTime = time . Now ( )
logger . Infof ( "gracefully shutting down webservice at %q" , * httpListenAddr )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
logger . Fatalf ( "cannot stop the webservice: %s" , err )
}
logger . Infof ( "successfully shut down the webservice in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
stopAuthConfig ( )
logger . Infof ( "successfully stopped vmauth in %.3f seconds" , time . Since ( startTime ) . Seconds ( ) )
}
func requestHandler ( w http . ResponseWriter , r * http . Request ) bool {
2021-05-17 23:23:53 +00:00
switch r . URL . Path {
case "/-/reload" :
2023-01-10 23:51:55 +00:00
if ! httpserver . CheckAuthFlag ( w , r , * reloadAuthKey , "reloadAuthKey" ) {
2021-05-20 15:46:12 +00:00
return true
}
2021-05-17 23:23:53 +00:00
configReloadRequests . Inc ( )
procutil . SelfSIGHUP ( )
w . WriteHeader ( http . StatusOK )
return true
}
2021-04-02 19:14:53 +00:00
authToken := r . Header . Get ( "Authorization" )
if authToken == "" {
2023-04-24 12:57:13 +00:00
// Process requests for unauthorized users
ui := authConfig . Load ( ) . UnauthorizedUser
if ui != nil {
processUserRequest ( w , r , ui )
return true
}
2020-08-09 06:38:41 +00:00
w . Header ( ) . Set ( "WWW-Authenticate" , ` Basic realm="Restricted" ` )
2021-04-02 19:14:53 +00:00
http . Error ( w , "missing `Authorization` request header" , http . StatusUnauthorized )
2020-05-05 07:53:42 +00:00
return true
}
2022-03-18 16:31:58 +00:00
if strings . HasPrefix ( authToken , "Token " ) {
// Handle InfluxDB's proprietary token authentication scheme as a bearer token authentication
// See https://docs.influxdata.com/influxdb/v2.0/api/
authToken = strings . Replace ( authToken , "Token" , "Bearer" , 1 )
}
2023-01-27 22:06:42 +00:00
2023-04-20 17:08:27 +00:00
ac := * authUsers . Load ( )
2021-04-02 19:14:53 +00:00
ui := ac [ authToken ]
if ui == nil {
2021-09-14 09:17:49 +00:00
invalidAuthTokenRequests . Inc ( )
if * logInvalidAuthTokens {
2023-05-17 07:09:47 +00:00
err := fmt . Errorf ( "cannot find the provided auth token %q in config" , authToken )
2023-01-27 21:38:13 +00:00
err = & httpserver . ErrorWithStatusCode {
Err : err ,
StatusCode : http . StatusUnauthorized ,
}
httpserver . Errorf ( w , r , "%s" , err )
2021-09-14 09:17:49 +00:00
} else {
2023-05-17 07:09:47 +00:00
http . Error ( w , "Unauthorized" , http . StatusUnauthorized )
2021-09-14 09:17:49 +00:00
}
2020-05-05 07:53:42 +00:00
return true
}
2023-04-24 12:57:13 +00:00
processUserRequest ( w , r , ui )
return true
}
func processUserRequest ( w http . ResponseWriter , r * http . Request , ui * UserInfo ) {
2023-06-27 18:15:17 +00:00
startTime := time . Now ( )
defer ui . requestsDuration . UpdateDuration ( startTime )
2021-02-11 10:40:59 +00:00
ui . requests . Inc ( )
2023-01-27 21:38:13 +00:00
2023-01-27 22:06:42 +00:00
// Limit the concurrency of requests to backends
concurrencyLimitOnce . Do ( concurrencyLimitInit )
select {
case concurrencyLimitCh <- struct { } { } :
2023-02-10 04:03:01 +00:00
if err := ui . beginConcurrencyLimit ( ) ; err != nil {
handleConcurrencyLimitError ( w , r , err )
<- concurrencyLimitCh
2023-04-24 12:57:13 +00:00
return
2023-01-27 22:06:42 +00:00
}
2023-02-10 04:03:01 +00:00
default :
concurrentRequestsLimitReached . Inc ( )
err := fmt . Errorf ( "cannot serve more than -maxConcurrentRequests=%d concurrent requests" , cap ( concurrencyLimitCh ) )
handleConcurrencyLimitError ( w , r , err )
2023-04-24 12:57:13 +00:00
return
2023-01-27 22:06:42 +00:00
}
2023-02-10 05:05:13 +00:00
processRequest ( w , r , ui )
2023-02-10 04:03:01 +00:00
ui . endConcurrencyLimit ( )
2023-01-27 22:06:42 +00:00
<- concurrencyLimitCh
}
2023-02-10 05:05:13 +00:00
func processRequest ( w http . ResponseWriter , r * http . Request , ui * UserInfo ) {
u := normalizeURL ( r . URL )
2023-04-26 09:04:35 +00:00
up , headers := ui . getURLPrefixAndHeaders ( u )
isDefault := false
if up == nil {
if ui . DefaultURL == nil {
2023-11-01 19:59:46 +00:00
// Authorization should be requested for http requests without credentials
// to a route that is not in the configuration for unauthorized user.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5236
if ui . BearerToken == "" && ui . Username == "" && len ( * authUsers . Load ( ) ) > 0 {
w . Header ( ) . Set ( "WWW-Authenticate" , ` Basic realm="Restricted" ` )
http . Error ( w , "missing `Authorization` request header" , http . StatusUnauthorized )
return
}
missingRouteRequests . Inc ( )
2023-04-26 09:04:35 +00:00
httpserver . Errorf ( w , r , "missing route for %q" , u . String ( ) )
return
}
up , headers = ui . DefaultURL , ui . Headers
isDefault = true
2023-02-10 05:05:13 +00:00
}
2023-05-17 07:37:03 +00:00
r . Body = & readTrackingBody {
r : r . Body ,
}
2023-04-26 09:04:35 +00:00
2023-02-10 05:05:13 +00:00
maxAttempts := up . getBackendsCount ( )
for i := 0 ; i < maxAttempts ; i ++ {
2023-02-11 08:27:40 +00:00
bu := up . getLeastLoadedBackendURL ( )
2023-04-26 09:04:35 +00:00
targetURL := bu . url
// Don't change path and add request_path query param for default route.
if isDefault {
query := targetURL . Query ( )
query . Set ( "request_path" , u . Path )
targetURL . RawQuery = query . Encode ( )
} else { // Update path for regular routes.
targetURL = mergeURLs ( targetURL , u )
}
2023-02-11 08:27:40 +00:00
ok := tryProcessingRequest ( w , r , targetURL , headers )
bu . put ( )
if ok {
2023-02-10 05:05:13 +00:00
return
}
2023-02-11 08:27:40 +00:00
bu . setBroken ( )
2023-02-10 05:05:13 +00:00
}
2023-04-26 09:04:35 +00:00
err := & httpserver . ErrorWithStatusCode {
2023-02-10 05:05:13 +00:00
Err : fmt . Errorf ( "all the backends for the user %q are unavailable" , ui . name ( ) ) ,
StatusCode : http . StatusServiceUnavailable ,
}
httpserver . Errorf ( w , r , "%s" , err )
}
func tryProcessingRequest ( w http . ResponseWriter , r * http . Request , targetURL * url . URL , headers [ ] Header ) bool {
2023-01-27 21:38:13 +00:00
// This code has been copied from net/http/httputil/reverseproxy.go
req := sanitizeRequestHeaders ( r )
req . URL = targetURL
2021-10-22 16:08:06 +00:00
for _ , h := range headers {
2023-01-27 21:38:13 +00:00
req . Header . Set ( h . Name , h . Value )
}
transportOnce . Do ( transportInit )
res , err := transport . RoundTrip ( req )
if err != nil {
2023-05-17 07:37:03 +00:00
rtb := req . Body . ( * readTrackingBody )
if rtb . readStarted {
// Request body has been already read, so it is impossible to retry the request.
// Return the error to the client then.
2023-02-10 05:05:13 +00:00
err = & httpserver . ErrorWithStatusCode {
Err : fmt . Errorf ( "cannot proxy the request to %q: %w" , targetURL , err ) ,
StatusCode : http . StatusServiceUnavailable ,
}
httpserver . Errorf ( w , r , "%s" , err )
return true
2023-01-27 21:38:13 +00:00
}
2023-05-17 07:37:03 +00:00
// Retry the request if its body wasn't read yet. This usually means that the backend isn't reachable.
remoteAddr := httpserver . GetQuotedRemoteAddr ( r )
// NOTE: do not use httpserver.GetRequestURI
// it explicitly reads request body and fails retries.
2023-05-17 07:19:33 +00:00
logger . Warnf ( "remoteAddr: %s; requestURI: %s; error when proxying the request to %q: %s" , remoteAddr , req . URL , targetURL , err )
2023-02-10 05:05:13 +00:00
return false
2023-01-27 21:38:13 +00:00
}
removeHopHeaders ( res . Header )
copyHeader ( w . Header ( ) , res . Header )
w . WriteHeader ( res . StatusCode )
copyBuf := copyBufPool . Get ( )
copyBuf . B = bytesutil . ResizeNoCopyNoOverallocate ( copyBuf . B , 16 * 1024 )
_ , err = io . CopyBuffer ( w , res . Body , copyBuf . B )
copyBufPool . Put ( copyBuf )
if err != nil && ! netutil . IsTrivialNetworkError ( err ) {
remoteAddr := httpserver . GetQuotedRemoteAddr ( r )
requestURI := httpserver . GetRequestURI ( r )
logger . Warnf ( "remoteAddr: %s; requestURI: %s; error when proxying response body from %s: %s" , remoteAddr , requestURI , targetURL , err )
2023-02-10 05:05:13 +00:00
return true
2021-10-22 16:08:06 +00:00
}
2023-02-10 05:05:13 +00:00
return true
2020-05-05 07:53:42 +00:00
}
2023-01-27 21:38:13 +00:00
var copyBufPool bytesutil . ByteBufferPool
func copyHeader ( dst , src http . Header ) {
for k , vv := range src {
for _ , v := range vv {
dst . Add ( k , v )
2021-06-11 09:50:22 +00:00
}
2023-01-27 21:38:13 +00:00
}
}
func sanitizeRequestHeaders ( r * http . Request ) * http . Request {
// This code has been copied from net/http/httputil/reverseproxy.go
req := r . Clone ( r . Context ( ) )
removeHopHeaders ( req . Header )
if clientIP , _ , err := net . SplitHostPort ( req . RemoteAddr ) ; err == nil {
// If we aren't the first proxy retain prior
// X-Forwarded-For information as a comma+space
// separated list and fold multiple headers into one.
prior := req . Header [ "X-Forwarded-For" ]
if len ( prior ) > 0 {
clientIP = strings . Join ( prior , ", " ) + ", " + clientIP
}
req . Header . Set ( "X-Forwarded-For" , clientIP )
}
return req
}
func removeHopHeaders ( h http . Header ) {
// remove hop-by-hop headers listed in the "Connection" header of h.
// See RFC 7230, section 6.1
for _ , f := range h [ "Connection" ] {
for _ , sf := range strings . Split ( f , "," ) {
if sf = textproto . TrimString ( sf ) ; sf != "" {
h . Del ( sf )
}
}
}
// Remove hop-by-hop headers to the backend. Especially
// important is "Connection" because we want a persistent
// connection, regardless of what the client sent to us.
for _ , key := range hopHeaders {
h . Del ( key )
}
}
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
// compatibility.
var hopHeaders = [ ] string {
"Connection" ,
"Proxy-Connection" , // non-standard but still sent by libcurl and rejected by e.g. google
"Keep-Alive" ,
"Proxy-Authenticate" ,
"Proxy-Authorization" ,
"Te" , // canonicalized version of "TE"
"Trailer" , // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
"Transfer-Encoding" ,
"Upgrade" ,
2021-06-11 09:50:22 +00:00
}
2021-09-14 09:17:49 +00:00
var (
2021-10-19 12:29:07 +00:00
configReloadRequests = metrics . NewCounter ( ` vmauth_http_requests_total { path="/-/reload"} ` )
invalidAuthTokenRequests = metrics . NewCounter ( ` vmauth_http_request_errors_total { reason="invalid_auth_token"} ` )
missingRouteRequests = metrics . NewCounter ( ` vmauth_http_request_errors_total { reason="missing_route"} ` )
2021-09-14 09:17:49 +00:00
)
2021-05-17 23:23:53 +00:00
2021-11-09 17:18:27 +00:00
var (
2023-01-27 21:38:13 +00:00
transport * http . Transport
transportOnce sync . Once
2021-11-09 17:18:27 +00:00
)
2023-01-27 21:38:13 +00:00
func transportInit ( ) {
tr := http . DefaultTransport . ( * http . Transport ) . Clone ( )
tr . ResponseHeaderTimeout = * responseTimeout
// Automatic compression must be disabled in order to fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/535
tr . DisableCompression = true
// Disable HTTP/2.0, since VictoriaMetrics components don't support HTTP/2.0 (because there is no sense in this).
tr . ForceAttemptHTTP2 = false
tr . MaxIdleConnsPerHost = * maxIdleConnsPerBackend
if tr . MaxIdleConns != 0 && tr . MaxIdleConns < tr . MaxIdleConnsPerHost {
tr . MaxIdleConns = tr . MaxIdleConnsPerHost
2021-11-09 17:18:27 +00:00
}
2023-01-27 21:38:13 +00:00
transport = tr
2020-05-05 07:53:42 +00:00
}
2020-06-05 07:39:46 +00:00
2023-01-27 22:06:42 +00:00
var (
concurrencyLimitCh chan struct { }
concurrencyLimitOnce sync . Once
)
func concurrencyLimitInit ( ) {
concurrencyLimitCh = make ( chan struct { } , * maxConcurrentRequests )
_ = metrics . NewGauge ( "vmauth_concurrent_requests_capacity" , func ( ) float64 {
return float64 ( * maxConcurrentRequests )
} )
_ = metrics . NewGauge ( "vmauth_concurrent_requests_current" , func ( ) float64 {
return float64 ( len ( concurrencyLimitCh ) )
} )
}
2023-02-10 04:03:01 +00:00
var concurrentRequestsLimitReached = metrics . NewCounter ( "vmauth_concurrent_requests_limit_reached_total" )
2023-01-27 22:06:42 +00:00
2020-06-05 07:39:46 +00:00
func usage ( ) {
const s = `
vmauth authenticates and authorizes incoming requests and proxies them to VictoriaMetrics .
2021-04-20 17:16:17 +00:00
See the docs at https : //docs.victoriametrics.com/vmauth.html .
2020-06-05 07:39:46 +00:00
`
2020-12-03 19:40:30 +00:00
flagutil . Usage ( s )
2020-06-05 07:39:46 +00:00
}
2023-02-10 04:03:01 +00:00
func handleConcurrencyLimitError ( w http . ResponseWriter , r * http . Request , err error ) {
w . Header ( ) . Add ( "Retry-After" , "10" )
err = & httpserver . ErrorWithStatusCode {
Err : err ,
StatusCode : http . StatusTooManyRequests ,
}
httpserver . Errorf ( w , r , "%s" , err )
}
2023-05-17 07:19:33 +00:00
type readTrackingBody struct {
2023-05-17 07:37:03 +00:00
r io . ReadCloser
readStarted bool
2023-05-17 07:19:33 +00:00
}
// Read implements io.Reader interface
// tracks body reading requests
func ( rtb * readTrackingBody ) Read ( p [ ] byte ) ( int , error ) {
2023-05-17 07:37:03 +00:00
if len ( p ) > 0 {
rtb . readStarted = true
2023-05-17 07:19:33 +00:00
}
2023-05-17 07:37:03 +00:00
return rtb . r . Read ( p )
2023-05-17 07:19:33 +00:00
}
2023-05-17 07:37:03 +00:00
// Close implements io.Closer interface.
2023-05-17 07:19:33 +00:00
func ( rtb * readTrackingBody ) Close ( ) error {
2023-05-17 07:37:03 +00:00
// Close rtb.r only if at least a single Read call was performed.
// http.Roundtrip performs body.Close call even without any Read calls
// so this hack allows us to reuse request body
if rtb . readStarted {
return rtb . r . Close ( )
2023-05-17 07:19:33 +00:00
}
return nil
}