Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-10-05 18:14:51 +03:00
commit 40d2f6fee4
20 changed files with 1456 additions and 22 deletions

View file

@ -294,6 +294,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)
* [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config)
In the future other `*_sd_config` types will be supported.

View file

@ -148,6 +148,9 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details.
* `dns_sd_configs` - for scraping targets discovered from DNS records (SRV, A and AAAA).
See [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config) for details.
* `openstack_sd_configs` - for scraping OpenStack targets.
See [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config) for details.
[OpenStack identity API v3](https://docs.openstack.org/api-ref/identity/v3/) is supported only.
File feature requests at [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.

View file

@ -294,6 +294,7 @@ Currently the following [scrape_config](https://prometheus.io/docs/prometheus/la
* [gce_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#gce_sd_config)
* [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config)
* [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config)
* [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config)
In the future other `*_sd_config` types will be supported.

View file

@ -148,6 +148,9 @@ The following scrape types in [scrape_config](https://prometheus.io/docs/prometh
See [consul_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config) for details.
* `dns_sd_configs` - for scraping targets discovered from DNS records (SRV, A and AAAA).
See [dns_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#dns_sd_config) for details.
* `openstack_sd_configs` - for scraping OpenStack targets.
See [openstack_sd_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config) for details.
[OpenStack identity API v3](https://docs.openstack.org/api-ref/identity/v3/) is supported only.
File feature requests at [our issue tracker](https://github.com/VictoriaMetrics/VictoriaMetrics/issues) if you need other service discovery mechanisms to be supported by `vmagent`.

View file

@ -36,6 +36,9 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) {
}
upExp -= downExp
for i, v := range a {
if v == vInfPos || v == vInfNeg {
continue
}
adjExp := upExp
for adjExp > 0 {
v *= 10
@ -45,6 +48,9 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) {
}
if downExp > 0 {
for i, v := range b {
if v == vInfPos || v == vInfNeg {
continue
}
adjExp := downExp
for adjExp > 0 {
v /= 10
@ -188,9 +194,6 @@ func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {
downExp := int16(0)
_ = ea[len(va)-1]
for i, v := range va {
if v == vInfPos || v == vInfNeg {
continue
}
exp := ea[i]
upExp := exp - minExp
maxUpExp := maxUpExponent(v)
@ -242,7 +245,7 @@ var vaeBufPool sync.Pool
const int64Max = int64(1<<63 - 1)
func maxUpExponent(v int64) int16 {
if v == 0 {
if v == 0 || v == vInfPos || v == vInfNeg {
// Any exponent allowed.
return 1024
}

View file

@ -131,14 +131,22 @@ func TestCalibrateScale(t *testing.T) {
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{500, 100}, 0, 0, []int64{vInfPos, 1200}, []int64{500, 100}, 0)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{500, 100}, 0, 2, []int64{vInfPos, 1200}, []int64{500e2, 100e2}, 0)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{500, 100}, 0, -2, []int64{vInfPos, 1200}, []int64{5, 1}, 0)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{3500, 100}, 0, -3, []int64{vInfPos, 1200}, []int64{3, 0}, 0)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{35, 1}, 0, 40, []int64{0, 0}, []int64{35e17, 1e17}, 23)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{35, 1}, 40, 0, []int64{vInfPos, 1200}, []int64{0, 0}, 40)
testCalibrateScale(t, []int64{vInfNeg, 1200}, []int64{35, 1}, 35, -5, []int64{vInfNeg, 1200}, []int64{0, 0}, 35)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{500, 100}, 0, -2, []int64{vInfPos, 12e4}, []int64{500, 100}, -2)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{3500, 100}, 0, -3, []int64{vInfPos, 12e5}, []int64{3500, 100}, -3)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{35, 1}, 0, 40, []int64{vInfPos, 0}, []int64{35e17, 1e17}, 23)
testCalibrateScale(t, []int64{vInfPos, 1200}, []int64{35, 1}, 40, 0, []int64{vInfPos, 12e17}, []int64{0, 0}, 25)
testCalibrateScale(t, []int64{vInfNeg, 1200}, []int64{35, 1}, 35, -5, []int64{vInfNeg, 12e17}, []int64{0, 0}, 20)
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 3, []int64{vMax, vMin, 123}, []int64{100e3}, 0)
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 3, 0, []int64{vMax, vMin, 123}, []int64{0}, 3)
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 30, []int64{92233, -92233, 0}, []int64{100e16}, 14)
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/805
testCalibrateScale(t, []int64{123}, []int64{vInfPos}, 0, 0, []int64{123}, []int64{vInfPos}, 0)
testCalibrateScale(t, []int64{123, vInfPos}, []int64{vInfNeg}, 0, 0, []int64{123, vInfPos}, []int64{vInfNeg}, 0)
testCalibrateScale(t, []int64{123, vInfPos, vInfNeg}, []int64{456}, 0, 0, []int64{123, vInfPos, vInfNeg}, []int64{456}, 0)
testCalibrateScale(t, []int64{123, vInfPos, vInfNeg, 456}, []int64{}, 0, 0, []int64{123, vInfPos, vInfNeg, 456}, []int64{}, 0)
testCalibrateScale(t, []int64{123, vInfPos}, []int64{vInfNeg, 456}, 0, 0, []int64{123, vInfPos}, []int64{vInfNeg, 456}, 0)
testCalibrateScale(t, []int64{123, vInfPos}, []int64{vInfNeg, 456}, 0, 10, []int64{123, vInfPos}, []int64{vInfNeg, 456e10}, 0)
}
func testCalibrateScale(t *testing.T, a, b []int64, ae, be int16, aExpected, bExpected []int64, eExpected int16) {
@ -175,7 +183,7 @@ func testCalibrateScale(t *testing.T, a, b []int64, ae, be int16, aExpected, bEx
bCopy = append([]int64{}, b...)
e = CalibrateScale(bCopy, be, aCopy, ae)
if e != eExpected {
t.Fatalf("revers: unexpected e for a=%d, b=%d, ae=%d, be=%d; got %d; expecting %d", a, b, ae, be, e, eExpected)
t.Fatalf("reverse: unexpected e for a=%d, b=%d, ae=%d, be=%d; got %d; expecting %d", a, b, ae, be, e, eExpected)
}
if !reflect.DeepEqual(aCopy, aExpected) {
t.Fatalf("reverse: unexpected a for b=%d, ae=%d, be=%d; got\n%d; expecting\n%d", b, ae, be, aCopy, aExpected)
@ -188,23 +196,17 @@ func testCalibrateScale(t *testing.T, a, b []int64, ae, be int16, aExpected, bEx
func TestMaxUpExponent(t *testing.T) {
f := func(v int64, eExpected int16) {
t.Helper()
e := maxUpExponent(v)
if e != eExpected {
t.Fatalf("unexpected e for v=%d; got %d; expecting %d", v, e, eExpected)
}
e = maxUpExponent(-v)
if e != eExpected {
t.Fatalf("unexpected e for v=%d; got %d; expecting %d", -v, e, eExpected)
}
}
f(vInfPos, 0)
f(vInfNeg, 0)
f(vInfPos, 1024)
f(vInfNeg, 1024)
f(vMin, 0)
f(vMax, 0)
f(0, 1024)
f(-1<<63, 0)
f((-1<<63)+1, 0)
f((1<<63)-1, 0)
f(1, 18)
f(12, 17)
f(123, 16)
@ -242,6 +244,44 @@ func TestMaxUpExponent(t *testing.T) {
f(923, 15)
f(92, 17)
f(9, 18)
f(-1, 18)
f(-12, 17)
f(-123, 16)
f(-1234, 15)
f(-12345, 14)
f(-123456, 13)
f(-1234567, 12)
f(-12345678, 11)
f(-123456789, 10)
f(-1234567890, 9)
f(-12345678901, 8)
f(-123456789012, 7)
f(-1234567890123, 6)
f(-12345678901234, 5)
f(-123456789012345, 4)
f(-1234567890123456, 3)
f(-12345678901234567, 2)
f(-123456789012345678, 1)
f(-1234567890123456789, 0)
f(-923456789012345678, 0)
f(-92345678901234567, 1)
f(-9234567890123456, 2)
f(-923456789012345, 3)
f(-92345678901234, 4)
f(-9234567890123, 5)
f(-923456789012, 6)
f(-92345678901, 7)
f(-9234567890, 8)
f(-923456789, 9)
f(-92345678, 10)
f(-9234567, 11)
f(-923456, 12)
f(-92345, 13)
f(-9234, 14)
f(-923, 15)
f(-92, 17)
f(-9, 18)
}
func TestAppendFloatToDecimal(t *testing.T) {

View file

@ -206,8 +206,9 @@ func applyRelabelConfig(labels []prompbmarshal.Label, labelsOffset int, prc *Par
}
value := relabelBufPool.Get()
value.B = prc.Regex.ExpandString(value.B[:0], prc.Replacement, label.Name, match)
label.Name = string(value.B)
labelName := string(value.B)
relabelBufPool.Put(value)
labels = setLabelValue(labels, labelsOffset, labelName, label.Value)
}
return labels
case "labelmap_all":

View file

@ -575,6 +575,10 @@ func TestApplyRelabelConfigs(t *testing.T) {
Name: "foo",
Value: "yyy",
},
{
Name: "foobar",
Value: "aaa",
},
})
})
t.Run("labelmap_all", func(t *testing.T) {

View file

@ -21,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
"gopkg.in/yaml.v2"
)
@ -69,6 +70,7 @@ type ScrapeConfig struct {
StaticConfigs []StaticConfig `yaml:"static_configs"`
FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"`
KubernetesSDConfigs []kubernetes.SDConfig `yaml:"kubernetes_sd_configs"`
OpenStackSDConfigs []openstack.SDConfig `yaml:"openstack_sd_configs"`
ConsulSDConfigs []consul.SDConfig `yaml:"consul_sd_configs"`
DNSSDConfigs []dns.SDConfig `yaml:"dns_sd_configs"`
EC2SDConfigs []ec2.SDConfig `yaml:"ec2_sd_configs"`
@ -201,6 +203,34 @@ func (cfg *Config) getKubernetesSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
return dst
}
// getOpenStackSDScrapeWork returns `openstack_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getOpenStackSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
var dst []ScrapeWork
for i := range cfg.ScrapeConfigs {
sc := &cfg.ScrapeConfigs[i]
dstLen := len(dst)
ok := true
for j := range sc.OpenStackSDConfigs {
sdc := &sc.OpenStackSDConfigs[j]
var okLocal bool
dst, okLocal = appendOpenstackScrapeWork(dst, sdc, cfg.baseDir, sc.swc)
if ok {
ok = okLocal
}
}
if ok {
continue
}
swsPrev := swsPrevByJob[sc.swc.jobName]
if len(swsPrev) > 0 {
logger.Errorf("there were errors when discovering openstack targets for job %q, so preserving the previous targets", sc.swc.jobName)
dst = append(dst[:dstLen], swsPrev...)
}
}
return dst
}
// getConsulSDScrapeWork returns `consul_sd_configs` ScrapeWork from cfg.
func (cfg *Config) getConsulSDScrapeWork(prev []ScrapeWork) []ScrapeWork {
swsPrevByJob := getSWSByJob(prev)
@ -444,6 +474,15 @@ func appendKubernetesScrapeWork(dst []ScrapeWork, sdc *kubernetes.SDConfig, base
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "kubernetes_sd_config"), true
}
func appendOpenstackScrapeWork(dst []ScrapeWork, sdc *openstack.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := openstack.GetLabels(sdc, baseDir)
if err != nil {
logger.Errorf("error when discovering openstack targets for `job_name` %q: %s; skipping it", swc.jobName, err)
return dst, false
}
return appendScrapeWorkForTargetLabels(dst, swc, targetLabels, "openstack_sd_config"), true
}
func appendConsulScrapeWork(dst []ScrapeWork, sdc *consul.SDConfig, baseDir string, swc *scrapeWorkConfig) ([]ScrapeWork, bool) {
targetLabels, err := consul.GetLabels(sdc, baseDir)
if err != nil {

View file

@ -920,6 +920,10 @@ scrape_configs:
Name: "prefix:url",
Value: "http://foo.bar:1234/metrics",
},
{
Name: "url",
Value: "http://foo.bar:1234/metrics",
},
},
AuthConfig: &promauth.Config{},
jobNameOriginal: "foo",

View file

@ -0,0 +1,184 @@
package openstack
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
var configMap = discoveryutils.NewConfigMap()
// apiCredentials can be refreshed
type apiCredentials struct {
computeURL *url.URL
token string
expiration time.Time
}
type apiConfig struct {
client *http.Client
port int
// tokenLock guards creds refresh
tokenLock sync.Mutex
creds *apiCredentials
// authTokenReq contins request body for apiCredentials
authTokenReq []byte
// keystone endpoint
endpoint *url.URL
allTenants bool
region string
// availability public, internal, admin for filtering compute endpoint
availability string
}
func (cfg *apiConfig) getFreshAPICredentials() (*apiCredentials, error) {
cfg.tokenLock.Lock()
defer cfg.tokenLock.Unlock()
if cfg.creds != nil && time.Until(cfg.creds.expiration) > 10*time.Second {
// Credentials aren't expired yet.
return cfg.creds, nil
}
newCreds, err := getCreds(cfg)
if err != nil {
return nil, fmt.Errorf("cannot refresh OpenStack api token: %w", err)
}
cfg.creds = newCreds
logger.Infof("successfully refreshed OpenStack api token; next expiration: %s", newCreds.expiration)
return newCreds, nil
}
func getAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
v, err := configMap.Get(sdc, func() (interface{}, error) { return newAPIConfig(sdc, baseDir) })
if err != nil {
return nil, err
}
return v.(*apiConfig), nil
}
func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) {
cfg := &apiConfig{
client: &http.Client{},
availability: sdc.Availability,
region: sdc.Region,
allTenants: sdc.AllTenants,
port: sdc.Port,
}
if sdc.TLSConfig != nil {
config, err := promauth.NewConfig(baseDir, nil, "", "", sdc.TLSConfig)
if err != nil {
return nil, err
}
cfg.client.Transport = &http.Transport{
TLSClientConfig: config.NewTLSConfig(),
}
}
// use public compute endpoint by default
if len(cfg.availability) == 0 {
cfg.availability = "public"
}
// create new variable to prevent side effects
sdcAuth := *sdc
// special case if identity_endpoint is not defined
if len(sdcAuth.IdentityEndpoint) == 0 {
// override sdc
sdcAuth = readCredentialsFromEnv()
}
parsedURL, err := url.Parse(sdcAuth.IdentityEndpoint)
if err != nil {
return nil, fmt.Errorf("cannot parse identity_endpoint: %s as url, err: %w", sdcAuth.IdentityEndpoint, err)
}
cfg.endpoint = parsedURL
tokenReq, err := buildAuthRequestBody(&sdcAuth)
if err != nil {
return nil, err
}
cfg.authTokenReq = tokenReq
// cfg.creds is populated at getFreshAPICredentials
return cfg, nil
}
// getCreds makes a call to openstack keystone api and retrieves token and computeURL
//
// See https://docs.openstack.org/api-ref/identity/v3/
func getCreds(cfg *apiConfig) (*apiCredentials, error) {
apiURL := *cfg.endpoint
apiURL.Path = path.Join(apiURL.Path, "auth", "tokens")
resp, err := cfg.client.Post(apiURL.String(), "application/json", bytes.NewBuffer(cfg.authTokenReq))
if err != nil {
return nil, fmt.Errorf("failed query openstack identity api, url: %s, err: %w", apiURL.String(), err)
}
r, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("cannot read response from %q: %w", apiURL.String(), err)
}
if resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("auth failed, bad status code: %d, want: 201", resp.StatusCode)
}
at := resp.Header.Get("X-Subject-Token")
if len(at) == 0 {
return nil, fmt.Errorf("auth failed, response without X-Subject-Token")
}
var ar authResponse
if err := json.Unmarshal(r, &ar); err != nil {
return nil, fmt.Errorf("cannot parse auth credentials response: %w", err)
}
computeURL, err := getComputeEndpointURL(ar.Token.Catalog, cfg.availability, cfg.region)
if err != nil {
return nil, fmt.Errorf("cannot get computeEndpoint, account doesn't have enough permissions, "+
"availability: %s, region: %s; error: %w", cfg.availability, cfg.region, err)
}
return &apiCredentials{
token: at,
expiration: ar.Token.ExpiresAt,
computeURL: computeURL,
}, nil
}
// readResponseBody reads body from http.Response.
func readResponseBody(resp *http.Response, apiURL string) ([]byte, error) {
data, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("cannot read response from %q: %w", apiURL, err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code for %q; got %d; want %d; response body: %q",
apiURL, resp.StatusCode, http.StatusOK, data)
}
return data, nil
}
// getAPIResponse calls openstack apiURL and returns response body.
func getAPIResponse(apiURL string, cfg *apiConfig) ([]byte, error) {
creds, err := cfg.getFreshAPICredentials()
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", apiURL, nil)
if err != nil {
return nil, fmt.Errorf("cannot create new request for openstack api url %s: %w", apiURL, err)
}
req.Header.Set("X-Auth-Token", creds.token)
resp, err := cfg.client.Do(req)
if err != nil {
return nil, fmt.Errorf("cannot query openstack api url %s: %w", apiURL, err)
}
return readResponseBody(resp, apiURL)
}

