app/vmauth: add support for configuring backends via DNS SRV urls

This commit is contained in:
Aliaksandr Valialkin 2024-04-17 20:45:48 +02:00
parent 07ed958b82
commit b426d10847
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 119 additions and 32 deletions

View file

@ -7,7 +7,6 @@ import (
"flag" "flag"
"fmt" "fmt"
"math" "math"
"net"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -27,6 +26,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
) )
@ -302,7 +302,7 @@ func (up *URLPrefix) getBackendsCount() int {
// //
// backendURL.put() must be called on the returned backendURL after the request is complete. // backendURL.put() must be called on the returned backendURL after the request is complete.
func (up *URLPrefix) getBackendURL() *backendURL { func (up *URLPrefix) getBackendURL() *backendURL {
up.discoverBackendIPsIfNeeded() up.discoverBackendAddrsIfNeeded()
pbus := up.bus.Load() pbus := up.bus.Load()
bus := *pbus bus := *pbus
@ -312,7 +312,7 @@ func (up *URLPrefix) getBackendURL() *backendURL {
return getLeastLoadedBackendURL(bus, &up.n) return getLeastLoadedBackendURL(bus, &up.n)
} }
func (up *URLPrefix) discoverBackendIPsIfNeeded() { func (up *URLPrefix) discoverBackendAddrsIfNeeded() {
if !up.discoverBackendIPs { if !up.discoverBackendIPs {
// The discovery is disabled. // The discovery is disabled.
return return
@ -337,27 +337,42 @@ func (up *URLPrefix) discoverBackendIPsIfNeeded() {
// Discover ips for all the backendURLs // Discover ips for all the backendURLs
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(intervalSec)) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(intervalSec))
hostToIPs := make(map[string][]string) hostToAddrs := make(map[string][]string)
for _, bu := range up.busOriginal { for _, bu := range up.busOriginal {
host := bu.Hostname() host := bu.Hostname()
if hostToIPs[host] != nil { if hostToAddrs[host] != nil {
// ips for the given host have been already discovered // ips for the given host have been already discovered
continue continue
} }
addrs, err := resolver.LookupIPAddr(ctx, host) var resolvedAddrs []string
var ips []string if strings.HasPrefix(host, "srv+") {
// The host has the format 'srv+realhost'. Strip 'srv+' prefix before performing the lookup.
host = strings.TrimPrefix(host, "srv+")
_, addrs, err := netutil.Resolver.LookupSRV(ctx, "", "", host)
if err != nil {
logger.Warnf("cannot discover backend SRV records for %s: %s; use it literally", bu, err)
resolvedAddrs = []string{host}
} else {
resolvedAddrs := make([]string, len(addrs))
for i, addr := range addrs {
resolvedAddrs[i] = fmt.Sprintf("%s:%d", addr.Target, addr.Port)
}
}
} else {
addrs, err := netutil.Resolver.LookupIPAddr(ctx, host)
if err != nil { if err != nil {
logger.Warnf("cannot discover backend IPs for %s: %s; use it literally", bu, err) logger.Warnf("cannot discover backend IPs for %s: %s; use it literally", bu, err)
ips = []string{host} resolvedAddrs = []string{host}
} else { } else {
ips = make([]string, len(addrs)) resolvedAddrs = make([]string, len(addrs))
for i, addr := range addrs { for i, addr := range addrs {
ips[i] = addr.String() resolvedAddrs[i] = addr.String()
} }
// sort ips, so they could be compared below in areEqualBackendURLs()
sort.Strings(ips)
} }
hostToIPs[host] = ips }
// sort resolvedAddrs, so they could be compared below in areEqualBackendURLs()
sort.Strings(resolvedAddrs)
hostToAddrs[host] = resolvedAddrs
} }
cancel() cancel()
@ -366,10 +381,14 @@ func (up *URLPrefix) discoverBackendIPsIfNeeded() {
for _, bu := range up.busOriginal { for _, bu := range up.busOriginal {
host := bu.Hostname() host := bu.Hostname()
port := bu.Port() port := bu.Port()
for _, ip := range hostToIPs[host] { for _, addr := range hostToAddrs[host] {
buCopy := *bu buCopy := *bu
buCopy.Host = ip buCopy.Host = addr
if port != "" { if port != "" {
if n := strings.IndexByte(buCopy.Host, ':'); n >= 0 {
// Drop the discovered port and substitute it the the port specified in bu.
buCopy.Host = buCopy.Host[:n]
}
buCopy.Host += ":" + port buCopy.Host += ":" + port
} }
busNew = append(busNew, &backendURL{ busNew = append(busNew, &backendURL{
@ -400,11 +419,6 @@ func areEqualBackendURLs(a, b []*backendURL) bool {
return true return true
} }
var resolver = &net.Resolver{
PreferGo: true,
StrictErrors: true,
}
// getFirstAvailableBackendURL returns the first available backendURL, which isn't broken. // getFirstAvailableBackendURL returns the first available backendURL, which isn't broken.
// //
// backendURL.put() must be called on the returned backendURL after the request is complete. // backendURL.put() must be called on the returned backendURL after the request is complete.

View file

@ -316,43 +316,54 @@ users:
f(` f(`
users: users:
- auth_token: foo - auth_token: foo
url_prefix: http://aaa:343/bbb url_prefix: https://aaa:343/bbb
max_concurrent_requests: 5 max_concurrent_requests: 5
tls_insecure_skip_verify: true tls_insecure_skip_verify: true
tls_server_name: "foo.bar"
tls_ca_file: "foo/bar"
tls_cert_file: "foo/baz"
tls_key_file: "foo/foo"
`, map[string]*UserInfo{ `, map[string]*UserInfo{
getHTTPAuthToken("foo"): { getHTTPAuthToken("foo"): {
AuthToken: "foo", AuthToken: "foo",
URLPrefix: mustParseURL("http://aaa:343/bbb"), URLPrefix: mustParseURL("https://aaa:343/bbb"),
MaxConcurrentRequests: 5, MaxConcurrentRequests: 5,
TLSInsecureSkipVerify: &insecureSkipVerifyTrue, TLSInsecureSkipVerify: &insecureSkipVerifyTrue,
TLSServerName: "foo.bar",
TLSCAFile: "foo/bar",
TLSCertFile: "foo/baz",
TLSKeyFile: "foo/foo",
}, },
}) })
// Multiple url_prefix entries // Multiple url_prefix entries
insecureSkipVerifyFalse := false insecureSkipVerifyFalse := false
discoverBackendIPsTrue := true
f(` f(`
users: users:
- username: foo - username: foo
password: bar password: bar
url_prefix: url_prefix:
- http://node1:343/bbb - http://node1:343/bbb
- http://node2:343/bbb - http://srv+node2:343/bbb
tls_insecure_skip_verify: false tls_insecure_skip_verify: false
retry_status_codes: [500, 501] retry_status_codes: [500, 501]
load_balancing_policy: first_available load_balancing_policy: first_available
drop_src_path_prefix_parts: 1 drop_src_path_prefix_parts: 1
discover_backend_ips: true
`, map[string]*UserInfo{ `, map[string]*UserInfo{
getHTTPAuthBasicToken("foo", "bar"): { getHTTPAuthBasicToken("foo", "bar"): {
Username: "foo", Username: "foo",
Password: "bar", Password: "bar",
URLPrefix: mustParseURLs([]string{ URLPrefix: mustParseURLs([]string{
"http://node1:343/bbb", "http://node1:343/bbb",
"http://node2:343/bbb", "http://srv+node2:343/bbb",
}), }),
TLSInsecureSkipVerify: &insecureSkipVerifyFalse, TLSInsecureSkipVerify: &insecureSkipVerifyFalse,
RetryStatusCodes: []int{500, 501}, RetryStatusCodes: []int{500, 501},
LoadBalancingPolicy: "first_available", LoadBalancingPolicy: "first_available",
DropSrcPathPrefixParts: intp(1), DropSrcPathPrefixParts: intp(1),
DiscoverBackendIPs: &discoverBackendIPsTrue,
}, },
}) })

View file

@ -438,6 +438,8 @@ func newRoundTripper(caFileOpt, certFileOpt, keyFileOpt, serverNameOpt string, i
if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost { if tr.MaxIdleConns != 0 && tr.MaxIdleConns < tr.MaxIdleConnsPerHost {
tr.MaxIdleConns = tr.MaxIdleConnsPerHost tr.MaxIdleConns = tr.MaxIdleConnsPerHost
} }
tr.DialContext = netutil.DialMaybeSRV
rt := cfg.NewRoundTripper(tr) rt := cfg.NewRoundTripper(tr)
return rt, nil return rt, nil
} }

View file

@ -125,7 +125,7 @@ See also [authorization](#authorization) and [routing](#routing) docs.
If [vmagent](https://docs.victoriametrics.com/vmagent.html) is used for processing [data push requests](https://docs.victoriametrics.com/vmagent.html#how-to-push-data-to-vmagent), If [vmagent](https://docs.victoriametrics.com/vmagent.html) is used for processing [data push requests](https://docs.victoriametrics.com/vmagent.html#how-to-push-data-to-vmagent),
then it is possible to scale the performance of data processing at `vmagent` by spreading load among multiple identically configured `vmagent` instances. then it is possible to scale the performance of data processing at `vmagent` by spreading load among multiple identically configured `vmagent` instances.
This can be done with the following [config](#auth-config) for `vmagent`: This can be done with the following [config](#auth-config) for `vmauth`:
```yaml ```yaml
unauthorized_user: unauthorized_user:
@ -596,19 +596,41 @@ There are the following solutions for this issue:
This scheme works great, but it needs manual updating of the [`-auth.config`](#auth-config) every time `vmselect` services are restarted, This scheme works great, but it needs manual updating of the [`-auth.config`](#auth-config) every time `vmselect` services are restarted,
downscaled or upscaled. downscaled or upscaled.
- To set `discover_backend_ips: true` option, so `vmagent` automatically discovers IPs behind the given hostname and then spreads load among the discovered IPs: - To set `discover_backend_ips: true` option, so `vmauth` automatically discovers IPs behind the given hostname and then spreads load among the discovered IPs:
```yaml ```yaml
unauthorized_user: unauthorized_user:
url_prefix: http://vmselect-service/select/0/prometheus/ url_prefix: http://vmselect-service:8481/select/0/prometheus/
discover_backend_ips: true discover_backend_ips: true
``` ```
The `discover_backend_ips` can be specified at `user` and `url_map` level in the [`-auth.config`](#auth-config). It can also be enabled globally If the `url_prefix` contains hostname with `srv+` prefix, then the hostname without `srv+` prefix is automatically resolved via [DNS SRV](https://en.wikipedia.org/wiki/SRV_record)
to the list of hostnames with TCP ports, and `vmauth` balances load among the discovered TCP addresses:
```yaml
unauthorized_user:
url_prefix: "http://srv+vmselect/select/0/prometheus"
discover_backend_ips: true
```
This functionality is useful for balancing load among backend instances, which run on different TCP ports, since DNS SRV records contain TCP ports.
The `discover_backend_ips` option can be specified at `user` and `url_map` level in the [`-auth.config`](#auth-config). It can also be enabled globally
via `-discoverBackendIPs` command-line flag. via `-discoverBackendIPs` command-line flag.
See also [load balancing docs](#load-balancing). See also [load balancing docs](#load-balancing).
## SRV urls
If `url_prefix` contains url with the hostname starting with `srv+` prefix, then `vmauth` uses [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) lookup
for the hostname without the `srv+` prefix and selects random TCP address (e.g. hostname plus TCP port) form the resolved results.
For example, if `some-addr` [DNS SRV](https://en.wikipedia.org/wiki/SRV_record) record contains `some-host:12345` TCP address,
then `url_prefix: http://srv+some-addr/some/path` is automatically resolved into `url_prefix: http://some-host:12345/some/path`.
The DNS SRV resolution is performed every time new connection to the `url_prefix` backend is established.
See also [discovering backend addressess](#discovering-backend-ips).
## Modifying HTTP headers ## Modifying HTTP headers
`vmauth` supports the ability to set and remove HTTP request headers before sending the requests to backends. `vmauth` supports the ability to set and remove HTTP request headers before sending the requests to backends.

View file

@ -1,7 +1,12 @@
package netutil package netutil
import ( import (
"context"
"fmt"
"math/rand"
"net"
"strings" "strings"
"time"
) )
// IsTrivialNetworkError returns true if the err can be ignored during logging. // IsTrivialNetworkError returns true if the err can be ignored during logging.
@ -13,3 +18,36 @@ func IsTrivialNetworkError(err error) bool {
} }
return false return false
} }
// DialMaybeSRV dials the given addr.
//
// The addr may be either the usual TCP address or srv+host form, where host is SRV addr.
// If the addr has srv+host form, then the host is resolved with SRV into randomly chosen TCP address for the connection.
func DialMaybeSRV(ctx context.Context, network, addr string) (net.Conn, error) {
if strings.HasPrefix(addr, "srv+") {
addr = strings.TrimPrefix(addr, "srv+")
_, addrs, err := Resolver.LookupSRV(ctx, "", "", addr)
if err != nil {
return nil, fmt.Errorf("cannot resolve SRV addr %s: %w", addr, err)
}
if len(addrs) == 0 {
return nil, fmt.Errorf("missing SRV records for %s", addr)
}
n := rand.Intn(len(addrs))
addr = fmt.Sprintf("%s:%d", addrs[n].Target, addrs[n].Port)
}
return Dialer.DialContext(ctx, network, addr)
}
// Resolver is default DNS resolver.
var Resolver = &net.Resolver{
PreferGo: true,
StrictErrors: true,
}
// Dialer is default network dialer.
var Dialer = &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: TCP6Enabled(),
}