app/vmalert: support DNS SRV record in -remoteWrite.url (#6299)

part of https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053,
supports [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) address in
`-remoteWrite.url` command-line option.
This commit is contained in:
Hui Wang 2024-05-22 16:52:51 +08:00 committed by GitHub
parent 974b7783ee
commit d7b5062917
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 582 additions and 201 deletions

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
@ -119,7 +120,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste
logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err)
}
tr := &http.Transport{
DialContext: statDial,
DialContext: httputils.GetStatDialFunc("vmagent_remotewrite"),
TLSHandshakeTimeout: tlsHandshakeTimeout.GetOptionalArg(argIdx),
MaxConnsPerHost: 2 * concurrency,
MaxIdleConnsPerHost: 2 * concurrency,

View file

@ -1,73 +0,0 @@
package remotewrite
import (
"context"
"net"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
func statDial(ctx context.Context, _, addr string) (conn net.Conn, err error) {
network := netutil.GetTCPNetwork()
conn, err = netutil.DialMaybeSRV(ctx, network, addr)
dialsTotal.Inc()
if err != nil {
dialErrors.Inc()
return nil, err
}
conns.Inc()
sc := &statConn{
Conn: conn,
}
return sc, nil
}
var (
dialsTotal = metrics.NewCounter(`vmagent_remotewrite_dials_total`)
dialErrors = metrics.NewCounter(`vmagent_remotewrite_dial_errors_total`)
conns = metrics.NewCounter(`vmagent_remotewrite_conns`)
)
type statConn struct {
closed atomic.Int32
net.Conn
}
func (sc *statConn) Read(p []byte) (int, error) {
n, err := sc.Conn.Read(p)
connReadsTotal.Inc()
if err != nil {
connReadErrors.Inc()
}
connBytesRead.Add(n)
return n, err
}
func (sc *statConn) Write(p []byte) (int, error) {
n, err := sc.Conn.Write(p)
connWritesTotal.Inc()
if err != nil {
connWriteErrors.Inc()
}
connBytesWritten.Add(n)
return n, err
}
func (sc *statConn) Close() error {
err := sc.Conn.Close()
if sc.closed.Add(1) == 1 {
conns.Dec()
}
return err
}
var (
connReadsTotal = metrics.NewCounter(`vmagent_remotewrite_conn_reads_total`)
connWritesTotal = metrics.NewCounter(`vmagent_remotewrite_conn_writes_total`)
connReadErrors = metrics.NewCounter(`vmagent_remotewrite_conn_read_errors_total`)
connWriteErrors = metrics.NewCounter(`vmagent_remotewrite_conn_write_errors_total`)
connBytesRead = metrics.NewCounter(`vmagent_remotewrite_conn_bytes_read_total`)
connBytesWritten = metrics.NewCounter(`vmagent_remotewrite_conn_bytes_written_total`)
)

View file

@ -15,8 +15,9 @@ import (
)
var (
addr = flag.String("datasource.url", "", "Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect URL. Required parameter. "+
"E.g. http://127.0.0.1:8428 . See also -remoteRead.disablePathAppend and -datasource.showURL")
addr = flag.String("datasource.url", "", "Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect endpoint. Required parameter. "+
"Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. "+
"See also -remoteRead.disablePathAppend and -datasource.showURL")
appendTypePrefix = flag.Bool("datasource.appendTypePrefix", false, "Whether to add type prefix to -datasource.url based on the query type. Set to true if sending different query types to the vmselect URL.")
showDatasourceURL = flag.Bool("datasource.showURL", false, "Whether to avoid stripping sensitive information such as auth headers or passwords from URLs in log messages or UI and exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key")
@ -98,6 +99,7 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}
tr.DialContext = httputils.GetStatDialFunc("vmalert_datasource")
tr.DisableKeepAlives = *disableKeepAlive
tr.MaxIdleConnsPerHost = *maxIdleConnections
if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost {

View file

@ -15,7 +15,8 @@ var (
addr = flag.String("remoteRead.url", "", "Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect."+
"Remote read is used to restore alerts state."+
"This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state. "+
"E.g. http://127.0.0.1:8428. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.")
"Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. "+
"See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.")
showRemoteReadURL = flag.Bool("remoteRead.showURL", false, "Whether to show -remoteRead.url in the exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key")
@ -65,6 +66,7 @@ func Init() (datasource.QuerierBuilder, error) {
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}
tr.DialContext = httputils.GetStatDialFunc("vmalert_remoteread")
endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams)
if err != nil {

View file

@ -13,7 +13,9 @@ import (
var (
addr = flag.String("remoteWrite.url", "", "Optional URL to VictoriaMetrics or vminsert where to persist alerts state "+
"and recording rules results in form of timeseries. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, "+
"and recording rules results in form of timeseries. "+
"Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. "+
"For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, "+
"then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.")
showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+
"It is hidden by default, since it can contain sensitive info such as auth key")
@ -69,6 +71,7 @@ func Init(ctx context.Context) (*Client, error) {
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}
t.DialContext = httputils.GetStatDialFunc("vmalert_remotewrite")
endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams)
if err != nil {

View file

@ -81,7 +81,7 @@
}
]
},
"description": "Overview for VictoriaMetrics vmalert v1.96.0 or higher",
"description": "Overview for VictoriaMetrics vmalert v1.102.0 or higher",
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 1,
@ -3237,6 +3237,213 @@
],
"title": "Datapoints drop rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"description": "Shows current number of established connections to remote write endpoints.\n\n",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 44
},
"id": 54,
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"targets": [
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "sum(max_over_time(vmalert_remotewrite_conns{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job)",
"interval": "",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Connections ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"description": "Shows the global rate for number of written bytes via remote write connections.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "decbytes"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 44
},
"id": 55,
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"targets": [
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "sum(rate(vmalert_remotewrite_conn_bytes_written_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job) > 0",
"interval": "",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Bytes write rate ($instance)",
"type": "timeseries"
}
],
"title": "Remote write",

