Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2023-01-26 23:54:43 -08:00
commit 3a21fde0f3
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
22 changed files with 356 additions and 56 deletions

View file

@ -24,7 +24,9 @@ import (
)
var (
httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections")
httpListenAddr = flag.String("httpListenAddr", ":8428", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
@ -75,7 +77,7 @@ func main() {
vminsert.Init()
startSelfScraper()
go httpserver.Serve(*httpListenAddr, requestHandler)
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
logger.Infof("started VictoriaMetrics in %.3f seconds", time.Since(startTime).Seconds())
sig := procutil.WaitForSigterm()

View file

@ -132,7 +132,7 @@ func setUp() {
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init()
vminsert.Init()
go httpserver.Serve(*httpListenAddr, requestHandler)
go httpserver.Serve(*httpListenAddr, false, requestHandler)
readyStorageCheckFunc := func() bool {
resp, err := http.Get(testHealthHTTPPath)
if err != nil {

View file

@ -763,7 +763,8 @@ scrape_configs:
## Cardinality limiter
By default `vmagent` doesn't limit the number of time series each scrape target can expose. The limit can be enforced in the following places:
By default `vmagent` doesn't limit the number of time series each scrape target can expose.
The limit can be enforced in the following places:
* Via `-promscrape.seriesLimitPerTarget` command-line option. This limit is applied individually
to all the scrape targets defined in the file pointed by `-promscrape.config`.
@ -774,10 +775,7 @@ By default `vmagent` doesn't limit the number of time series each scrape target
via [Kubernetes annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) for targets,
which may expose too high number of time series.
See also `sample_limit` option at [scrape_config section](https://docs.victoriametrics.com/sd_configs.html#scrape_configs).
Scraped metrics are dropped for time series exceeding the given limit.
Scraped metrics are dropped for time series exceeding the given limit on the time window of 24h.
`vmagent` creates the following additional per-target metrics for targets with non-zero series limit:
- `scrape_series_limit_samples_dropped` - the number of dropped samples during the scrape when the unique series limit is exceeded.
@ -791,6 +789,7 @@ These metrics allow building the following alerting rules:
- `scrape_series_current / scrape_series_limit > 0.9` - alerts when the number of series exposed by the target reaches 90% of the limit.
- `sum_over_time(scrape_series_limit_samples_dropped[1h]) > 0` - alerts when some samples are dropped because the series limit on a particular target is reached.
See also `sample_limit` option at [scrape_config section](https://docs.victoriametrics.com/sd_configs.html#scrape_configs).
By default `vmagent` doesn't limit the number of time series written to remote storage systems specified at `-remoteWrite.url`.
The limit can be enforced by setting the following command-line flags:

View file

@ -44,16 +44,29 @@ import (
var (
httpListenAddr = flag.String("httpListenAddr", ":8429", "TCP address to listen for http connections. "+
"Set this flag to empty value in order to disable listening on any port. This mode may be useful for running multiple vmagent instances on the same server. "+
"Note that /targets and /metrics pages aren't available if -httpListenAddr=''")
"Note that /targets and /metrics pages aren't available if -httpListenAddr=''. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<vmagent>:8429/write")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
"This flag isn't needed when ingesting data over HTTP - just send it to http://<vmagent>:8429/write . "+
"See also -influxListenAddr.useProxyProtocol")
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
"Usually :4242 must be set. Doesn't work if empty. See also -opentsdbListenAddr.useProxyProtocol")
opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
)
@ -104,26 +117,26 @@ func main() {
remotewrite.Init()
common.StartUnmarshalWorkers()
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, func(r io.Reader) error {
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, func(r io.Reader) error {
return influx.InsertHandlerForReader(r, false)
})
}
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
if len(*opentsdbListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, httpInsertHandler)
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, httpInsertHandler)
}
if len(*opentsdbHTTPListenAddr) > 0 {
httpInsertHandler := getOpenTSDBHTTPInsertHandler()
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, httpInsertHandler)
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, httpInsertHandler)
}
promscrape.Init(remotewrite.Push)
if len(*httpListenAddr) > 0 {
go httpserver.Serve(*httpListenAddr, requestHandler)
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
}
logger.Infof("started vmagent in %.3f seconds", time.Since(startTime).Seconds())

View file

@ -10,6 +10,7 @@ import (
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmagent/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
)
@ -53,4 +54,7 @@ func tearDown() {
common.StopUnmarshalWorkers()
srv.Close()
logger.ResetOutputForTest()
tmpDataDir := flag.Lookup("remoteWrite.tmpDataPath").Value.String()
fs.MustRemoveAll(tmpDataDir)
}

View file

@ -49,7 +49,9 @@ absolute path to all .tpl files in root.`)
configCheckInterval = flag.Duration("configCheckInterval", 0, "Interval for checking for changes in '-rule' or '-notifier.config' files. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes.")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules")
validateTemplates = flag.Bool("rule.validateTemplates", true, "Whether to validate annotation and label templates")
@ -170,7 +172,7 @@ func main() {
go configReload(ctx, manager, groupsCfg, sighupCh)
rh := &requestHandler{m: manager}
go httpserver.Serve(*httpListenAddr, rh.handler)
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, rh.handler)
sig := procutil.WaitForSigterm()
logger.Infof("service received signal %s", sig)

View file

@ -22,7 +22,9 @@ import (
)
var (
httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections")
httpListenAddr = flag.String("httpListenAddr", ":8427", "TCP address to listen for http connections. See also -httpListenAddr.useProxyProtocol")
useProxyProtocol = flag.Bool("httpListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -httpListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
maxIdleConnsPerBackend = flag.Int("maxIdleConnsPerBackend", 100, "The maximum number of idle connections vmauth can open per each backend host")
reloadAuthKey = flag.String("reloadAuthKey", "", "Auth key for /-/reload http endpoint. It must be passed as authKey=...")
logInvalidAuthTokens = flag.Bool("logInvalidAuthTokens", false, "Whether to log requests with invalid auth tokens. "+
@ -41,7 +43,7 @@ func main() {
logger.Infof("starting vmauth at %q...", *httpListenAddr)
startTime := time.Now()
initAuthConfig()
go httpserver.Serve(*httpListenAddr, requestHandler)
go httpserver.Serve(*httpListenAddr, *useProxyProtocol, requestHandler)
logger.Infof("started vmauth in %.3f seconds", time.Since(startTime).Seconds())
sig := procutil.WaitForSigterm()

View file

@ -93,7 +93,7 @@ func main() {
logger.Fatalf("invalid -snapshotName=%q: %s", *snapshotName, err)
}
go httpserver.Serve(*httpListenAddr, nil)
go httpserver.Serve(*httpListenAddr, false, nil)
srcFS, err := newSrcFS()
if err != nil {

View file

@ -39,13 +39,25 @@ import (
)
var (
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write")
graphiteListenAddr = flag.String("graphiteListenAddr", "", "TCP and UDP address to listen for Graphite plaintext data. Usually :2003 must be set. Doesn't work if empty. "+
"See also -graphiteListenAddr.useProxyProtocol")
graphiteUseProxyProtocol = flag.Bool("graphiteListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -graphiteListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
influxListenAddr = flag.String("influxListenAddr", "", "TCP and UDP address to listen for InfluxDB line protocol data. Usually :8089 must be set. Doesn't work if empty. "+
"This flag isn't needed when ingesting data over HTTP - just send it to http://<victoriametrics>:8428/write . "+
"See also -influxListenAddr.useProxyProtocol")
influxUseProxyProtocol = flag.Bool("influxListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -influxListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbListenAddr = flag.String("opentsdbListenAddr", "", "TCP and UDP address to listen for OpentTSDB metrics. "+
"Telnet put messages and HTTP /api/put messages are simultaneously served on TCP port. "+
"Usually :4242 must be set. Doesn't work if empty")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty")
"Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbListenAddr.useProxyProtocol")
opentsdbUseProxyProtocol = flag.Bool("opentsdbListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted at -opentsdbListenAddr . "+
"See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
opentsdbHTTPListenAddr = flag.String("opentsdbHTTPListenAddr", "", "TCP address to listen for OpentTSDB HTTP put requests. Usually :4242 must be set. Doesn't work if empty. "+
"See also -opentsdbHTTPListenAddr.useProxyProtocol")
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
maxLabelsPerTimeseries = flag.Int("maxLabelsPerTimeseries", 30, "The maximum number of labels accepted per time series. Superfluous labels are dropped. In this case the vm_metrics_with_dropped_labels_total metric at /metrics page is incremented")
maxLabelValueLen = flag.Int("maxLabelValueLen", 16*1024, "The maximum length of label values in the accepted time series. Longer label values are truncated. In this case the vm_too_long_label_values_total metric at /metrics page is incremented")
@ -71,16 +83,16 @@ func Init() {
storage.SetMaxLabelValueLen(*maxLabelValueLen)
common.StartUnmarshalWorkers()
if len(*graphiteListenAddr) > 0 {
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, graphite.InsertHandler)
graphiteServer = graphiteserver.MustStart(*graphiteListenAddr, *graphiteUseProxyProtocol, graphite.InsertHandler)
}
if len(*influxListenAddr) > 0 {
influxServer = influxserver.MustStart(*influxListenAddr, influx.InsertHandlerForReader)
influxServer = influxserver.MustStart(*influxListenAddr, *influxUseProxyProtocol, influx.InsertHandlerForReader)
}
if len(*opentsdbListenAddr) > 0 {
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, opentsdb.InsertHandler, opentsdbhttp.InsertHandler)
opentsdbServer = opentsdbserver.MustStart(*opentsdbListenAddr, *opentsdbUseProxyProtocol, opentsdb.InsertHandler, opentsdbhttp.InsertHandler)
}
if len(*opentsdbHTTPListenAddr) > 0 {
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, opentsdbhttp.InsertHandler)
opentsdbhttpServer = opentsdbhttpserver.MustStart(*opentsdbHTTPListenAddr, *opentsdbHTTPUseProxyProtocol, opentsdbhttp.InsertHandler)
}
promscrape.Init(func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
prompush.Push(wr)

View file

@ -38,7 +38,7 @@ func main() {
logger.Init()
pushmetrics.Init()
go httpserver.Serve(*httpListenAddr, nil)
go httpserver.Serve(*httpListenAddr, false, nil)
srcFS, err := newSrcFS()
if err != nil {

View file

@ -20,8 +20,10 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when sending stale markers for targets, which expose big number of metrics. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675) issues.
* FEATURE: allow using VictoriaMetrics components behind proxies, which communicate with the backend via [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3335). For example, [vmauth](https://docs.victoriametrics.com/vmauth.html) accepts proxy protocol connections when it starts with `-httpListenAddr.useProxyProtocol` command-line flag.
* FEATURE: add `-internStringMaxLen` command-line flag, which can be used for fine-tuning RAM vs CPU usage in certain workloads. For example, if the stored time series contain long labels, then it may be useful reducing the `-internStringMaxLen` in order to reduce memory usage at the cost of increased CPU usage. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692).
* BUGFIX: fix a bug, which could prevent background merges for the previous partitions until restart if the storage didn't have enough disk space for final deduplication and down-sampling.
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): propagate all the timeout-related errors from `vmstorage` to `vmselect` when `vmstorage`. Previously some timeout errors weren't returned from `vmselect` to `vmstorage`. Instead, `vmstorage` could log the error and close the connection to `vmselect`, so `vmselect` was logging cryptic errors such as `cannot execute funcName="..." on vmstorage "...": EOF`.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): update API version for [ec2_sd_configs](https://docs.victoriametrics.com/sd_configs.html#ec2_sd_configs) to fix [the issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3700) with missing `__meta_ec2_availability_zone_id` attribute.

View file

@ -1274,8 +1274,8 @@ scrape_configs:
# scrape_offset: <duration>
# series_limit is an optional limit on the number of unique time series
# a single target can expose during all the scrapes.
# By default there is no limit on the number of exposed series.
# a single target can expose during all the scrapes on the time window of 24h.
# By default, there is no limit on the number of exposed series.
# See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter .
# The series_limit can be set on a per-target basis by specifying `__series_limit__`
# label during target relabeling phase.
@ -1283,7 +1283,7 @@ scrape_configs:
# series_limit: ...
# no_stale_markers allows disabling staleness tracking.
# By default staleness tracking is enabled for all the discovered scrape targets.
# By default, staleness tracking is enabled for all the discovered scrape targets.
# See https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers
# no_stale_markers: <boolean>

View file

@ -767,7 +767,8 @@ scrape_configs:
## Cardinality limiter
By default `vmagent` doesn't limit the number of time series each scrape target can expose. The limit can be enforced in the following places:
By default `vmagent` doesn't limit the number of time series each scrape target can expose.
The limit can be enforced in the following places:
* Via `-promscrape.seriesLimitPerTarget` command-line option. This limit is applied individually
to all the scrape targets defined in the file pointed by `-promscrape.config`.
@ -778,10 +779,7 @@ By default `vmagent` doesn't limit the number of time series each scrape target
via [Kubernetes annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) for targets,
which may expose too high number of time series.
See also `sample_limit` option at [scrape_config section](https://docs.victoriametrics.com/sd_configs.html#scrape_configs).
Scraped metrics are dropped for time series exceeding the given limit.
Scraped metrics are dropped for time series exceeding the given limit on the time window of 24h.
`vmagent` creates the following additional per-target metrics for targets with non-zero series limit:
- `scrape_series_limit_samples_dropped` - the number of dropped samples during the scrape when the unique series limit is exceeded.
@ -795,6 +793,7 @@ These metrics allow building the following alerting rules:
- `scrape_series_current / scrape_series_limit > 0.9` - alerts when the number of series exposed by the target reaches 90% of the limit.
- `sum_over_time(scrape_series_limit_samples_dropped[1h]) > 0` - alerts when some samples are dropped because the series limit on a particular target is reached.
See also `sample_limit` option at [scrape_config section](https://docs.victoriametrics.com/sd_configs.html#scrape_configs).
By default `vmagent` doesn't limit the number of time series written to remote storage systems specified at `-remoteWrite.url`.
The limit can be enforced by setting the following command-line flags:

View file

@ -79,7 +79,10 @@ type RequestHandler func(w http.ResponseWriter, r *http.Request) bool
// by calling DisableResponseCompression before writing the first byte to w.
//
// The compression is also disabled if -http.disableResponseCompression flag is set.
func Serve(addr string, rh RequestHandler) {
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
func Serve(addr string, useProxyProtocol bool, rh RequestHandler) {
if rh == nil {
rh = func(w http.ResponseWriter, r *http.Request) bool {
return false
@ -103,7 +106,7 @@ func Serve(addr string, rh RequestHandler) {
}
tlsConfig = tc
}
ln, err := netutil.NewTCPListener(scheme, addr, tlsConfig)
ln, err := netutil.NewTCPListener(scheme, addr, useProxyProtocol, tlsConfig)
if err != nil {
logger.Fatalf("cannot start http server at %s: %s", addr, err)
}

View file

@ -37,10 +37,13 @@ type Server struct {
//
// The incoming connections are processed with insertHandler.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r io.Reader) error) *Server {
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP Graphite server at %q", addr)
lnTCP, err := netutil.NewTCPListener("graphite", addr, nil)
lnTCP, err := netutil.NewTCPListener("graphite", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP Graphite server at %q: %s", addr, err)
}

View file

@ -37,10 +37,13 @@ type Server struct {
//
// The incoming connections are processed with insertHandler.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r io.Reader) error) *Server {
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
logger.Infof("starting TCP InfluxDB server at %q", addr)
lnTCP, err := netutil.NewTCPListener("influx", addr, nil)
lnTCP, err := netutil.NewTCPListener("influx", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP InfluxDB server at %q: %s", addr, err)
}

View file

@ -40,10 +40,13 @@ type Server struct {
// MustStart starts OpenTSDB collector on the given addr.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server {
func MustStart(addr string, useProxyProtocol bool, telnetInsertHandler func(r io.Reader) error, httpInsertHandler func(req *http.Request) error) *Server {
logger.Infof("starting TCP OpenTSDB collector at %q", addr)
lnTCP, err := netutil.NewTCPListener("opentsdb", addr, nil)
lnTCP, err := netutil.NewTCPListener("opentsdb", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start TCP OpenTSDB collector at %q: %s", addr, err)
}

View file

@ -27,10 +27,13 @@ type Server struct {
// MustStart starts HTTP OpenTSDB server on the given addr.
//
// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
//
// MustStop must be called on the returned server when it is no longer needed.
func MustStart(addr string, insertHandler func(r *http.Request) error) *Server {
func MustStart(addr string, useProxyProtocol bool, insertHandler func(r *http.Request) error) *Server {
logger.Infof("starting HTTP OpenTSDB server at %q", addr)
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, nil)
lnTCP, err := netutil.NewTCPListener("opentsdbhttp", addr, useProxyProtocol, nil)
if err != nil {
logger.Fatalf("cannot start HTTP OpenTSDB collector at %q: %s", addr, err)
}

View file

@ -0,0 +1,129 @@
package netutil
import (
"encoding/binary"
"fmt"
"io"
"net"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type proxyProtocolConn struct {
net.Conn
remoteAddr net.Addr
}
func newProxyProtocolConn(c net.Conn) (net.Conn, error) {
// Limit the time needed for reading the proxy protocol header.
d := time.Now().Add(5 * time.Second)
if err := c.SetReadDeadline(d); err != nil {
return nil, fmt.Errorf("cannot set deadline for reading proxy protocol header: %s", err)
}
remoteAddr, err := readProxyProto(c)
if err != nil {
return nil, fmt.Errorf("proxy protocol error: %w", err)
}
if remoteAddr == nil {
remoteAddr = c.RemoteAddr()
}
// Reset the read deadline.
if err := c.SetReadDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("cannot reset deadline after reading proxy protocol header: %s", err)
}
return &proxyProtocolConn{
Conn: c,
remoteAddr: remoteAddr,
}, nil
}
func (ppc *proxyProtocolConn) RemoteAddr() net.Addr {
return ppc.remoteAddr
}
func readProxyProto(r io.Reader) (net.Addr, error) {
bb := bbPool.Get()
defer bbPool.Put(bb)
// Read the first 16 bytes of proxy protocol header:
// - bytes 0-11: v2Identifier
// - byte 12: version and command
// - byte 13: family and protocol
// - bytes 14-15: payload length
//
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, 16)
if _, err := io.ReadFull(r, bb.B); err != nil {
return nil, fmt.Errorf("cannot read proxy protocol header: %w", err)
}
ident := bb.B[:12]
if string(ident) != v2Identifier {
return nil, fmt.Errorf("unexpected proxy protocol header: %q; want %q", ident, v2Identifier)
}
version := bb.B[12] >> 4
command := bb.B[12] & 0x0f
family := bb.B[13] >> 4
proto := bb.B[13] & 0x0f
if version != 2 {
return nil, fmt.Errorf("unsupported proxy protocol version, only v2 protocol version is supported, got: %d", version)
}
if proto != 1 {
// Only TCP is supported (aka STREAM).
return nil, fmt.Errorf("the proxy protocol implementation doesn't support proto %d; expecting 1", proto)
}
// The length of the remainder of the header including any TLVs in network byte order
// 0, 1, 2
blockLen := int(binary.BigEndian.Uint16(bb.B[14:16]))
// in general RFC doesn't limit block length, but for sanity check lets limit it to 2kb
// in theory TLVs may occupy some space
if blockLen > 2048 {
return nil, fmt.Errorf("too big proxy protocol block length: %d; it mustn't exceed 2048 bytes", blockLen)
}
// Read the protocol block itself
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, blockLen)
if _, err := io.ReadFull(r, bb.B); err != nil {
return nil, fmt.Errorf("cannot read proxy protocol block with the lehgth %d bytes: %w", blockLen, err)
}
switch command {
case 0:
// Proxy LOCAL command. Ignore the protocol block. The real sender address should be used.
return nil, nil
case 1:
// Parse the protocol block according to the family.
switch family {
case 1:
// ipv4 (aka AF_INET)
if len(bb.B) < 12 {
return nil, fmt.Errorf("cannot ipv4 address from proxy protocol block with the length %d bytes; expected at least 12 bytes", len(bb.B))
}
remoteAddr := &net.TCPAddr{
IP: net.IPv4(bb.B[0], bb.B[1], bb.B[2], bb.B[3]),
Port: int(binary.BigEndian.Uint16(bb.B[8:10])),
}
return remoteAddr, nil
case 2:
// ipv6 (aka AF_INET6)
if len(bb.B) < 36 {
return nil, fmt.Errorf("cannot read ipv6 address from proxy protocol block with the length %d bytes; expected at least 36 bytes", len(bb.B))
}
remoteAddr := &net.TCPAddr{
IP: bb.B[0:16],
Port: int(binary.BigEndian.Uint16(bb.B[32:34])),
}
return remoteAddr, nil
default:
return nil, fmt.Errorf("the proxy protocol implementation doesn't support protocol family %d; supported values: 1, 2", family)
}
default:
return nil, fmt.Errorf("the proxy protocol implementation doesn't support command %d; suppoted values: 0, 1", command)
}
}
const v2Identifier = "\r\n\r\n\x00\r\nQUIT\n"
var bbPool bytesutil.ByteBufferPool

View file

@ -0,0 +1,107 @@
package netutil
import (
"bytes"
"io"
"net"
"reflect"
"testing"
)
func TestParseProxyProtocolSuccess(t *testing.T) {
f := func(body, wantTail []byte, wantAddr net.Addr) {
t.Helper()
r := bytes.NewBuffer(body)
gotAddr, err := readProxyProto(r)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(gotAddr, wantAddr) {
t.Fatalf("ip not match, got: %v, want: %v", gotAddr, wantAddr)
}
gotTail, err := io.ReadAll(r)
if err != nil {
t.Fatalf("cannot read tail: %s", err)
}
if !bytes.Equal(gotTail, wantTail) {
t.Fatalf("unexpected tail after parsing proxy protocol\ngot:\n%q\nwant:\n%q", gotTail, wantTail)
}
}
// LOCAL addr
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x20, 0x11, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil,
nil)
// ipv4
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data srcid,dstip,srcport,dstport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0}, nil,
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80})
// ipv4 with payload
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0,
// some payload
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0,
}, []byte{0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0},
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 80})
// ipv6
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x24,
// src and dst ipv6
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
// ports
0, 80, 0, 0}, nil,
&net.TCPAddr{IP: net.ParseIP("::1"), Port: 80})
}
func TestParseProxyProtocolFail(t *testing.T) {
f := func(body []byte) {
t.Helper()
r := bytes.NewBuffer(body)
gotAddr, err := readProxyProto(r)
if err == nil {
t.Fatalf("expected error at input %v", body)
}
if gotAddr != nil {
t.Fatalf("expected ip to be nil, got: %v", gotAddr)
}
}
// too short protocol prefix
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A})
// broken protocol prefix
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21})
// invalid header
f([]byte{0x0D, 0x1A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C})
// invalid version
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x31, 0x11, 0x00, 0x0C})
// too long block
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0xff, 0x0C})
// missing bytes in address
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x0C,
// ip data srcid,dstip,srcport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80})
// too short address length
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x11, 0x00, 0x08,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0})
// unsupported family
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x31, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// unsupported command
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x22, 0x11, 0x00, 0x0C,
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// mimatch ipv6 and ipv4
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x21, 0x00, 0x0C,
// ip data srcid,dstip,srcport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// ipv4 udp isn't supported
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x12, 0x00, 0x0C,
// ip data srcid,dstip,srcport,dstport
0x7F, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 80, 0, 0})
// ipv6 udp isn't supported
f([]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x21, 0x22, 0x00, 0x24,
// src and dst ipv6
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
// ports
0, 80, 0, 0})
}

View file

@ -16,8 +16,11 @@ var enableTCP6 = flag.Bool("enableTCP6", false, "Whether to enable IPv6 for list
// NewTCPListener returns new TCP listener for the given addr and optional tlsConfig.
//
// name is used for metrics registered in ms. Each listener in the program must have distinct name.
func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, error) {
// name is used for metrics. Each listener in the program must have a distinct name.
//
// If useProxyProtocol is set to true, then the returned listener accepts TCP connections via proxy protocol.
// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
func NewTCPListener(name, addr string, useProxyProtocol bool, tlsConfig *tls.Config) (*TCPListener, error) {
network := GetTCPNetwork()
ln, err := net.Listen(network, addr)
if err != nil {
@ -28,7 +31,8 @@ func NewTCPListener(name, addr string, tlsConfig *tls.Config) (*TCPListener, err
}
ms := metrics.GetDefaultSet()
tln := &TCPListener{
Listener: ln,
Listener: ln,
useProxyProtocol: useProxyProtocol,
accepts: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_accepts_total{name=%q, addr=%q}`, name, addr)),
acceptErrors: ms.NewCounter(fmt.Sprintf(`vm_tcplistener_errors_total{name=%q, addr=%q, type="accept"}`, name, addr)),
@ -69,6 +73,8 @@ type TCPListener struct {
accepts *metrics.Counter
acceptErrors *metrics.Counter
useProxyProtocol bool
connMetrics
}
@ -87,6 +93,12 @@ func (ln *TCPListener) Accept() (net.Conn, error) {
ln.acceptErrors.Inc()
return nil, err
}
if ln.useProxyProtocol {
conn, err = newProxyProtocolConn(conn)
if err != nil {
return nil, err
}
}
ln.conns.Inc()
sc := &statConn{
Conn: conn,

View file

@ -952,6 +952,7 @@ func (pt *partition) ForceMergeAllParts() error {
if newPartSize > maxOutBytes {
freeSpaceNeededBytes := newPartSize - maxOutBytes
forceMergeLogger.Warnf("cannot initiate force merge for the partition %s; additional space needed: %d bytes", pt.name, freeSpaceNeededBytes)
pt.releasePartsToMerge(pws)
return nil
}
@ -963,6 +964,7 @@ func (pt *partition) ForceMergeAllParts() error {
}
pws = pt.getAllPartsForMerge()
if len(pws) <= 1 {
pt.releasePartsToMerge(pws)
return nil
}
}