lib/netutil: init implimentation of proxy protocol (#3687)

* lib/netutil: init implimentation of proxy protocol
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2023-01-27 08:08:35 +01:00 committed by GitHub
parent 2c1419c687
commit 73256fe438
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 325 additions and 43 deletions

View file

@ -24,7 +24,9 @@ import (
) )
var ( var (
httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections") httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+ minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling") "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+ dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
@ -64,7 +66,7 @@ func main() {
vminsert.Init() vminsert.Init()
startSelfScraper() startSelfScraper()
go httpserver.Serve(*httpListenAddr, requestHandler) go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
logger.Infof("started VictoriaMetrics in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("started VictoriaMetrics in %.3f seconds", time.Since(startTime).Seconds())
sig := procutil.WaitForSigterm() sig := procutil.WaitForSigterm()

View file

@ -132,7 +132,7 @@ func setUp() {
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded) vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init() vmselect.Init()
vminsert.Init() vminsert.Init()
go httpserver.Serve(*httpListenAddr, requestHandler) go httpserver.Serve(*httpListenAddr, false, requestHandler)
readyStorageCheckFunc := func() bool { readyStorageCheckFunc := func() bool {
resp, err := http.Get(testHealthHTTPPath) resp, err := http.Get(testHealthHTTPPath)
if err != nil { if err != nil {

View file

@ -44,16 +44,29 @@ import (
var ( var (
httpListenAddr = flag.String("httpListenAddr", ":8429", "TCP address to listen for http connections. "+ httpListenAddr = flag.String("httpListenAddr", ":8429", "TCP address to listen for http connections. "+
"Set this flag to empty value in order to disable listening on any port. This mode may be useful for running multiple vmagent instances on the same server. "+ "Set this flag to empty value in order to disable listening on any port. This mode may be useful for running multiple vmagent instances on the same server. "+
"Note that /targets and /metrics pages aren't available if -httpListenAddr=''") "Note that /targets and /metrics pages aren't available if -httpListenAddr=''. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+ influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<vmagent>:8429/write") "This flag isn't needed when ingesting data over HTTP - just send it to http://<vmagent>:8429/write . "+
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") "See also -influxListenAddr.useProxyProtocol")
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty") "Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty") opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") "See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+ "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag") "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
) )
@ -104,26 +117,26 @@ func main() {
remotewrite.Init() remotewrite.Init()
common.StartUnmarshalWorkers() common.StartUnmarshalWorkers()
if len(*influxListenAddr) > 0 { if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error { influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false) return influx.InsertHandlerForReader(r, false)
}) })
} }
if len(*graphiteListenAddr) > 0 { if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
} }
if len(*opentsdbListenAddr) > 0 { if len(*opentsdbListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler() httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, httpInsertHandler) opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler)
} }
if len(*opentsdbHTTPListenAddr) > 0 { if len(*opentsdbHTTPListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler() httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, httpInsertHandler) opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, httpInsertHandler)
} }
promscrape.Init(remotewrite.Push) promscrape.Init(remotewrite.Push)
if len(*httpListenAddr) > 0 { if len(*httpListenAddr) > 0 {
go httpserver.Serve(*httpListenAddr, requestHandler) go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
} }
logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())

View file