View file

@ -80,7 +80,7 @@
}
]
},
"description": "Overview for VictoriaMetrics vmalert v1.96.0 or higher",
"description": "Overview for VictoriaMetrics vmalert v1.102.0 or higher",
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 1,
@ -3236,6 +3236,213 @@
],
"title": "Datapoints drop rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"description": "Shows current number of established connections to remote write endpoints.\n\n",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
},
{
"color": "red",
"value": 80
}
]
},
"unit": "short"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 44
},
"id": 54,
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "sum(max_over_time(vmalert_remotewrite_conns{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job)",
"interval": "",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Connections ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"description": "Shows the global rate for number of written bytes via remote write connections.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "never",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"min": 0,
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "decbytes"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 44
},
"id": 55,
"options": {
"legend": {
"calcs": [
"mean",
"lastNotNull",
"max"
],
"displayMode": "table",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "multi",
"sort": "desc"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"exemplar": true,
"expr": "sum(rate(vmalert_remotewrite_conn_bytes_written_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job) > 0",
"interval": "",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Bytes write rate ($instance)",
"type": "timeseries"
}
],
"title": "Remote write",

View file

@ -47,6 +47,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): speed up retrieving rules files from object storages by skipping unchanged objects during reloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6210).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): support reading [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) records in `-datasource.url`, `-remoteWrite.url` and `-remoteRead.url` command-line option. For example, `-remoteWrite.url=http://srv+victoria-metrics` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends data to one of the addresses. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix calendar display when `UTC+00:00` timezone is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6239).

View file

@ -1042,7 +1042,7 @@ The shortlist of configuration flags is the following:
-datasource.tlsServerName string
Optional TLS server name to use for connections to -datasource.url. By default, the server name from -datasource.url is used
-datasource.url string
Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect URL. Required parameter. E.g. http://127.0.0.1:8428 . See also -remoteRead.disablePathAppend and -datasource.showURL
Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect URL. Required parameter. Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. See also -remoteRead.disablePathAppend and -datasource.showURL
-defaultTenant.graphite string
Default tenant for Graphite alerting groups. See https://docs.victoriametrics.com/vmalert/#multitenancy .This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/
-defaultTenant.prometheus string
@ -1306,7 +1306,7 @@ The shortlist of configuration flags is the following:
-remoteRead.tlsServerName string
Optional TLS server name to use for connections to -remoteRead.url. By default, the server name from -remoteRead.url is used
-remoteRead.url vmalert
Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. E.g. http://127.0.0.1:8428. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.
Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.
-remoteWrite.basicAuth.password string
Optional basic auth password for -remoteWrite.url
-remoteWrite.basicAuth.passwordFile string
@ -1360,7 +1360,7 @@ The shortlist of configuration flags is the following:
-remoteWrite.tlsServerName string
Optional TLS server name to use for connections to -remoteWrite.url. By default, the server name from -remoteWrite.url is used
-remoteWrite.url string
Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.
Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.
-replay.disableProgressBar
Whether to disable rendering progress bars during the replay. Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.
-replay.maxDatapointsPerQuery /query_range

