mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
f03e81c693
- Make sure that invalid/missing TLS CA file or TLS client certificate files at vmagent startup don't prevent from processing the corresponding scrape targets after the file becomes correct, without the need to restart vmagent. Previously scrape targets with invalid TLS CA file or TLS client certificate files were permanently dropped after the first attempt to initialize them, and they didn't appear until the next vmagent reload or the next change in other places of the loaded scrape configs. - Make sure that TLS CA is properly re-loaded from file after it changes without the need to restart vmagent. Previously the old TLS CA was used until vmagent restart. - Properly handle errors during http request creation for the second attempt to send data to remote system at vmagent and vmalert. Previously failed request creation could result in nil pointer dereferencing, since the returned request is nil on error. - Add more context to the logged error during AWS sigv4 request signing before sending the data to -remoteWrite.url at vmagent. Previously it could miss details on the source of the request. - Do not create a new HTTP client per second when generating OAuth2 token needed to put in Authorization header of every http request issued by vmagent during service discovery or target scraping. Re-use the HTTP client instead until the corresponding scrape config changes. - Cache error at lib/promauth.Config.GetAuthHeader() in the same way as the auth header is cached, e.g. the error is cached for a second now. This should reduce load on CPU and OAuth2 server when auth header cannot be obtained because of temporary error. - Share tls.Config.GetClientCertificate function among multiple scrape targets with the same tls_config. Cache the loaded certificate and the error for one second. This should significantly reduce CPU load when scraping big number of targets with the same tls_config. - Allow loading TLS certificates from HTTP and HTTPs urls by specifying these urls at `tls_config->cert_file` and `tls_config->key_file`. - Improve test coverage at lib/promauth - Skip unreachable or invalid files specified at `scrape_config_files` during vmagent startup, since these files may become valid later. Previously vmagent was exitting in this case. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4959
274 lines
7.9 KiB
Go
274 lines
7.9 KiB
Go
package datasource
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
)
|
|
|
|
type datasourceType string
|
|
|
|
const (
|
|
datasourcePrometheus datasourceType = "prometheus"
|
|
datasourceGraphite datasourceType = "graphite"
|
|
)
|
|
|
|
func toDatasourceType(s string) datasourceType {
|
|
if s == string(datasourceGraphite) {
|
|
return datasourceGraphite
|
|
}
|
|
return datasourcePrometheus
|
|
}
|
|
|
|
// VMStorage represents vmstorage entity with ability to read and write metrics
|
|
// WARN: when adding a new field, remember to update Clone() method.
|
|
type VMStorage struct {
|
|
c *http.Client
|
|
authCfg *promauth.Config
|
|
datasourceURL string
|
|
appendTypePrefix bool
|
|
lookBack time.Duration
|
|
queryStep time.Duration
|
|
dataSourceType datasourceType
|
|
|
|
// evaluationInterval will help setting request's `step` param.
|
|
evaluationInterval time.Duration
|
|
// extraParams contains params to be attached to each HTTP request
|
|
extraParams url.Values
|
|
// extraHeaders are headers to be attached to each HTTP request
|
|
extraHeaders []keyValue
|
|
|
|
// whether to print additional log messages
|
|
// for each sent request
|
|
debug bool
|
|
}
|
|
|
|
type keyValue struct {
|
|
key string
|
|
value string
|
|
}
|
|
|
|
// Clone makes clone of VMStorage, shares http client.
|
|
func (s *VMStorage) Clone() *VMStorage {
|
|
ns := &VMStorage{
|
|
c: s.c,
|
|
authCfg: s.authCfg,
|
|
datasourceURL: s.datasourceURL,
|
|
appendTypePrefix: s.appendTypePrefix,
|
|
lookBack: s.lookBack,
|
|
queryStep: s.queryStep,
|
|
|
|
dataSourceType: s.dataSourceType,
|
|
evaluationInterval: s.evaluationInterval,
|
|
|
|
// init map so it can be populated below
|
|
extraParams: url.Values{},
|
|
|
|
debug: s.debug,
|
|
}
|
|
if len(s.extraHeaders) > 0 {
|
|
ns.extraHeaders = make([]keyValue, len(s.extraHeaders))
|
|
copy(ns.extraHeaders, s.extraHeaders)
|
|
}
|
|
for k, v := range s.extraParams {
|
|
ns.extraParams[k] = v
|
|
}
|
|
|
|
return ns
|
|
}
|
|
|
|
// ApplyParams - changes given querier params.
|
|
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
|
|
s.dataSourceType = toDatasourceType(params.DataSourceType)
|
|
s.evaluationInterval = params.EvaluationInterval
|
|
if params.QueryParams != nil {
|
|
if s.extraParams == nil {
|
|
s.extraParams = url.Values{}
|
|
}
|
|
for k, vl := range params.QueryParams {
|
|
// custom query params are prior to default ones
|
|
if s.extraParams.Has(k) {
|
|
s.extraParams.Del(k)
|
|
}
|
|
for _, v := range vl {
|
|
// don't use .Set() instead of Del/Add since it is allowed
|
|
// for GET params to be duplicated
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4908
|
|
s.extraParams.Add(k, v)
|
|
}
|
|
}
|
|
}
|
|
if params.Headers != nil {
|
|
for key, value := range params.Headers {
|
|
kv := keyValue{key: key, value: value}
|
|
s.extraHeaders = append(s.extraHeaders, kv)
|
|
}
|
|
}
|
|
s.debug = params.Debug
|
|
return s
|
|
}
|
|
|
|
// BuildWithParams - implements interface.
|
|
func (s *VMStorage) BuildWithParams(params QuerierParams) Querier {
|
|
return s.Clone().ApplyParams(params)
|
|
}
|
|
|
|
// NewVMStorage is a constructor for VMStorage
|
|
func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Duration, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage {
|
|
return &VMStorage{
|
|
c: c,
|
|
authCfg: authCfg,
|
|
datasourceURL: strings.TrimSuffix(baseURL, "/"),
|
|
appendTypePrefix: appendTypePrefix,
|
|
lookBack: lookBack,
|
|
queryStep: queryStep,
|
|
dataSourceType: datasourcePrometheus,
|
|
extraParams: url.Values{},
|
|
}
|
|
}
|
|
|
|
// Query executes the given query and returns parsed response
|
|
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
|
|
req, err := s.newQueryRequest(query, ts)
|
|
if err != nil {
|
|
return Result{}, nil, err
|
|
}
|
|
resp, err := s.do(ctx, req)
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
|
// Return unexpected error to the caller.
|
|
return Result{}, nil, err
|
|
}
|
|
// Something in the middle between client and datasource might be closing
|
|
// the connection. So we do a one more attempt in hope request will succeed.
|
|
req, err = s.newQueryRequest(query, ts)
|
|
if err != nil {
|
|
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
|
}
|
|
resp, err = s.do(ctx, req)
|
|
if err != nil {
|
|
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
|
}
|
|
}
|
|
|
|
// Process the received response.
|
|
parseFn := parsePrometheusResponse
|
|
if s.dataSourceType != datasourcePrometheus {
|
|
parseFn = parseGraphiteResponse
|
|
}
|
|
result, err := parseFn(req, resp)
|
|
_ = resp.Body.Close()
|
|
return result, req, err
|
|
}
|
|
|
|
// QueryRange executes the given query on the given time range.
|
|
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
|
|
// Graphite type isn't supported.
|
|
func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) {
|
|
if s.dataSourceType != datasourcePrometheus {
|
|
return res, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType)
|
|
}
|
|
if start.IsZero() {
|
|
return res, fmt.Errorf("start param is missing")
|
|
}
|
|
if end.IsZero() {
|
|
return res, fmt.Errorf("end param is missing")
|
|
}
|
|
req, err := s.newQueryRangeRequest(query, start, end)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
resp, err := s.do(ctx, req)
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
|
// Return unexpected error to the caller.
|
|
return res, err
|
|
}
|
|
// Something in the middle between client and datasource might be closing
|
|
// the connection. So we do a one more attempt in hope request will succeed.
|
|
req, err = s.newQueryRangeRequest(query, start, end)
|
|
if err != nil {
|
|
return res, fmt.Errorf("second attempt: %w", err)
|
|
}
|
|
resp, err = s.do(ctx, req)
|
|
if err != nil {
|
|
return res, fmt.Errorf("second attempt: %w", err)
|
|
}
|
|
}
|
|
|
|
// Process the received response.
|
|
res, err = parsePrometheusResponse(req, resp)
|
|
_ = resp.Body.Close()
|
|
return res, err
|
|
}
|
|
|
|
func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) {
|
|
ru := req.URL.Redacted()
|
|
if *showDatasourceURL {
|
|
ru = req.URL.String()
|
|
}
|
|
if s.debug {
|
|
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru)
|
|
}
|
|
resp, err := s.c.Do(req.WithContext(ctx))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting response from %s: %w", ru, err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
_ = resp.Body.Close()
|
|
return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, ru, body)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) (*http.Request, error) {
|
|
req, err := s.newRequest()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", s.datasourceURL, err)
|
|
}
|
|
s.setPrometheusRangeReqParams(req, query, start, end)
|
|
return req, nil
|
|
}
|
|
|
|
func (s *VMStorage) newQueryRequest(query string, ts time.Time) (*http.Request, error) {
|
|
req, err := s.newRequest()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot create query request to datasource %q: %w", s.datasourceURL, err)
|
|
}
|
|
switch s.dataSourceType {
|
|
case "", datasourcePrometheus:
|
|
s.setPrometheusInstantReqParams(req, query, ts)
|
|
case datasourceGraphite:
|
|
s.setGraphiteReqParams(req, query, ts)
|
|
default:
|
|
logger.Panicf("BUG: engine not found: %q", s.dataSourceType)
|
|
}
|
|
return req, nil
|
|
}
|
|
|
|
func (s *VMStorage) newRequest() (*http.Request, error) {
|
|
req, err := http.NewRequest(http.MethodPost, s.datasourceURL, nil)
|
|
if err != nil {
|
|
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if s.authCfg != nil {
|
|
err = s.authCfg.SetHeaders(req, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
for _, h := range s.extraHeaders {
|
|
req.Header.Set(h.key, h.value)
|
|
}
|
|
return req, nil
|
|
}
|