mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
vmalert: support rules backfilling (aka replay
) (#1358)
* vmalert: support rules backfilling (aka `replay`) vmalert can `replay` configured rules in the past and backfill results via remote write protocol. It supports MetricsQL/PromQL storage as data source, and can backfill data to remote write compatible storage. Supports recording and alerting rules `replay`. See more details in README. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/836 * vmalert: review fixes * vmalert: readme fixes
This commit is contained in:
parent
408fc90b40
commit
2a259ef5e7
23 changed files with 1531 additions and 315 deletions
|
@ -69,6 +69,15 @@ run-vmalert: vmalert
|
|||
-evaluationInterval=3s \
|
||||
-rule.configCheckInterval=10s
|
||||
|
||||
replay-vmalert: vmalert
|
||||
./bin/vmalert -rule=app/vmalert/config/testdata/rules-replay-good.rules \
|
||||
-datasource.url=http://localhost:8428 \
|
||||
-remoteWrite.url=http://localhost:8428 \
|
||||
-external.label=cluster=east-1 \
|
||||
-external.label=replica=a \
|
||||
-replay.timeFrom=2021-05-11T07:21:43Z \
|
||||
-replay.timeTo=2021-05-29T18:40:43Z
|
||||
|
||||
vmalert-amd64:
|
||||
CGO_ENABLED=1 GOARCH=amd64 $(MAKE) vmalert-local-with-goarch
|
||||
|
||||
|
|
|
@ -12,7 +12,8 @@ rules against configured address.
|
|||
support;
|
||||
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
|
||||
* Keeps the alerts [state on restarts](#alerts-state-on-restarts);
|
||||
* Graphite datasource can be used for alerting and recording rules. See [these docs](#graphite) for details.
|
||||
* Graphite datasource can be used for alerting and recording rules. See [these docs](#graphite) for details;
|
||||
* Recording and Alerting rules backfilling (aka `replay`);
|
||||
* Lightweight without extra dependencies.
|
||||
|
||||
## Limitations
|
||||
|
@ -227,6 +228,93 @@ implements [Graphite Render API](https://graphite.readthedocs.io/en/stable/rende
|
|||
When using vmalert with both `graphite` and `prometheus` rules configured against cluster version of VM do not forget
|
||||
to set `-datasource.appendTypePrefix` flag to `true`, so vmalert can adjust URL prefix automatically based on query type.
|
||||
|
||||
## Rules backfilling
|
||||
|
||||
vmalert supports alerting and recording rules backfilling (aka `replay`). In replay mode vmalert
|
||||
can read the same rules configuration as normally, evaluate them on the given time range and backfill
|
||||
results via remote write to the configured storage. vmalert supports any PromQL/MetricsQL compatible
|
||||
data source for backfilling.
|
||||
|
||||
### How it works
|
||||
|
||||
In `replay` mode vmalert works as a cli-tool and exits immediately after work is done.
|
||||
To run vmalert in `replay` mode:
|
||||
```
|
||||
./bin/vmalert -rule=path/to/your.rules \ # path to files with rules you usually use with vmalert
|
||||
-datasource.url=http://localhost:8428 \ # PromQL/MetricsQL compatible datasource
|
||||
-remoteWrite.url=http://localhost:8428 \ # remote write compatible storage to persist results
|
||||
-replay.timeFrom=2021-05-11T07:21:43Z \ # time from begin replay
|
||||
-replay.timeTo=2021-05-29T18:40:43Z # time to finish replay
|
||||
```
|
||||
|
||||
The output of the command will look like the following:
|
||||
```
|
||||
Replay mode:
|
||||
from: 2021-05-11 07:21:43 +0000 UTC # set by -replay.timeFrom
|
||||
to: 2021-05-29 18:40:43 +0000 UTC # set by -replay.timeTo
|
||||
max data points per request: 1000 # set by -replay.maxDatapointsPerQuery
|
||||
|
||||
Group "ReplayGroup"
|
||||
interval: 1m0s
|
||||
requests to make: 27
|
||||
max range per request: 16h40m0s
|
||||
> Rule "type:vm_cache_entries:rate5m" (ID: 1792509946081842725)
|
||||
27 / 27 [----------------------------------------------------------------------------------------------------] 100.00% 78 p/s
|
||||
> Rule "go_cgo_calls_count:rate5m" (ID: 17958425467471411582)
|
||||
27 / 27 [-----------------------------------------------------------------------------------------------------] 100.00% ? p/s
|
||||
|
||||
Group "vmsingleReplay"
|
||||
interval: 30s
|
||||
requests to make: 54
|
||||
max range per request: 8h20m0s
|
||||
> Rule "RequestErrorsToAPI" (ID: 17645863024999990222)
|
||||
54 / 54 [-----------------------------------------------------------------------------------------------------] 100.00% ? p/s
|
||||
> Rule "TooManyLogs" (ID: 9042195394653477652)
|
||||
54 / 54 [-----------------------------------------------------------------------------------------------------] 100.00% ? p/s
|
||||
2021-06-07T09:59:12.098Z info app/vmalert/replay.go:68 replay finished! Imported 511734 samples
|
||||
```
|
||||
|
||||
In `replay` mode all groups are executed sequentially one-by-one. Rules within the group are
|
||||
executed sequentially as well (`concurrency` setting is ignored). Vmalert sends rule's expression
|
||||
to [/query_range](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries) endpoint
|
||||
of the configured `-datasource.url`. Returned data then processed according to the rule type and
|
||||
backfilled to `-remoteWrite.url` via [Remote Write protocol](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations).
|
||||
Vmalert respects `evaluationInterval` value set by flag or per-group during the replay.
|
||||
|
||||
#### Recording rules
|
||||
|
||||
Result of recording rules `replay` should match with results of normal rules evaluation.
|
||||
|
||||
#### Alerting rules
|
||||
|
||||
Result of alerting rules `replay` is time series reflecting [alert's state](#alerts-state-on-restarts).
|
||||
To see if `replayed` alert has fired in the past use the following PromQL/MetricsQL expression:
|
||||
```
|
||||
ALERTS{alertname="your_alertname", alertstate="firing"}
|
||||
```
|
||||
Execute the query against storage which was used for `-remoteWrite.url` during the `replay`.
|
||||
|
||||
### Additional configuration
|
||||
|
||||
There are following non-required `replay` flags:
|
||||
|
||||
* `-replay.maxDatapointsPerQuery` - the max number of data points expected to receive in one request.
|
||||
In two words, it affects the max time range for every `/query_range` request. The higher the value,
|
||||
the less requests will be issued during `replay`.
|
||||
* `-replay.ruleRetryAttempts` - when datasource fails to respond vmalert will make this number of retries
|
||||
per rule before giving up.
|
||||
* `-replay.rulesDelay` - delay between sequential rules execution. Important in cases if there are chaining
|
||||
(rules which depend on each other) rules. It is expected, that remote storage will be able to persist
|
||||
previously accepted data during the delay, so data will be available for the subsequent queries.
|
||||
Keep it equal or bigger than `-remoteWrite.flushInterval`.
|
||||
|
||||
See full description for these flags in `./vmalert --help`.
|
||||
|
||||
### Limitations
|
||||
|
||||
* Graphite engine isn't supported yet;
|
||||
* `query` template function is disabled for performance reasons (might be changed in future);
|
||||
|
||||
|
||||
## Configuration
|
||||
|
||||
|
@ -387,6 +475,16 @@ The shortlist of configuration flags is the following:
|
|||
Optional TLS server name to use for connections to -remoteWrite.url. By default the server name from -remoteWrite.url is used
|
||||
-remoteWrite.url string
|
||||
Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. E.g. http://127.0.0.1:8428
|
||||
-replay.maxDatapointsPerQuery int
|
||||
Max number of data points expected in one request. The higher the value, the less requests will be made during replay. (default 1000)
|
||||
-replay.ruleRetryAttempts int
|
||||
Defines how many retries to make before giving up on rule if request for it returns an error. (default 5)
|
||||
-replay.rulesDelay duration
|
||||
Delay between rules evaluation within the group. Could be important if there are chained rules inside of the groupand processing need to wait for previous rule results to be persisted by remote storage before evaluating the next rule. Keep it equal or bigger than -remoteWrite.flushInterval. (default 1s)
|
||||
-replay.timeFrom string
|
||||
The time filter in RFC3339 format to select time series with timestamp equal or higher than provided value. E.g. '2020-01-01T20:07:00Z'
|
||||
-replay.timeTo string
|
||||
The time filter in RFC3339 format to select timeseries with timestamp equal or lower than provided value. E.g. '2020-01-01T20:07:00Z'
|
||||
-rule array
|
||||
Path to the file with alert rules.
|
||||
Supports patterns. Flag can be specified multiple times.
|
||||
|
|
|
@ -19,15 +19,16 @@ import (
|
|||
|
||||
// AlertingRule is basic alert entity
|
||||
type AlertingRule struct {
|
||||
Type datasource.Type
|
||||
RuleID uint64
|
||||
Name string
|
||||
Expr string
|
||||
For time.Duration
|
||||
Labels map[string]string
|
||||
Annotations map[string]string
|
||||
GroupID uint64
|
||||
GroupName string
|
||||
Type datasource.Type
|
||||
RuleID uint64
|
||||
Name string
|
||||
Expr string
|
||||
For time.Duration
|
||||
Labels map[string]string
|
||||
Annotations map[string]string
|
||||
GroupID uint64
|
||||
GroupName string
|
||||
EvalInterval time.Duration
|
||||
|
||||
q datasource.Querier
|
||||
|
||||
|
@ -53,15 +54,16 @@ type alertingRuleMetrics struct {
|
|||
|
||||
func newAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule) *AlertingRule {
|
||||
ar := &AlertingRule{
|
||||
Type: cfg.Type,
|
||||
RuleID: cfg.ID,
|
||||
Name: cfg.Alert,
|
||||
Expr: cfg.Expr,
|
||||
For: cfg.For.Duration(),
|
||||
Labels: cfg.Labels,
|
||||
Annotations: cfg.Annotations,
|
||||
GroupID: group.ID(),
|
||||
GroupName: group.Name,
|
||||
Type: cfg.Type,
|
||||
RuleID: cfg.ID,
|
||||
Name: cfg.Alert,
|
||||
Expr: cfg.Expr,
|
||||
For: cfg.For.Duration(),
|
||||
Labels: cfg.Labels,
|
||||
Annotations: cfg.Annotations,
|
||||
GroupID: group.ID(),
|
||||
GroupName: group.Name,
|
||||
EvalInterval: group.Interval,
|
||||
q: qb.BuildWithParams(datasource.QuerierParams{
|
||||
DataSourceType: &cfg.Type,
|
||||
EvaluationInterval: group.Interval,
|
||||
|
@ -126,9 +128,66 @@ func (ar *AlertingRule) ID() uint64 {
|
|||
return ar.RuleID
|
||||
}
|
||||
|
||||
// ExecRange executes alerting rule on the given time range similarly to Exec.
|
||||
// It doesn't update internal states of the Rule and meant to be used just
|
||||
// to get time series for backfilling.
|
||||
// It returns ALERT and ALERT_FOR_STATE time series as result.
|
||||
func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
|
||||
series, err := ar.q.QueryRange(ctx, ar.Expr, start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var result []prompbmarshal.TimeSeries
|
||||
qFn := func(query string) ([]datasource.Metric, error) {
|
||||
return nil, fmt.Errorf("`query` template isn't supported in replay mode")
|
||||
}
|
||||
for _, s := range series {
|
||||
// extra labels could contain templates, so we expand them first
|
||||
labels, err := expandLabels(s, qFn, ar)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to expand labels: %s", err)
|
||||
}
|
||||
for k, v := range labels {
|
||||
// apply extra labels to datasource
|
||||
// so the hash key will be consistent on restore
|
||||
s.SetLabel(k, v)
|
||||
}
|
||||
|
||||
a, err := ar.newAlert(s, time.Time{}, qFn) // initial alert
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create alert: %s", err)
|
||||
}
|
||||
if ar.For == 0 { // if alert is instant
|
||||
a.State = notifier.StateFiring
|
||||
for i := range s.Values {
|
||||
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// if alert with For > 0
|
||||
prevT := time.Time{}
|
||||
//activeAt := time.Time{}
|
||||
for i := range s.Values {
|
||||
at := time.Unix(s.Timestamps[i], 0)
|
||||
if at.Sub(prevT) > ar.EvalInterval {
|
||||
// reset to Pending if there are gaps > EvalInterval between DPs
|
||||
a.State = notifier.StatePending
|
||||
//activeAt = at
|
||||
a.Start = at
|
||||
} else if at.Sub(a.Start) >= ar.For {
|
||||
a.State = notifier.StateFiring
|
||||
}
|
||||
prevT = at
|
||||
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Exec executes AlertingRule expression via the given Querier.
|
||||
// Based on the Querier results AlertingRule maintains notifier.Alerts
|
||||
func (ar *AlertingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.TimeSeries, error) {
|
||||
func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) {
|
||||
qMetrics, err := ar.q.Query(ctx, ar.Expr)
|
||||
ar.mu.Lock()
|
||||
defer ar.mu.Unlock()
|
||||
|
@ -168,9 +227,9 @@ func (ar *AlertingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.
|
|||
}
|
||||
updated[h] = struct{}{}
|
||||
if a, ok := ar.alerts[h]; ok {
|
||||
if a.Value != m.Value {
|
||||
if a.Value != m.Values[0] {
|
||||
// update Value field with latest value
|
||||
a.Value = m.Value
|
||||
a.Value = m.Values[0]
|
||||
// and re-exec template since Value can be used
|
||||
// in annotations
|
||||
a.Annotations, err = a.ExecTemplate(qFn, ar.Annotations)
|
||||
|
@ -208,10 +267,7 @@ func (ar *AlertingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.
|
|||
alertsFired.Inc()
|
||||
}
|
||||
}
|
||||
if series {
|
||||
return ar.toTimeSeries(ar.lastExecTime), nil
|
||||
}
|
||||
return nil, nil
|
||||
return ar.toTimeSeries(ar.lastExecTime.Unix()), nil
|
||||
}
|
||||
|
||||
func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) {
|
||||
|
@ -221,13 +277,13 @@ func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (ma
|
|||
}
|
||||
tpl := notifier.AlertTplData{
|
||||
Labels: metricLabels,
|
||||
Value: m.Value,
|
||||
Value: m.Values[0],
|
||||
Expr: ar.Expr,
|
||||
}
|
||||
return notifier.ExecTemplate(q, ar.Labels, tpl)
|
||||
}
|
||||
|
||||
func (ar *AlertingRule) toTimeSeries(timestamp time.Time) []prompbmarshal.TimeSeries {
|
||||
func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries {
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
for _, a := range ar.alerts {
|
||||
if a.State == notifier.StateInactive {
|
||||
|
@ -251,6 +307,7 @@ func (ar *AlertingRule) UpdateWith(r Rule) error {
|
|||
ar.For = nr.For
|
||||
ar.Labels = nr.Labels
|
||||
ar.Annotations = nr.Annotations
|
||||
ar.EvalInterval = nr.EvalInterval
|
||||
ar.q = nr.q
|
||||
return nil
|
||||
}
|
||||
|
@ -279,13 +336,15 @@ func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notif
|
|||
GroupID: ar.GroupID,
|
||||
Name: ar.Name,
|
||||
Labels: map[string]string{},
|
||||
Value: m.Value,
|
||||
Value: m.Values[0],
|
||||
Start: start,
|
||||
Expr: ar.Expr,
|
||||
}
|
||||
// label defined here to make override possible by
|
||||
// time series labels.
|
||||
a.Labels[alertGroupNameLabel] = ar.GroupName
|
||||
if ar.GroupName != "" {
|
||||
a.Labels[alertGroupNameLabel] = ar.GroupName
|
||||
}
|
||||
for _, l := range m.Labels {
|
||||
// drop __name__ to be consistent with Prometheus alerting
|
||||
if l.Name == "__name__" {
|
||||
|
@ -374,7 +433,7 @@ const (
|
|||
)
|
||||
|
||||
// alertToTimeSeries converts the given alert with the given timestamp to timeseries
|
||||
func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries {
|
||||
func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp int64) []prompbmarshal.TimeSeries {
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
tss = append(tss, alertToTimeSeries(ar.Name, a, timestamp))
|
||||
if ar.For > 0 {
|
||||
|
@ -383,7 +442,7 @@ func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp time.Time
|
|||
return tss
|
||||
}
|
||||
|
||||
func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
func alertToTimeSeries(name string, a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries {
|
||||
labels := make(map[string]string)
|
||||
for k, v := range a.Labels {
|
||||
labels[k] = v
|
||||
|
@ -391,19 +450,19 @@ func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prom
|
|||
labels["__name__"] = alertMetricName
|
||||
labels[alertNameLabel] = name
|
||||
labels[alertStateLabel] = a.State.String()
|
||||
return newTimeSeries(1, labels, timestamp)
|
||||
return newTimeSeries([]float64{1}, []int64{timestamp}, labels)
|
||||
}
|
||||
|
||||
// alertForToTimeSeries returns a timeseries that represents
|
||||
// state of active alerts, where value is time when alert become active
|
||||
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries {
|
||||
labels := make(map[string]string)
|
||||
for k, v := range a.Labels {
|
||||
labels[k] = v
|
||||
}
|
||||
labels["__name__"] = alertForStateMetricName
|
||||
labels[alertNameLabel] = name
|
||||
return newTimeSeries(float64(a.Start.Unix()), labels, timestamp)
|
||||
return newTimeSeries([]float64{float64(a.Start.Unix())}, []int64{timestamp}, labels)
|
||||
}
|
||||
|
||||
// Restore restores the state of active alerts basing on previously written timeseries.
|
||||
|
@ -445,7 +504,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
|
|||
m.Labels = append(m.Labels, l)
|
||||
}
|
||||
|
||||
a, err := ar.newAlert(m, time.Unix(int64(m.Value), 0), qFn)
|
||||
a, err := ar.newAlert(m, time.Unix(int64(m.Values[0]), 0), qFn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create alert: %w", err)
|
||||
}
|
||||
|
|
|
@ -24,11 +24,11 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
|
|||
newTestAlertingRule("instant", 0),
|
||||
¬ifier.Alert{State: notifier.StateFiring},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant",
|
||||
}, timestamp),
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -38,13 +38,13 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
|
|||
"instance": "bar",
|
||||
}},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant extra labels",
|
||||
"job": "foo",
|
||||
"instance": "bar",
|
||||
}, timestamp),
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -54,48 +54,52 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
|
|||
"__name__": "bar",
|
||||
}},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant labels override",
|
||||
}, timestamp),
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("for", time.Second),
|
||||
¬ifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "for",
|
||||
}, timestamp),
|
||||
newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for",
|
||||
}, timestamp),
|
||||
}),
|
||||
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
||||
[]int64{timestamp.UnixNano()},
|
||||
map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for",
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("for pending", 10*time.Second),
|
||||
¬ifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StatePending.String(),
|
||||
alertNameLabel: "for pending",
|
||||
}, timestamp),
|
||||
newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for pending",
|
||||
}, timestamp),
|
||||
}),
|
||||
newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())},
|
||||
[]int64{timestamp.UnixNano()},
|
||||
map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for pending",
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.rule.Name, func(t *testing.T) {
|
||||
tc.rule.alerts[tc.alert.ID] = tc.alert
|
||||
tss := tc.rule.toTimeSeries(timestamp)
|
||||
tss := tc.rule.toTimeSeries(timestamp.Unix())
|
||||
if err := compareTimeSeries(t, tc.expTS, tss); err != nil {
|
||||
t.Fatalf("timeseries missmatch: %s", err)
|
||||
}
|
||||
|
@ -118,7 +122,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||
{
|
||||
newTestAlertingRule("empty labels", 0),
|
||||
[][]datasource.Metric{
|
||||
{datasource.Metric{}},
|
||||
{datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}}},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(datasource.Metric{}): {State: notifier.StateFiring},
|
||||
|
@ -299,7 +303,7 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||
for _, step := range tc.steps {
|
||||
fq.reset()
|
||||
fq.add(step...)
|
||||
if _, err := tc.rule.Exec(context.TODO(), false); err != nil {
|
||||
if _, err := tc.rule.Exec(context.TODO()); err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
// artificial delay between applying steps
|
||||
|
@ -321,6 +325,166 @@ func TestAlertingRule_Exec(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAlertingRule_ExecRange(t *testing.T) {
|
||||
testCases := []struct {
|
||||
rule *AlertingRule
|
||||
data []datasource.Metric
|
||||
expAlerts []*notifier.Alert
|
||||
}{
|
||||
{
|
||||
newTestAlertingRule("empty", 0),
|
||||
[]datasource.Metric{},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("empty labels", 0),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1}, Timestamps: []int64{1}},
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("single-firing", 0),
|
||||
[]datasource.Metric{
|
||||
metricWithLabels(t, "name", "foo"),
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{
|
||||
Labels: map[string]string{"name": "foo"},
|
||||
State: notifier.StateFiring,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("single-firing-on-range", 0),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1, 1}, Timestamps: []int64{1e3, 2e3, 3e3}},
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{State: notifier.StateFiring},
|
||||
{State: notifier.StateFiring},
|
||||
{State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("for-pending", time.Second),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{State: notifier.StatePending, Start: time.Unix(1, 0)},
|
||||
{State: notifier.StatePending, Start: time.Unix(3, 0)},
|
||||
{State: notifier.StatePending, Start: time.Unix(5, 0)},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("for-firing", 3*time.Second),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{State: notifier.StatePending, Start: time.Unix(1, 0)},
|
||||
{State: notifier.StatePending, Start: time.Unix(1, 0)},
|
||||
{State: notifier.StateFiring, Start: time.Unix(1, 0)},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("for=>pending=>firing=>pending=>firing=>pending", time.Second),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1, 1, 1, 1}, Timestamps: []int64{1, 2, 5, 6, 20}},
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{State: notifier.StatePending, Start: time.Unix(1, 0)},
|
||||
{State: notifier.StateFiring, Start: time.Unix(1, 0)},
|
||||
{State: notifier.StatePending, Start: time.Unix(5, 0)},
|
||||
{State: notifier.StateFiring, Start: time.Unix(5, 0)},
|
||||
{State: notifier.StatePending, Start: time.Unix(20, 0)},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestAlertingRule("multi-series-for=>pending=>pending=>firing", 3*time.Second),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
|
||||
{Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||
Labels: []datasource.Label{{Name: "foo", Value: "bar"}},
|
||||
},
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{State: notifier.StatePending, Start: time.Unix(1, 0)},
|
||||
{State: notifier.StatePending, Start: time.Unix(1, 0)},
|
||||
{State: notifier.StateFiring, Start: time.Unix(1, 0)},
|
||||
//
|
||||
{State: notifier.StatePending, Start: time.Unix(1, 0),
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
}},
|
||||
{State: notifier.StatePending, Start: time.Unix(5, 0),
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRuleWithLabels("multi-series-firing", "source", "vm"),
|
||||
[]datasource.Metric{
|
||||
{Values: []float64{1, 1}, Timestamps: []int64{1, 100}},
|
||||
{Values: []float64{1, 1}, Timestamps: []int64{1, 5},
|
||||
Labels: []datasource.Label{{Name: "foo", Value: "bar"}},
|
||||
},
|
||||
},
|
||||
[]*notifier.Alert{
|
||||
{State: notifier.StateFiring, Labels: map[string]string{
|
||||
"source": "vm",
|
||||
}},
|
||||
{State: notifier.StateFiring, Labels: map[string]string{
|
||||
"source": "vm",
|
||||
}},
|
||||
//
|
||||
{State: notifier.StateFiring, Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
"source": "vm",
|
||||
}},
|
||||
{State: notifier.StateFiring, Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
"source": "vm",
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
fakeGroup := Group{Name: "TestRule_ExecRange"}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.rule.Name, func(t *testing.T) {
|
||||
fq := &fakeQuerier{}
|
||||
tc.rule.q = fq
|
||||
tc.rule.GroupID = fakeGroup.ID()
|
||||
fq.add(tc.data...)
|
||||
gotTS, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
var expTS []prompbmarshal.TimeSeries
|
||||
var j int
|
||||
for _, series := range tc.data {
|
||||
for _, timestamp := range series.Timestamps {
|
||||
expTS = append(expTS, tc.rule.alertToTimeSeries(tc.expAlerts[j], timestamp)...)
|
||||
j++
|
||||
}
|
||||
}
|
||||
if len(gotTS) != len(expTS) {
|
||||
t.Fatalf("expected %d time series; got %d", len(expTS), len(gotTS))
|
||||
}
|
||||
for i := range expTS {
|
||||
got, exp := gotTS[i], expTS[i]
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("%d: expected \n%v but got \n%v", i, exp, got)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAlertingRule_Restore(t *testing.T) {
|
||||
testCases := []struct {
|
||||
rule *AlertingRule
|
||||
|
@ -443,14 +607,14 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
|
|||
|
||||
// successful attempt
|
||||
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
|
||||
_, err := ar.Exec(context.TODO(), false)
|
||||
_, err := ar.Exec(context.TODO())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// label `job` will collide with rule extra label and will make both time series equal
|
||||
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz"))
|
||||
_, err = ar.Exec(context.TODO(), false)
|
||||
_, err = ar.Exec(context.TODO())
|
||||
if !errors.Is(err, errDuplicate) {
|
||||
t.Fatalf("expected to have %s error; got %s", errDuplicate, err)
|
||||
}
|
||||
|
@ -459,7 +623,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
|
|||
|
||||
expErr := "connection reset by peer"
|
||||
fq.setErr(errors.New(expErr))
|
||||
_, err = ar.Exec(context.TODO(), false)
|
||||
_, err = ar.Exec(context.TODO())
|
||||
if err == nil {
|
||||
t.Fatalf("expected to get err; got nil")
|
||||
}
|
||||
|
@ -484,17 +648,15 @@ func TestAlertingRule_Template(t *testing.T) {
|
|||
hash(metricWithLabels(t, "region", "east", "instance", "foo")): {
|
||||
Annotations: map[string]string{},
|
||||
Labels: map[string]string{
|
||||
alertGroupNameLabel: "",
|
||||
"region": "east",
|
||||
"instance": "foo",
|
||||
"region": "east",
|
||||
"instance": "foo",
|
||||
},
|
||||
},
|
||||
hash(metricWithLabels(t, "region", "east", "instance", "bar")): {
|
||||
Annotations: map[string]string{},
|
||||
Labels: map[string]string{
|
||||
alertGroupNameLabel: "",
|
||||
"region": "east",
|
||||
"instance": "bar",
|
||||
"region": "east",
|
||||
"instance": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -519,9 +681,8 @@ func TestAlertingRule_Template(t *testing.T) {
|
|||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "region", "east", "instance", "foo")): {
|
||||
Labels: map[string]string{
|
||||
alertGroupNameLabel: "",
|
||||
"instance": "foo",
|
||||
"region": "east",
|
||||
"instance": "foo",
|
||||
"region": "east",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": `Too high connection number for "foo" for region east`,
|
||||
|
@ -530,9 +691,8 @@ func TestAlertingRule_Template(t *testing.T) {
|
|||
},
|
||||
hash(metricWithLabels(t, "region", "east", "instance", "bar")): {
|
||||
Labels: map[string]string{
|
||||
alertGroupNameLabel: "",
|
||||
"instance": "bar",
|
||||
"region": "east",
|
||||
"instance": "bar",
|
||||
"region": "east",
|
||||
},
|
||||
Annotations: map[string]string{
|
||||
"summary": `Too high connection number for "bar" for region east`,
|
||||
|
@ -549,7 +709,7 @@ func TestAlertingRule_Template(t *testing.T) {
|
|||
tc.rule.GroupID = fakeGroup.ID()
|
||||
tc.rule.q = fq
|
||||
fq.add(tc.metrics...)
|
||||
if _, err := tc.rule.Exec(context.TODO(), false); err != nil {
|
||||
if _, err := tc.rule.Exec(context.TODO()); err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
for hash, expAlert := range tc.expAlerts {
|
||||
|
@ -579,5 +739,5 @@ func newTestRuleWithLabels(name string, labels ...string) *AlertingRule {
|
|||
}
|
||||
|
||||
func newTestAlertingRule(name string, waitFor time.Duration) *AlertingRule {
|
||||
return &AlertingRule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor}
|
||||
return &AlertingRule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor, EvalInterval: waitFor}
|
||||
}
|
||||
|
|
39
app/vmalert/config/testdata/rules-replay-good.rules
vendored
Normal file
39
app/vmalert/config/testdata/rules-replay-good.rules
vendored
Normal file
|
@ -0,0 +1,39 @@
|
|||
groups:
|
||||
- name: ReplayGroup
|
||||
interval: 1m
|
||||
concurrency: 1
|
||||
rules:
|
||||
- record: type:vm_cache_entries:rate5m
|
||||
expr: sum(rate(vm_cache_entries[5m])) by (type)
|
||||
labels:
|
||||
recording: true
|
||||
- record: go_cgo_calls_count:rate5m
|
||||
expr: rate(go_cgo_calls_count{job="vmdb"}[5m])
|
||||
labels:
|
||||
recording: true
|
||||
|
||||
- name: vmsingleReplay
|
||||
interval: 30s
|
||||
concurrency: 2
|
||||
rules:
|
||||
- alert: RequestErrorsToAPI
|
||||
expr: increase(vm_http_request_errors_total[5m]) > 0
|
||||
for: 15m
|
||||
labels:
|
||||
severity: warning
|
||||
annotations:
|
||||
dashboard: "http://localhost:3000/d/wNf0q_kZk?viewPanel=35&var-instance={{ $labels.instance }}"
|
||||
summary: "Too many errors served for path {{ $labels.path }} (instance {{ $labels.instance }})"
|
||||
description: "Requests to path {{ $labels.path }} are receiving errors.
|
||||
Please verify if clients are sending correct requests."
|
||||
|
||||
- alert: TooManyLogs
|
||||
expr: sum(increase(vm_log_messages_total{level!="info"}[5m])) by (job, instance) > 0
|
||||
for: 15m
|
||||
labels:
|
||||
severity: warning
|
||||
annotations:
|
||||
dashboard: "http://localhost:3000/d/wNf0q_kZk?viewPanel=67&var-instance={{ $labels.instance }}"
|
||||
summary: "Too many logs printed for job \"{{ $labels.job }}\" ({{ $labels.instance }})"
|
||||
description: "Logging rate for job \"{{ $labels.job }}\" ({{ $labels.instance }}) is {{ $value }} for last 15m.\n
|
||||
Worth to check logs for specific error messages."
|
|
@ -2,26 +2,33 @@ package datasource
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Querier interface wraps Query and QueryRange methods
|
||||
type Querier interface {
|
||||
Query(ctx context.Context, query string) ([]Metric, error)
|
||||
QueryRange(ctx context.Context, query string, from, to time.Time) ([]Metric, error)
|
||||
}
|
||||
|
||||
// QuerierBuilder builds Querier with given params.
|
||||
type QuerierBuilder interface {
|
||||
BuildWithParams(params QuerierParams) Querier
|
||||
}
|
||||
|
||||
// Querier interface wraps Query method which
|
||||
// executes given query and returns list of Metrics
|
||||
// as result
|
||||
type Querier interface {
|
||||
Query(ctx context.Context, query string) ([]Metric, error)
|
||||
// QuerierParams params for Querier.
|
||||
type QuerierParams struct {
|
||||
DataSourceType *Type
|
||||
EvaluationInterval time.Duration
|
||||
// see https://docs.victoriametrics.com/#prometheus-querying-api-enhancements
|
||||
ExtraLabels map[string]string
|
||||
}
|
||||
|
||||
// Metric is the basic entity which should be return by datasource
|
||||
// It represents single data point with full list of labels
|
||||
type Metric struct {
|
||||
Labels []Label
|
||||
Timestamp int64
|
||||
Value float64
|
||||
Labels []Label
|
||||
Timestamps []int64
|
||||
Values []float64
|
||||
}
|
||||
|
||||
// SetLabel adds or updates existing one label
|
||||
|
|
18
app/vmalert/datasource/datasource_test.go
Normal file
18
app/vmalert/datasource/datasource_test.go
Normal file
|
@ -0,0 +1,18 @@
|
|||
package datasource
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestMetric_Label(t *testing.T) {
|
||||
m := &Metric{}
|
||||
|
||||
m.AddLabel("foo", "bar")
|
||||
checkEqualString(t, "bar", m.Label("foo"))
|
||||
|
||||
m.SetLabel("foo", "baz")
|
||||
checkEqualString(t, "baz", m.Label("foo"))
|
||||
|
||||
m.SetLabel("qux", "quux")
|
||||
checkEqualString(t, "quux", m.Label("qux"))
|
||||
|
||||
checkEqualString(t, "", m.Label("non-existing"))
|
||||
}
|
|
@ -2,76 +2,13 @@ package datasource
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type response struct {
|
||||
Status string `json:"status"`
|
||||
Data struct {
|
||||
ResultType string `json:"resultType"`
|
||||
Result []struct {
|
||||
Labels map[string]string `json:"metric"`
|
||||
TV [2]interface{} `json:"value"`
|
||||
} `json:"result"`
|
||||
} `json:"data"`
|
||||
ErrorType string `json:"errorType"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func (r response) metrics() ([]Metric, error) {
|
||||
var ms []Metric
|
||||
var m Metric
|
||||
var f float64
|
||||
var err error
|
||||
for i, res := range r.Data.Result {
|
||||
f, err = strconv.ParseFloat(res.TV[1].(string), 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err)
|
||||
}
|
||||
m.Labels = nil
|
||||
for k, v := range r.Data.Result[i].Labels {
|
||||
m.AddLabel(k, v)
|
||||
}
|
||||
m.Timestamp = int64(res.TV[0].(float64))
|
||||
m.Value = f
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
type graphiteResponse []graphiteResponseTarget
|
||||
|
||||
type graphiteResponseTarget struct {
|
||||
Target string `json:"target"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
DataPoints [][2]float64 `json:"datapoints"`
|
||||
}
|
||||
|
||||
func (r graphiteResponse) metrics() []Metric {
|
||||
var ms []Metric
|
||||
for _, res := range r {
|
||||
if len(res.DataPoints) < 1 {
|
||||
continue
|
||||
}
|
||||
var m Metric
|
||||
// add only last value to the result.
|
||||
last := res.DataPoints[len(res.DataPoints)-1]
|
||||
m.Value = last[0]
|
||||
m.Timestamp = int64(last[1])
|
||||
for k, v := range res.Tags {
|
||||
m.AddLabel(k, v)
|
||||
}
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
// VMStorage represents vmstorage entity with ability to read and write metrics
|
||||
type VMStorage struct {
|
||||
c *http.Client
|
||||
|
@ -88,20 +25,6 @@ type VMStorage struct {
|
|||
extraLabels []string
|
||||
}
|
||||
|
||||
const queryPath = "/api/v1/query"
|
||||
const graphitePath = "/render"
|
||||
|
||||
const prometheusPrefix = "/prometheus"
|
||||
const graphitePrefix = "/graphite"
|
||||
|
||||
// QuerierParams params for Querier.
|
||||
type QuerierParams struct {
|
||||
DataSourceType *Type
|
||||
EvaluationInterval time.Duration
|
||||
// see https://docs.victoriametrics.com/#prometheus-querying-api-enhancements
|
||||
ExtraLabels map[string]string
|
||||
}
|
||||
|
||||
// Clone makes clone of VMStorage, shares http client.
|
||||
func (s *VMStorage) Clone() *VMStorage {
|
||||
return &VMStorage{
|
||||
|
@ -149,11 +72,21 @@ func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, lookBack time.Du
|
|||
|
||||
// Query executes the given query and returns parsed response
|
||||
func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
|
||||
req, err := s.prepareReq(query, time.Now())
|
||||
req, err := s.newRequestPOST()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ts := time.Now()
|
||||
switch s.dataSourceType.name {
|
||||
case "", prometheusType:
|
||||
s.setPrometheusInstantReqParams(req, query, ts)
|
||||
case graphiteType:
|
||||
s.setGraphiteReqParams(req, query, ts)
|
||||
default:
|
||||
return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name)
|
||||
}
|
||||
|
||||
resp, err := s.do(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -169,25 +102,32 @@ func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
|
|||
return parseFn(req, resp)
|
||||
}
|
||||
|
||||
func (s *VMStorage) prepareReq(query string, timestamp time.Time) (*http.Request, error) {
|
||||
req, err := http.NewRequest("POST", s.datasourceURL, nil)
|
||||
// QueryRange executes the given query on the given time range.
|
||||
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
|
||||
// Graphite type isn't supported.
|
||||
func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) ([]Metric, error) {
|
||||
if s.dataSourceType.name != prometheusType {
|
||||
return nil, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType.name)
|
||||
}
|
||||
req, err := s.newRequestPOST()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json; charset=utf-8")
|
||||
if s.basicAuthPass != "" {
|
||||
req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass)
|
||||
if start.IsZero() {
|
||||
return nil, fmt.Errorf("start param is missing")
|
||||
}
|
||||
|
||||
switch s.dataSourceType.name {
|
||||
case "", prometheusType:
|
||||
s.setPrometheusReqParams(req, query, timestamp)
|
||||
case graphiteType:
|
||||
s.setGraphiteReqParams(req, query, timestamp)
|
||||
default:
|
||||
return nil, fmt.Errorf("engine not found: %q", s.dataSourceType.name)
|
||||
if end.IsZero() {
|
||||
return nil, fmt.Errorf("end param is missing")
|
||||
}
|
||||
return req, nil
|
||||
s.setPrometheusRangeReqParams(req, query, start, end)
|
||||
resp, err := s.do(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
return parsePrometheusResponse(req, resp)
|
||||
}
|
||||
|
||||
func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) {
|
||||
|
@ -203,80 +143,14 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response,
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string, timestamp time.Time) {
|
||||
if s.appendTypePrefix {
|
||||
r.URL.Path += prometheusPrefix
|
||||
func (s *VMStorage) newRequestPOST() (*http.Request, error) {
|
||||
req, err := http.NewRequest("POST", s.datasourceURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.URL.Path += queryPath
|
||||
q := r.URL.Query()
|
||||
q.Set("query", query)
|
||||
if s.lookBack > 0 {
|
||||
timestamp = timestamp.Add(-s.lookBack)
|
||||
req.Header.Set("Content-Type", "application/json; charset=utf-8")
|
||||
if s.basicAuthPass != "" {
|
||||
req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass)
|
||||
}
|
||||
if s.evaluationInterval > 0 {
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
|
||||
timestamp = timestamp.Truncate(s.evaluationInterval)
|
||||
// set step as evaluationInterval by default
|
||||
q.Set("step", s.evaluationInterval.String())
|
||||
}
|
||||
q.Set("time", fmt.Sprintf("%d", timestamp.Unix()))
|
||||
|
||||
if s.queryStep > 0 {
|
||||
// override step with user-specified value
|
||||
q.Set("step", s.queryStep.String())
|
||||
}
|
||||
if s.roundDigits != "" {
|
||||
q.Set("round_digits", s.roundDigits)
|
||||
}
|
||||
for _, l := range s.extraLabels {
|
||||
q.Add("extra_label", l)
|
||||
}
|
||||
r.URL.RawQuery = q.Encode()
|
||||
}
|
||||
|
||||
func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string, timestamp time.Time) {
|
||||
if s.appendTypePrefix {
|
||||
r.URL.Path += graphitePrefix
|
||||
}
|
||||
r.URL.Path += graphitePath
|
||||
q := r.URL.Query()
|
||||
q.Set("format", "json")
|
||||
q.Set("target", query)
|
||||
from := "-5min"
|
||||
if s.lookBack > 0 {
|
||||
lookBack := timestamp.Add(-s.lookBack)
|
||||
from = strconv.FormatInt(lookBack.Unix(), 10)
|
||||
}
|
||||
q.Set("from", from)
|
||||
q.Set("until", "now")
|
||||
r.URL.RawQuery = q.Encode()
|
||||
}
|
||||
|
||||
const (
|
||||
statusSuccess, statusError, rtVector = "success", "error", "vector"
|
||||
)
|
||||
|
||||
func parsePrometheusResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
|
||||
r := &response{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL, err)
|
||||
}
|
||||
if r.Status == statusError {
|
||||
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error)
|
||||
}
|
||||
if r.Status != statusSuccess {
|
||||
return nil, fmt.Errorf("unknown status: %s, Expected success or error ", r.Status)
|
||||
}
|
||||
if r.Data.ResultType != rtVector {
|
||||
return nil, fmt.Errorf("unknown result type:%s. Expected vector", r.Data.ResultType)
|
||||
}
|
||||
return r.metrics()
|
||||
}
|
||||
|
||||
func parseGraphiteResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
|
||||
r := &graphiteResponse{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing graphite metrics for %s: %w", req.URL, err)
|
||||
}
|
||||
return r.metrics(), nil
|
||||
return req, nil
|
||||
}
|
||||
|
|
67
app/vmalert/datasource/vm_graphite_api.go
Normal file
67
app/vmalert/datasource/vm_graphite_api.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package datasource
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type graphiteResponse []graphiteResponseTarget
|
||||
|
||||
type graphiteResponseTarget struct {
|
||||
Target string `json:"target"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
DataPoints [][2]float64 `json:"datapoints"`
|
||||
}
|
||||
|
||||
func (r graphiteResponse) metrics() []Metric {
|
||||
var ms []Metric
|
||||
for _, res := range r {
|
||||
if len(res.DataPoints) < 1 {
|
||||
continue
|
||||
}
|
||||
var m Metric
|
||||
// add only last value to the result.
|
||||
last := res.DataPoints[len(res.DataPoints)-1]
|
||||
m.Values = append(m.Values, last[0])
|
||||
m.Timestamps = append(m.Timestamps, int64(last[1]))
|
||||
for k, v := range res.Tags {
|
||||
m.AddLabel(k, v)
|
||||
}
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
|
||||
func parseGraphiteResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
|
||||
r := &graphiteResponse{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing graphite metrics for %s: %w", req.URL, err)
|
||||
}
|
||||
return r.metrics(), nil
|
||||
}
|
||||
|
||||
const (
|
||||
graphitePath = "/render"
|
||||
graphitePrefix = "/graphite"
|
||||
)
|
||||
|
||||
func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string, timestamp time.Time) {
|
||||
if s.appendTypePrefix {
|
||||
r.URL.Path += graphitePrefix
|
||||
}
|
||||
r.URL.Path += graphitePath
|
||||
q := r.URL.Query()
|
||||
q.Set("format", "json")
|
||||
q.Set("target", query)
|
||||
from := "-5min"
|
||||
if s.lookBack > 0 {
|
||||
lookBack := timestamp.Add(-s.lookBack)
|
||||
from = strconv.FormatInt(lookBack.Unix(), 10)
|
||||
}
|
||||
q.Set("from", from)
|
||||
q.Set("until", "now")
|
||||
r.URL.RawQuery = q.Encode()
|
||||
}
|
170
app/vmalert/datasource/vm_prom_api.go
Normal file
170
app/vmalert/datasource/vm_prom_api.go
Normal file
|
@ -0,0 +1,170 @@
|
|||
package datasource
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type promResponse struct {
|
||||
Status string `json:"status"`
|
||||
ErrorType string `json:"errorType"`
|
||||
Error string `json:"error"`
|
||||
Data struct {
|
||||
ResultType string `json:"resultType"`
|
||||
Result json.RawMessage `json:"result"`
|
||||
} `json:"data"`
|
||||
}
|
||||
|
||||
type promInstant struct {
|
||||
Result []struct {
|
||||
Labels map[string]string `json:"metric"`
|
||||
TV [2]interface{} `json:"value"`
|
||||
} `json:"result"`
|
||||
}
|
||||
|
||||
type promRange struct {
|
||||
Result []struct {
|
||||
Labels map[string]string `json:"metric"`
|
||||
TVs [][2]interface{} `json:"values"`
|
||||
} `json:"result"`
|
||||
}
|
||||
|
||||
func (r promInstant) metrics() ([]Metric, error) {
|
||||
var result []Metric
|
||||
var m Metric
|
||||
for i, res := range r.Result {
|
||||
f, err := strconv.ParseFloat(res.TV[1].(string), 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err)
|
||||
}
|
||||
m.Labels = nil
|
||||
for k, v := range r.Result[i].Labels {
|
||||
m.AddLabel(k, v)
|
||||
}
|
||||
m.Timestamps = append(m.Timestamps, int64(res.TV[0].(float64)))
|
||||
m.Values = append(m.Values, f)
|
||||
result = append(result, m)
|
||||
|
||||
m.Values = m.Values[:0]
|
||||
m.Labels = m.Labels[:0]
|
||||
m.Timestamps = m.Timestamps[:0]
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r promRange) metrics() ([]Metric, error) {
|
||||
var result []Metric
|
||||
for i, res := range r.Result {
|
||||
var m Metric
|
||||
for _, tv := range res.TVs {
|
||||
f, err := strconv.ParseFloat(tv[1].(string), 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, tv[1], err)
|
||||
}
|
||||
m.Values = append(m.Values, f)
|
||||
m.Timestamps = append(m.Timestamps, int64(tv[0].(float64)))
|
||||
}
|
||||
if len(m.Values) < 1 || len(m.Timestamps) < 1 {
|
||||
return nil, fmt.Errorf("metric %v contains no values", res)
|
||||
}
|
||||
m.Labels = nil
|
||||
for k, v := range r.Result[i].Labels {
|
||||
m.AddLabel(k, v)
|
||||
}
|
||||
result = append(result, m)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
const (
|
||||
statusSuccess, statusError = "success", "error"
|
||||
rtVector, rtMatrix = "vector", "matrix"
|
||||
)
|
||||
|
||||
func parsePrometheusResponse(req *http.Request, resp *http.Response) ([]Metric, error) {
|
||||
r := &promResponse{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL, err)
|
||||
}
|
||||
if r.Status == statusError {
|
||||
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error)
|
||||
}
|
||||
if r.Status != statusSuccess {
|
||||
return nil, fmt.Errorf("unknown status: %s, Expected success or error ", r.Status)
|
||||
}
|
||||
switch r.Data.ResultType {
|
||||
case rtVector:
|
||||
var pi promInstant
|
||||
if err := json.Unmarshal(r.Data.Result, &pi.Result); err != nil {
|
||||
return nil, fmt.Errorf("umarshal err %s; \n %#v", err, string(r.Data.Result))
|
||||
}
|
||||
return pi.metrics()
|
||||
case rtMatrix:
|
||||
var pr promRange
|
||||
if err := json.Unmarshal(r.Data.Result, &pr.Result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pr.metrics()
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown result type %q", r.Data.ResultType)
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
prometheusInstantPath = "/api/v1/query"
|
||||
prometheusRangePath = "/api/v1/query_range"
|
||||
prometheusPrefix = "/prometheus"
|
||||
)
|
||||
|
||||
func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
|
||||
if s.appendTypePrefix {
|
||||
r.URL.Path += prometheusPrefix
|
||||
}
|
||||
r.URL.Path += prometheusInstantPath
|
||||
q := r.URL.Query()
|
||||
if s.lookBack > 0 {
|
||||
timestamp = timestamp.Add(-s.lookBack)
|
||||
}
|
||||
if s.evaluationInterval > 0 {
|
||||
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1232
|
||||
timestamp = timestamp.Truncate(s.evaluationInterval)
|
||||
}
|
||||
q.Set("time", fmt.Sprintf("%d", timestamp.Unix()))
|
||||
r.URL.RawQuery = q.Encode()
|
||||
s.setPrometheusReqParams(r, query)
|
||||
}
|
||||
|
||||
func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) {
|
||||
if s.appendTypePrefix {
|
||||
r.URL.Path += prometheusPrefix
|
||||
}
|
||||
r.URL.Path += prometheusRangePath
|
||||
q := r.URL.Query()
|
||||
q.Add("start", fmt.Sprintf("%d", start.Unix()))
|
||||
q.Add("end", fmt.Sprintf("%d", end.Unix()))
|
||||
r.URL.RawQuery = q.Encode()
|
||||
s.setPrometheusReqParams(r, query)
|
||||
}
|
||||
|
||||
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
|
||||
q := r.URL.Query()
|
||||
q.Set("query", query)
|
||||
if s.evaluationInterval > 0 {
|
||||
// set step as evaluationInterval by default
|
||||
q.Set("step", s.evaluationInterval.String())
|
||||
}
|
||||
if s.queryStep > 0 {
|
||||
// override step with user-specified value
|
||||
q.Set("step", s.queryStep.String())
|
||||
}
|
||||
if s.roundDigits != "" {
|
||||
q.Set("round_digits", s.roundDigits)
|
||||
}
|
||||
for _, l := range s.extraLabels {
|
||||
q.Add("extra_label", l)
|
||||
}
|
||||
r.URL.RawQuery = q.Encode()
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
@ -19,7 +20,7 @@ var (
|
|||
queryRender = "constantLine(10)"
|
||||
)
|
||||
|
||||
func TestVMSelectQuery(t *testing.T) {
|
||||
func TestVMInstantQuery(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
|
||||
t.Errorf("should not be called")
|
||||
|
@ -103,9 +104,9 @@ func TestVMSelectQuery(t *testing.T) {
|
|||
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
|
||||
}
|
||||
expected := Metric{
|
||||
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
|
||||
Timestamp: 1583786142,
|
||||
Value: 13763,
|
||||
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
|
||||
Timestamps: []int64{1583786142},
|
||||
Values: []float64{13763},
|
||||
}
|
||||
if !reflect.DeepEqual(m[0], expected) {
|
||||
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
|
||||
|
@ -122,44 +123,145 @@ func TestVMSelectQuery(t *testing.T) {
|
|||
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
|
||||
}
|
||||
expected = Metric{
|
||||
Labels: []Label{{Value: "constantLine(10)", Name: "name"}},
|
||||
Timestamp: 1611758403,
|
||||
Value: 10,
|
||||
Labels: []Label{{Value: "constantLine(10)", Name: "name"}},
|
||||
Timestamps: []int64{1611758403},
|
||||
Values: []float64{10},
|
||||
}
|
||||
if !reflect.DeepEqual(m[0], expected) {
|
||||
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrepareReq(t *testing.T) {
|
||||
func TestVMRangeQuery(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
|
||||
t.Errorf("should not be called")
|
||||
})
|
||||
c := -1
|
||||
mux.HandleFunc("/api/v1/query_range", func(w http.ResponseWriter, r *http.Request) {
|
||||
c++
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("expected POST method got %s", r.Method)
|
||||
}
|
||||
if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass {
|
||||
t.Errorf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass)
|
||||
}
|
||||
if r.URL.Query().Get("query") != query {
|
||||
t.Errorf("expected %s in query param, got %s", query, r.URL.Query().Get("query"))
|
||||
}
|
||||
startTS := r.URL.Query().Get("start")
|
||||
if startTS == "" {
|
||||
t.Errorf("expected 'start' in query param, got nil instead")
|
||||
}
|
||||
if _, err := strconv.ParseInt(startTS, 10, 64); err != nil {
|
||||
t.Errorf("failed to parse 'start' query param: %s", err)
|
||||
}
|
||||
endTS := r.URL.Query().Get("end")
|
||||
if endTS == "" {
|
||||
t.Errorf("expected 'end' in query param, got nil instead")
|
||||
}
|
||||
if _, err := strconv.ParseInt(endTS, 10, 64); err != nil {
|
||||
t.Errorf("failed to parse 'end' query param: %s", err)
|
||||
}
|
||||
switch c {
|
||||
case 0:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"vm_rows"},"values":[[1583786142,"13763"]]}]}}`))
|
||||
}
|
||||
})
|
||||
|
||||
srv := httptest.NewServer(mux)
|
||||
defer srv.Close()
|
||||
|
||||
s := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, time.Minute, 0, false, srv.Client())
|
||||
|
||||
p := NewPrometheusType()
|
||||
pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second})
|
||||
|
||||
_, err := pq.QueryRange(ctx, query, time.Now(), time.Time{})
|
||||
expectError(t, err, "is missing")
|
||||
|
||||
_, err = pq.QueryRange(ctx, query, time.Time{}, time.Now())
|
||||
expectError(t, err, "is missing")
|
||||
|
||||
start, end := time.Now().Add(-time.Minute), time.Now()
|
||||
|
||||
m, err := pq.QueryRange(ctx, query, start, end)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected %s", err)
|
||||
}
|
||||
if len(m) != 1 {
|
||||
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
|
||||
}
|
||||
expected := Metric{
|
||||
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
|
||||
Timestamps: []int64{1583786142},
|
||||
Values: []float64{13763},
|
||||
}
|
||||
if !reflect.DeepEqual(m[0], expected) {
|
||||
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
|
||||
}
|
||||
|
||||
g := NewGraphiteType()
|
||||
gq := s.BuildWithParams(QuerierParams{DataSourceType: &g})
|
||||
|
||||
_, err = gq.QueryRange(ctx, queryRender, start, end)
|
||||
expectError(t, err, "is not supported")
|
||||
}
|
||||
|
||||
func TestRequestParams(t *testing.T) {
|
||||
query := "up"
|
||||
timestamp := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
|
||||
testCases := []struct {
|
||||
name string
|
||||
vm *VMStorage
|
||||
checkFn func(t *testing.T, r *http.Request)
|
||||
name string
|
||||
queryRange bool
|
||||
vm *VMStorage
|
||||
checkFn func(t *testing.T, r *http.Request)
|
||||
}{
|
||||
{
|
||||
"prometheus path",
|
||||
false,
|
||||
&VMStorage{
|
||||
dataSourceType: NewPrometheusType(),
|
||||
},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
checkEqualString(t, queryPath, r.URL.Path)
|
||||
checkEqualString(t, prometheusInstantPath, r.URL.Path)
|
||||
},
|
||||
},
|
||||
{
|
||||
"prometheus prefix",
|
||||
false,
|
||||
&VMStorage{
|
||||
dataSourceType: NewPrometheusType(),
|
||||
appendTypePrefix: true,
|
||||
},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
checkEqualString(t, prometheusPrefix+queryPath, r.URL.Path)
|
||||
checkEqualString(t, prometheusPrefix+prometheusInstantPath, r.URL.Path)
|
||||
},
|
||||
},
|
||||
{
|
||||
"prometheus range path",
|
||||
true,
|
||||
&VMStorage{
|
||||
dataSourceType: NewPrometheusType(),
|
||||
},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
checkEqualString(t, prometheusRangePath, r.URL.Path)
|
||||
},
|
||||
},
|
||||
{
|
||||
"prometheus range prefix",
|
||||
true,
|
||||
&VMStorage{
|
||||
dataSourceType: NewPrometheusType(),
|
||||
appendTypePrefix: true,
|
||||
},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
checkEqualString(t, prometheusPrefix+prometheusRangePath, r.URL.Path)
|
||||
},
|
||||
},
|
||||
{
|
||||
"graphite path",
|
||||
false,
|
||||
&VMStorage{
|
||||
dataSourceType: NewGraphiteType(),
|
||||
},
|
||||
|
@ -169,6 +271,7 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"graphite prefix",
|
||||
false,
|
||||
&VMStorage{
|
||||
dataSourceType: NewGraphiteType(),
|
||||
appendTypePrefix: true,
|
||||
|
@ -179,14 +282,38 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"default params",
|
||||
false,
|
||||
&VMStorage{},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
exp := fmt.Sprintf("query=%s&time=%d", query, timestamp.Unix())
|
||||
checkEqualString(t, exp, r.URL.RawQuery)
|
||||
},
|
||||
},
|
||||
{
|
||||
"default range params",
|
||||
true,
|
||||
&VMStorage{},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
exp := fmt.Sprintf("end=%d&query=%s&start=%d", timestamp.Unix(), query, timestamp.Unix())
|
||||
checkEqualString(t, exp, r.URL.RawQuery)
|
||||
},
|
||||
},
|
||||
{
|
||||
"basic auth",
|
||||
false,
|
||||
&VMStorage{
|
||||
basicAuthUser: "foo",
|
||||
basicAuthPass: "bar",
|
||||
},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
u, p, _ := r.BasicAuth()
|
||||
checkEqualString(t, "foo", u)
|
||||
checkEqualString(t, "bar", p)
|
||||
},
|
||||
},
|
||||
{
|
||||
"basic auth range",
|
||||
true,
|
||||
&VMStorage{
|
||||
basicAuthUser: "foo",
|
||||
basicAuthPass: "bar",
|
||||
|
@ -199,6 +326,7 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"lookback",
|
||||
false,
|
||||
&VMStorage{
|
||||
lookBack: time.Minute,
|
||||
},
|
||||
|
@ -209,6 +337,7 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"evaluation interval",
|
||||
false,
|
||||
&VMStorage{
|
||||
evaluationInterval: 15 * time.Second,
|
||||
},
|
||||
|
@ -221,6 +350,7 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"lookback + evaluation interval",
|
||||
false,
|
||||
&VMStorage{
|
||||
lookBack: time.Minute,
|
||||
evaluationInterval: 15 * time.Second,
|
||||
|
@ -235,6 +365,7 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"step override",
|
||||
false,
|
||||
&VMStorage{
|
||||
queryStep: time.Minute,
|
||||
},
|
||||
|
@ -245,6 +376,7 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"round digits",
|
||||
false,
|
||||
&VMStorage{
|
||||
roundDigits: "10",
|
||||
},
|
||||
|
@ -255,6 +387,7 @@ func TestPrepareReq(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"extra labels",
|
||||
false,
|
||||
&VMStorage{
|
||||
extraLabels: []string{
|
||||
"env=prod",
|
||||
|
@ -266,14 +399,39 @@ func TestPrepareReq(t *testing.T) {
|
|||
checkEqualString(t, exp, r.URL.RawQuery)
|
||||
},
|
||||
},
|
||||
{
|
||||
"extra labels range",
|
||||
true,
|
||||
&VMStorage{
|
||||
extraLabels: []string{
|
||||
"env=prod",
|
||||
"query=es=cape",
|
||||
},
|
||||
},
|
||||
func(t *testing.T, r *http.Request) {
|
||||
exp := fmt.Sprintf("end=%d&extra_label=env%%3Dprod&extra_label=query%%3Des%%3Dcape&query=%s&start=%d",
|
||||
timestamp.Unix(), query, timestamp.Unix())
|
||||
checkEqualString(t, exp, r.URL.RawQuery)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
req, err := tc.vm.prepareReq(query, timestamp)
|
||||
req, err := tc.vm.newRequestPOST()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
switch tc.vm.dataSourceType.name {
|
||||
case "", prometheusType:
|
||||
if tc.queryRange {
|
||||
tc.vm.setPrometheusRangeReqParams(req, query, timestamp, timestamp)
|
||||
} else {
|
||||
tc.vm.setPrometheusInstantReqParams(req, query, timestamp)
|
||||
}
|
||||
case graphiteType:
|
||||
tc.vm.setGraphiteReqParams(req, query, timestamp)
|
||||
}
|
||||
tc.checkFn(t, req)
|
||||
})
|
||||
}
|
||||
|
@ -285,3 +443,13 @@ func checkEqualString(t *testing.T, exp, got string) {
|
|||
t.Errorf("expected to get %q; got %q", exp, got)
|
||||
}
|
||||
}
|
||||
|
||||
func expectError(t *testing.T, err error, exp string) {
|
||||
t.Helper()
|
||||
if err == nil {
|
||||
t.Errorf("expected non-nil error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), exp) {
|
||||
t.Errorf("expected error %q to contain %q", err, exp)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -269,15 +269,10 @@ type executor struct {
|
|||
|
||||
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, interval time.Duration) chan error {
|
||||
res := make(chan error, len(rules))
|
||||
var returnSeries bool
|
||||
if e.rw != nil {
|
||||
returnSeries = true
|
||||
}
|
||||
|
||||
if concurrency == 1 {
|
||||
// fast path
|
||||
for _, rule := range rules {
|
||||
res <- e.exec(ctx, rule, returnSeries, interval)
|
||||
res <- e.exec(ctx, rule, interval)
|
||||
}
|
||||
close(res)
|
||||
return res
|
||||
|
@ -290,7 +285,7 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurren
|
|||
sem <- struct{}{}
|
||||
wg.Add(1)
|
||||
go func(r Rule) {
|
||||
res <- e.exec(ctx, r, returnSeries, interval)
|
||||
res <- e.exec(ctx, r, interval)
|
||||
<-sem
|
||||
wg.Done()
|
||||
}(rule)
|
||||
|
@ -309,14 +304,14 @@ var (
|
|||
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
||||
)
|
||||
|
||||
func (e *executor) exec(ctx context.Context, rule Rule, returnSeries bool, interval time.Duration) error {
|
||||
func (e *executor) exec(ctx context.Context, rule Rule, interval time.Duration) error {
|
||||
execTotal.Inc()
|
||||
execStart := time.Now()
|
||||
defer func() {
|
||||
execDuration.UpdateDuration(execStart)
|
||||
}()
|
||||
|
||||
tss, err := rule.Exec(ctx, returnSeries)
|
||||
tss, err := rule.Exec(ctx)
|
||||
if err != nil {
|
||||
execErrors.Inc()
|
||||
return fmt.Errorf("rule %q: failed to execute: %w", rule, err)
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
|
@ -42,6 +43,10 @@ func (fq *fakeQuerier) BuildWithParams(_ datasource.QuerierParams) datasource.Qu
|
|||
return fq
|
||||
}
|
||||
|
||||
func (fq *fakeQuerier) QueryRange(ctx context.Context, q string, _, _ time.Time) ([]datasource.Metric, error) {
|
||||
return fq.Query(ctx, q)
|
||||
}
|
||||
|
||||
func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) {
|
||||
fq.Lock()
|
||||
defer fq.Unlock()
|
||||
|
@ -72,9 +77,16 @@ func (fn *fakeNotifier) getAlerts() []notifier.Alert {
|
|||
}
|
||||
|
||||
func metricWithValueAndLabels(t *testing.T, value float64, labels ...string) datasource.Metric {
|
||||
return metricWithValuesAndLabels(t, []float64{value}, labels...)
|
||||
}
|
||||
|
||||
func metricWithValuesAndLabels(t *testing.T, values []float64, labels ...string) datasource.Metric {
|
||||
t.Helper()
|
||||
m := metricWithLabels(t, labels...)
|
||||
m.Value = value
|
||||
m.Values = values
|
||||
for i := range values {
|
||||
m.Timestamps = append(m.Timestamps, int64(i))
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
|
@ -83,7 +95,7 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
|
|||
if len(labels) == 0 || len(labels)%2 != 0 {
|
||||
t.Fatalf("expected to get even number of labels")
|
||||
}
|
||||
m := datasource.Metric{}
|
||||
m := datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}}
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
m.Labels = append(m.Labels, datasource.Label{
|
||||
Name: labels[i],
|
||||
|
|
|
@ -68,13 +68,37 @@ func main() {
|
|||
notifier.InitTemplateFunc(u)
|
||||
groups, err := config.Parse(*rulePath, true, true)
|
||||
if err != nil {
|
||||
logger.Fatalf(err.Error())
|
||||
logger.Fatalf("failed to parse %q: %s", *rulePath, err)
|
||||
}
|
||||
if len(groups) == 0 {
|
||||
logger.Fatalf("No rules for validation. Please specify path to file(s) with alerting and/or recording rules using `-rule` flag")
|
||||
}
|
||||
return
|
||||
}
|
||||
if *replayFrom != "" || *replayTo != "" {
|
||||
rw, err := remotewrite.Init(context.Background())
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to init remoteWrite: %s", err)
|
||||
}
|
||||
eu, err := getExternalURL(*externalURL, *httpListenAddr, httpserver.IsTLS())
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to init `external.url`: %s", err)
|
||||
}
|
||||
notifier.InitTemplateFunc(eu)
|
||||
groupsCfg, err := config.Parse(*rulePath, *validateTemplates, *validateExpressions)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse configuration file: %s", err)
|
||||
}
|
||||
q, err := datasource.Init()
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to init datasource: %s", err)
|
||||
}
|
||||
if err := replay(groupsCfg, q, rw); err != nil {
|
||||
logger.Fatalf("replay failed: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
manager, err := newManager(ctx)
|
||||
if err != nil {
|
||||
|
|
|
@ -49,6 +49,8 @@ func TestManagerUpdateConcurrent(t *testing.T) {
|
|||
"config/testdata/rules1-good.rules",
|
||||
"config/testdata/rules2-good.rules",
|
||||
}
|
||||
evalInterval := *evaluationInterval
|
||||
defer func() { *evaluationInterval = evalInterval }()
|
||||
*evaluationInterval = time.Millisecond
|
||||
cfg := loadCfg(t, []string{paths[0]}, true, true)
|
||||
if err := m.start(context.Background(), cfg); err != nil {
|
||||
|
|
|
@ -83,14 +83,16 @@ func TestAlert_ExecTemplate(t *testing.T) {
|
|||
{Name: "foo", Value: "bar"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
},
|
||||
Value: 1,
|
||||
Values: []float64{1},
|
||||
Timestamps: []int64{1},
|
||||
},
|
||||
{
|
||||
Labels: []datasource.Label{
|
||||
{Name: "foo", Value: "garply"},
|
||||
{Name: "baz", Value: "fred"},
|
||||
},
|
||||
Value: 2,
|
||||
Values: []float64{2},
|
||||
Timestamps: []int64{1},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -47,8 +47,8 @@ func datasourceMetricsToTemplateMetrics(ms []datasource.Metric) []metric {
|
|||
}
|
||||
mss = append(mss, metric{
|
||||
Labels: labelsMap,
|
||||
Timestamp: m.Timestamp,
|
||||
Value: m.Value})
|
||||
Timestamp: m.Timestamps[0],
|
||||
Value: m.Values[0]})
|
||||
}
|
||||
return mss
|
||||
}
|
||||
|
|
|
@ -88,12 +88,30 @@ func (rr *RecordingRule) Close() {
|
|||
metrics.UnregisterMetric(rr.metrics.errors.name)
|
||||
}
|
||||
|
||||
// Exec executes RecordingRule expression via the given Querier.
|
||||
func (rr *RecordingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal.TimeSeries, error) {
|
||||
if !series {
|
||||
return nil, nil
|
||||
// ExecRange executes recording rule on the given time range similarly to Exec.
|
||||
// It doesn't update internal states of the Rule and meant to be used just
|
||||
// to get time series for backfilling.
|
||||
func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) {
|
||||
series, err := rr.q.QueryRange(ctx, rr.Expr, start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
duplicates := make(map[string]struct{}, len(series))
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
for _, s := range series {
|
||||
ts := rr.toTimeSeries(s)
|
||||
key := stringifyLabels(ts)
|
||||
if _, ok := duplicates[key]; ok {
|
||||
return nil, fmt.Errorf("original metric %v; resulting labels %q: %w", s.Labels, key, errDuplicate)
|
||||
}
|
||||
duplicates[key] = struct{}{}
|
||||
tss = append(tss, ts)
|
||||
}
|
||||
return tss, nil
|
||||
}
|
||||
|
||||
// Exec executes RecordingRule expression via the given Querier.
|
||||
func (rr *RecordingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) {
|
||||
qMetrics, err := rr.q.Query(ctx, rr.Expr)
|
||||
rr.mu.Lock()
|
||||
defer rr.mu.Unlock()
|
||||
|
@ -107,7 +125,7 @@ func (rr *RecordingRule) Exec(ctx context.Context, series bool) ([]prompbmarshal
|
|||
duplicates := make(map[string]struct{}, len(qMetrics))
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
for _, r := range qMetrics {
|
||||
ts := rr.toTimeSeries(r, time.Unix(r.Timestamp, 0))
|
||||
ts := rr.toTimeSeries(r)
|
||||
key := stringifyLabels(ts)
|
||||
if _, ok := duplicates[key]; ok {
|
||||
rr.lastExecError = errDuplicate
|
||||
|
@ -138,7 +156,7 @@ func stringifyLabels(ts prompbmarshal.TimeSeries) string {
|
|||
return b.String()
|
||||
}
|
||||
|
||||
func (rr *RecordingRule) toTimeSeries(m datasource.Metric, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
func (rr *RecordingRule) toTimeSeries(m datasource.Metric) prompbmarshal.TimeSeries {
|
||||
labels := make(map[string]string)
|
||||
for _, l := range m.Labels {
|
||||
labels[l.Name] = l.Value
|
||||
|
@ -148,7 +166,7 @@ func (rr *RecordingRule) toTimeSeries(m datasource.Metric, timestamp time.Time)
|
|||
for k, v := range rr.Labels {
|
||||
labels[k] = v
|
||||
}
|
||||
return newTimeSeries(m.Value, labels, timestamp)
|
||||
return newTimeSeries(m.Values, m.Timestamps, labels)
|
||||
}
|
||||
|
||||
// UpdateWith copies all significant fields.
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
func TestRecoridngRule_ToTimeSeries(t *testing.T) {
|
||||
func TestRecoridngRule_Exec(t *testing.T) {
|
||||
timestamp := time.Now()
|
||||
testCases := []struct {
|
||||
rule *RecordingRule
|
||||
|
@ -24,9 +24,9 @@ func TestRecoridngRule_ToTimeSeries(t *testing.T) {
|
|||
"__name__", "bar",
|
||||
)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(10, map[string]string{
|
||||
newTimeSeries([]float64{10}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "foo",
|
||||
}, timestamp),
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -37,18 +37,18 @@ func TestRecoridngRule_ToTimeSeries(t *testing.T) {
|
|||
metricWithValueAndLabels(t, 3, "__name__", "baz", "job", "baz"),
|
||||
},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "foobarbaz",
|
||||
"job": "foo",
|
||||
}, timestamp),
|
||||
newTimeSeries(2, map[string]string{
|
||||
}),
|
||||
newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "foobarbaz",
|
||||
"job": "bar",
|
||||
}, timestamp),
|
||||
newTimeSeries(3, map[string]string{
|
||||
}),
|
||||
newTimeSeries([]float64{3}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "foobarbaz",
|
||||
"job": "baz",
|
||||
}, timestamp),
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -59,16 +59,16 @@ func TestRecoridngRule_ToTimeSeries(t *testing.T) {
|
|||
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
|
||||
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar")},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(2, map[string]string{
|
||||
newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "job:foo",
|
||||
"job": "foo",
|
||||
"source": "test",
|
||||
}, timestamp),
|
||||
newTimeSeries(1, map[string]string{
|
||||
}),
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "job:foo",
|
||||
"job": "bar",
|
||||
"source": "test",
|
||||
}, timestamp),
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func TestRecoridngRule_ToTimeSeries(t *testing.T) {
|
|||
fq := &fakeQuerier{}
|
||||
fq.add(tc.metrics...)
|
||||
tc.rule.q = fq
|
||||
tss, err := tc.rule.Exec(context.TODO(), true)
|
||||
tss, err := tc.rule.Exec(context.TODO())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected Exec err: %s", err)
|
||||
}
|
||||
|
@ -88,7 +88,88 @@ func TestRecoridngRule_ToTimeSeries(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) {
|
||||
func TestRecoridngRule_ExecRange(t *testing.T) {
|
||||
timestamp := time.Now()
|
||||
testCases := []struct {
|
||||
rule *RecordingRule
|
||||
metrics []datasource.Metric
|
||||
expTS []prompbmarshal.TimeSeries
|
||||
}{
|
||||
{
|
||||
&RecordingRule{Name: "foo"},
|
||||
[]datasource.Metric{metricWithValuesAndLabels(t, []float64{10, 20, 30},
|
||||
"__name__", "bar",
|
||||
)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries([]float64{10, 20, 30},
|
||||
[]int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()},
|
||||
map[string]string{
|
||||
"__name__": "foo",
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
&RecordingRule{Name: "foobarbaz"},
|
||||
[]datasource.Metric{
|
||||
metricWithValuesAndLabels(t, []float64{1}, "__name__", "foo", "job", "foo"),
|
||||
metricWithValuesAndLabels(t, []float64{2, 3}, "__name__", "bar", "job", "bar"),
|
||||
metricWithValuesAndLabels(t, []float64{4, 5, 6}, "__name__", "baz", "job", "baz"),
|
||||
},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "foobarbaz",
|
||||
"job": "foo",
|
||||
}),
|
||||
newTimeSeries([]float64{2, 3}, []int64{timestamp.UnixNano(), timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "foobarbaz",
|
||||
"job": "bar",
|
||||
}),
|
||||
newTimeSeries([]float64{4, 5, 6},
|
||||
[]int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()},
|
||||
map[string]string{
|
||||
"__name__": "foobarbaz",
|
||||
"job": "baz",
|
||||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
&RecordingRule{Name: "job:foo", Labels: map[string]string{
|
||||
"source": "test",
|
||||
}},
|
||||
[]datasource.Metric{
|
||||
metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"),
|
||||
metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar")},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "job:foo",
|
||||
"job": "foo",
|
||||
"source": "test",
|
||||
}),
|
||||
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
|
||||
"__name__": "job:foo",
|
||||
"job": "bar",
|
||||
"source": "test",
|
||||
}),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.rule.Name, func(t *testing.T) {
|
||||
fq := &fakeQuerier{}
|
||||
fq.add(tc.metrics...)
|
||||
tc.rule.q = fq
|
||||
tss, err := tc.rule.ExecRange(context.TODO(), time.Now(), time.Now())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected Exec err: %s", err)
|
||||
}
|
||||
if err := compareTimeSeries(t, tc.expTS, tss); err != nil {
|
||||
t.Fatalf("timeseries missmatch: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecoridngRule_ExecNegative(t *testing.T) {
|
||||
rr := &RecordingRule{Name: "job:foo", Labels: map[string]string{
|
||||
"job": "test",
|
||||
}}
|
||||
|
@ -97,7 +178,7 @@ func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) {
|
|||
expErr := "connection reset by peer"
|
||||
fq.setErr(errors.New(expErr))
|
||||
rr.q = fq
|
||||
_, err := rr.Exec(context.TODO(), true)
|
||||
_, err := rr.Exec(context.TODO())
|
||||
if err == nil {
|
||||
t.Fatalf("expected to get err; got nil")
|
||||
}
|
||||
|
@ -112,7 +193,7 @@ func TestRecoridngRule_ToTimeSeriesNegative(t *testing.T) {
|
|||
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"))
|
||||
fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar"))
|
||||
|
||||
_, err = rr.Exec(context.TODO(), true)
|
||||
_, err = rr.Exec(context.TODO())
|
||||
if err == nil {
|
||||
t.Fatalf("expected to get err; got nil")
|
||||
}
|
||||
|
|
160
app/vmalert/replay.go
Normal file
160
app/vmalert/replay.go
Normal file
|
@ -0,0 +1,160 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cheggaaa/pb/v3"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
var (
|
||||
replayFrom = flag.String("replay.timeFrom", "",
|
||||
"The time filter in RFC3339 format to select time series with timestamp equal or higher than provided value. E.g. '2020-01-01T20:07:00Z'")
|
||||
replayTo = flag.String("replay.timeTo", "",
|
||||
"The time filter in RFC3339 format to select timeseries with timestamp equal or lower than provided value. E.g. '2020-01-01T20:07:00Z'")
|
||||
replayRulesDelay = flag.Duration("replay.rulesDelay", time.Second,
|
||||
"Delay between rules evaluation within the group. Could be important if there are chained rules inside of the group"+
|
||||
"and processing need to wait for previous rule results to be persisted by remote storage before evaluating the next rule."+
|
||||
"Keep it equal or bigger than -remoteWrite.flushInterval.")
|
||||
replayMaxDatapoints = flag.Int("replay.maxDatapointsPerQuery", 1e3,
|
||||
"Max number of data points expected in one request. The higher the value, the less requests will be made during replay.")
|
||||
replayRuleRetryAttempts = flag.Int("replay.ruleRetryAttempts", 5,
|
||||
"Defines how many retries to make before giving up on rule if request for it returns an error.")
|
||||
)
|
||||
|
||||
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewrite.Client) error {
|
||||
if *replayMaxDatapoints < 1 {
|
||||
return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1")
|
||||
}
|
||||
tFrom, err := time.Parse(time.RFC3339, *replayFrom)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse %q: %s", *replayFrom, err)
|
||||
}
|
||||
tTo, err := time.Parse(time.RFC3339, *replayTo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse %q: %s", *replayTo, err)
|
||||
}
|
||||
if !tTo.After(tFrom) {
|
||||
return fmt.Errorf("replay.timeTo must be bigger than replay.timeFrom")
|
||||
}
|
||||
labels := make(map[string]string)
|
||||
for _, s := range *externalLabels {
|
||||
if len(s) == 0 {
|
||||
continue
|
||||
}
|
||||
n := strings.IndexByte(s, '=')
|
||||
if n < 0 {
|
||||
return fmt.Errorf("missing '=' in `-label`. It must contain label in the form `name=value`; got %q", s)
|
||||
}
|
||||
labels[s[:n]] = s[n+1:]
|
||||
}
|
||||
|
||||
fmt.Printf("Replay mode:"+
|
||||
"\nfrom: \t%v "+
|
||||
"\nto: \t%v "+
|
||||
"\nmax data points per request: %d\n",
|
||||
tFrom, tTo, *replayMaxDatapoints)
|
||||
|
||||
var total int
|
||||
for _, cfg := range groupsCfg {
|
||||
ng := newGroup(cfg, qb, *evaluationInterval, labels)
|
||||
total += ng.replay(tFrom, tTo, rw)
|
||||
}
|
||||
logger.Infof("replay finished! Imported %d samples", total)
|
||||
if rw != nil {
|
||||
return rw.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
|
||||
var total int
|
||||
step := g.Interval * time.Duration(*replayMaxDatapoints)
|
||||
ri := rangeIterator{start: start, end: end, step: step}
|
||||
iterations := int(end.Sub(start)/step) + 1
|
||||
fmt.Printf("\nGroup %q"+
|
||||
"\ninterval: \t%v"+
|
||||
"\nrequests to make: \t%d"+
|
||||
"\nmax range per request: \t%v\n",
|
||||
g.Name, g.Interval, iterations, step)
|
||||
for _, rule := range g.Rules {
|
||||
fmt.Printf("> Rule %q (ID: %d)\n", rule, rule.ID())
|
||||
bar := pb.StartNew(iterations)
|
||||
ri.reset()
|
||||
for ri.next() {
|
||||
n, err := replayRule(rule, ri.s, ri.e, rw)
|
||||
if err != nil {
|
||||
logger.Fatalf("rule %q: %s", rule, err)
|
||||
}
|
||||
total += n
|
||||
bar.Increment()
|
||||
}
|
||||
bar.Finish()
|
||||
// sleep to let remote storage to flush data on-disk
|
||||
// so chained rules could be calculated correctly
|
||||
time.Sleep(*replayRulesDelay)
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
|
||||
var err error
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
for i := 0; i < *replayRuleRetryAttempts; i++ {
|
||||
tss, err = rule.ExecRange(context.Background(), start, end)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
logger.Errorf("attempt %d to execute rule %q failed: %s", i+1, rule, err)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
if err != nil { // means all attempts failed
|
||||
return 0, err
|
||||
}
|
||||
if len(tss) < 1 {
|
||||
return 0, nil
|
||||
}
|
||||
var n int
|
||||
for _, ts := range tss {
|
||||
if err := rw.Push(ts); err != nil {
|
||||
return n, fmt.Errorf("remote write failure: %s", err)
|
||||
}
|
||||
n += len(ts.Samples)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
type rangeIterator struct {
|
||||
step time.Duration
|
||||
start, end time.Time
|
||||
|
||||
iter int
|
||||
s, e time.Time
|
||||
}
|
||||
|
||||
func (ri *rangeIterator) reset() {
|
||||
ri.iter = 0
|
||||
ri.s, ri.e = time.Time{}, time.Time{}
|
||||
}
|
||||
|
||||
func (ri *rangeIterator) next() bool {
|
||||
ri.s = ri.start.Add(ri.step * time.Duration(ri.iter))
|
||||
if !ri.end.After(ri.s) {
|
||||
return false
|
||||
}
|
||||
ri.e = ri.s.Add(ri.step)
|
||||
if ri.e.After(ri.end) {
|
||||
ri.e = ri.end
|
||||
}
|
||||
ri.iter++
|
||||
return true
|
||||
}
|
249
app/vmalert/replay_test.go
Normal file
249
app/vmalert/replay_test.go
Normal file
|
@ -0,0 +1,249 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
)
|
||||
|
||||
type fakeReplayQuerier struct {
|
||||
fakeQuerier
|
||||
registry map[string]map[string]struct{}
|
||||
}
|
||||
|
||||
func (fr *fakeReplayQuerier) BuildWithParams(_ datasource.QuerierParams) datasource.Querier {
|
||||
return fr
|
||||
}
|
||||
|
||||
func (fr *fakeReplayQuerier) QueryRange(_ context.Context, q string, from, to time.Time) ([]datasource.Metric, error) {
|
||||
key := fmt.Sprintf("%s+%s", from.Format("15:04:05"), to.Format("15:04:05"))
|
||||
dps, ok := fr.registry[q]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected query received: %q", q)
|
||||
}
|
||||
_, ok = dps[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected time range received: %q", key)
|
||||
}
|
||||
delete(dps, key)
|
||||
if len(fr.registry[q]) < 1 {
|
||||
delete(fr.registry, q)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestReplay(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
from, to string
|
||||
maxDP int
|
||||
cfg []config.Group
|
||||
qb *fakeReplayQuerier
|
||||
}{
|
||||
{
|
||||
name: "one rule + one response",
|
||||
from: "2021-01-01T12:00:00.000Z",
|
||||
to: "2021-01-01T12:02:00.000Z",
|
||||
maxDP: 10,
|
||||
cfg: []config.Group{
|
||||
{Rules: []config.Rule{{Record: "foo", Expr: "sum(up)"}}},
|
||||
},
|
||||
qb: &fakeReplayQuerier{
|
||||
registry: map[string]map[string]struct{}{
|
||||
"sum(up)": {"12:00:00+12:02:00": {}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one rule + multiple responses",
|
||||
from: "2021-01-01T12:00:00.000Z",
|
||||
to: "2021-01-01T12:02:30.000Z",
|
||||
maxDP: 1,
|
||||
cfg: []config.Group{
|
||||
{Rules: []config.Rule{{Record: "foo", Expr: "sum(up)"}}},
|
||||
},
|
||||
qb: &fakeReplayQuerier{
|
||||
registry: map[string]map[string]struct{}{
|
||||
"sum(up)": {
|
||||
"12:00:00+12:01:00": {},
|
||||
"12:01:00+12:02:00": {},
|
||||
"12:02:00+12:02:30": {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "datapoints per step",
|
||||
from: "2021-01-01T12:00:00.000Z",
|
||||
to: "2021-01-01T15:02:30.000Z",
|
||||
maxDP: 60,
|
||||
cfg: []config.Group{
|
||||
{Interval: time.Minute, Rules: []config.Rule{{Record: "foo", Expr: "sum(up)"}}},
|
||||
},
|
||||
qb: &fakeReplayQuerier{
|
||||
registry: map[string]map[string]struct{}{
|
||||
"sum(up)": {
|
||||
"12:00:00+13:00:00": {},
|
||||
"13:00:00+14:00:00": {},
|
||||
"14:00:00+15:00:00": {},
|
||||
"15:00:00+15:02:30": {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multiple recording rules + multiple responses",
|
||||
from: "2021-01-01T12:00:00.000Z",
|
||||
to: "2021-01-01T12:02:30.000Z",
|
||||
maxDP: 1,
|
||||
cfg: []config.Group{
|
||||
{Rules: []config.Rule{{Record: "foo", Expr: "sum(up)"}}},
|
||||
{Rules: []config.Rule{{Record: "bar", Expr: "max(up)"}}},
|
||||
},
|
||||
qb: &fakeReplayQuerier{
|
||||
registry: map[string]map[string]struct{}{
|
||||
"sum(up)": {
|
||||
"12:00:00+12:01:00": {},
|
||||
"12:01:00+12:02:00": {},
|
||||
"12:02:00+12:02:30": {},
|
||||
},
|
||||
"max(up)": {
|
||||
"12:00:00+12:01:00": {},
|
||||
"12:01:00+12:02:00": {},
|
||||
"12:02:00+12:02:30": {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "multiple alerting rules + multiple responses",
|
||||
from: "2021-01-01T12:00:00.000Z",
|
||||
to: "2021-01-01T12:02:30.000Z",
|
||||
maxDP: 1,
|
||||
cfg: []config.Group{
|
||||
{Rules: []config.Rule{{Alert: "foo", Expr: "sum(up) > 1"}}},
|
||||
{Rules: []config.Rule{{Alert: "bar", Expr: "max(up) < 1"}}},
|
||||
},
|
||||
qb: &fakeReplayQuerier{
|
||||
registry: map[string]map[string]struct{}{
|
||||
"sum(up) > 1": {
|
||||
"12:00:00+12:01:00": {},
|
||||
"12:01:00+12:02:00": {},
|
||||
"12:02:00+12:02:30": {},
|
||||
},
|
||||
"max(up) < 1": {
|
||||
"12:00:00+12:01:00": {},
|
||||
"12:01:00+12:02:00": {},
|
||||
"12:02:00+12:02:30": {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
from, to, maxDP := *replayFrom, *replayTo, *replayMaxDatapoints
|
||||
retries, delay := *replayRuleRetryAttempts, *replayRulesDelay
|
||||
defer func() {
|
||||
*replayFrom, *replayTo = from, to
|
||||
*replayMaxDatapoints, *replayRuleRetryAttempts = maxDP, retries
|
||||
*replayRulesDelay = delay
|
||||
}()
|
||||
|
||||
*replayRuleRetryAttempts = 1
|
||||
*replayRulesDelay = time.Millisecond
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
*replayFrom = tc.from
|
||||
*replayTo = tc.to
|
||||
*replayMaxDatapoints = tc.maxDP
|
||||
if err := replay(tc.cfg, tc.qb, nil); err != nil {
|
||||
t.Fatalf("replay failed: %s", err)
|
||||
}
|
||||
if len(tc.qb.registry) > 0 {
|
||||
t.Fatalf("not all requests were sent: %#v", tc.qb.registry)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRangeIterator(t *testing.T) {
|
||||
testCases := []struct {
|
||||
ri rangeIterator
|
||||
result [][2]time.Time
|
||||
}{
|
||||
{
|
||||
ri: rangeIterator{
|
||||
start: parseTime(t, "2021-01-01T12:00:00.000Z"),
|
||||
end: parseTime(t, "2021-01-01T12:30:00.000Z"),
|
||||
step: 5 * time.Minute,
|
||||
},
|
||||
result: [][2]time.Time{
|
||||
{parseTime(t, "2021-01-01T12:00:00.000Z"), parseTime(t, "2021-01-01T12:05:00.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:05:00.000Z"), parseTime(t, "2021-01-01T12:10:00.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:10:00.000Z"), parseTime(t, "2021-01-01T12:15:00.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:15:00.000Z"), parseTime(t, "2021-01-01T12:20:00.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:20:00.000Z"), parseTime(t, "2021-01-01T12:25:00.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:25:00.000Z"), parseTime(t, "2021-01-01T12:30:00.000Z")},
|
||||
},
|
||||
},
|
||||
{
|
||||
ri: rangeIterator{
|
||||
start: parseTime(t, "2021-01-01T12:00:00.000Z"),
|
||||
end: parseTime(t, "2021-01-01T12:30:00.000Z"),
|
||||
step: 45 * time.Minute,
|
||||
},
|
||||
result: [][2]time.Time{
|
||||
{parseTime(t, "2021-01-01T12:00:00.000Z"), parseTime(t, "2021-01-01T12:30:00.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:30:00.000Z"), parseTime(t, "2021-01-01T12:30:00.000Z")},
|
||||
},
|
||||
},
|
||||
{
|
||||
ri: rangeIterator{
|
||||
start: parseTime(t, "2021-01-01T12:00:12.000Z"),
|
||||
end: parseTime(t, "2021-01-01T12:00:17.000Z"),
|
||||
step: time.Second,
|
||||
},
|
||||
result: [][2]time.Time{
|
||||
{parseTime(t, "2021-01-01T12:00:12.000Z"), parseTime(t, "2021-01-01T12:00:13.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:00:13.000Z"), parseTime(t, "2021-01-01T12:00:14.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:00:14.000Z"), parseTime(t, "2021-01-01T12:00:15.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:00:15.000Z"), parseTime(t, "2021-01-01T12:00:16.000Z")},
|
||||
{parseTime(t, "2021-01-01T12:00:16.000Z"), parseTime(t, "2021-01-01T12:00:17.000Z")},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
|
||||
var j int
|
||||
for tc.ri.next() {
|
||||
if len(tc.result) < j+1 {
|
||||
t.Fatalf("unexpected result for iterator on step %d: %v - %v",
|
||||
j, tc.ri.s, tc.ri.e)
|
||||
}
|
||||
s, e := tc.ri.s, tc.ri.e
|
||||
expS, expE := tc.result[j][0], tc.result[j][1]
|
||||
if s != expS {
|
||||
t.Fatalf("expected to get start=%v; got %v", expS, s)
|
||||
}
|
||||
if e != expE {
|
||||
t.Fatalf("expected to get end=%v; got %v", expE, e)
|
||||
}
|
||||
j++
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func parseTime(t *testing.T, s string) time.Time {
|
||||
t.Helper()
|
||||
tt, err := time.Parse("2006-01-02T15:04:05.000Z", s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return tt
|
||||
}
|
|
@ -3,21 +3,21 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Rule represents alerting or recording rule
|
||||
// that has unique ID, can be Executed and
|
||||
// updated with other Rule.
|
||||
type Rule interface {
|
||||
// Returns unique ID that may be used for
|
||||
// ID returns unique ID that may be used for
|
||||
// identifying this Rule among others.
|
||||
ID() uint64
|
||||
// Exec executes the rule with given context
|
||||
// and Querier. If returnSeries is true, Exec
|
||||
// may return TimeSeries as result of execution
|
||||
Exec(ctx context.Context, returnSeries bool) ([]prompbmarshal.TimeSeries, error)
|
||||
Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error)
|
||||
// ExecRange executes the rule on the given time range
|
||||
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
|
||||
// UpdateWith performs modification of current Rule
|
||||
// with fields of the given Rule.
|
||||
UpdateWith(Rule) error
|
||||
|
|
|
@ -7,17 +7,21 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
ts := prompbmarshal.TimeSeries{}
|
||||
ts.Samples = append(ts.Samples, prompbmarshal.Sample{
|
||||
Value: value,
|
||||
Timestamp: timestamp.UnixNano() / 1e6,
|
||||
})
|
||||
func newTimeSeries(values []float64, timestamps []int64, labels map[string]string) prompbmarshal.TimeSeries {
|
||||
ts := prompbmarshal.TimeSeries{
|
||||
Samples: make([]prompbmarshal.Sample, len(values)),
|
||||
}
|
||||
for i := range values {
|
||||
ts.Samples[i] = prompbmarshal.Sample{
|
||||
Value: values[i],
|
||||
Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6,
|
||||
}
|
||||
}
|
||||
keys := make([]string, 0, len(labels))
|
||||
for k := range labels {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
sort.Strings(keys) // make order deterministic
|
||||
for _, key := range keys {
|
||||
ts.Labels = append(ts.Labels, prompbmarshal.Label{
|
||||
Name: key,
|
||||
|
|
Loading…
Reference in a new issue