146
lib/httputils/statconn.go Normal file
View file

@ -0,0 +1,146 @@
package httputils
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
var statConnMetricsRegistry sync.Map
type statConnMetrics struct {
dialsTotal *metrics.Counter
dialErrors *metrics.Counter
conns *metrics.Counter
connReadsTotal *metrics.Counter
connWritesTotal *metrics.Counter
connReadErrors *metrics.Counter
connWriteErrors *metrics.Counter
connBytesRead *metrics.Counter
connBytesWritten *metrics.Counter
}
func newStatConnMetrics(metricPrefix string) statConnMetrics {
scm := statConnMetrics{}
scm.dialsTotal = metrics.NewCounter(fmt.Sprintf(`%s_dials_total`, metricPrefix))
scm.dialErrors = metrics.NewCounter(fmt.Sprintf(`%s_dial_errors_total`, metricPrefix))
scm.conns = metrics.NewCounter(fmt.Sprintf(`%s_conns`, metricPrefix))
scm.connReadsTotal = metrics.NewCounter(fmt.Sprintf(`%s_conn_reads_total`, metricPrefix))
scm.connWritesTotal = metrics.NewCounter(fmt.Sprintf(`%s_conn_writes_total`, metricPrefix))
scm.connReadErrors = metrics.NewCounter(fmt.Sprintf(`%s_conn_read_errors_total`, metricPrefix))
scm.connWriteErrors = metrics.NewCounter(fmt.Sprintf(`%s_conn_write_errors_total`, metricPrefix))
scm.connBytesRead = metrics.NewCounter(fmt.Sprintf(`%s_conn_bytes_read_total`, metricPrefix))
scm.connBytesWritten = metrics.NewCounter(fmt.Sprintf(`%s_conn_bytes_written_total`, metricPrefix))
return scm
}
// GetStatDialFunc returns dial function that supports DNS SRV records,
// and register stats metrics for conns.
func GetStatDialFunc(metricPrefix string) func(ctx context.Context, network, addr string) (net.Conn, error) {
v, ok := statConnMetricsRegistry.Load(metricPrefix)
if !ok {
v = newStatConnMetrics(metricPrefix)
statConnMetricsRegistry.Store(metricPrefix, v)
}
sm := v.(statConnMetrics)
return func(ctx context.Context, _, addr string) (net.Conn, error) {
network := netutil.GetTCPNetwork()
conn, err := netutil.DialMaybeSRV(ctx, network, addr)
sm.dialsTotal.Inc()
if err != nil {
sm.dialErrors.Inc()
if !netutil.TCP6Enabled() && !isTCPv4Addr(addr) {
err = fmt.Errorf("%w; try -enableTCP6 command-line flag for dialing ipv6 addresses", err)
}
return nil, err
}
sm.conns.Inc()
sc := &statConn{
Conn: conn,
statConnMetrics: sm,
}
return sc, nil
}
}
type statConn struct {
closed atomic.Int32
net.Conn
statConnMetrics
}
func (sc *statConn) Read(p []byte) (int, error) {
n, err := sc.Conn.Read(p)
sc.connReadsTotal.Inc()
if err != nil {
sc.connReadErrors.Inc()
}
sc.connBytesRead.Add(n)
return n, err
}
func (sc *statConn) Write(p []byte) (int, error) {
n, err := sc.Conn.Write(p)
sc.connWritesTotal.Inc()
if err != nil {
sc.connWriteErrors.Inc()
}
sc.connBytesWritten.Add(n)
return n, err
}
func (sc *statConn) Close() error {
err := sc.Conn.Close()
if sc.closed.Add(1) == 1 {
sc.conns.Dec()
}
return err
}
func isTCPv4Addr(addr string) bool {
s := addr
for i := 0; i < 3; i++ {
n := strings.IndexByte(s, '.')
if n < 0 {
return false
}
if !isUint8NumString(s[:n]) {
return false
}
s = s[n+1:]
}
n := strings.IndexByte(s, ':')
if n < 0 {
return false
}
if !isUint8NumString(s[:n]) {
return false
}
s = s[n+1:]
// Verify TCP port
n, err := strconv.Atoi(s)
if err != nil {
return false
}
return n >= 0 && n < (1<<16)
}
func isUint8NumString(s string) bool {
n, err := strconv.Atoi(s)
if err != nil {
return false
}
return n >= 0 && n < (1<<8)
}

