2020-03-13 10:19:31 +00:00
|
|
|
package datasource
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-04-19 08:18:32 +00:00
|
|
|
"errors"
|
2020-03-13 10:19:31 +00:00
|
|
|
"fmt"
|
2022-08-21 21:13:44 +00:00
|
|
|
"io"
|
2020-03-13 10:19:31 +00:00
|
|
|
"net/http"
|
2021-12-02 12:45:08 +00:00
|
|
|
"net/url"
|
2020-03-13 10:19:31 +00:00
|
|
|
"strings"
|
2020-09-21 12:53:49 +00:00
|
|
|
"time"
|
2021-09-14 11:32:06 +00:00
|
|
|
|
2022-09-13 13:25:43 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
2024-05-30 15:54:42 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
2021-09-14 11:32:06 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
2020-03-13 10:19:31 +00:00
|
|
|
)
|
|
|
|
|
2022-07-22 08:44:55 +00:00
|
|
|
type datasourceType string
|
|
|
|
|
|
|
|
const (
|
|
|
|
datasourcePrometheus datasourceType = "prometheus"
|
2024-09-23 07:38:30 +00:00
|
|
|
datasourceVlogs datasourceType = "vlogs"
|
2022-07-22 08:44:55 +00:00
|
|
|
datasourceGraphite datasourceType = "graphite"
|
|
|
|
)
|
|
|
|
|
|
|
|
func toDatasourceType(s string) datasourceType {
|
2024-09-23 07:38:30 +00:00
|
|
|
if s == "" {
|
|
|
|
return datasourcePrometheus
|
2022-07-22 08:44:55 +00:00
|
|
|
}
|
2024-09-23 07:38:30 +00:00
|
|
|
return datasourceType(s)
|
2022-07-22 08:44:55 +00:00
|
|
|
}
|
|
|
|
|
2020-03-13 10:19:31 +00:00
|
|
|
// VMStorage represents vmstorage entity with ability to read and write metrics
|
2023-06-01 09:38:48 +00:00
|
|
|
// WARN: when adding a new field, remember to update Clone() method.
|
2020-03-13 10:19:31 +00:00
|
|
|
type VMStorage struct {
|
2021-02-03 21:26:30 +00:00
|
|
|
c *http.Client
|
2021-09-14 11:32:06 +00:00
|
|
|
authCfg *promauth.Config
|
2021-02-03 21:26:30 +00:00
|
|
|
datasourceURL string
|
|
|
|
appendTypePrefix bool
|
|
|
|
queryStep time.Duration
|
2023-09-06 14:29:59 +00:00
|
|
|
dataSourceType datasourceType
|
2021-04-30 06:46:03 +00:00
|
|
|
|
2023-10-10 10:41:19 +00:00
|
|
|
// evaluationInterval will help setting request's `step` param.
|
2021-04-30 06:46:03 +00:00
|
|
|
evaluationInterval time.Duration
|
2023-09-06 14:29:59 +00:00
|
|
|
// extraParams contains params to be attached to each HTTP request
|
|
|
|
extraParams url.Values
|
|
|
|
// extraHeaders are headers to be attached to each HTTP request
|
|
|
|
extraHeaders []keyValue
|
2022-09-13 13:25:43 +00:00
|
|
|
|
|
|
|
// whether to print additional log messages
|
|
|
|
// for each sent request
|
|
|
|
debug bool
|
2022-07-22 08:44:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type keyValue struct {
|
|
|
|
key string
|
|
|
|
value string
|
2020-03-13 10:19:31 +00:00
|
|
|
}
|
|
|
|
|
2021-04-28 20:41:15 +00:00
|
|
|
// Clone makes clone of VMStorage, shares http client.
|
|
|
|
func (s *VMStorage) Clone() *VMStorage {
|
2023-06-02 09:38:55 +00:00
|
|
|
ns := &VMStorage{
|
2022-05-13 13:19:32 +00:00
|
|
|
c: s.c,
|
|
|
|
authCfg: s.authCfg,
|
|
|
|
datasourceURL: s.datasourceURL,
|
2023-06-01 09:38:48 +00:00
|
|
|
appendTypePrefix: s.appendTypePrefix,
|
2022-05-13 13:19:32 +00:00
|
|
|
queryStep: s.queryStep,
|
2023-06-01 09:38:48 +00:00
|
|
|
|
|
|
|
dataSourceType: s.dataSourceType,
|
|
|
|
evaluationInterval: s.evaluationInterval,
|
2023-06-02 09:38:55 +00:00
|
|
|
|
|
|
|
// init map so it can be populated below
|
|
|
|
extraParams: url.Values{},
|
2023-06-01 09:38:48 +00:00
|
|
|
|
|
|
|
debug: s.debug,
|
2021-04-28 20:41:15 +00:00
|
|
|
}
|
2023-06-02 09:38:55 +00:00
|
|
|
if len(s.extraHeaders) > 0 {
|
|
|
|
ns.extraHeaders = make([]keyValue, len(s.extraHeaders))
|
|
|
|
copy(ns.extraHeaders, s.extraHeaders)
|
|
|
|
}
|
|
|
|
for k, v := range s.extraParams {
|
|
|
|
ns.extraParams[k] = v
|
|
|
|
}
|
|
|
|
|
|
|
|
return ns
|
2021-04-28 20:41:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ApplyParams - changes given querier params.
|
|
|
|
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
|
2024-09-23 07:38:30 +00:00
|
|
|
if params.DataSourceType != "" {
|
|
|
|
s.dataSourceType = toDatasourceType(params.DataSourceType)
|
|
|
|
}
|
2021-04-30 07:31:45 +00:00
|
|
|
s.evaluationInterval = params.EvaluationInterval
|
2023-06-02 09:38:55 +00:00
|
|
|
if params.QueryParams != nil {
|
|
|
|
if s.extraParams == nil {
|
|
|
|
s.extraParams = url.Values{}
|
|
|
|
}
|
|
|
|
for k, vl := range params.QueryParams {
|
2023-09-08 07:32:48 +00:00
|
|
|
// custom query params are prior to default ones
|
|
|
|
if s.extraParams.Has(k) {
|
|
|
|
s.extraParams.Del(k)
|
|
|
|
}
|
|
|
|
for _, v := range vl {
|
|
|
|
// don't use .Set() instead of Del/Add since it is allowed
|
|
|
|
// for GET params to be duplicated
|
|
|
|
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4908
|
|
|
|
s.extraParams.Add(k, v)
|
2023-06-02 09:38:55 +00:00
|
|
|
}
|
2023-06-01 07:44:11 +00:00
|
|
|
}
|
|
|
|
}
|
2022-07-22 08:44:55 +00:00
|
|
|
if params.Headers != nil {
|
|
|
|
for key, value := range params.Headers {
|
|
|
|
kv := keyValue{key: key, value: value}
|
|
|
|
s.extraHeaders = append(s.extraHeaders, kv)
|
|
|
|
}
|
|
|
|
}
|
2023-06-01 09:38:48 +00:00
|
|
|
s.debug = params.Debug
|
2021-04-28 20:41:15 +00:00
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
|
|
|
// BuildWithParams - implements interface.
|
|
|
|
func (s *VMStorage) BuildWithParams(params QuerierParams) Querier {
|
|
|
|
return s.Clone().ApplyParams(params)
|
|
|
|
}
|
|
|
|
|
2020-03-13 10:19:31 +00:00
|
|
|
// NewVMStorage is a constructor for VMStorage
|
2024-03-12 15:16:50 +00:00
|
|
|
func NewVMStorage(baseURL string, authCfg *promauth.Config, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage {
|
2020-03-13 10:19:31 +00:00
|
|
|
return &VMStorage{
|
2022-05-13 13:19:32 +00:00
|
|
|
c: c,
|
|
|
|
authCfg: authCfg,
|
|
|
|
datasourceURL: strings.TrimSuffix(baseURL, "/"),
|
|
|
|
appendTypePrefix: appendTypePrefix,
|
|
|
|
queryStep: queryStep,
|
2022-07-22 08:44:55 +00:00
|
|
|
dataSourceType: datasourcePrometheus,
|
2023-06-02 09:38:55 +00:00
|
|
|
extraParams: url.Values{},
|
2020-03-13 10:19:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-30 06:46:03 +00:00
|
|
|
// Query executes the given query and returns parsed response
|
2023-05-08 07:36:39 +00:00
|
|
|
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
|
2024-02-29 13:25:36 +00:00
|
|
|
req, err := s.newQueryRequest(ctx, query, ts)
|
2023-10-17 09:58:19 +00:00
|
|
|
if err != nil {
|
|
|
|
return Result{}, nil, err
|
|
|
|
}
|
2024-02-29 13:25:36 +00:00
|
|
|
resp, err := s.do(req)
|
2023-10-25 21:19:33 +00:00
|
|
|
if err != nil {
|
2024-05-30 15:54:42 +00:00
|
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
|
2023-10-25 21:19:33 +00:00
|
|
|
// Return unexpected error to the caller.
|
|
|
|
return Result{}, nil, err
|
|
|
|
}
|
|
|
|
// Something in the middle between client and datasource might be closing
|
2023-08-23 22:04:05 +00:00
|
|
|
// the connection. So we do a one more attempt in hope request will succeed.
|
2024-02-29 13:25:36 +00:00
|
|
|
req, err = s.newQueryRequest(ctx, query, ts)
|
2023-10-25 21:19:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
|
|
|
}
|
2024-02-29 13:25:36 +00:00
|
|
|
resp, err = s.do(req)
|
2023-10-25 21:19:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
|
|
|
}
|
2023-08-23 22:04:05 +00:00
|
|
|
}
|
2021-04-30 06:46:03 +00:00
|
|
|
|
2023-10-25 21:19:33 +00:00
|
|
|
// Process the received response.
|
2024-09-23 07:38:30 +00:00
|
|
|
var parseFn func(req *http.Request, resp *http.Response) (Result, error)
|
|
|
|
switch s.dataSourceType {
|
|
|
|
case datasourcePrometheus:
|
|
|
|
parseFn = parsePrometheusResponse
|
|
|
|
case datasourceGraphite:
|
|
|
|
parseFn = parseGraphiteResponse
|
|
|
|
case datasourceVlogs:
|
|
|
|
parseFn = parseVlogsResponse
|
2021-04-30 06:46:03 +00:00
|
|
|
}
|
2022-09-15 10:40:22 +00:00
|
|
|
result, err := parseFn(req, resp)
|
2023-10-25 21:19:33 +00:00
|
|
|
_ = resp.Body.Close()
|
2022-09-15 10:40:22 +00:00
|
|
|
return result, req, err
|
2021-02-01 13:02:44 +00:00
|
|
|
}
|
|
|
|
|
2021-06-09 09:20:38 +00:00
|
|
|
// QueryRange executes the given query on the given time range.
|
|
|
|
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
|
|
|
|
// Graphite type isn't supported.
|
2023-05-08 07:36:39 +00:00
|
|
|
func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) {
|
2024-09-23 07:38:30 +00:00
|
|
|
if s.dataSourceType == datasourceGraphite {
|
2023-05-08 07:36:39 +00:00
|
|
|
return res, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType)
|
2021-06-09 09:20:38 +00:00
|
|
|
}
|
|
|
|
if start.IsZero() {
|
2023-05-08 07:36:39 +00:00
|
|
|
return res, fmt.Errorf("start param is missing")
|
2020-03-13 10:19:31 +00:00
|
|
|
}
|
2021-06-09 09:20:38 +00:00
|
|
|
if end.IsZero() {
|
2023-05-08 07:36:39 +00:00
|
|
|
return res, fmt.Errorf("end param is missing")
|
2021-04-30 06:46:03 +00:00
|
|
|
}
|
2024-02-29 13:25:36 +00:00
|
|
|
req, err := s.newQueryRangeRequest(ctx, query, start, end)
|
2023-10-17 09:58:19 +00:00
|
|
|
if err != nil {
|
2023-10-25 21:19:33 +00:00
|
|
|
return res, err
|
2023-10-17 09:58:19 +00:00
|
|
|
}
|
2024-02-29 13:25:36 +00:00
|
|
|
resp, err := s.do(req)
|
2023-10-25 21:19:33 +00:00
|
|
|
if err != nil {
|
2024-05-30 15:54:42 +00:00
|
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
|
2023-10-25 21:19:33 +00:00
|
|
|
// Return unexpected error to the caller.
|
|
|
|
return res, err
|
|
|
|
}
|
|
|
|
// Something in the middle between client and datasource might be closing
|
2023-08-23 22:04:05 +00:00
|
|
|
// the connection. So we do a one more attempt in hope request will succeed.
|
2024-02-29 13:25:36 +00:00
|
|
|
req, err = s.newQueryRangeRequest(ctx, query, start, end)
|
2023-10-25 21:19:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return res, fmt.Errorf("second attempt: %w", err)
|
|
|
|
}
|
2024-02-29 13:25:36 +00:00
|
|
|
resp, err = s.do(req)
|
2023-10-25 21:19:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return res, fmt.Errorf("second attempt: %w", err)
|
|
|
|
}
|
2023-08-23 22:04:05 +00:00
|
|
|
}
|
2023-10-25 21:19:33 +00:00
|
|
|
|
|
|
|
// Process the received response.
|
|
|
|
res, err = parsePrometheusResponse(req, resp)
|
|
|
|
_ = resp.Body.Close()
|
|
|
|
return res, err
|
2021-04-30 06:46:03 +00:00
|
|
|
}
|
|
|
|
|
2024-02-29 13:25:36 +00:00
|
|
|
func (s *VMStorage) do(req *http.Request) (*http.Response, error) {
|
2023-10-10 09:40:27 +00:00
|
|
|
ru := req.URL.Redacted()
|
|
|
|
if *showDatasourceURL {
|
|
|
|
ru = req.URL.String()
|
|
|
|
}
|
2022-09-13 13:25:43 +00:00
|
|
|
if s.debug {
|
2023-10-10 09:40:27 +00:00
|
|
|
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru)
|
2022-09-13 13:25:43 +00:00
|
|
|
}
|
2024-02-29 13:25:36 +00:00
|
|
|
resp, err := s.c.Do(req)
|
2020-03-13 10:19:31 +00:00
|
|
|
if err != nil {
|
2023-10-10 09:40:27 +00:00
|
|
|
return nil, fmt.Errorf("error getting response from %s: %w", ru, err)
|
2020-03-13 10:19:31 +00:00
|
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
2022-08-21 21:13:44 +00:00
|
|
|
body, _ := io.ReadAll(resp.Body)
|
2021-04-30 06:46:03 +00:00
|
|
|
_ = resp.Body.Close()
|
2023-10-10 09:40:27 +00:00
|
|
|
return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, ru, body)
|
2020-03-13 10:19:31 +00:00
|
|
|
}
|
2021-04-30 06:46:03 +00:00
|
|
|
return resp, nil
|
2021-02-01 13:02:44 +00:00
|
|
|
}
|
|
|
|
|
2024-02-29 13:25:36 +00:00
|
|
|
func (s *VMStorage) newQueryRangeRequest(ctx context.Context, query string, start, end time.Time) (*http.Request, error) {
|
|
|
|
req, err := s.newRequest(ctx)
|
2023-10-17 09:58:19 +00:00
|
|
|
if err != nil {
|
2023-10-25 19:24:01 +00:00
|
|
|
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", s.datasourceURL, err)
|
2023-10-17 09:58:19 +00:00
|
|
|
}
|
2023-08-23 22:04:05 +00:00
|
|
|
s.setPrometheusRangeReqParams(req, query, start, end)
|
2023-10-17 09:58:19 +00:00
|
|
|
return req, nil
|
2023-08-23 22:04:05 +00:00
|
|
|
}
|
|
|
|
|
2024-02-29 13:25:36 +00:00
|
|
|
func (s *VMStorage) newQueryRequest(ctx context.Context, query string, ts time.Time) (*http.Request, error) {
|
|
|
|
req, err := s.newRequest(ctx)
|
2023-10-17 09:58:19 +00:00
|
|
|
if err != nil {
|
2023-10-25 19:24:01 +00:00
|
|
|
return nil, fmt.Errorf("cannot create query request to datasource %q: %w", s.datasourceURL, err)
|
2023-10-17 09:58:19 +00:00
|
|
|
}
|
2023-08-23 22:04:05 +00:00
|
|
|
switch s.dataSourceType {
|
|
|
|
case "", datasourcePrometheus:
|
|
|
|
s.setPrometheusInstantReqParams(req, query, ts)
|
|
|
|
case datasourceGraphite:
|
2024-03-12 15:16:50 +00:00
|
|
|
s.setGraphiteReqParams(req, query)
|
2024-09-23 07:38:30 +00:00
|
|
|
case datasourceVlogs:
|
|
|
|
s.setVlogsInstantReqParams(req, query, ts)
|
2023-08-23 22:04:05 +00:00
|
|
|
default:
|
|
|
|
logger.Panicf("BUG: engine not found: %q", s.dataSourceType)
|
|
|
|
}
|
2023-10-17 09:58:19 +00:00
|
|
|
return req, nil
|
2023-08-23 22:04:05 +00:00
|
|
|
}
|
|
|
|
|
2024-02-29 13:25:36 +00:00
|
|
|
func (s *VMStorage) newRequest(ctx context.Context) (*http.Request, error) {
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.datasourceURL, nil)
|
2021-06-09 09:20:38 +00:00
|
|
|
if err != nil {
|
2023-08-23 22:04:05 +00:00
|
|
|
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err)
|
2020-03-13 10:19:31 +00:00
|
|
|
}
|
2021-11-09 16:03:50 +00:00
|
|
|
req.Header.Set("Content-Type", "application/json")
|
2021-09-14 11:32:06 +00:00
|
|
|
if s.authCfg != nil {
|
2023-10-17 09:58:19 +00:00
|
|
|
err = s.authCfg.SetHeaders(req, true)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-02-01 13:02:44 +00:00
|
|
|
}
|
2022-07-21 13:59:55 +00:00
|
|
|
for _, h := range s.extraHeaders {
|
2022-07-22 08:44:55 +00:00
|
|
|
req.Header.Set(h.key, h.value)
|
2022-07-21 13:59:55 +00:00
|
|
|
}
|
2023-10-17 09:58:19 +00:00
|
|
|
return req, nil
|
2021-02-01 13:02:44 +00:00
|
|
|
}
|