diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 5148ec1b0..49e2071d6 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/awsapi" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" @@ -119,7 +120,7 @@ func newHTTPClient(argIdx int, remoteWriteURL, sanitizedURL string, fq *persiste logger.Fatalf("cannot initialize AWS Config for -remoteWrite.url=%q: %s", remoteWriteURL, err) } tr := &http.Transport{ - DialContext: statDial, + DialContext: httputils.GetStatDialFunc("vmagent_remotewrite"), TLSHandshakeTimeout: tlsHandshakeTimeout.GetOptionalArg(argIdx), MaxConnsPerHost: 2 * concurrency, MaxIdleConnsPerHost: 2 * concurrency, diff --git a/app/vmagent/remotewrite/statconn.go b/app/vmagent/remotewrite/statconn.go deleted file mode 100644 index 924835496..000000000 --- a/app/vmagent/remotewrite/statconn.go +++ /dev/null @@ -1,73 +0,0 @@ -package remotewrite - -import ( - "context" - "net" - "sync/atomic" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" - "github.com/VictoriaMetrics/metrics" -) - -func statDial(ctx context.Context, _, addr string) (conn net.Conn, err error) { - network := netutil.GetTCPNetwork() - conn, err = netutil.DialMaybeSRV(ctx, network, addr) - dialsTotal.Inc() - if err != nil { - dialErrors.Inc() - return nil, err - } - conns.Inc() - sc := &statConn{ - Conn: conn, - } - return sc, nil -} - -var ( - dialsTotal = metrics.NewCounter(`vmagent_remotewrite_dials_total`) - dialErrors = metrics.NewCounter(`vmagent_remotewrite_dial_errors_total`) - conns = metrics.NewCounter(`vmagent_remotewrite_conns`) -) - -type statConn struct { - closed atomic.Int32 - net.Conn -} - -func (sc *statConn) Read(p []byte) (int, error) { - n, err := sc.Conn.Read(p) - connReadsTotal.Inc() - if err != nil { - connReadErrors.Inc() - } - connBytesRead.Add(n) - return n, err -} - -func (sc *statConn) Write(p []byte) (int, error) { - n, err := sc.Conn.Write(p) - connWritesTotal.Inc() - if err != nil { - connWriteErrors.Inc() - } - connBytesWritten.Add(n) - return n, err -} - -func (sc *statConn) Close() error { - err := sc.Conn.Close() - if sc.closed.Add(1) == 1 { - conns.Dec() - } - return err -} - -var ( - connReadsTotal = metrics.NewCounter(`vmagent_remotewrite_conn_reads_total`) - connWritesTotal = metrics.NewCounter(`vmagent_remotewrite_conn_writes_total`) - connReadErrors = metrics.NewCounter(`vmagent_remotewrite_conn_read_errors_total`) - connWriteErrors = metrics.NewCounter(`vmagent_remotewrite_conn_write_errors_total`) - connBytesRead = metrics.NewCounter(`vmagent_remotewrite_conn_bytes_read_total`) - connBytesWritten = metrics.NewCounter(`vmagent_remotewrite_conn_bytes_written_total`) -) diff --git a/app/vmalert/datasource/init.go b/app/vmalert/datasource/init.go index 140aff936..f6dd4eba5 100644 --- a/app/vmalert/datasource/init.go +++ b/app/vmalert/datasource/init.go @@ -15,8 +15,9 @@ import ( ) var ( - addr = flag.String("datasource.url", "", "Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect URL. Required parameter. "+ - "E.g. http://127.0.0.1:8428 . See also -remoteRead.disablePathAppend and -datasource.showURL") + addr = flag.String("datasource.url", "", "Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect endpoint. Required parameter. "+ + "Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. "+ + "See also -remoteRead.disablePathAppend and -datasource.showURL") appendTypePrefix = flag.Bool("datasource.appendTypePrefix", false, "Whether to add type prefix to -datasource.url based on the query type. Set to true if sending different query types to the vmselect URL.") showDatasourceURL = flag.Bool("datasource.showURL", false, "Whether to avoid stripping sensitive information such as auth headers or passwords from URLs in log messages or UI and exported metrics. "+ "It is hidden by default, since it can contain sensitive info such as auth key") @@ -98,6 +99,7 @@ func Init(extraParams url.Values) (QuerierBuilder, error) { if err != nil { return nil, fmt.Errorf("failed to create transport: %w", err) } + tr.DialContext = httputils.GetStatDialFunc("vmalert_datasource") tr.DisableKeepAlives = *disableKeepAlive tr.MaxIdleConnsPerHost = *maxIdleConnections if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost { diff --git a/app/vmalert/remoteread/init.go b/app/vmalert/remoteread/init.go index fc25ce5b9..fe6d29703 100644 --- a/app/vmalert/remoteread/init.go +++ b/app/vmalert/remoteread/init.go @@ -15,7 +15,8 @@ var ( addr = flag.String("remoteRead.url", "", "Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect."+ "Remote read is used to restore alerts state."+ "This configuration makes sense only if `vmalert` was configured with `remoteWrite.url` before and has been successfully persisted its state. "+ - "E.g. http://127.0.0.1:8428. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.") + "Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. "+ + "See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.") showRemoteReadURL = flag.Bool("remoteRead.showURL", false, "Whether to show -remoteRead.url in the exported metrics. "+ "It is hidden by default, since it can contain sensitive info such as auth key") @@ -65,6 +66,7 @@ func Init() (datasource.QuerierBuilder, error) { if err != nil { return nil, fmt.Errorf("failed to create transport: %w", err) } + tr.DialContext = httputils.GetStatDialFunc("vmalert_remoteread") endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams) if err != nil { diff --git a/app/vmalert/remotewrite/init.go b/app/vmalert/remotewrite/init.go index 82161f297..899918df4 100644 --- a/app/vmalert/remotewrite/init.go +++ b/app/vmalert/remotewrite/init.go @@ -13,7 +13,9 @@ import ( var ( addr = flag.String("remoteWrite.url", "", "Optional URL to VictoriaMetrics or vminsert where to persist alerts state "+ - "and recording rules results in form of timeseries. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, "+ + "and recording rules results in form of timeseries. "+ + "Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. "+ + "For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, "+ "then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.") showRemoteWriteURL = flag.Bool("remoteWrite.showURL", false, "Whether to show -remoteWrite.url in the exported metrics. "+ "It is hidden by default, since it can contain sensitive info such as auth key") @@ -69,6 +71,7 @@ func Init(ctx context.Context) (*Client, error) { if err != nil { return nil, fmt.Errorf("failed to create transport: %w", err) } + t.DialContext = httputils.GetStatDialFunc("vmalert_remotewrite") endpointParams, err := flagutil.ParseJSONMap(*oauth2EndpointParams) if err != nil { diff --git a/dashboards/vm/vmalert.json b/dashboards/vm/vmalert.json index aa38ad560..cc63b9e6a 100644 --- a/dashboards/vm/vmalert.json +++ b/dashboards/vm/vmalert.json @@ -81,7 +81,7 @@ } ] }, - "description": "Overview for VictoriaMetrics vmalert v1.96.0 or higher", + "description": "Overview for VictoriaMetrics vmalert v1.102.0 or higher", "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 1, @@ -3237,6 +3237,213 @@ ], "title": "Datapoints drop rate ($instance)", "type": "timeseries" + }, + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "description": "Shows current number of established connections to remote write endpoints.\n\n", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "links": [], + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 44 + }, + "id": 54, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "editorMode": "code", + "exemplar": true, + "expr": "sum(max_over_time(vmalert_remotewrite_conns{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job)", + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Connections ($instance)", + "type": "timeseries" + }, + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "description": "Shows the global rate for number of written bytes via remote write connections.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "links": [], + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 44 + }, + "id": 55, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "victoriametrics-datasource", + "uid": "$ds" + }, + "editorMode": "code", + "exemplar": true, + "expr": "sum(rate(vmalert_remotewrite_conn_bytes_written_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job) > 0", + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Bytes write rate ($instance)", + "type": "timeseries" } ], "title": "Remote write", diff --git a/dashboards/vmalert.json b/dashboards/vmalert.json index c7758e0c6..2e23aed0c 100644 --- a/dashboards/vmalert.json +++ b/dashboards/vmalert.json @@ -80,7 +80,7 @@ } ] }, - "description": "Overview for VictoriaMetrics vmalert v1.96.0 or higher", + "description": "Overview for VictoriaMetrics vmalert v1.102.0 or higher", "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 1, @@ -3236,6 +3236,213 @@ ], "title": "Datapoints drop rate ($instance)", "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "description": "Shows current number of established connections to remote write endpoints.\n\n", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "links": [], + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 44 + }, + "id": 54, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "editorMode": "code", + "exemplar": true, + "expr": "sum(max_over_time(vmalert_remotewrite_conns{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job)", + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Connections ($instance)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "description": "Shows the global rate for number of written bytes via remote write connections.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "links": [], + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 44 + }, + "id": 55, + "options": { + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "$ds" + }, + "editorMode": "code", + "exemplar": true, + "expr": "sum(rate(vmalert_remotewrite_conn_bytes_written_total{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by(job) > 0", + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Bytes write rate ($instance)", + "type": "timeseries" } ], "title": "Remote write", diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ed3910ca9..e7d1a40cd 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -47,6 +47,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): add [rate_sum](https://docs.victoriametrics.com/stream-aggregation/#rate_sum) and [rate_avg](https://docs.victoriametrics.com/stream-aggregation/#rate_avg) aggregation outputs. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): reduce CPU usage when evaluating high number of alerting and recording rules. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): speed up retrieving rules files from object storages by skipping unchanged objects during reloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6210). +* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): support reading [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) records in `-datasource.url`, `-remoteWrite.url` and `-remoteRead.url` command-line option. For example, `-remoteWrite.url=http://srv+victoria-metrics` automatically resolves the `victoria-metrics` DNS SRV to a list of hostnames with TCP ports and then sends data to one of the addresses. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6053). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix bug that prevents the first query trace from expanding on click event. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6186). The issue was introduced in [v1.100.0](https://docs.victoriametrics.com/changelog/#v11000) release. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix calendar display when `UTC+00:00` timezone is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6239). diff --git a/docs/vmalert.md b/docs/vmalert.md index e8c79b264..ad47aaf8d 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -1042,7 +1042,7 @@ The shortlist of configuration flags is the following: -datasource.tlsServerName string Optional TLS server name to use for connections to -datasource.url. By default, the server name from -datasource.url is used -datasource.url string - Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect URL. Required parameter. E.g. http://127.0.0.1:8428 . See also -remoteRead.disablePathAppend and -datasource.showURL + Datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect URL. Required parameter. Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. See also -remoteRead.disablePathAppend and -datasource.showURL -defaultTenant.graphite string Default tenant for Graphite alerting groups. See https://docs.victoriametrics.com/vmalert/#multitenancy .This flag is available only in Enterprise binaries. See https://docs.victoriametrics.com/enterprise/ -defaultTenant.prometheus string @@ -1306,7 +1306,7 @@ The shortlist of configuration flags is the following: -remoteRead.tlsServerName string Optional TLS server name to use for connections to -remoteRead.url. By default, the server name from -remoteRead.url is used -remoteRead.url vmalert - Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. E.g. http://127.0.0.1:8428. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'. + Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'. -remoteWrite.basicAuth.password string Optional basic auth password for -remoteWrite.url -remoteWrite.basicAuth.passwordFile string @@ -1360,7 +1360,7 @@ The shortlist of configuration flags is the following: -remoteWrite.tlsServerName string Optional TLS server name to use for connections to -remoteWrite.url. By default, the server name from -remoteWrite.url is used -remoteWrite.url string - Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'. + Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. Supports address in the form of IP address with a port (e.g., 127.0.0.1:8428) or DNS SRV record. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'. -replay.disableProgressBar Whether to disable rendering progress bars during the replay. Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode. -replay.maxDatapointsPerQuery /query_range diff --git a/lib/httputils/statconn.go b/lib/httputils/statconn.go new file mode 100644 index 000000000..a7494a87c --- /dev/null +++ b/lib/httputils/statconn.go @@ -0,0 +1,146 @@ +package httputils + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/metrics" +) + +var statConnMetricsRegistry sync.Map + +type statConnMetrics struct { + dialsTotal *metrics.Counter + dialErrors *metrics.Counter + conns *metrics.Counter + + connReadsTotal *metrics.Counter + connWritesTotal *metrics.Counter + connReadErrors *metrics.Counter + connWriteErrors *metrics.Counter + connBytesRead *metrics.Counter + connBytesWritten *metrics.Counter +} + +func newStatConnMetrics(metricPrefix string) statConnMetrics { + scm := statConnMetrics{} + + scm.dialsTotal = metrics.NewCounter(fmt.Sprintf(`%s_dials_total`, metricPrefix)) + scm.dialErrors = metrics.NewCounter(fmt.Sprintf(`%s_dial_errors_total`, metricPrefix)) + scm.conns = metrics.NewCounter(fmt.Sprintf(`%s_conns`, metricPrefix)) + + scm.connReadsTotal = metrics.NewCounter(fmt.Sprintf(`%s_conn_reads_total`, metricPrefix)) + scm.connWritesTotal = metrics.NewCounter(fmt.Sprintf(`%s_conn_writes_total`, metricPrefix)) + scm.connReadErrors = metrics.NewCounter(fmt.Sprintf(`%s_conn_read_errors_total`, metricPrefix)) + scm.connWriteErrors = metrics.NewCounter(fmt.Sprintf(`%s_conn_write_errors_total`, metricPrefix)) + scm.connBytesRead = metrics.NewCounter(fmt.Sprintf(`%s_conn_bytes_read_total`, metricPrefix)) + scm.connBytesWritten = metrics.NewCounter(fmt.Sprintf(`%s_conn_bytes_written_total`, metricPrefix)) + + return scm +} + +// GetStatDialFunc returns dial function that supports DNS SRV records, +// and register stats metrics for conns. +func GetStatDialFunc(metricPrefix string) func(ctx context.Context, network, addr string) (net.Conn, error) { + v, ok := statConnMetricsRegistry.Load(metricPrefix) + if !ok { + v = newStatConnMetrics(metricPrefix) + statConnMetricsRegistry.Store(metricPrefix, v) + } + sm := v.(statConnMetrics) + return func(ctx context.Context, _, addr string) (net.Conn, error) { + network := netutil.GetTCPNetwork() + conn, err := netutil.DialMaybeSRV(ctx, network, addr) + sm.dialsTotal.Inc() + if err != nil { + sm.dialErrors.Inc() + if !netutil.TCP6Enabled() && !isTCPv4Addr(addr) { + err = fmt.Errorf("%w; try -enableTCP6 command-line flag for dialing ipv6 addresses", err) + } + return nil, err + } + sm.conns.Inc() + sc := &statConn{ + Conn: conn, + statConnMetrics: sm, + } + return sc, nil + } +} + +type statConn struct { + closed atomic.Int32 + net.Conn + statConnMetrics +} + +func (sc *statConn) Read(p []byte) (int, error) { + n, err := sc.Conn.Read(p) + sc.connReadsTotal.Inc() + if err != nil { + sc.connReadErrors.Inc() + } + sc.connBytesRead.Add(n) + return n, err +} + +func (sc *statConn) Write(p []byte) (int, error) { + n, err := sc.Conn.Write(p) + sc.connWritesTotal.Inc() + if err != nil { + sc.connWriteErrors.Inc() + } + sc.connBytesWritten.Add(n) + return n, err +} + +func (sc *statConn) Close() error { + err := sc.Conn.Close() + if sc.closed.Add(1) == 1 { + sc.conns.Dec() + } + return err +} + +func isTCPv4Addr(addr string) bool { + s := addr + for i := 0; i < 3; i++ { + n := strings.IndexByte(s, '.') + if n < 0 { + return false + } + if !isUint8NumString(s[:n]) { + return false + } + s = s[n+1:] + } + n := strings.IndexByte(s, ':') + if n < 0 { + return false + } + if !isUint8NumString(s[:n]) { + return false + } + s = s[n+1:] + + // Verify TCP port + n, err := strconv.Atoi(s) + if err != nil { + return false + } + return n >= 0 && n < (1<<16) +} + +func isUint8NumString(s string) bool { + n, err := strconv.Atoi(s) + if err != nil { + return false + } + return n >= 0 && n < (1<<8) +} diff --git a/lib/promscrape/statconn_test.go b/lib/httputils/statconn_test.go similarity index 97% rename from lib/promscrape/statconn_test.go rename to lib/httputils/statconn_test.go index 37a2ecf3c..1978b09bf 100644 --- a/lib/promscrape/statconn_test.go +++ b/lib/httputils/statconn_test.go @@ -1,4 +1,4 @@ -package promscrape +package httputils import ( "testing" diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 31457911a..17094cae5 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" ) var ( @@ -71,7 +72,7 @@ func newClient(ctx context.Context, sw *ScrapeWork) (*client, error) { IdleConnTimeout: 2 * sw.ScrapeInterval, DisableCompression: *disableCompression || sw.DisableCompression, DisableKeepAlives: *disableKeepAlive || sw.DisableKeepAlive, - DialContext: statStdDial, + DialContext: httputils.GetStatDialFunc("vm_promscrape"), MaxIdleConnsPerHost: 100, MaxResponseHeaderBytes: int64(maxResponseHeadersSize.N), }), diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go deleted file mode 100644 index d637ffeca..000000000 --- a/lib/promscrape/statconn.go +++ /dev/null @@ -1,116 +0,0 @@ -package promscrape - -import ( - "context" - "fmt" - "net" - "strconv" - "strings" - "sync/atomic" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" - "github.com/VictoriaMetrics/metrics" -) - -func statStdDial(ctx context.Context, _, addr string) (net.Conn, error) { - network := netutil.GetTCPNetwork() - conn, err := netutil.DialMaybeSRV(ctx, network, addr) - dialsTotal.Inc() - if err != nil { - dialErrors.Inc() - if !netutil.TCP6Enabled() && !isTCPv4Addr(addr) { - err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err) - } - return nil, err - } - conns.Inc() - sc := &statConn{ - Conn: conn, - } - return sc, nil -} - -var ( - dialsTotal = metrics.NewCounter(`vm_promscrape_dials_total`) - dialErrors = metrics.NewCounter(`vm_promscrape_dial_errors_total`) - conns = metrics.NewCounter(`vm_promscrape_conns`) -) - -type statConn struct { - closed atomic.Int32 - net.Conn -} - -func (sc *statConn) Read(p []byte) (int, error) { - n, err := sc.Conn.Read(p) - connReadsTotal.Inc() - if err != nil { - connReadErrors.Inc() - } - connBytesRead.Add(n) - return n, err -} - -func (sc *statConn) Write(p []byte) (int, error) { - n, err := sc.Conn.Write(p) - connWritesTotal.Inc() - if err != nil { - connWriteErrors.Inc() - } - connBytesWritten.Add(n) - return n, err -} - -func (sc *statConn) Close() error { - err := sc.Conn.Close() - if sc.closed.Add(1) == 1 { - conns.Dec() - } - return err -} - -var ( - connReadsTotal = metrics.NewCounter(`vm_promscrape_conn_reads_total`) - connWritesTotal = metrics.NewCounter(`vm_promscrape_conn_writes_total`) - connReadErrors = metrics.NewCounter(`vm_promscrape_conn_read_errors_total`) - connWriteErrors = metrics.NewCounter(`vm_promscrape_conn_write_errors_total`) - connBytesRead = metrics.NewCounter(`vm_promscrape_conn_bytes_read_total`) - connBytesWritten = metrics.NewCounter(`vm_promscrape_conn_bytes_written_total`) -) - -func isTCPv4Addr(addr string) bool { - s := addr - for i := 0; i < 3; i++ { - n := strings.IndexByte(s, '.') - if n < 0 { - return false - } - if !isUint8NumString(s[:n]) { - return false - } - s = s[n+1:] - } - n := strings.IndexByte(s, ':') - if n < 0 { - return false - } - if !isUint8NumString(s[:n]) { - return false - } - s = s[n+1:] - - // Verify TCP port - n, err := strconv.Atoi(s) - if err != nil { - return false - } - return n >= 0 && n < (1<<16) -} - -func isUint8NumString(s string) bool { - n, err := strconv.Atoi(s) - if err != nil { - return false - } - return n >= 0 && n < (1<<8) -}