View file

@ -0,0 +1,322 @@
package openstack
import (
"encoding/json"
"fmt"
"net/url"
"os"
"time"
)
// authResponse represents identity api response
//
// See https://docs.openstack.org/api-ref/identity/v3/#authentication-and-token-management
type authResponse struct {
Token struct {
ExpiresAt time.Time `json:"expires_at,omitempty"`
Catalog []catalogItem `json:"catalog,omitempty"`
}
}
type catalogItem struct {
Name string `json:"name"`
Type string `json:"type"`
Endpoints []endpoint `json:"endpoints"`
}
// openstack api endpoint
//
// See https://docs.openstack.org/api-ref/identity/v3/#list-endpoints
type endpoint struct {
RegionID string `json:"region_id"`
RegionName string `json:"region_name"`
URL string `json:"url"`
Name string `json:"name"`
Type string `json:"type"`
Interface string `json:"interface"`
}
// getComputeEndpointURL extracts compute endpoint url with given filters from keystone catalog
func getComputeEndpointURL(catalog []catalogItem, availability, region string) (*url.URL, error) {
for _, eps := range catalog {
if eps.Type != "compute" {
continue
}
for _, ep := range eps.Endpoints {
if ep.Interface == availability && (len(region) == 0 || region == ep.RegionID || region == ep.RegionName) {
return url.Parse(ep.URL)
}
}
}
return nil, fmt.Errorf("cannot find compute url for the given availability: %q, region: %q", availability, region)
}
// buildAuthRequestBody builds request for authentication
func buildAuthRequestBody(sdc *SDConfig) ([]byte, error) {
if len(sdc.Password) == 0 && len(sdc.ApplicationCredentialID) == 0 && len(sdc.ApplicationCredentialName) == 0 {
return nil, fmt.Errorf("password and application credentials are missing")
}
type domainReq struct {
ID *string `json:"id,omitempty"`
Name *string `json:"name,omitempty"`
}
type userReq struct {
ID *string `json:"id,omitempty"`
Name *string `json:"name,omitempty"`
Password *string `json:"password,omitempty"`
Passcode *string `json:"passcode,omitempty"`
Domain *domainReq `json:"domain,omitempty"`
}
type passwordReq struct {
User userReq `json:"user"`
}
type tokenReq struct {
ID string `json:"id"`
}
type applicationCredentialReq struct {
ID *string `json:"id,omitempty"`
Name *string `json:"name,omitempty"`
User *userReq `json:"user,omitempty"`
Secret *string `json:"secret,omitempty"`
}
type identityReq struct {
Methods []string `json:"methods"`
Password *passwordReq `json:"password,omitempty"`
Token *tokenReq `json:"token,omitempty"`
ApplicationCredential *applicationCredentialReq `json:"application_credential,omitempty"`
}
type authReq struct {
Identity identityReq `json:"identity"`
Scope map[string]interface{} `json:"scope,omitempty"`
}
type request struct {
Auth authReq `json:"auth"`
}
// Populate the request structure based on the provided arguments. Create and return an error
// if insufficient or incompatible information is present.
var req request
if len(sdc.Password) == 0 {
// There are three kinds of possible application_credential requests
// 1. application_credential id + secret
// 2. application_credential name + secret + user_id
// 3. application_credential name + secret + username + domain_id / domain_name
if len(sdc.ApplicationCredentialID) > 0 {
if len(sdc.ApplicationCredentialSecret) == 0 {
return nil, fmt.Errorf("ApplicationCredentialSecret is empty")
}
req.Auth.Identity.Methods = []string{"application_credential"}
req.Auth.Identity.ApplicationCredential = &applicationCredentialReq{
ID: &sdc.ApplicationCredentialID,
Secret: &sdc.ApplicationCredentialSecret,
}
return json.Marshal(req)
}
if len(sdc.ApplicationCredentialSecret) == 0 {
return nil, fmt.Errorf("missing application_credential_secret when application_credential_name is set")
}
var userRequest *userReq
if len(sdc.UserID) > 0 {
// UserID could be used without the domain information
userRequest = &userReq{
ID: &sdc.UserID,
}
}
if userRequest == nil && len(sdc.Username) == 0 {
return nil, fmt.Errorf("username and userid is empty")
}
if userRequest == nil && len(sdc.DomainID) > 0 {
userRequest = &userReq{
Name: &sdc.Username,
Domain: &domainReq{ID: &sdc.DomainID},
}
}
if userRequest == nil && len(sdc.DomainName) > 0 {
userRequest = &userReq{
Name: &sdc.Username,
Domain: &domainReq{Name: &sdc.DomainName},
}
}
if userRequest == nil {
return nil, fmt.Errorf("domain_id and domain_name cannot be empty for application_credential_name auth")
}
req.Auth.Identity.Methods = []string{"application_credential"}
req.Auth.Identity.ApplicationCredential = &applicationCredentialReq{
Name: &sdc.ApplicationCredentialName,
User: userRequest,
Secret: &sdc.ApplicationCredentialSecret,
}
return json.Marshal(req)
}
// Password authentication.
req.Auth.Identity.Methods = append(req.Auth.Identity.Methods, "password")
if len(sdc.Username) == 0 && len(sdc.UserID) == 0 {
return nil, fmt.Errorf("username and userid is empty for username/password auth")
}
if len(sdc.Username) > 0 {
if len(sdc.UserID) > 0 {
return nil, fmt.Errorf("both username and userid is present")
}
if len(sdc.DomainID) == 0 && len(sdc.DomainName) == 0 {
return nil, fmt.Errorf(" domain_id or domain_name is missing for username/password auth: %s", sdc.Username)
}
if len(sdc.DomainID) > 0 {
if sdc.DomainName != "" {
return nil, fmt.Errorf("both domain_id and domain_name is present")
}
// Configure the request for Username and Password authentication with a DomainID.
if len(sdc.Password) > 0 {
req.Auth.Identity.Password = &passwordReq{
User: userReq{
Name: &sdc.Username,
Password: &sdc.Password,
Domain: &domainReq{ID: &sdc.DomainID},
},
}
}
}
if len(sdc.DomainName) > 0 {
// Configure the request for Username and Password authentication with a DomainName.
if len(sdc.Password) > 0 {
req.Auth.Identity.Password = &passwordReq{
User: userReq{
Name: &sdc.Username,
Password: &sdc.Password,
Domain: &domainReq{Name: &sdc.DomainName},
},
}
}
}
}
if len(sdc.UserID) > 0 {
if len(sdc.DomainID) > 0 {
return nil, fmt.Errorf("both user_id and domain_id is present")
}
if len(sdc.DomainName) > 0 {
return nil, fmt.Errorf("both user_id and domain_name is present")
}
// Configure the request for UserID and Password authentication.
if len(sdc.Password) > 0 {
req.Auth.Identity.Password = &passwordReq{
User: userReq{
ID: &sdc.UserID,
Password: &sdc.Password,
},
}
}
}
// build scope for password auth
scope, err := buildScope(sdc)
if err != nil {
return nil, err
}
if len(scope) > 0 {
req.Auth.Scope = scope
}
return json.Marshal(req)
}
// buildScope adds scope information into auth request
//
// See https://docs.openstack.org/api-ref/identity/v3/#password-authentication-with-unscoped-authorization
func buildScope(sdc *SDConfig) (map[string]interface{}, error) {
if len(sdc.ProjectName) == 0 && len(sdc.ProjectID) == 0 && len(sdc.DomainID) == 0 && len(sdc.DomainName) == 0 {
return nil, nil
}
if len(sdc.ProjectName) > 0 {
// ProjectName provided: either DomainID or DomainName must also be supplied.
// ProjectID may not be supplied.
if len(sdc.DomainID) == 0 && len(sdc.DomainName) == 0 {
return nil, fmt.Errorf("both domain_id and domain_name present")
}
if len(sdc.ProjectID) > 0 {
return nil, fmt.Errorf("both domain_id and domain_name present")
}
if len(sdc.DomainID) > 0 {
return map[string]interface{}{
"project": map[string]interface{}{
"name": &sdc.ProjectName,
"domain": map[string]interface{}{"id": &sdc.DomainID},
},
}, nil
}
if len(sdc.DomainName) > 0 {
return map[string]interface{}{
"project": map[string]interface{}{
"name": &sdc.ProjectName,
"domain": map[string]interface{}{"name": &sdc.DomainName},
},
}, nil
}
} else if len(sdc.ProjectID) > 0 {
// ProjectID provided. ProjectName, DomainID, and DomainName may not be provided.
if len(sdc.DomainID) > 0 {
return nil, fmt.Errorf("both project_id and domain_id present")
}
if len(sdc.DomainName) > 0 {
return nil, fmt.Errorf("both project_id and domain_name present")
}
return map[string]interface{}{
"project": map[string]interface{}{
"id": &sdc.ProjectID,
},
}, nil
} else if len(sdc.DomainID) > 0 {
if len(sdc.DomainName) > 0 {
return nil, fmt.Errorf("both domain_id and domain_name present")
}
return map[string]interface{}{
"domain": map[string]interface{}{
"id": &sdc.DomainID,
},
}, nil
} else if len(sdc.DomainName) > 0 {
return map[string]interface{}{
"domain": map[string]interface{}{
"name": &sdc.DomainName,
},
}, nil
}
return nil, nil
}
// readCredentialsFromEnv obtains serviceDiscoveryConfig from env variables for openstack
func readCredentialsFromEnv() SDConfig {
authURL := os.Getenv("OS_AUTH_URL")
username := os.Getenv("OS_USERNAME")
userID := os.Getenv("OS_USERID")
password := os.Getenv("OS_PASSWORD")
tenantID := os.Getenv("OS_TENANT_ID")
tenantName := os.Getenv("OS_TENANT_NAME")
domainID := os.Getenv("OS_DOMAIN_ID")
domainName := os.Getenv("OS_DOMAIN_NAME")
applicationCredentialID := os.Getenv("OS_APPLICATION_CREDENTIAL_ID")
applicationCredentialName := os.Getenv("OS_APPLICATION_CREDENTIAL_NAME")
applicationCredentialSecret := os.Getenv("OS_APPLICATION_CREDENTIAL_SECRET")
// If OS_PROJECT_ID is set, overwrite tenantID with the value.
if v := os.Getenv("OS_PROJECT_ID"); v != "" {
tenantID = v
}
// If OS_PROJECT_NAME is set, overwrite tenantName with the value.
if v := os.Getenv("OS_PROJECT_NAME"); v != "" {
tenantName = v
}
return SDConfig{
IdentityEndpoint: authURL,
Username: username,
UserID: userID,
Password: password,
ProjectName: tenantName,
ProjectID: tenantID,
DomainName: domainName,
DomainID: domainID,
ApplicationCredentialName: applicationCredentialName,
ApplicationCredentialID: applicationCredentialID,
ApplicationCredentialSecret: applicationCredentialSecret,
}
}