View file

@ -1,4 +1,4 @@
package promscrape
package httputils
import (
"testing"

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
)
var (
@ -71,7 +72,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) {
IdleConnTimeout: 2 * sw.ScrapeInterval,
DisableCompression: *disableCompression || sw.DisableCompression,
DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive,
DialContext: statStdDial,
DialContext: httputils.GetStatDialFunc("vm_promscrape"),
MaxIdleConnsPerHost: 100,
MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N),
}),

View file

@ -1,116 +0,0 @@
package promscrape
import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/metrics"
)
func statStdDial(ctx context.Context, _, addr string) (net.Conn, error) {
network := netutil.GetTCPNetwork()
conn, err := netutil.DialMaybeSRV(ctx, network, addr)
dialsTotal.Inc()
if err != nil {
dialErrors.Inc()
if !netutil.TCP6Enabled() && !isTCPv4Addr(addr) {
err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err)
}
return nil, err
}
conns.Inc()
sc := &statConn{
Conn: conn,
}
return sc, nil
}
var (
dialsTotal = metrics.NewCounter(`vm_promscrape_dials_total`)
dialErrors = metrics.NewCounter(`vm_promscrape_dial_errors_total`)
conns = metrics.NewCounter(`vm_promscrape_conns`)
)
type statConn struct {
closed atomic.Int32
net.Conn
}
func (sc *statConn) Read(p []byte) (int, error) {
n, err := sc.Conn.Read(p)
connReadsTotal.Inc()
if err != nil {
connReadErrors.Inc()
}
connBytesRead.Add(n)
return n, err
}
func (sc *statConn) Write(p []byte) (int, error) {
n, err := sc.Conn.Write(p)
connWritesTotal.Inc()
if err != nil {
connWriteErrors.Inc()
}
connBytesWritten.Add(n)
return n, err
}
func (sc *statConn) Close() error {
err := sc.Conn.Close()
if sc.closed.Add(1) == 1 {
conns.Dec()
}
return err
}
var (
connReadsTotal = metrics.NewCounter(`vm_promscrape_conn_reads_total`)
connWritesTotal = metrics.NewCounter(`vm_promscrape_conn_writes_total`)
connReadErrors = metrics.NewCounter(`vm_promscrape_conn_read_errors_total`)
connWriteErrors = metrics.NewCounter(`vm_promscrape_conn_write_errors_total`)
connBytesRead = metrics.NewCounter(`vm_promscrape_conn_bytes_read_total`)
connBytesWritten = metrics.NewCounter(`vm_promscrape_conn_bytes_written_total`)
)
func isTCPv4Addr(addr string) bool {
s := addr
for i := 0; i < 3; i++ {
n := strings.IndexByte(s, '.')
if n < 0 {
return false
}
if !isUint8NumString(s[:n]) {
return false
}
s = s[n+1:]
}
n := strings.IndexByte(s, ':')
if n < 0 {
return false
}
if !isUint8NumString(s[:n]) {
return false
}
s = s[n+1:]
// Verify TCP port
n, err := strconv.Atoi(s)
if err != nil {
return false
}
return n >= 0 && n < (1<<16)
}
func isUint8NumString(s string) bool {
n, err := strconv.Atoi(s)
if err != nil {
return false
}
return n >= 0 && n < (1<<8)
}