@ -49,7 +49,9 @@ absolute path to all .tpl files in root.`)
configCheckInterval = flag.Duration("configCheckInterval", 0, "Interval for checking for changes in '-rule' or '-notifier.config' files. "+ configCheckInterval = flag.Duration("configCheckInterval", 0, "Interval for checking for changes in '-rule' or '-notifier.config' files. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes.") "By default the checking is disabled. Send SIGHUP signal in order to force config check for changes.")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules") evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules")
validateTemplates = flag.Bool("rule.validateTemplates", true, "Whether to validate annotation and label templates") validateTemplates = flag.Bool("rule.validateTemplates", true, "Whether to validate annotation and label templates")
@ -170,7 +172,7 @@ func main() {
go configReload(ctx, manager, groupsCfg, sighupCh) go configReload(ctx, manager, groupsCfg, sighupCh)
rh := &requestHandler{m: manager} rh := &requestHandler{m: manager}
go httpserver.Serve(*httpListenAddr, rh.handler) go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler)
sig := procutil.WaitForSigterm() sig := procutil.WaitForSigterm()
logger.Infof("service received signal %s", sig) logger.Infof("service received signal %s", sig)

View file

@ -22,7 +22,9 @@ import (
) )
var ( var (
httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections") httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host") maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host")
reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...") reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+ logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+
@ -41,7 +43,7 @@ func main() {
logger.Infof("starting vmauth at %q...", *httpListenAddr) logger.Infof("starting vmauth at %q...", *httpListenAddr)
startTime := time.Now() startTime := time.Now()
initAuthConfig() initAuthConfig()
go httpserver.Serve(*httpListenAddr, requestHandler) go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds())
sig := procutil.WaitForSigterm() sig := procutil.WaitForSigterm()

View file

@ -93,7 +93,7 @@ func main() {
logger.Fatalf("invalid -snapshotName=%q: %s", *snapshotName, err) logger.Fatalf("invalid -snapshotName=%q: %s", *snapshotName, err)
} }
go httpserver.Serve(*httpListenAddr, nil) go httpserver.Serve(*httpListenAddr, false, nil)
srcFS, err := newSrcFS() srcFS, err := newSrcFS()
if err != nil { if err != nil {

View file

@ -39,13 +39,25 @@ import (
) )
var ( var (
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty") graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+ "See also -graphiteListenAddr.useProxyProtocol")
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write") graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write . "+
"See also -influxListenAddr.useProxyProtocol")
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+ opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+ "Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty") "Usually :4242 must be set. Doesn't work if empty. "+
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty") "See also -opentsdbListenAddr.useProxyProtocol")
opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented") maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 16*1024, "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented") maxLabelValueLen = flag.Int("maxLabelValueLen", 16*1024, "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented")
@ -71,16 +83,16 @@ func Init() {
storage.SetMaxLabelValueLen(*maxLabelValueLen) storage.SetMaxLabelValueLen(*maxLabelValueLen)
common.StartUnmarshalWorkers() common.StartUnmarshalWorkers()
if len(*graphiteListenAddr) > 0 { if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler) graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
} }
if len(*influxListenAddr) > 0 { if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader) influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, influx.InsertHandlerForReader)
} }
if len(*opentsdbListenAddr) > 0 { if len(*opentsdbListenAddr) > 0 {
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, opentsdbhttp.InsertHandler) opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, opentsdbhttp.InsertHandler)
} }
if len(*opentsdbHTTPListenAddr) > 0 { if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler) opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler)
} }
promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) { promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
prompush.Push(wr) prompush.Push(wr)

View file

@ -38,7 +38,7 @@ func main() {
logger.Init() logger.Init()
pushmetrics.Init() pushmetrics.Init()
go httpserver.Serve(*httpListenAddr, nil) go httpserver.Serve(*httpListenAddr, false, nil)
srcFS, err := newSrcFS() srcFS, err := newSrcFS()
if err != nil { if err != nil {

View file

@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues.
* FEATURE: allow using VictoriaMetrics components behind proxies, which communicate with the backend via [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335). For example, [vmauth](https://docs.victoriametrics.com/vmauth.html) accepts proxy protocol connections when it starts with `-httpListenAddr.useProxyProtocol` command-line flag.
* FEATURE: add `-internStringMaxLen` command-line flag, which can be used for fine-tuning RAM vs CPU usage in certain workloads. For example, if the stored time series contain long labels, then it may be useful reducing the `-internStringMaxLen` in order to reduce memory usage at the cost of increased CPU usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692). * FEATURE: add `-internStringMaxLen` command-line flag, which can be used for fine-tuning RAM vs CPU usage in certain workloads. For example, if the stored time series contain long labels, then it may be useful reducing the `-internStringMaxLen` in order to reduce memory usage at the cost of increased CPU usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692).
* BUGFIX: fix a bug, which could prevent background merges for the previous partitions until restart if the storage didn't have enough disk space for final deduplication and down-sampling. * BUGFIX: fix a bug, which could prevent background merges for the previous partitions until restart if the storage didn't have enough disk space for final deduplication and down-sampling.

View file

@ -79,7 +79,10 @@ type RequestHandler func(w http.ResponseWriter, r *http.Request) bool
// by calling DisableResponseCompression before writing the first byte to w. // by calling DisableResponseCompression before writing the first byte to w.
// //
// The compression is also disabled if -http.disableResponseCompression flag is set. // The compression is also disabled if -http.disableResponseCompression flag is set.
func Serve(addr string, rh RequestHandler) { //
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
func Serve(addr string, useProxyProtocol bool, rh RequestHandler) {
if rh == nil { if rh == nil {
rh = func(w http.ResponseWriter, r *http.Request) bool { rh = func(w http.ResponseWriter, r *http.Request) bool {
return false return false
@ -103,7 +106,7 @@ func Serve(addr string, rh RequestHandler) {
} }
tlsConfig = tc tlsConfig = tc
} }
ln, err := netutil.NewTCPListener(scheme, addr, tlsConfig) ln, err := netutil.NewTCPListener(scheme, addr, useProxyProtocol, tlsConfig)
if err != nil { if err != nil {
logger.Fatalf("cannot start http server at %s: %s", addr, err) logger.Fatalf("cannot start http server at %s: %s", addr, err)
} }

View file

@ -37,10 +37,13 @@ type Server struct {
// //
// The incoming connections are processed with insertHandler. // The incoming connections are processed with insertHandler.
// //
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed. // MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP Graphite server at %q", addr) logger.Infof("starting TCP Graphite server at %q", addr)
lnTCP, err := netutil.NewTCPListener("graphite", addr, nil) lnTCP, err := netutil.NewTCPListener("graphite", addr, useProxyProtocol, nil)
if err != nil { if err != nil {
logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err) logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err)
} }

View file

@ -37,10 +37,13 @@ type Server struct {
// //
// The incoming connections are processed with insertHandler. // The incoming connections are processed with insertHandler.
// //
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed. // MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r io.Reader) error) *Server { func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP InfluxDB server at %q", addr) logger.Infof("starting TCP InfluxDB server at %q", addr)
lnTCP, err := netutil.NewTCPListener("influx", addr, nil) lnTCP, err := netutil.NewTCPListener("influx", addr, useProxyProtocol, nil)
if err != nil { if err != nil {
logger.Fatalf("cannot start TCP InfluxDB server at %q: %s", addr, err) logger.Fatalf("cannot start TCP InfluxDB server at %q: %s", addr, err)
} }

View file

@ -40,10 +40,13 @@ type Server struct {
// MustStart starts OpenTSDB collector on the given addr. // MustStart starts OpenTSDB collector on the given addr.
// //
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed. // MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server { func MustStart(addr string, useProxyProtocol bool, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server {
logger.Infof("starting TCP OpenTSDB collector at %q", addr) logger.Infof("starting TCP OpenTSDB collector at %q", addr)
lnTCP, err := netutil.NewTCPListener("opentsdb", addr, nil) lnTCP, err := netutil.NewTCPListener("opentsdb", addr, useProxyProtocol, nil)
if err != nil { if err != nil {
logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err) logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err)
} }

View file

@ -27,10 +27,13 @@ type Server struct {
// MustStart starts HTTP OpenTSDB server on the given addr. // MustStart starts HTTP OpenTSDB server on the given addr.
// //
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed. // MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r *http.Request) error) *Server { func MustStart(addr string, useProxyProtocol bool, insertHandler func(r *http.Request) error) *Server {
logger.Infof("starting HTTP OpenTSDB server at %q", addr) logger.Infof("starting HTTP OpenTSDB server at %q", addr)
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, nil) lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, useProxyProtocol, nil)
if err != nil { if err != nil {
logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err) logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err)
} }

View file

@ -0,0 +1,116 @@
package netutil
import (
"encoding/binary"
"fmt"
"io"
"net"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type proxyProtocolConn struct {
net.Conn
remoteAddr net.Addr
}
func newProxyProtocolConn(c net.Conn) (net.Conn, error) {
remoteAddr, err := readProxyProto(c)
if err != nil {
return nil, fmt.Errorf("proxy protocol error: %w", err)
}
if remoteAddr == nil {
remoteAddr = c.RemoteAddr()
}
return &proxyProtocolConn{
Conn: c,
remoteAddr: remoteAddr,
}, nil
}
func (ppc *proxyProtocolConn) RemoteAddr() net.Addr {
return ppc.remoteAddr
}
func readProxyProto(r io.Reader) (net.Addr, error) {
bb := bbPool.Get()
defer bbPool.Put(bb)
// Read the first 16 bytes of proxy protocol header:
// - bytes 0-11: v2Identifier
// - byte 12: version and command
// - byte 13: family and protocol
// - bytes 14-15: payload length
//
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, 16)
if _, err := io.ReadFull(r, bb.B); err != nil {
return nil, fmt.Errorf("cannot read proxy protocol header: %w", err)
}
ident := bb.B[:12]
if string(ident) != v2Identifier {
return nil, fmt.Errorf("unexpected proxy protocol header: %q; want %q", ident, v2Identifier)
}
version := bb.B[12] >> 4
command := bb.B[12] & 0x0f
family := bb.B[13] >> 4
proto := bb.B[13] & 0x0f
if version != 2 {
return nil, fmt.Errorf("unsupported proxy protocol version, only v2 protocol version is supported, got: %d", version)
}
if proto != 1 {
// Only TCP is supported (aka STREAM).
return nil, fmt.Errorf("the proxy protocol implementation doesn't support proto %d; expecting 1", proto)
}
// The length of the remainder of the header including any TLVs in network byte order
// 0, 1, 2
blockLen := int(binary.BigEndian.Uint16(bb.B[14:16]))
// in general RFC doesn't limit block length, but for sanity check lets limit it to 2kb
// in theory TLVs may occupy some space
if blockLen > 2048 {
return nil, fmt.Errorf("too big proxy protocol block length: %d; it mustn't exceed 2048 bytes", blockLen)
}
// Read the protocol block itself
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, blockLen)
if _, err := io.ReadFull(r, bb.B); err != nil {
return nil, fmt.Errorf("cannot read proxy protocol block with the lehgth %d bytes: %w", blockLen, err)
}
switch command {
case 0:
// Proxy LOCAL command. Ignore the protocol block. The real sender address should be used.
return nil, nil
case 1:
// Parse the protocol block according to the family.
switch family {
case 1:
// ipv4 (aka AF_INET)
if len(bb.B) < 12 {
return nil, fmt.Errorf("cannot ipv4 address from proxy protocol block with the length %d bytes; expected at least 12 bytes", len(bb.B))
}
remoteAddr := &net.TCPAddr{
IP: net.IPv4(bb.B[0], bb.B[1], bb.B[2], bb.B[3]),
Port: int(binary.BigEndian.Uint16(bb.B[8:10])),
}
return remoteAddr, nil
case 2:
// ipv6 (aka AF_INET6)
if len(bb.B) < 36 {
return nil, fmt.Errorf("cannot read ipv6 address from proxy protocol block with the length %d bytes; expected at least 36 bytes", len(bb.B))
}
remoteAddr := &net.TCPAddr{
IP: bb.B[0:16],
Port: int(binary.BigEndian.Uint16(bb.B[32:34])),
}
return remoteAddr, nil
default:
return nil, fmt.Errorf("the proxy protocol implementation doesn't support protocol family %d; supported values: 1, 2", family)
}
default:
return nil, fmt.Errorf("the proxy protocol implementation doesn't support command %d; suppoted values: 0, 1", command)
}
}
const v2Identifier = "\r\n\r\n\x00\r\nQUIT\n"
var bbPool bytesutil.ByteBufferPool

View file

@ -0,0 +1,107 @@
package netutil
import (
"bytes"
"io"
"net"
"reflect"
"testing"
)
func TestParseProxyProtocolSuccess(t *testing.T) {
f := func(body, wantTail []byte, wantAddr net.Addr) {
t.Helper()
r := bytes.NewBuffer(body)
gotAddr, err := readProxyProto(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(gotAddr, wantAddr) {
t.Fatalf("ip not match, got: %v, want: %v", gotAddr, wantAddr)
}
gotTail, err := io.ReadAll(r)
if err != nil {
t.Fatalf("cannot read tail: %s", err)
}
if !bytes.Equal(gotTail, wantTail) {
t.Fatalf("unexpected tail after parsing proxy protocol\ngot:\n%q\nwant:\n%q", gotTail, wantTail)
}
}
// LOCAL addr
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x20, 0x11, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil,
nil)
// ipv4
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data srcid,dstip,srcport,dstport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil,
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80})
// ipv4 with payload
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0,
// some payload
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0,
}, []byte{0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0},
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80})
// ipv6
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x24,
// src and dst ipv6
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
// ports
0, 80, 0, 0}, nil,
&net.TCPAddr{IP: net.ParseIP("::1"), Port: 80})
}
func TestParseProxyProtocolFail(t *testing.T) {
f := func(body []byte) {
t.Helper()
r := bytes.NewBuffer(body)
gotAddr, err := readProxyProto(r)
if err == nil {
t.Fatalf("expected error at input %v", body)
}
if gotAddr != nil {
t.Fatalf("expected ip to be nil, got: %v", gotAddr)
}
}
// too short protocol prefix
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A})
// broken protocol prefix
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21})
// invalid header
f([]byte{0x0D, 0x1A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C})
// invalid version
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x31, 0x11, 0x00, 0x0C})
// too long block
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0xff, 0x0C})
// missing bytes in address
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data srcid,dstip,srcport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80})
// too short address length
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x08,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0})
// unsupported family
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x31, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// unsupported command
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x22, 0x11, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// mimatch ipv6 and ipv4
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x0C,
// ip data srcid,dstip,srcport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// ipv4 udp isn't supported
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x12, 0x00, 0x0C,
// ip data srcid,dstip,srcport,dstport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// ipv6 udp isn't supported
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x22, 0x00, 0x24,
// src and dst ipv6
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
// ports
0, 80, 0, 0})
}

View file

@ -16,8 +16,11 @@ var enableTCP6 = flag.Bool("enableTCP6", false, "Whether to enable IPv6 for list
// NewTCPListener returns new TCP listener for the given addr and optional tlsConfig. // NewTCPListener returns new TCP listener for the given addr and optional tlsConfig.
// //
// name is used for metrics registered in ms. Each listener in the program must have distinct name. // name is used for metrics. Each listener in the program must have a distinct name.
func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, error) { //
// If useProxyProtocol is set to true, then the returned listener accepts TCP connections via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
func NewTCPListener(name, addr string, useProxyProtocol bool, tlsConfig *tls.Config) (*TCPListener, error) {
network := GetTCPNetwork() network := GetTCPNetwork()
ln, err := net.Listen(network, addr) ln, err := net.Listen(network, addr)
if err != nil { if err != nil {
@ -28,7 +31,8 @@ func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, err
} }
ms := metrics.GetDefaultSet() ms := metrics.GetDefaultSet()
tln := &TCPListener{ tln := &TCPListener{
Listener: ln, Listener: ln,
useProxyProtocol: useProxyProtocol,
accepts: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)), accepts: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)),
acceptErrors: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)), acceptErrors: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)),
@ -69,6 +73,8 @@ type TCPListener struct {
accepts *metrics.Counter accepts *metrics.Counter
acceptErrors *metrics.Counter acceptErrors *metrics.Counter
useProxyProtocol bool
connMetrics connMetrics
} }
@ -87,6 +93,12 @@ func (ln *TCPListener) Accept() (net.Conn, error) {
ln.acceptErrors.Inc() ln.acceptErrors.Inc()
return nil, err return nil, err
} }
if ln.useProxyProtocol {
conn, err = newProxyProtocolConn(conn)
if err != nil {
return nil, err
}
}
ln.conns.Inc() ln.conns.Inc()
sc := &statConn{ sc := &statConn{
Conn: conn, Conn: conn,