View file

@ -0,0 +1,122 @@
package openstack
import (
"reflect"
"testing"
)
func Test_buildAuthRequestBody1(t *testing.T) {
type args struct {
sdc *SDConfig
}
tests := []struct {
name string
args args
want []byte
wantErr bool
}{
{
name: "empty config",
args: args{
sdc: &SDConfig{},
},
wantErr: true,
},
{
name: "username password auth with domain",
args: args{
sdc: &SDConfig{
Username: "some-user",
Password: "some-password",
DomainName: "some-domain",
},
},
want: []byte(`{"auth":{"identity":{"methods":["password"],"password":{"user":{"name":"some-user","password":"some-password","domain":{"name":"some-domain"}}}},"scope":{"domain":{"name":"some-domain"}}}}`),
},
{
name: "application credentials auth",
args: args{
sdc: &SDConfig{
ApplicationCredentialID: "some-id",
ApplicationCredentialSecret: "some-secret",
},
},
want: []byte(`{"auth":{"identity":{"methods":["application_credential"],"application_credential":{"id":"some-id","secret":"some-secret"}}}}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := buildAuthRequestBody(tt.args.sdc)
if (err != nil) != tt.wantErr {
t.Errorf("buildAuthRequestBody() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildAuthRequestBody() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_getComputeEndpointURL1(t *testing.T) {
type args struct {
catalog []catalogItem
availability string
region string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "bad catalog data",
args: args{
catalog: []catalogItem{
{
Type: "keystone",
Endpoints: []endpoint{},
},
},
},
wantErr: true,
},
{
name: "good private url",
args: args{
availability: "private",
catalog: []catalogItem{
{
Type: "compute",
Endpoints: []endpoint{
{
Interface: "private",
Type: "compute",
URL: "https://compute.test.local:8083/v2.1",
},
},
},
{
Type: "keystone",
Endpoints: []endpoint{},
},
},
},
want: "https://compute.test.local:8083/v2.1",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getComputeEndpointURL(tt.args.catalog, tt.args.availability, tt.args.region)
if (err != nil) != tt.wantErr {
t.Errorf("getComputeEndpointURL() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr && !reflect.DeepEqual(got.String(), tt.want) {
t.Errorf("getComputeEndpointURL() got = %v, want %v", got.String(), tt.want)
}
})
}
}

View file

@ -0,0 +1,88 @@
package openstack
import (
"encoding/json"
"fmt"
"path"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// See https://docs.openstack.org/api-ref/compute/#list-hypervisors-details
type hypervisorDetail struct {
Hypervisors []hypervisor `json:"hypervisors"`
Links []struct {
HREF string `json:"href"`
Rel string `json:"rel,omitempty"`
} `json:"hypervisors_links,omitempty"`
}
type hypervisor struct {
HostIP string `json:"host_ip"`
ID int `json:"id"`
Hostname string `json:"hypervisor_hostname"`
Status string `json:"status"`
State string `json:"state"`
Type string `json:"hypervisor_type"`
}
func parseHypervisorDetail(data []byte) (*hypervisorDetail, error) {
var hvsd hypervisorDetail
if err := json.Unmarshal(data, &hvsd); err != nil {
return nil, fmt.Errorf("cannot parse hypervisorDetail: %w", err)
}
return &hvsd, nil
}
func (cfg *apiConfig) getHypervisors() ([]hypervisor, error) {
creds, err := cfg.getFreshAPICredentials()
if err != nil {
return nil, err
}
computeURL := *creds.computeURL
computeURL.Path = path.Join(computeURL.Path, "os-hypervisors", "detail")
nextLink := computeURL.String()
var hvs []hypervisor
for {
resp, err := getAPIResponse(nextLink, cfg)
if err != nil {
return nil, err
}
detail, err := parseHypervisorDetail(resp)
if err != nil {
return nil, err
}
hvs = append(hvs, detail.Hypervisors...)
if len(detail.Links) == 0 {
return hvs, nil
}
nextLink = detail.Links[0].HREF
}
}
func addHypervisorLabels(hvs []hypervisor, port int) []map[string]string {
var ms []map[string]string
for _, hv := range hvs {
addr := discoveryutils.JoinHostPort(hv.HostIP, port)
m := map[string]string{
"__address__": addr,
"__meta_openstack_hypervisor_type": hv.Type,
"__meta_openstack_hypervisor_status": hv.Status,
"__meta_openstack_hypervisor_hostname": hv.Hostname,
"__meta_openstack_hypervisor_state": hv.State,
"__meta_openstack_hypervisor_host_ip": hv.HostIP,
"__meta_openstack_hypervisor_id": strconv.Itoa(hv.ID),
}
ms = append(ms, m)
}
return ms
}
func getHypervisorLabels(cfg *apiConfig) ([]map[string]string, error) {
hvs, err := cfg.getHypervisors()
if err != nil {
return nil, fmt.Errorf("cannot get hypervisors: %w", err)
}
return addHypervisorLabels(hvs, cfg.port), nil
}

View file

@ -0,0 +1,152 @@
package openstack
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_parseHypervisorDetail(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want hypervisorDetail
wantErr bool
}{
{
name: "bad data",
args: args{
data: []byte(`{ff}`),
},
wantErr: true,
},
{
name: "1 hypervisor",
args: args{
data: []byte(`{
"hypervisors": [
{
"cpu_info": {
"arch": "x86_64",
"model": "Nehalem",
"vendor": "Intel",
"features": [
"pge",
"clflush"
],
"topology": {
"cores": 1,
"threads": 1,
"sockets": 4
}
},
"current_workload": 0,
"status": "enabled",
"state": "up",
"disk_available_least": 0,
"host_ip": "1.1.1.1",
"free_disk_gb": 1028,
"free_ram_mb": 7680,
"hypervisor_hostname": "host1",
"hypervisor_type": "fake",
"hypervisor_version": 1000,
"id": 2,
"local_gb": 1028,
"local_gb_used": 0,
"memory_mb": 8192,
"memory_mb_used": 512,
"running_vms": 0,
"service": {
"host": "host1",
"id": 6,
"disabled_reason": null
},
"vcpus": 2,
"vcpus_used": 0
}
]}`),
},
want: hypervisorDetail{
Hypervisors: []hypervisor{
{
HostIP: "1.1.1.1",
ID: 2,
Hostname: "host1",
Status: "enabled",
State: "up",
Type: "fake",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseHypervisorDetail(tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("parseHypervisorDetail() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr && !reflect.DeepEqual(*got, tt.want) {
t.Errorf("parseHypervisorDetail() got = %v, want %v", *got, tt.want)
}
})
}
}
func Test_addHypervisorLabels(t *testing.T) {
type args struct {
hvs []hypervisor
port int
}
tests := []struct {
name string
args args
want [][]prompbmarshal.Label
}{
{
name: "",
args: args{
port: 9100,
hvs: []hypervisor{
{
Type: "fake",
ID: 5,
State: "enabled",
Status: "up",
Hostname: "fakehost",
HostIP: "1.2.2.2",
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "1.2.2.2:9100",
"__meta_openstack_hypervisor_host_ip": "1.2.2.2",
"__meta_openstack_hypervisor_hostname": "fakehost",
"__meta_openstack_hypervisor_id": "5",
"__meta_openstack_hypervisor_state": "enabled",
"__meta_openstack_hypervisor_status": "up",
"__meta_openstack_hypervisor_type": "fake",
}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := addHypervisorLabels(tt.args.hvs, tt.args.port)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range got {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Errorf("addHypervisorLabels() = %v, want %v", sortedLabelss, tt.want)
}
})
}
}

View file

@ -0,0 +1,144 @@
package openstack
import (
"encoding/json"
"fmt"
"path"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
// See https://docs.openstack.org/api-ref/compute/#list-servers
type serversDetail struct {
Servers []server `json:"servers"`
Links []struct {
HREF string `json:"href"`
Rel string `json:"rel"`
} `json:"servers_links,omitempty"`
}
type server struct {
ID string `json:"id"`
TenantID string `json:"tenant_id"`
UserID string `json:"user_id"`
Name string `json:"name"`
HostID string `json:"hostid"`
Status string `json:"status"`
Addresses map[string][]struct {
Address string `json:"addr"`
Version int `json:"version"`
Type string `json:"OS-EXT-IPS:type"`
} `json:"addresses"`
Metadata map[string]string `json:"metadata,omitempty"`
Flavor struct {
ID string `json:"id"`
} `json:"flavor"`
}
func parseServersDetail(data []byte) (*serversDetail, error) {
var srvd serversDetail
if err := json.Unmarshal(data, &srvd); err != nil {
return nil, fmt.Errorf("cannot parse serversDetail: %w", err)
}
return &srvd, nil
}
func addInstanceLabels(servers []server, port int) []map[string]string {
var ms []map[string]string
for _, server := range servers {
m := map[string]string{
"__meta_openstack_instance_id": server.ID,
"__meta_openstack_instance_status": server.Status,
"__meta_openstack_instance_name": server.Name,
"__meta_openstack_project_id": server.TenantID,
"__meta_openstack_user_id": server.UserID,
"__meta_openstack_instance_flavor": server.Flavor.ID,
}
for k, v := range server.Metadata {
m["__meta_openstack_tag_"+discoveryutils.SanitizeLabelName(k)] = v
}
// Traverse server.Addresses in alphabetical order of pool name
// in order to return targets in deterministic order.
sortedPools := make([]string, 0, len(server.Addresses))
for pool := range server.Addresses {
sortedPools = append(sortedPools, pool)
}
sort.Strings(sortedPools)
for _, pool := range sortedPools {
addresses := server.Addresses[pool]
if len(addresses) == 0 {
// skip pool with zero addresses
continue
}
var publicIP string
// its possible to have only one floating ip per pool
for _, ip := range addresses {
if ip.Type != "floating" {
continue
}
publicIP = ip.Address
break
}
for _, ip := range addresses {
// fast return
if len(ip.Address) == 0 || ip.Type == "floating" {
continue
}
// copy labels
lbls := make(map[string]string, len(m))
for k, v := range m {
lbls[k] = v
}
lbls["__meta_openstack_address_pool"] = pool
lbls["__meta_openstack_private_ip"] = ip.Address
if len(publicIP) > 0 {
lbls["__meta_openstack_public_ip"] = publicIP
}
lbls["__address__"] = discoveryutils.JoinHostPort(ip.Address, port)
ms = append(ms, lbls)
}
}
}
return ms
}
func (cfg *apiConfig) getServers() ([]server, error) {
creds, err := cfg.getFreshAPICredentials()
if err != nil {
return nil, err
}
computeURL := *creds.computeURL
computeURL.Path = path.Join(computeURL.Path, "servers", "detail")
// by default, query fetches data from all tenants
if !cfg.allTenants {
q := computeURL.Query()
q.Set("all_tenants", "false")
computeURL.RawQuery = q.Encode()
}
nextLink := computeURL.String()
var servers []server
for {
resp, err := getAPIResponse(nextLink, cfg)
if err != nil {
return nil, err
}
serversDetail, err := parseServersDetail(resp)
if err != nil {
return nil, err
}
servers = append(servers, serversDetail.Servers...)
if len(serversDetail.Links) == 0 {
return servers, nil
}
nextLink = serversDetail.Links[0].HREF
}
}
func getInstancesLabels(cfg *apiConfig) ([]map[string]string, error) {
srv, err := cfg.getServers()
if err != nil {
return nil, err
}
return addInstanceLabels(srv, cfg.port), nil
}

View file

@ -0,0 +1,269 @@
package openstack
import (
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils"
)
func Test_addInstanceLabels(t *testing.T) {
type args struct {
servers []server
port int
}
tests := []struct {
name string
args args
want [][]prompbmarshal.Label
}{
{
name: "empty_response",
args: args{
port: 9100,
},
},
{
name: "one_server",
args: args{
port: 9100,
servers: []server{
{
ID: "10",
Status: "enabled",
Name: "server-1",
HostID: "some-host-id",
TenantID: "some-tenant-id",
UserID: "some-user-id",
Flavor: struct {
ID string `json:"id"`
}{ID: "5"},
Addresses: map[string][]struct {
Address string `json:"addr"`
Version int `json:"version"`
Type string `json:"OS-EXT-IPS:type"`
}{
"test": {
{
Address: "192.168.0.1",
Version: 4,
Type: "fixed",
},
},
},
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "192.168.0.1:9100",
"__meta_openstack_address_pool": "test",
"__meta_openstack_instance_flavor": "5",
"__meta_openstack_instance_id": "10",
"__meta_openstack_instance_name": "server-1",
"__meta_openstack_instance_status": "enabled",
"__meta_openstack_private_ip": "192.168.0.1",
"__meta_openstack_project_id": "some-tenant-id",
"__meta_openstack_user_id": "some-user-id",
}),
},
},
{
name: "with_public_ip",
args: args{
port: 9100,
servers: []server{
{
ID: "10",
Status: "enabled",
Name: "server-2",
HostID: "some-host-id",
TenantID: "some-tenant-id",
UserID: "some-user-id",
Flavor: struct {
ID string `json:"id"`
}{ID: "5"},
Addresses: map[string][]struct {
Address string `json:"addr"`
Version int `json:"version"`
Type string `json:"OS-EXT-IPS:type"`
}{
"test": {
{
Address: "192.168.0.1",
Version: 4,
Type: "fixed",
},
{
Address: "1.5.5.5",
Version: 4,
Type: "floating",
},
},
"internal": {
{
Address: "10.10.0.1",
Version: 4,
Type: "fixed",
},
},
},
},
},
},
want: [][]prompbmarshal.Label{
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "10.10.0.1:9100",
"__meta_openstack_address_pool": "internal",
"__meta_openstack_instance_flavor": "5",
"__meta_openstack_instance_id": "10",
"__meta_openstack_instance_name": "server-2",
"__meta_openstack_instance_status": "enabled",
"__meta_openstack_private_ip": "10.10.0.1",
"__meta_openstack_project_id": "some-tenant-id",
"__meta_openstack_user_id": "some-user-id",
}),
discoveryutils.GetSortedLabels(map[string]string{
"__address__": "192.168.0.1:9100",
"__meta_openstack_address_pool": "test",
"__meta_openstack_instance_flavor": "5",
"__meta_openstack_instance_id": "10",
"__meta_openstack_instance_name": "server-2",
"__meta_openstack_instance_status": "enabled",
"__meta_openstack_private_ip": "192.168.0.1",
"__meta_openstack_public_ip": "1.5.5.5",
"__meta_openstack_project_id": "some-tenant-id",
"__meta_openstack_user_id": "some-user-id",
}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := addInstanceLabels(tt.args.servers, tt.args.port)
var sortedLabelss [][]prompbmarshal.Label
for _, labels := range got {
sortedLabelss = append(sortedLabelss, discoveryutils.GetSortedLabels(labels))
}
if !reflect.DeepEqual(sortedLabelss, tt.want) {
t.Errorf("addInstanceLabels() = \n got: %v,\nwant: %v", sortedLabelss, tt.want)
}
})
}
}
func Test_parseServersDetail(t *testing.T) {
type args struct {
data []byte
}
tests := []struct {
name string
args args
want serversDetail
wantErr bool
}{
{
name: "parse ok",
args: args{
data: []byte(`{
"servers":[
{
"id":"c9f68076-01a3-489a-aebe-8b773c71e7f3",
"name":"test10",
"status":"ACTIVE",
"tenant_id":"d34be4e44f9c444eab9a5ec7b953951f",
"user_id":"e55737f142ac42f18093037760656bd7",
"metadata":{
},
"hostId":"e26db8db23736877aa92ebbbe11743b2a2a3b107aada00a8a0cf474b",
"image":{
"id":"253f7a69-dc79-4fb2-86f8-9ec92c94107a",
"links":[
{
"rel":"bookmark",
"href":"http://10.20.20.1:8774/images/253f7a69-dc79-4fb2-86f8-9ec92c94107a"
}
]
},
"flavor":{
"id":"1"
},
"addresses":{
"test":[
{
"version":4,
"addr":"192.168.222.15",
"OS-EXT-IPS:type":"fixed",
"OS-EXT-IPS-MAC:mac_addr":"fa:16:3e:b0:40:af"
},
{
"version":4,
"addr":"10.20.20.69",
"OS-EXT-IPS:type":"floating",
"OS-EXT-IPS-MAC:mac_addr":"fa:16:3e:b0:40:af"
}
]
},
"accessIPv4":"",
"accessIPv6":"",
"key_name":"microstack",
"security_groups":[
{
"name":"default"
}
]
}
]
}`),
},
want: serversDetail{
Servers: []server{
{
Flavor: struct {
ID string `json:"id"`
}{ID: "1"},
ID: "c9f68076-01a3-489a-aebe-8b773c71e7f3",
TenantID: "d34be4e44f9c444eab9a5ec7b953951f",
UserID: "e55737f142ac42f18093037760656bd7",
Name: "test10",
HostID: "e26db8db23736877aa92ebbbe11743b2a2a3b107aada00a8a0cf474b",
Status: "ACTIVE",
Metadata: map[string]string{},
Addresses: map[string][]struct {
Address string `json:"addr"`
Version int `json:"version"`
Type string `json:"OS-EXT-IPS:type"`
}{
"test": {
{
Address: "192.168.222.15",
Version: 4,
Type: "fixed",
},
{
Address: "10.20.20.69",
Version: 4,
Type: "floating",
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseServersDetail(tt.args.data)
if (err != nil) != tt.wantErr {
t.Errorf("parseServersDetail() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr && !reflect.DeepEqual(*got, tt.want) {
t.Errorf("parseServersDetail() \ngot = %v,\nwant= %v", *got, tt.want)
}
})
}
}

View file

@ -0,0 +1,48 @@
package openstack
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
// SDConfig is the configuration for OpenStack based service discovery.
//
// See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config
type SDConfig struct {
IdentityEndpoint string `yaml:"identity_endpoint"`
Username string `yaml:"username"`
UserID string `yaml:"userid"`
Password string `yaml:"password"`
ProjectName string `yaml:"project_name"`
ProjectID string `yaml:"project_id"`
DomainName string `yaml:"domain_name"`
DomainID string `yaml:"domain_id"`
ApplicationCredentialName string `yaml:"application_credential_name"`
ApplicationCredentialID string `yaml:"application_credential_id"`
ApplicationCredentialSecret string `yaml:"application_credential_secret"`
Role string `yaml:"role"`
Region string `yaml:"region"`
// RefreshInterval time.Duration `yaml:"refresh_interval"`
// refresh_interval is obtained from `-promscrape.openstackSDCheckInterval` command-line option.
Port int `yaml:"port"`
AllTenants bool `yaml:"all_tenants"`
TLSConfig *promauth.TLSConfig `yaml:"tls_config"`
Availability string `yaml:"availability"`
}
// GetLabels returns gce labels according to sdc.
func GetLabels(sdc *SDConfig, baseDir string) ([]map[string]string, error) {
cfg, err := getAPIConfig(sdc, baseDir)
if err != nil {
return nil, fmt.Errorf("cannot get API config: %w", err)
}
switch sdc.Role {
case "hypervisor":
return getHypervisorLabels(cfg)
case "instance":
return getInstancesLabels(cfg)
default:
return nil, fmt.Errorf("unexpected `role`: %q; must be one of `instance` or `hypervisor`; skipping it", sdc.Role)
}
}

View file

@ -21,6 +21,9 @@ var (
kubernetesSDCheckInterval = flag.Duration("promscrape.kubernetesSDCheckInterval", 30*time.Second, "Interval for checking for changes in Kubernetes API server. "+
"This works only if `kubernetes_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config for details")
openstackSDCheckInterval = flag.Duration("promscrape.openstackSDCheckInterval", 30*time.Second, "Interval for checking for changes in openstack API server. "+
"This works only if `openstack_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#openstack_sd_config for details")
consulSDCheckInterval = flag.Duration("promscrape.consulSDCheckInterval", 30*time.Second, "Interval for checking for changes in consul. "+
"This works only if `consul_sd_configs` is configured in '-promscrape.config' file. "+
"See https://prometheus.io/docs/prometheus/latest/configuration/configuration/#consul_sd_config for details")
@ -85,6 +88,7 @@ func runScraper(configFile string, pushData func(wr *prompbmarshal.WriteRequest)
scs.add("static_configs", 0, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getStaticScrapeWork() })
scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
scs.add("kubernetes_sd_configs", *kubernetesSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
scs.add("openstack_sd_configs", *openstackSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
scs.add("consul_sd_configs", *consulSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
scs.add("dns_sd_configs", *dnsSDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
scs.add("ec2_sd_configs", *ec2SDCheckInterval, func(cfg *Config, swsPrev []ScrapeWork) []ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })

View file

@ -10,12 +10,14 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/metrics"
)
var (
maxLineSize = flagutil.NewBytes("influx.maxLineSize", 256*1024, "The maximum size in bytes for a single Influx line during parsing")
trimTimestamp = flag.Duration("influxTrimTimestamp", time.Millisecond, "Trim timestamps for Influx line protocol data to this duration. "+
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
)
@ -70,7 +72,7 @@ func (ctx *streamContext) Read() bool {
if ctx.err != nil {
return false
}
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(ctx.br, ctx.reqBuf, ctx.tailBuf, maxLineSize.N)
if ctx.err != nil {
if ctx.err != io.EOF {
readErrors.Inc()