2019-05-22 21:16:55 +00:00
package httpserver
import (
"bufio"
"context"
"crypto/tls"
2020-06-30 21:02:02 +00:00
"errors"
2019-05-24 09:18:40 +00:00
"flag"
2019-05-22 21:16:55 +00:00
"fmt"
"io"
"net"
"net/http"
"net/http/pprof"
"runtime"
"strconv"
"strings"
"sync"
2020-05-07 12:20:06 +00:00
"sync/atomic"
2019-05-22 21:16:55 +00:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
2020-01-17 21:57:18 +00:00
"github.com/klauspost/compress/gzip"
2019-05-22 21:16:55 +00:00
)
var (
2020-05-02 10:07:30 +00:00
pathPrefix = flag . String ( "http.pathPrefix" , "" , "An optional prefix to add to all the paths handled by http server. For example, if '-http.pathPrefix=/foo/bar' is set, " +
"then all the http requests will be handled on '/foo/bar/*' paths. This may be useful for proxied requests. " +
"See https://www.robustperception.io/using-external-urls-and-proxies-with-prometheus" )
2020-03-27 18:08:42 +00:00
disableResponseCompression = flag . Bool ( "http.disableResponseCompression" , false , "Disable compression of HTTP responses for saving CPU resources. By default compression is enabled to save network bandwidth" )
maxGracefulShutdownDuration = flag . Duration ( "http.maxGracefulShutdownDuration" , 7 * time . Second , "The maximum duration for graceful shutdown of HTTP server. " +
"Highly loaded server may require increased value for graceful shutdown" )
2020-05-07 12:20:06 +00:00
shutdownDelay = flag . Duration ( "http.shutdownDelay" , 0 , "Optional delay before http server shutdown. During this dealy the servier returns non-OK responses " +
"from /health page, so load balancers can route new requests to other servers" )
)
2019-05-24 09:18:40 +00:00
2020-05-07 12:20:06 +00:00
var (
servers = make ( map [ string ] * server )
2019-05-22 21:16:55 +00:00
serversLock sync . Mutex
)
2020-05-07 12:20:06 +00:00
type server struct {
shutdownDelayDeadline int64
s * http . Server
}
2019-05-22 21:16:55 +00:00
// RequestHandler must serve the given request r and write response to w.
//
// RequestHandler must return true if the request has been served (successfully or not).
//
// RequestHandler must return false if it cannot serve the given request.
// In such cases the caller must serve the request.
type RequestHandler func ( w http . ResponseWriter , r * http . Request ) bool
// Serve starts an http server on the given addr with the given rh.
//
// By default all the responses are transparently compressed, since Google
// charges a lot for the egress traffic. The compression may be disabled
// by calling DisableResponseCompression before writing the first byte to w.
2019-05-24 09:18:40 +00:00
//
// The compression is also disabled if -http.disableResponseCompression flag is set.
2019-05-22 21:16:55 +00:00
func Serve ( addr string , rh RequestHandler ) {
2019-05-22 21:23:23 +00:00
logger . Infof ( "starting http server at http://%s/" , addr )
logger . Infof ( "pprof handlers are exposed at http://%s/debug/pprof/" , addr )
ln , err := netutil . NewTCPListener ( "http" , addr )
2019-05-22 21:16:55 +00:00
if err != nil {
2019-05-22 21:23:23 +00:00
logger . Panicf ( "FATAL: cannot start http server at %s: %s" , addr , err )
2019-05-22 21:16:55 +00:00
}
serveWithListener ( addr , ln , rh )
}
func serveWithListener ( addr string , ln net . Listener , rh RequestHandler ) {
2020-05-07 12:20:06 +00:00
var s server
s . s = & http . Server {
Handler : gzipHandler ( & s , rh ) ,
2019-05-22 21:16:55 +00:00
2020-05-07 11:10:40 +00:00
// Disable http/2, since it doesn't give any advantages for VictoriaMetrics services.
2019-05-22 21:16:55 +00:00
TLSNextProto : make ( map [ string ] func ( * http . Server , * tls . Conn , http . Handler ) ) ,
2020-05-07 11:10:40 +00:00
ReadHeaderTimeout : 5 * time . Second ,
IdleTimeout : time . Minute ,
2019-05-22 21:16:55 +00:00
2020-05-07 11:10:40 +00:00
// Do not set ReadTimeout and WriteTimeout here,
// since these timeouts must be controlled by request handlers.
2019-05-22 21:16:55 +00:00
ErrorLog : logger . StdErrorLogger ( ) ,
}
serversLock . Lock ( )
2020-05-07 12:20:06 +00:00
servers [ addr ] = & s
2019-05-22 21:16:55 +00:00
serversLock . Unlock ( )
2020-05-07 12:20:06 +00:00
if err := s . s . Serve ( ln ) ; err != nil {
2019-05-22 21:16:55 +00:00
if err == http . ErrServerClosed {
// The server gracefully closed.
return
}
logger . Panicf ( "FATAL: cannot serve http at %s: %s" , addr , err )
}
}
// Stop stops the http server on the given addr, which has been started
// via Serve func.
func Stop ( addr string ) error {
serversLock . Lock ( )
s := servers [ addr ]
delete ( servers , addr )
serversLock . Unlock ( )
if s == nil {
logger . Panicf ( "BUG: there is no http server at %q" , addr )
}
2020-05-07 12:20:06 +00:00
deadline := time . Now ( ) . Add ( * shutdownDelay ) . UnixNano ( )
atomic . StoreInt64 ( & s . shutdownDelayDeadline , deadline )
if * shutdownDelay > 0 {
// Sleep for a while until load balancer in front of the server
// notifies that "/health" endpoint returns non-OK responses.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/463 .
logger . Infof ( "Waiting for %.3fs before shutdown of http server %q, so load balancers could re-route requests to other servers" , shutdownDelay . Seconds ( ) , addr )
time . Sleep ( * shutdownDelay )
logger . Infof ( "Starting shutdown for http server %q" , addr )
}
ctx , cancel := context . WithTimeout ( context . Background ( ) , * maxGracefulShutdownDuration )
defer cancel ( )
if err := s . s . Shutdown ( ctx ) ; err != nil {
2020-04-29 22:36:56 +00:00
return fmt . Errorf ( "cannot gracefully shutdown http server at %q in %.3fs; " +
"probably, `-http.maxGracefulShutdownDuration` command-line flag value must be increased; error: %s" , addr , maxGracefulShutdownDuration . Seconds ( ) , err )
2019-05-22 21:16:55 +00:00
}
return nil
}
2020-05-07 12:20:06 +00:00
func gzipHandler ( s * server , rh RequestHandler ) http . HandlerFunc {
2020-04-01 15:14:17 +00:00
return func ( w http . ResponseWriter , r * http . Request ) {
2019-05-22 21:16:55 +00:00
w = maybeGzipResponseWriter ( w , r )
2020-05-07 12:20:06 +00:00
handlerWrapper ( s , w , r , rh )
2019-05-22 21:16:55 +00:00
if zrw , ok := w . ( * gzipResponseWriter ) ; ok {
if err := zrw . Close ( ) ; err != nil && ! isTrivialNetworkError ( err ) {
2020-04-15 17:50:12 +00:00
logger . Warnf ( "gzipResponseWriter.Close: %s" , err )
2019-05-22 21:16:55 +00:00
}
}
}
}
2019-11-22 22:19:44 +00:00
var metricsHandlerDuration = metrics . NewHistogram ( ` vm_http_request_duration_seconds { path="/metrics"} ` )
2019-05-22 21:16:55 +00:00
2020-05-07 12:20:06 +00:00
func handlerWrapper ( s * server , w http . ResponseWriter , r * http . Request , rh RequestHandler ) {
2019-05-22 21:16:55 +00:00
requestsTotal . Inc ( )
2020-05-02 10:07:30 +00:00
path , err := getCanonicalPath ( r . URL . Path )
if err != nil {
Errorf ( w , "cannot get canonical path: %s" , err )
unsupportedRequestErrors . Inc ( )
return
}
r . URL . Path = path
2019-05-22 21:16:55 +00:00
switch r . URL . Path {
case "/health" :
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
2020-05-07 12:20:06 +00:00
deadline := atomic . LoadInt64 ( & s . shutdownDelayDeadline )
if deadline <= 0 {
w . Write ( [ ] byte ( "OK" ) )
return
}
// Return non-OK response during grace period before shutting down the server.
// Load balancers must notify these responses and re-route new requests to other servers.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/463 .
d := time . Until ( time . Unix ( 0 , deadline ) )
if d < 0 {
d = 0
}
errMsg := fmt . Sprintf ( "The server is in delayed shutdown mode, which will end in %.3fs" , d . Seconds ( ) )
http . Error ( w , errMsg , http . StatusServiceUnavailable )
2019-05-22 21:16:55 +00:00
return
2019-12-04 17:15:49 +00:00
case "/ping" :
// This is needed for compatibility with Influx agents.
// See https://docs.influxdata.com/influxdb/v1.7/tools/api/#ping-http-endpoint
status := http . StatusNoContent
if verbose := r . FormValue ( "verbose" ) ; verbose == "true" {
status = http . StatusOK
}
w . WriteHeader ( status )
return
2019-12-18 21:08:22 +00:00
case "/favicon.ico" :
faviconRequests . Inc ( )
w . WriteHeader ( http . StatusNoContent )
return
2019-05-22 21:16:55 +00:00
case "/metrics" :
metricsRequests . Inc ( )
2019-12-18 21:08:22 +00:00
startTime := time . Now ( )
2019-05-22 21:16:55 +00:00
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
writePrometheusMetrics ( w )
metricsHandlerDuration . UpdateDuration ( startTime )
return
default :
if strings . HasPrefix ( r . URL . Path , "/debug/pprof/" ) {
pprofRequests . Inc ( )
DisableResponseCompression ( w )
pprofHandler ( r . URL . Path [ len ( "/debug/pprof/" ) : ] , w , r )
return
}
if rh ( w , r ) {
return
}
Errorf ( w , "unsupported path requested: %q" , r . URL . Path )
unsupportedRequestErrors . Inc ( )
return
}
}
2020-05-02 10:07:30 +00:00
func getCanonicalPath ( path string ) ( string , error ) {
if len ( * pathPrefix ) == 0 {
return path , nil
}
prefix := * pathPrefix
if ! strings . HasSuffix ( prefix , "/" ) {
prefix = prefix + "/"
}
if ! strings . HasPrefix ( path , prefix ) {
return "" , fmt . Errorf ( "missing `-pathPrefix=%q` in the requested path: %q" , * pathPrefix , path )
}
path = path [ len ( prefix ) - 1 : ]
return path , nil
}
2019-05-22 21:16:55 +00:00
func maybeGzipResponseWriter ( w http . ResponseWriter , r * http . Request ) http . ResponseWriter {
2019-05-24 09:18:40 +00:00
if * disableResponseCompression {
return w
}
2019-05-22 21:16:55 +00:00
ae := r . Header . Get ( "Accept-Encoding" )
if ae == "" {
return w
}
ae = strings . ToLower ( ae )
n := strings . Index ( ae , "gzip" )
if n < 0 {
2020-05-22 13:44:09 +00:00
// Do not apply gzip encoding to the response.
2019-05-22 21:16:55 +00:00
return w
}
2020-05-22 13:44:09 +00:00
// Apply gzip encoding to the response.
2019-05-22 21:16:55 +00:00
zw := getGzipWriter ( w )
bw := getBufioWriter ( zw )
zrw := & gzipResponseWriter {
ResponseWriter : w ,
zw : zw ,
bw : bw ,
}
return zrw
}
// DisableResponseCompression disables response compression on w.
//
// The function must be called before the first w.Write* call.
func DisableResponseCompression ( w http . ResponseWriter ) {
2020-05-22 13:44:09 +00:00
zrw , ok := w . ( * gzipResponseWriter )
if ! ok {
return
}
if zrw . firstWriteDone {
logger . Panicf ( "BUG: DisableResponseCompression must be called before sending the response" )
}
zrw . disableCompression = true
2019-05-22 21:16:55 +00:00
}
// EnableCORS enables https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS
// on the response.
func EnableCORS ( w http . ResponseWriter , _ * http . Request ) {
w . Header ( ) . Set ( "Access-Control-Allow-Origin" , "*" )
}
func getGzipWriter ( w io . Writer ) * gzip . Writer {
v := gzipWriterPool . Get ( )
if v == nil {
zw , err := gzip . NewWriterLevel ( w , 1 )
if err != nil {
logger . Panicf ( "BUG: cannot create gzip writer: %s" , err )
}
return zw
}
zw := v . ( * gzip . Writer )
zw . Reset ( w )
return zw
}
func putGzipWriter ( zw * gzip . Writer ) {
gzipWriterPool . Put ( zw )
}
var gzipWriterPool sync . Pool
type gzipResponseWriter struct {
http . ResponseWriter
zw * gzip . Writer
bw * bufio . Writer
statusCode int
firstWriteDone bool
disableCompression bool
}
func ( zrw * gzipResponseWriter ) Write ( p [ ] byte ) ( int , error ) {
if ! zrw . firstWriteDone {
h := zrw . Header ( )
2020-05-24 19:12:20 +00:00
if zrw . statusCode == http . StatusNoContent {
zrw . disableCompression = true
}
2020-05-22 13:44:09 +00:00
if h . Get ( "Content-Encoding" ) != "" {
2019-05-22 21:16:55 +00:00
zrw . disableCompression = true
2020-05-22 13:44:09 +00:00
}
if ! zrw . disableCompression {
h . Set ( "Content-Encoding" , "gzip" )
2020-05-24 19:12:20 +00:00
h . Del ( "Content-Length" )
2020-05-22 13:44:09 +00:00
if h . Get ( "Content-Type" ) == "" {
// Disable auto-detection of content-type, since it
// is incorrectly detected after the compression.
h . Set ( "Content-Type" , "text/html" )
}
2019-05-22 21:16:55 +00:00
}
2020-05-24 20:55:28 +00:00
zrw . writeHeader ( )
2019-05-22 21:16:55 +00:00
zrw . firstWriteDone = true
}
if zrw . disableCompression {
return zrw . ResponseWriter . Write ( p )
}
return zrw . bw . Write ( p )
}
func ( zrw * gzipResponseWriter ) WriteHeader ( statusCode int ) {
zrw . statusCode = statusCode
}
2020-05-24 20:55:28 +00:00
func ( zrw * gzipResponseWriter ) writeHeader ( ) {
if zrw . statusCode == 0 {
zrw . statusCode = http . StatusOK
}
zrw . ResponseWriter . WriteHeader ( zrw . statusCode )
}
2019-05-22 21:16:55 +00:00
// Implements http.Flusher
func ( zrw * gzipResponseWriter ) Flush ( ) {
2020-06-05 18:37:10 +00:00
if ! zrw . disableCompression {
if err := zrw . bw . Flush ( ) ; err != nil && ! isTrivialNetworkError ( err ) {
logger . Warnf ( "gzipResponseWriter.Flush (buffer): %s" , err )
}
if err := zrw . zw . Flush ( ) ; err != nil && ! isTrivialNetworkError ( err ) {
logger . Warnf ( "gzipResponseWriter.Flush (gzip): %s" , err )
}
2019-05-22 21:16:55 +00:00
}
if fw , ok := zrw . ResponseWriter . ( http . Flusher ) ; ok {
fw . Flush ( )
}
}
func ( zrw * gzipResponseWriter ) Close ( ) error {
if ! zrw . firstWriteDone {
2020-05-24 20:55:28 +00:00
zrw . writeHeader ( )
2019-05-22 21:16:55 +00:00
return nil
}
zrw . Flush ( )
2020-06-05 18:37:10 +00:00
var err error
if ! zrw . disableCompression {
err = zrw . zw . Close ( )
}
2019-05-22 21:16:55 +00:00
putGzipWriter ( zrw . zw )
zrw . zw = nil
putBufioWriter ( zrw . bw )
zrw . bw = nil
return err
}
func getBufioWriter ( w io . Writer ) * bufio . Writer {
v := bufioWriterPool . Get ( )
if v == nil {
return bufio . NewWriterSize ( w , 16 * 1024 )
}
bw := v . ( * bufio . Writer )
bw . Reset ( w )
return bw
}
func putBufioWriter ( bw * bufio . Writer ) {
bufioWriterPool . Put ( bw )
}
var bufioWriterPool sync . Pool
func pprofHandler ( profileName string , w http . ResponseWriter , r * http . Request ) {
// This switch has been stolen from init func at https://golang.org/src/net/http/pprof/pprof.go
switch profileName {
case "cmdline" :
pprofCmdlineRequests . Inc ( )
pprof . Cmdline ( w , r )
case "profile" :
pprofProfileRequests . Inc ( )
pprof . Profile ( w , r )
case "symbol" :
pprofSymbolRequests . Inc ( )
pprof . Symbol ( w , r )
case "trace" :
pprofTraceRequests . Inc ( )
pprof . Trace ( w , r )
case "mutex" :
pprofMutexRequests . Inc ( )
seconds , _ := strconv . Atoi ( r . FormValue ( "seconds" ) )
if seconds <= 0 {
seconds = 10
}
prev := runtime . SetMutexProfileFraction ( 10 )
time . Sleep ( time . Duration ( seconds ) * time . Second )
pprof . Index ( w , r )
runtime . SetMutexProfileFraction ( prev )
default :
pprofDefaultRequests . Inc ( )
pprof . Index ( w , r )
}
}
var (
metricsRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/metrics"} ` )
pprofRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/debug/pprof/"} ` )
pprofCmdlineRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/debug/pprof/cmdline"} ` )
pprofProfileRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/debug/pprof/profile"} ` )
pprofSymbolRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/debug/pprof/symbol"} ` )
pprofTraceRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/debug/pprof/trace"} ` )
pprofMutexRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/debug/pprof/mutex"} ` )
pprofDefaultRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/debug/pprof/default"} ` )
faviconRequests = metrics . NewCounter ( ` vm_http_requests_total { path="/favicon.ico"} ` )
unsupportedRequestErrors = metrics . NewCounter ( ` vm_http_request_errors_total { path="*", reason="unsupported"} ` )
requestsTotal = metrics . NewCounter ( ` vm_http_requests_all_total ` )
)
// Errorf writes formatted error message to w and to logger.
func Errorf ( w http . ResponseWriter , format string , args ... interface { } ) {
errStr := fmt . Sprintf ( format , args ... )
2020-04-15 17:50:12 +00:00
logger . WarnfSkipframes ( 1 , "%s" , errStr )
2019-08-23 06:46:45 +00:00
// Extract statusCode from args
statusCode := http . StatusBadRequest
2020-06-30 21:02:02 +00:00
var esc * ErrorWithStatusCode
2019-08-23 06:46:45 +00:00
for _ , arg := range args {
2020-06-30 21:02:02 +00:00
if err , ok := arg . ( error ) ; ok && errors . As ( err , & esc ) {
2019-08-23 06:46:45 +00:00
statusCode = esc . StatusCode
break
}
}
http . Error ( w , errStr , statusCode )
}
// ErrorWithStatusCode is error with HTTP status code.
//
// The given StatusCode is sent to client when the error is passed to Errorf.
type ErrorWithStatusCode struct {
Err error
StatusCode int
}
2020-06-30 21:53:43 +00:00
// Unwrap returns e.Err.
//
// This is used by standard errors package. See https://golang.org/pkg/errors
func ( e * ErrorWithStatusCode ) Unwrap ( ) error {
return e . Err
}
2019-08-23 06:46:45 +00:00
// Error implements error interface.
func ( e * ErrorWithStatusCode ) Error ( ) string {
return e . Err . Error ( )
2019-05-22 21:16:55 +00:00
}
func isTrivialNetworkError ( err error ) bool {
s := err . Error ( )
if strings . Contains ( s , "broken pipe" ) || strings . Contains ( s , "reset by peer" ) {
return true
}
return false
}