vmalert: remove dependency on datasource pkg from config (#2905)

* vmalert: remove dependency on datasource pkg from config

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2022-07-22 10:44:55 +02:00 committed by GitHub
parent 89890eab5d
commit 2914ce5ca5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 111 additions and 83 deletions

View file

@ -20,7 +20,7 @@ import (
// AlertingRule is basic alert entity
type AlertingRule struct {
Type datasource.Type
Type config.Type
RuleID uint64
Name string
Expr string
@ -72,7 +72,7 @@ func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
GroupName: group.Name,
EvalInterval: group.Interval,
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: &group.Type,
DataSourceType: group.Type.String(),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,

View file

@ -12,7 +12,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
@ -23,7 +22,7 @@ import (
// Group contains list of Rules grouped into
// entity with one name and evaluation interval
type Group struct {
Type datasource.Type `yaml:"type,omitempty"`
Type Type `yaml:"type,omitempty"`
File string
Name string `yaml:"name"`
Interval *promutils.Duration `yaml:"interval,omitempty"`
@ -39,7 +38,7 @@ type Group struct {
// Optional HTTP URL parameters added to each rule request
Params url.Values `yaml:"params"`
// Headers contains optional HTTP headers added to each rule request
Headers []datasource.Header `yaml:"headers,omitempty"`
Headers []Header `yaml:"headers,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -57,7 +56,7 @@ func (g *Group) UnmarshalYAML(unmarshal func(interface{}) error) error {
}
// change default value to prometheus datasource.
if g.Type.Get() == "" {
g.Type.Set(datasource.NewPrometheusType())
g.Type.Set(NewPrometheusType())
}
h := md5.New()

View file

@ -9,7 +9,6 @@ import (
"gopkg.in/yaml.v2"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
@ -224,7 +223,7 @@ func TestGroup_Validate(t *testing.T) {
},
{
group: &Group{Name: "test thanos",
Type: datasource.NewRawType("thanos"),
Type: NewRawType("thanos"),
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"description": "{{ value|query }}",
@ -236,7 +235,7 @@ func TestGroup_Validate(t *testing.T) {
},
{
group: &Group{Name: "test graphite",
Type: datasource.NewGraphiteType(),
Type: NewGraphiteType(),
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"description": "some-description",
@ -248,7 +247,7 @@ func TestGroup_Validate(t *testing.T) {
},
{
group: &Group{Name: "test prometheus",
Type: datasource.NewPrometheusType(),
Type: NewPrometheusType(),
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"description": "{{ value|query }}",
@ -261,7 +260,7 @@ func TestGroup_Validate(t *testing.T) {
{
group: &Group{
Name: "test graphite inherit",
Type: datasource.NewGraphiteType(),
Type: NewGraphiteType(),
Rules: []Rule{
{
Expr: "sumSeries(time('foo.bar',10))",
@ -276,7 +275,7 @@ func TestGroup_Validate(t *testing.T) {
{
group: &Group{
Name: "test graphite prometheus bad expr",
Type: datasource.NewGraphiteType(),
Type: NewGraphiteType(),
Rules: []Rule{
{
Expr: "sum(up == 0 ) by (host)",

View file

@ -1,4 +1,4 @@
package datasource
package config
import (
"fmt"
@ -10,45 +10,45 @@ import (
// Type represents data source type
type Type struct {
name string
Name string
}
// NewPrometheusType returns prometheus datasource type
func NewPrometheusType() Type {
return Type{
name: "prometheus",
Name: "prometheus",
}
}
// NewGraphiteType returns graphite datasource type
func NewGraphiteType() Type {
return Type{
name: "graphite",
Name: "graphite",
}
}
// NewRawType returns datasource type from raw string
// without validation.
func NewRawType(d string) Type {
return Type{name: d}
return Type{Name: d}
}
// Get returns datasource type
func (t *Type) Get() string {
return t.name
return t.Name
}
// Set changes datasource type
func (t *Type) Set(d Type) {
t.name = d.name
t.Name = d.Name
}
// String implements String interface with default value.
func (t Type) String() string {
if t.name == "" {
if t.Name == "" {
return "prometheus"
}
return t.name
return t.Name
}
// ValidateExpr validates query expression with datasource ql.
@ -63,7 +63,7 @@ func (t *Type) ValidateExpr(expr string) error {
return fmt.Errorf("bad prometheus expr: %q, err: %w", expr, err)
}
default:
return fmt.Errorf("unknown datasource type=%q", t.name)
return fmt.Errorf("unknown datasource type=%q", t.Name)
}
return nil
}
@ -82,13 +82,13 @@ func (t *Type) UnmarshalYAML(unmarshal func(interface{}) error) error {
default:
return fmt.Errorf("unknown datasource type=%q, want %q or %q", s, "prometheus", "graphite")
}
t.name = s
t.Name = s
return nil
}
// MarshalYAML implements the yaml.Unmarshaler interface.
func (t Type) MarshalYAML() (interface{}, error) {
return t.name, nil
return t.Name, nil
}
// Header is a Key - Value struct for holding an HTTP header.

View file

@ -19,10 +19,10 @@ type QuerierBuilder interface {
// QuerierParams params for Querier.
type QuerierParams struct {
DataSourceType *Type
DataSourceType string
EvaluationInterval time.Duration
QueryParams url.Values
Headers []Header
Headers map[string]string
}
// Metric is the basic entity which should be return by datasource

View file

@ -97,7 +97,7 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
appendTypePrefix: *appendTypePrefix,
lookBack: *lookBack,
queryStep: *queryStep,
dataSourceType: NewPrometheusType(),
dataSourceType: datasourcePrometheus,
extraParams: extraParams,
}, nil
}

View file

@ -12,6 +12,20 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
type datasourceType string
const (
datasourcePrometheus datasourceType = "prometheus"
datasourceGraphite datasourceType = "graphite"
)
func toDatasourceType(s string) datasourceType {
if s == string(datasourceGraphite) {
return datasourceGraphite
}
return datasourcePrometheus
}
// VMStorage represents vmstorage entity with ability to read and write metrics
type VMStorage struct {
c *http.Client
@ -21,10 +35,15 @@ type VMStorage struct {
lookBack time.Duration
queryStep time.Duration
dataSourceType Type
dataSourceType datasourceType
evaluationInterval time.Duration
extraParams url.Values
extraHeaders []Header
extraHeaders []keyValue
}
type keyValue struct {
key string
value string
}
// Clone makes clone of VMStorage, shares http client.
@ -42,12 +61,15 @@ func (s *VMStorage) Clone() *VMStorage {
// ApplyParams - changes given querier params.
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
if params.DataSourceType != nil {
s.dataSourceType = *params.DataSourceType
}
s.dataSourceType = toDatasourceType(params.DataSourceType)
s.evaluationInterval = params.EvaluationInterval
s.extraParams = params.QueryParams
s.extraHeaders = params.Headers
if params.Headers != nil {
for key, value := range params.Headers {
kv := keyValue{key: key, value: value}
s.extraHeaders = append(s.extraHeaders, kv)
}
}
return s
}
@ -65,7 +87,7 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati
appendTypePrefix: appendTypePrefix,
lookBack: lookBack,
queryStep: queryStep,
dataSourceType: NewPrometheusType(),
dataSourceType: datasourcePrometheus,
}
}
@ -76,13 +98,13 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Me
return nil, err
}
switch s.dataSourceType.String() {
case "prometheus":
switch s.dataSourceType {
case "", datasourcePrometheus:
s.setPrometheusInstantReqParams(req, query, ts)
case "graphite":
case datasourceGraphite:
s.setGraphiteReqParams(req, query, ts)
default:
return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name)
return nil, fmt.Errorf("engine not found: %q", s.dataSourceType)
}
resp, err := s.do(ctx, req)
@ -94,7 +116,7 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Me
}()
parseFn := parsePrometheusResponse
if s.dataSourceType.name != "prometheus" {
if s.dataSourceType != datasourcePrometheus {
parseFn = parseGraphiteResponse
}
return parseFn(req, resp)
@ -104,8 +126,8 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Me
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
// Graphite type isn't supported.
func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) ([]Metric, error) {
if s.dataSourceType.name != "prometheus" {
return nil, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType.name)
if s.dataSourceType != datasourcePrometheus {
return nil, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType)
}
req, err := s.newRequestPOST()
if err != nil {
@ -151,7 +173,7 @@ func (s *VMStorage) newRequestPOST() (*http.Request, error) {
s.authCfg.SetHeaders(req, true)
}
for _, h := range s.extraHeaders {
req.Header.Set(h.Key, h.Value)
req.Header.Set(h.key, h.value)
}
return req, nil
}

View file

@ -89,8 +89,8 @@ func TestVMInstantQuery(t *testing.T) {
}
s := NewVMStorage(srv.URL, authCfg, time.Minute, 0, false, srv.Client())
p := NewPrometheusType()
pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second})
p := datasourcePrometheus
pq := s.BuildWithParams(QuerierParams{DataSourceType: string(p), EvaluationInterval: 15 * time.Second})
ts := time.Now()
expErr := func(err string) {
@ -146,8 +146,7 @@ func TestVMInstantQuery(t *testing.T) {
t.Fatalf("unexpected metric %+v want %+v", m, expected)
}
g := NewGraphiteType()
gq := s.BuildWithParams(QuerierParams{DataSourceType: &g})
gq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceGraphite)})
m, err = gq.Query(ctx, queryRender, ts) // 8 - graphite
if err != nil {
@ -212,8 +211,7 @@ func TestVMRangeQuery(t *testing.T) {
}
s := NewVMStorage(srv.URL, authCfg, time.Minute, 0, false, srv.Client())
p := NewPrometheusType()
pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second})
pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus), EvaluationInterval: 15 * time.Second})
_, err = pq.QueryRange(ctx, query, time.Now(), time.Time{})
expectError(t, err, "is missing")
@ -239,8 +237,7 @@ func TestVMRangeQuery(t *testing.T) {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
}
g := NewGraphiteType()
gq := s.BuildWithParams(QuerierParams{DataSourceType: &g})
gq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceGraphite)})
_, err = gq.QueryRange(ctx, queryRender, start, end)
expectError(t, err, "is not supported")
@ -263,7 +260,7 @@ func TestRequestParams(t *testing.T) {
"prometheus path",
false,
&VMStorage{
dataSourceType: NewPrometheusType(),
dataSourceType: datasourcePrometheus,
},
func(t *testing.T, r *http.Request) {
checkEqualString(t, "/api/v1/query", r.URL.Path)
@ -273,7 +270,7 @@ func TestRequestParams(t *testing.T) {
"prometheus prefix",
false,
&VMStorage{
dataSourceType: NewPrometheusType(),
dataSourceType: datasourcePrometheus,
appendTypePrefix: true,
},
func(t *testing.T, r *http.Request) {
@ -284,7 +281,7 @@ func TestRequestParams(t *testing.T) {
"prometheus range path",
true,
&VMStorage{
dataSourceType: NewPrometheusType(),
dataSourceType: datasourcePrometheus,
},
func(t *testing.T, r *http.Request) {
checkEqualString(t, "/api/v1/query_range", r.URL.Path)
@ -294,7 +291,7 @@ func TestRequestParams(t *testing.T) {
"prometheus range prefix",
true,
&VMStorage{
dataSourceType: NewPrometheusType(),
dataSourceType: datasourcePrometheus,
appendTypePrefix: true,
},
func(t *testing.T, r *http.Request) {
@ -305,7 +302,7 @@ func TestRequestParams(t *testing.T) {
"graphite path",
false,
&VMStorage{
dataSourceType: NewGraphiteType(),
dataSourceType: datasourceGraphite,
},
func(t *testing.T, r *http.Request) {
checkEqualString(t, graphitePath, r.URL.Path)
@ -315,7 +312,7 @@ func TestRequestParams(t *testing.T) {
"graphite prefix",
false,
&VMStorage{
dataSourceType: NewGraphiteType(),
dataSourceType: datasourceGraphite,
appendTypePrefix: true,
},
func(t *testing.T, r *http.Request) {
@ -453,7 +450,7 @@ func TestRequestParams(t *testing.T) {
"graphite extra params",
false,
&VMStorage{
dataSourceType: NewGraphiteType(),
dataSourceType: datasourceGraphite,
extraParams: url.Values{
"nocache": {"1"},
"max_lookback": {"1h"},
@ -472,14 +469,14 @@ func TestRequestParams(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
switch tc.vm.dataSourceType.String() {
case "prometheus":
switch tc.vm.dataSourceType {
case "", datasourcePrometheus:
if tc.queryRange {
tc.vm.setPrometheusRangeReqParams(req, query, timestamp, timestamp)
} else {
tc.vm.setPrometheusInstantReqParams(req, query, timestamp)
}
case "graphite":
case datasourceGraphite:
tc.vm.setGraphiteReqParams(req, query, timestamp)
}
tc.checkFn(t, req)
@ -530,9 +527,9 @@ func TestHeaders(t *testing.T) {
{
name: "custom extraHeaders",
vmFn: func() *VMStorage {
return &VMStorage{extraHeaders: []Header{
{Key: "Foo", Value: "bar"},
{Key: "Baz", Value: "qux"},
return &VMStorage{extraHeaders: []keyValue{
{key: "Foo", value: "bar"},
{key: "Baz", value: "qux"},
}}
},
checkFn: func(t *testing.T, r *http.Request) {
@ -551,8 +548,8 @@ func TestHeaders(t *testing.T) {
}
return &VMStorage{
authCfg: cfg,
extraHeaders: []Header{
{Key: "Authorization", Value: "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="},
extraHeaders: []keyValue{
{key: "Authorization", value: "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="},
}}
},
checkFn: func(t *testing.T, r *http.Request) {

View file

@ -28,7 +28,7 @@ type Group struct {
Name string
File string
Rules []Rule
Type datasource.Type
Type config.Type
Interval time.Duration
Limit int
Concurrency int
@ -37,7 +37,7 @@ type Group struct {
Labels map[string]string
Params url.Values
Headers []datasource.Header
Headers map[string]string
doneCh chan struct{}
finishedCh chan struct{}
@ -97,7 +97,7 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
Concurrency: cfg.Concurrency,
Checksum: cfg.Checksum,
Params: cfg.Params,
Headers: cfg.Headers,
Headers: make(map[string]string),
Labels: cfg.Labels,
doneCh: make(chan struct{}),
@ -110,6 +110,9 @@ func newGroup(cfg config.Group, qb datasource.QuerierBuilder, defaultInterval ti
if g.Concurrency < 1 {
g.Concurrency = 1
}
for _, h := range cfg.Headers {
g.Headers[h.Key] = h.Value
}
g.metrics = newGroupMetrics(g)
rules := make([]Rule, len(cfg.Rules))
for i, r := range cfg.Rules {

View file

@ -200,13 +200,22 @@ func urlValuesToStrings(values url.Values) []string {
return res
}
func headersToStrings(headers []datasource.Header) []string {
func headersToStrings(headers map[string]string) []string {
if len(headers) < 1 {
return nil
}
var res []string
for _, h := range headers {
res = append(res, fmt.Sprintf("%s: %s", h.Key, h.Value))
keys := make([]string, 0, len(headers))
for k := range headers {
keys = append(keys, k)
}
sort.Strings(keys)
var res []string
for _, k := range keys {
v := headers[k]
res = append(res, fmt.Sprintf("%s: %s", k, v))
}
return res
}

View file

@ -10,7 +10,6 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates"
@ -132,7 +131,7 @@ func TestManagerUpdate(t *testing.T) {
{
File: "config/testdata/dir/rules1-good.rules",
Name: "duplicatedGroupDiffFiles",
Type: datasource.NewPrometheusType(),
Type: config.NewPrometheusType(),
Interval: defaultEvalInterval,
Rules: []Rule{
&AlertingRule{
@ -157,14 +156,14 @@ func TestManagerUpdate(t *testing.T) {
{
File: "config/testdata/rules/rules0-good.rules",
Name: "groupGorSingleAlert",
Type: datasource.NewPrometheusType(),
Type: config.NewPrometheusType(),
Rules: []Rule{VMRows},
Interval: defaultEvalInterval,
},
{
File: "config/testdata/rules/rules0-good.rules",
Interval: defaultEvalInterval,
Type: datasource.NewPrometheusType(),
Type: config.NewPrometheusType(),
Name: "TestGroup", Rules: []Rule{
Conns,
ExampleAlertAlwaysFiring,
@ -179,7 +178,7 @@ func TestManagerUpdate(t *testing.T) {
{
File: "config/testdata/rules/rules0-good.rules",
Name: "groupGorSingleAlert",
Type: datasource.NewPrometheusType(),
Type: config.NewPrometheusType(),
Interval: defaultEvalInterval,
Rules: []Rule{VMRows},
},
@ -187,7 +186,7 @@ func TestManagerUpdate(t *testing.T) {
File: "config/testdata/rules/rules0-good.rules",
Interval: defaultEvalInterval,
Name: "TestGroup",
Type: datasource.NewPrometheusType(),
Type: config.NewPrometheusType(),
Rules: []Rule{
Conns,
ExampleAlertAlwaysFiring,
@ -202,14 +201,14 @@ func TestManagerUpdate(t *testing.T) {
{
File: "config/testdata/rules/rules0-good.rules",
Name: "groupGorSingleAlert",
Type: datasource.NewPrometheusType(),
Type: config.NewPrometheusType(),
Interval: defaultEvalInterval,
Rules: []Rule{VMRows},
},
{
File: "config/testdata/rules/rules0-good.rules",
Interval: defaultEvalInterval,
Type: datasource.NewPrometheusType(),
Type: config.NewPrometheusType(),
Name: "TestGroup", Rules: []Rule{
Conns,
ExampleAlertAlwaysFiring,

View file

@ -18,7 +18,7 @@ import (
// to evaluate configured Expression and
// return TimeSeries as result.
type RecordingRule struct {
Type datasource.Type
Type config.Type
RuleID uint64
Name string
Expr string
@ -70,7 +70,7 @@ func newRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
GroupID: group.ID(),
metrics: &recordingRuleMetrics{},
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: &group.Type,
DataSourceType: group.Type.String(),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,

View file

@ -27,6 +27,6 @@ func init() {
func Init() {
extraLabels := strings.Join(*pushExtraLabels, ",")
for _, pu := range *pushURL {
metrics.InitPushExt(pu, *pushInterval, extraLabels, appmetrics.WritePrometheusMetrics)
_ = metrics.InitPushExt(pu, *pushInterval, extraLabels, appmetrics.WritePrometheusMetrics)
}
}