Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-04-01 12:38:34 +03:00
commit 2c22e168f5
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
21 changed files with 399 additions and 148 deletions

View file

@ -381,7 +381,7 @@ See also [downsampling docs](https://docs.victoriametrics.com/#downsampling).
`vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: `vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
* `http://<vmalert-addr>` - UI; * `http://<vmalert-addr>` - UI;
* `http://<vmalert-addr>/api/v1/groups` - list of all loaded groups and rules; * `http://<vmalert-addr>/api/v1/rules` - list of all loaded groups and rules;
* `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts; * `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts;
* `http://<vmalert-addr>/api/v1/<groupID>/<alertID>/status"` - get alert status by ID. * `http://<vmalert-addr>/api/v1/<groupID>/<alertID>/status"` - get alert status by ID.
Used as alert source in AlertManager. Used as alert source in AlertManager.

View file

@ -191,9 +191,10 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
if at.Sub(prevT) > ar.EvalInterval { if at.Sub(prevT) > ar.EvalInterval {
// reset to Pending if there are gaps > EvalInterval between DPs // reset to Pending if there are gaps > EvalInterval between DPs
a.State = notifier.StatePending a.State = notifier.StatePending
a.Start = at a.ActiveAt = at
} else if at.Sub(a.Start) >= ar.For { } else if at.Sub(a.ActiveAt) >= ar.For {
a.State = notifier.StateFiring a.State = notifier.StateFiring
a.Start = at
} }
prevT = at prevT = at
result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...)
@ -202,11 +203,15 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([]
return result, nil return result, nil
} }
// resolvedRetention is the duration for which a resolved alert instance
// is kept in memory state and consequently repeatedly sent to the AlertManager.
const resolvedRetention = 15 * time.Minute
// Exec executes AlertingRule expression via the given Querier. // Exec executes AlertingRule expression via the given Querier.
// Based on the Querier results AlertingRule maintains notifier.Alerts // Based on the Querier results AlertingRule maintains notifier.Alerts
func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) { func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) {
start := time.Now() start := time.Now()
qMetrics, err := ar.q.Query(ctx, ar.Expr) qMetrics, err := ar.q.Query(ctx, ar.Expr, ts)
ar.mu.Lock() ar.mu.Lock()
defer ar.mu.Unlock() defer ar.mu.Unlock()
@ -220,12 +225,12 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
for h, a := range ar.alerts { for h, a := range ar.alerts {
// cleanup inactive alerts from previous Exec // cleanup inactive alerts from previous Exec
if a.State == notifier.StateInactive { if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention {
delete(ar.alerts, h) delete(ar.alerts, h)
} }
} }
qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) } qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query, ts) }
updated := make(map[uint64]struct{}) updated := make(map[uint64]struct{})
// update list of active alerts // update list of active alerts
for _, m := range qMetrics { for _, m := range qMetrics {
@ -250,10 +255,18 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
if _, ok := updated[h]; ok { if _, ok := updated[h]; ok {
// duplicate may be caused by extra labels // duplicate may be caused by extra labels
// conflicting with the metric labels // conflicting with the metric labels
return nil, fmt.Errorf("labels %v: %w", m.Labels, errDuplicate) ar.lastExecError = fmt.Errorf("labels %v: %w", m.Labels, errDuplicate)
return nil, ar.lastExecError
} }
updated[h] = struct{}{} updated[h] = struct{}{}
if a, ok := ar.alerts[h]; ok { if a, ok := ar.alerts[h]; ok {
if a.State == notifier.StateInactive {
// alert could be in inactive state for resolvedRetention
// so when we again receive metrics for it - we switch it
// back to notifier.StatePending
a.State = notifier.StatePending
a.ActiveAt = ts
}
if a.Value != m.Values[0] { if a.Value != m.Values[0] {
// update Value field with latest value // update Value field with latest value
a.Value = m.Values[0] a.Value = m.Values[0]
@ -273,6 +286,7 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
} }
a.ID = h a.ID = h
a.State = notifier.StatePending a.State = notifier.StatePending
a.ActiveAt = ts
ar.alerts[h] = a ar.alerts[h] = a
} }
@ -286,15 +300,19 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e
delete(ar.alerts, h) delete(ar.alerts, h)
continue continue
} }
a.State = notifier.StateInactive if a.State == notifier.StateFiring {
a.State = notifier.StateInactive
a.ResolvedAt = ts
}
continue continue
} }
if a.State == notifier.StatePending && time.Since(a.Start) >= ar.For { if a.State == notifier.StatePending && time.Since(a.ActiveAt) >= ar.For {
a.State = notifier.StateFiring a.State = notifier.StateFiring
a.Start = ts
alertsFired.Inc() alertsFired.Inc()
} }
} }
return ar.toTimeSeries(ar.lastExecTime.Unix()), nil return ar.toTimeSeries(ts.Unix()), nil
} }
func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) { func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) {
@ -360,12 +378,12 @@ func hash(m datasource.Metric) uint64 {
func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) { func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) {
a := &notifier.Alert{ a := &notifier.Alert{
GroupID: ar.GroupID, GroupID: ar.GroupID,
Name: ar.Name, Name: ar.Name,
Labels: map[string]string{}, Labels: map[string]string{},
Value: m.Values[0], Value: m.Values[0],
Start: start, ActiveAt: start,
Expr: ar.Expr, Expr: ar.Expr,
} }
for _, l := range m.Labels { for _, l := range m.Labels {
// drop __name__ to be consistent with Prometheus alerting // drop __name__ to be consistent with Prometheus alerting
@ -435,6 +453,9 @@ func (ar *AlertingRule) AlertsToAPI() []*APIAlert {
var alerts []*APIAlert var alerts []*APIAlert
ar.mu.RLock() ar.mu.RLock()
for _, a := range ar.alerts { for _, a := range ar.alerts {
if a.State == notifier.StateInactive {
continue
}
alerts = append(alerts, ar.newAlertAPI(*a)) alerts = append(alerts, ar.newAlertAPI(*a))
} }
ar.mu.RUnlock() ar.mu.RUnlock()
@ -453,7 +474,7 @@ func (ar *AlertingRule) newAlertAPI(a notifier.Alert) *APIAlert {
Labels: a.Labels, Labels: a.Labels,
Annotations: a.Annotations, Annotations: a.Annotations,
State: a.State.String(), State: a.State.String(),
ActiveAt: a.Start, ActiveAt: a.ActiveAt,
Restored: a.Restored, Restored: a.Restored,
Value: strconv.FormatFloat(a.Value, 'f', -1, 32), Value: strconv.FormatFloat(a.Value, 'f', -1, 32),
} }
@ -479,7 +500,7 @@ const (
alertGroupNameLabel = "alertgroup" alertGroupNameLabel = "alertgroup"
) )
// alertToTimeSeries converts the given alert with the given timestamp to timeseries // alertToTimeSeries converts the given alert with the given timestamp to time series
func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp int64) []prompbmarshal.TimeSeries { func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp int64) []prompbmarshal.TimeSeries {
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
tss = append(tss, alertToTimeSeries(a, timestamp)) tss = append(tss, alertToTimeSeries(a, timestamp))
@ -507,11 +528,11 @@ func alertForToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.Time
labels[k] = v labels[k] = v
} }
labels["__name__"] = alertForStateMetricName labels["__name__"] = alertForStateMetricName
return newTimeSeries([]float64{float64(a.Start.Unix())}, []int64{timestamp}, labels) return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels)
} }
// Restore restores the state of active alerts basing on previously written time series. // Restore restores the state of active alerts basing on previously written time series.
// Restore restores only Start field. Field State will be always Pending and supposed // Restore restores only ActiveAt field. Field State will be always Pending and supposed
// to be updated on next Exec, as well as Value field. // to be updated on next Exec, as well as Value field.
// Only rules with For > 0 will be restored. // Only rules with For > 0 will be restored.
func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration, labels map[string]string) error { func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration, labels map[string]string) error {
@ -519,7 +540,8 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
return fmt.Errorf("querier is nil") return fmt.Errorf("querier is nil")
} }
qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) } ts := time.Now()
qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query, ts) }
// account for external labels in filter // account for external labels in filter
var labelsFilter string var labelsFilter string
@ -532,7 +554,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
// remote write protocol which is used for state persistence in vmalert. // remote write protocol which is used for state persistence in vmalert.
expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])", expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])",
alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds())) alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds()))
qMetrics, err := q.Query(ctx, expr) qMetrics, err := q.Query(ctx, expr, ts)
if err != nil { if err != nil {
return err return err
} }
@ -555,21 +577,27 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb
// and returns only those which should be sent to notifier. // and returns only those which should be sent to notifier.
// Isn't concurrent safe. // Isn't concurrent safe.
func (ar *AlertingRule) alertsToSend(ts time.Time, resolveDuration, resendDelay time.Duration) []notifier.Alert { func (ar *AlertingRule) alertsToSend(ts time.Time, resolveDuration, resendDelay time.Duration) []notifier.Alert {
needsSending := func(a *notifier.Alert) bool {
if a.State == notifier.StatePending {
return false
}
if a.ResolvedAt.After(a.LastSent) {
return true
}
return a.LastSent.Add(resendDelay).Before(ts)
}
var alerts []notifier.Alert var alerts []notifier.Alert
for _, a := range ar.alerts { for _, a := range ar.alerts {
switch a.State { if !needsSending(a) {
case notifier.StateFiring: continue
if time.Since(a.LastSent) < resendDelay {
continue
}
a.End = ts.Add(resolveDuration)
a.LastSent = ts
alerts = append(alerts, *a)
case notifier.StateInactive:
a.End = ts
a.LastSent = ts
alerts = append(alerts, *a)
} }
a.End = ts.Add(resolveDuration)
if a.State == notifier.StateInactive {
a.End = a.ResolvedAt
}
a.LastSent = ts
alerts = append(alerts, *a)
} }
return alerts return alerts
} }

View file

@ -61,7 +61,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
}, },
{ {
newTestAlertingRule("for", time.Second), newTestAlertingRule("for", time.Second),
&notifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)}, &notifier.Alert{State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second)},
[]prompbmarshal.TimeSeries{ []prompbmarshal.TimeSeries{
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
"__name__": alertMetricName, "__name__": alertMetricName,
@ -76,7 +76,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) {
}, },
{ {
newTestAlertingRule("for pending", 10*time.Second), newTestAlertingRule("for pending", 10*time.Second),
&notifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)}, &notifier.Alert{State: notifier.StatePending, ActiveAt: timestamp.Add(time.Second)},
[]prompbmarshal.TimeSeries{ []prompbmarshal.TimeSeries{
newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{
"__name__": alertMetricName, "__name__": alertMetricName,
@ -169,7 +169,7 @@ func TestAlertingRule_Exec(t *testing.T) {
}, },
}, },
{ {
newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty", 0), newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive", 0),
[][]datasource.Metric{ [][]datasource.Metric{
{metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo")},
{}, {},
@ -177,7 +177,9 @@ func TestAlertingRule_Exec(t *testing.T) {
{}, {},
{}, {},
}, },
nil, []testAlert{
{labels: []string{"name", "foo"}, alert: &notifier.Alert{State: notifier.StateInactive}},
},
}, },
{ {
newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0), newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0),
@ -217,8 +219,9 @@ func TestAlertingRule_Exec(t *testing.T) {
}, },
// 1: fire first alert // 1: fire first alert
// 2: fire second alert, set first inactive // 2: fire second alert, set first inactive
// 3: fire third alert, set second inactive, delete first one // 3: fire third alert, set second inactive
[]testAlert{ []testAlert{
{labels: []string{"name", "foo"}, alert: &notifier.Alert{State: notifier.StateInactive}},
{labels: []string{"name", "foo1"}, alert: &notifier.Alert{State: notifier.StateInactive}}, {labels: []string{"name", "foo1"}, alert: &notifier.Alert{State: notifier.StateInactive}},
{labels: []string{"name", "foo2"}, alert: &notifier.Alert{State: notifier.StateFiring}}, {labels: []string{"name", "foo2"}, alert: &notifier.Alert{State: notifier.StateFiring}},
}, },
@ -301,7 +304,7 @@ func TestAlertingRule_Exec(t *testing.T) {
for _, step := range tc.steps { for _, step := range tc.steps {
fq.reset() fq.reset()
fq.add(step...) fq.add(step...)
if _, err := tc.rule.Exec(context.TODO()); err != nil { if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
// artificial delay between applying steps // artificial delay between applying steps
@ -380,9 +383,9 @@ func TestAlertingRule_ExecRange(t *testing.T) {
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}}, {Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
}, },
[]*notifier.Alert{ []*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(3, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(3, 0)},
{State: notifier.StatePending, Start: time.Unix(5, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(5, 0)},
}, },
}, },
{ {
@ -391,9 +394,9 @@ func TestAlertingRule_ExecRange(t *testing.T) {
{Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}}, {Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}},
}, },
[]*notifier.Alert{ []*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(1, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StateFiring, Start: time.Unix(1, 0)}, {State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
}, },
}, },
{ {
@ -402,11 +405,11 @@ func TestAlertingRule_ExecRange(t *testing.T) {
{Values: []float64{1, 1, 1, 1, 1}, Timestamps: []int64{1, 2, 5, 6, 20}}, {Values: []float64{1, 1, 1, 1, 1}, Timestamps: []int64{1, 2, 5, 6, 20}},
}, },
[]*notifier.Alert{ []*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StateFiring, Start: time.Unix(1, 0)}, {State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(5, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(5, 0)},
{State: notifier.StateFiring, Start: time.Unix(5, 0)}, {State: notifier.StateFiring, ActiveAt: time.Unix(5, 0)},
{State: notifier.StatePending, Start: time.Unix(20, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(20, 0)},
}, },
}, },
{ {
@ -418,15 +421,15 @@ func TestAlertingRule_ExecRange(t *testing.T) {
}, },
}, },
[]*notifier.Alert{ []*notifier.Alert{
{State: notifier.StatePending, Start: time.Unix(1, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StatePending, Start: time.Unix(1, 0)}, {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)},
{State: notifier.StateFiring, Start: time.Unix(1, 0)}, {State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)},
// //
{State: notifier.StatePending, Start: time.Unix(1, 0), {State: notifier.StatePending, ActiveAt: time.Unix(1, 0),
Labels: map[string]string{ Labels: map[string]string{
"foo": "bar", "foo": "bar",
}}, }},
{State: notifier.StatePending, Start: time.Unix(5, 0), {State: notifier.StatePending, ActiveAt: time.Unix(5, 0),
Labels: map[string]string{ Labels: map[string]string{
"foo": "bar", "foo": "bar",
}}, }},
@ -479,7 +482,7 @@ func TestAlertingRule_ExecRange(t *testing.T) {
a.Labels = make(map[string]string) a.Labels = make(map[string]string)
} }
a.Labels[alertNameLabel] = tc.rule.Name a.Labels[alertNameLabel] = tc.rule.Name
expTS = append(expTS, tc.rule.alertToTimeSeries(tc.expAlerts[j], timestamp)...) expTS = append(expTS, tc.rule.alertToTimeSeries(a, timestamp)...)
j++ j++
} }
} }
@ -511,7 +514,7 @@ func TestAlertingRule_Restore(t *testing.T) {
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(datasource.Metric{}): {State: notifier.StatePending, hash(datasource.Metric{}): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)}, ActiveAt: time.Now().Truncate(time.Hour)},
}, },
}, },
{ {
@ -532,7 +535,7 @@ func TestAlertingRule_Restore(t *testing.T) {
"foo", "bar", "foo", "bar",
"namespace", "baz", "namespace", "baz",
)): {State: notifier.StatePending, )): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)}, ActiveAt: time.Now().Truncate(time.Hour)},
}, },
}, },
{ {
@ -552,7 +555,7 @@ func TestAlertingRule_Restore(t *testing.T) {
"namespace", "baz", "namespace", "baz",
"source", "vm", "source", "vm",
)): {State: notifier.StatePending, )): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)}, ActiveAt: time.Now().Truncate(time.Hour)},
}, },
}, },
{ {
@ -573,11 +576,11 @@ func TestAlertingRule_Restore(t *testing.T) {
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending, hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending,
Start: time.Now().Truncate(time.Hour)}, ActiveAt: time.Now().Truncate(time.Hour)},
hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending, hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending,
Start: time.Now().Truncate(2 * time.Hour)}, ActiveAt: time.Now().Truncate(2 * time.Hour)},
hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending, hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending,
Start: time.Now().Truncate(3 * time.Hour)}, ActiveAt: time.Now().Truncate(3 * time.Hour)},
}, },
}, },
} }
@ -602,8 +605,8 @@ func TestAlertingRule_Restore(t *testing.T) {
if got.State != exp.State { if got.State != exp.State {
t.Fatalf("expected state %d; got %d", exp.State, got.State) t.Fatalf("expected state %d; got %d", exp.State, got.State)
} }
if got.Start != exp.Start { if got.ActiveAt != exp.ActiveAt {
t.Fatalf("expected Start %v; got %v", exp.Start, got.Start) t.Fatalf("expected ActiveAt %v; got %v", exp.ActiveAt, got.ActiveAt)
} }
} }
}) })
@ -618,14 +621,14 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
// successful attempt // successful attempt
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar"))
_, err := ar.Exec(context.TODO()) _, err := ar.Exec(context.TODO(), time.Now())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// label `job` will collide with rule extra label and will make both time series equal // label `job` will collide with rule extra label and will make both time series equal
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz"))
_, err = ar.Exec(context.TODO()) _, err = ar.Exec(context.TODO(), time.Now())
if !errors.Is(err, errDuplicate) { if !errors.Is(err, errDuplicate) {
t.Fatalf("expected to have %s error; got %s", errDuplicate, err) t.Fatalf("expected to have %s error; got %s", errDuplicate, err)
} }
@ -634,7 +637,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) {
expErr := "connection reset by peer" expErr := "connection reset by peer"
fq.setErr(errors.New(expErr)) fq.setErr(errors.New(expErr))
_, err = ar.Exec(context.TODO()) _, err = ar.Exec(context.TODO(), time.Now())
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }
@ -688,8 +691,8 @@ func TestAlertingRule_Template(t *testing.T) {
alerts: make(map[uint64]*notifier.Alert), alerts: make(map[uint64]*notifier.Alert),
}, },
[]datasource.Metric{ []datasource.Metric{
metricWithValueAndLabels(t, 2, "instance", "foo"), metricWithValueAndLabels(t, 2, "instance", "foo", alertNameLabel, "override"),
metricWithValueAndLabels(t, 10, "instance", "bar"), metricWithValueAndLabels(t, 10, "instance", "bar", alertNameLabel, "override"),
}, },
map[uint64]*notifier.Alert{ map[uint64]*notifier.Alert{
hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "foo")): { hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "foo")): {
@ -762,7 +765,7 @@ func TestAlertingRule_Template(t *testing.T) {
tc.rule.GroupID = fakeGroup.ID() tc.rule.GroupID = fakeGroup.ID()
tc.rule.q = fq tc.rule.q = fq
fq.add(tc.metrics...) fq.add(tc.metrics...)
if _, err := tc.rule.Exec(context.TODO()); err != nil { if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil {
t.Fatalf("unexpected err: %s", err) t.Fatalf("unexpected err: %s", err)
} }
for hash, expAlert := range tc.expAlerts { for hash, expAlert := range tc.expAlerts {
@ -821,17 +824,17 @@ func TestAlertsToSend(t *testing.T) {
5*time.Minute, time.Minute, 5*time.Minute, time.Minute,
) )
f( // resolve inactive alert at the current timestamp f( // resolve inactive alert at the current timestamp
[]*notifier.Alert{{State: notifier.StateInactive}}, []*notifier.Alert{{State: notifier.StateInactive, ResolvedAt: ts}},
[]*notifier.Alert{{LastSent: ts, End: ts}}, []*notifier.Alert{{LastSent: ts, End: ts}},
time.Minute, time.Minute, time.Minute, time.Minute,
) )
f( // mixed case of firing and resolved alerts. Names are added for deterministic sorting f( // mixed case of firing and resolved alerts. Names are added for deterministic sorting
[]*notifier.Alert{{Name: "a", State: notifier.StateFiring}, {Name: "b", State: notifier.StateInactive}}, []*notifier.Alert{{Name: "a", State: notifier.StateFiring}, {Name: "b", State: notifier.StateInactive, ResolvedAt: ts}},
[]*notifier.Alert{{Name: "a", LastSent: ts, End: ts.Add(5 * time.Minute)}, {Name: "b", LastSent: ts, End: ts}}, []*notifier.Alert{{Name: "a", LastSent: ts, End: ts.Add(5 * time.Minute)}, {Name: "b", LastSent: ts, End: ts}},
5*time.Minute, time.Minute, 5*time.Minute, time.Minute,
) )
f( // mixed case of pending and resolved alerts. Names are added for deterministic sorting f( // mixed case of pending and resolved alerts. Names are added for deterministic sorting
[]*notifier.Alert{{Name: "a", State: notifier.StatePending}, {Name: "b", State: notifier.StateInactive}}, []*notifier.Alert{{Name: "a", State: notifier.StatePending}, {Name: "b", State: notifier.StateInactive, ResolvedAt: ts}},
[]*notifier.Alert{{Name: "b", LastSent: ts, End: ts}}, []*notifier.Alert{{Name: "b", LastSent: ts, End: ts}},
5*time.Minute, time.Minute, 5*time.Minute, time.Minute,
) )
@ -850,6 +853,16 @@ func TestAlertsToSend(t *testing.T) {
[]*notifier.Alert{{LastSent: ts, End: ts.Add(time.Minute)}}, []*notifier.Alert{{LastSent: ts, End: ts.Add(time.Minute)}},
time.Minute, 0, time.Minute, 0,
) )
f( // inactive alert which has been sent already
[]*notifier.Alert{{State: notifier.StateInactive, LastSent: ts.Add(-time.Second), ResolvedAt: ts.Add(-2 * time.Second)}},
nil,
time.Minute, time.Minute,
)
f( // inactive alert which has been resolved after last send
[]*notifier.Alert{{State: notifier.StateInactive, LastSent: ts.Add(-time.Second), ResolvedAt: ts}},
[]*notifier.Alert{{LastSent: ts, End: ts}},
time.Minute, time.Minute,
)
} }
func newTestRuleWithLabels(name string, labels ...string) *AlertingRule { func newTestRuleWithLabels(name string, labels ...string) *AlertingRule {

View file

@ -8,7 +8,7 @@ import (
// Querier interface wraps Query and QueryRange methods // Querier interface wraps Query and QueryRange methods
type Querier interface { type Querier interface {
Query(ctx context.Context, query string) ([]Metric, error) Query(ctx context.Context, query string, ts time.Time) ([]Metric, error)
QueryRange(ctx context.Context, query string, from, to time.Time) ([]Metric, error) QueryRange(ctx context.Context, query string, from, to time.Time) ([]Metric, error)
} }

View file

@ -71,13 +71,12 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati
} }
// Query executes the given query and returns parsed response // Query executes the given query and returns parsed response
func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) { func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Metric, error) {
req, err := s.newRequestPOST() req, err := s.newRequestPOST()
if err != nil { if err != nil {
return nil, err return nil, err
} }
ts := time.Now()
switch s.dataSourceType.String() { switch s.dataSourceType.String() {
case "prometheus": case "prometheus":
s.setPrometheusInstantReqParams(req, query, ts) s.setPrometheusInstantReqParams(req, query, ts)

View file

@ -89,26 +89,27 @@ func TestVMInstantQuery(t *testing.T) {
p := NewPrometheusType() p := NewPrometheusType()
pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second}) pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second})
ts := time.Now()
if _, err := pq.Query(ctx, query); err == nil { if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected connection error got nil") t.Fatalf("expected connection error got nil")
} }
if _, err := pq.Query(ctx, query); err == nil { if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected invalid response status error got nil") t.Fatalf("expected invalid response status error got nil")
} }
if _, err := pq.Query(ctx, query); err == nil { if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected response body error got nil") t.Fatalf("expected response body error got nil")
} }
if _, err := pq.Query(ctx, query); err == nil { if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected error status got nil") t.Fatalf("expected error status got nil")
} }
if _, err := pq.Query(ctx, query); err == nil { if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected unknown status got nil") t.Fatalf("expected unknown status got nil")
} }
if _, err := pq.Query(ctx, query); err == nil { if _, err := pq.Query(ctx, query, ts); err == nil {
t.Fatalf("expected non-vector resultType error got nil") t.Fatalf("expected non-vector resultType error got nil")
} }
m, err := pq.Query(ctx, query) m, err := pq.Query(ctx, query, ts)
if err != nil { if err != nil {
t.Fatalf("unexpected %s", err) t.Fatalf("unexpected %s", err)
} }
@ -134,7 +135,7 @@ func TestVMInstantQuery(t *testing.T) {
g := NewGraphiteType() g := NewGraphiteType()
gq := s.BuildWithParams(QuerierParams{DataSourceType: &g}) gq := s.BuildWithParams(QuerierParams{DataSourceType: &g})
m, err = gq.Query(ctx, queryRender) m, err = gq.Query(ctx, queryRender, ts)
if err != nil { if err != nil {
t.Fatalf("unexpected %s", err) t.Fatalf("unexpected %s", err)
} }

View file

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"net/url" "net/url"
"strconv"
"strings"
"sync" "sync"
"time" "time"
@ -13,7 +15,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
@ -44,6 +48,7 @@ type Group struct {
type groupMetrics struct { type groupMetrics struct {
iterationTotal *utils.Counter iterationTotal *utils.Counter
iterationDuration *utils.Summary iterationDuration *utils.Summary
iterationMissed *utils.Counter
} }
func newGroupMetrics(name, file string) *groupMetrics { func newGroupMetrics(name, file string) *groupMetrics {
@ -51,6 +56,7 @@ func newGroupMetrics(name, file string) *groupMetrics {
labels := fmt.Sprintf(`group=%q, file=%q`, name, file) labels := fmt.Sprintf(`group=%q, file=%q`, name, file)
m.iterationTotal = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels)) m.iterationTotal = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels))
m.iterationDuration = utils.GetOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels)) m.iterationDuration = utils.GetOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels))
m.iterationMissed = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels))
return m return m
} }
@ -226,6 +232,13 @@ var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client) { func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client) {
defer func() { close(g.finishedCh) }() defer func() { close(g.finishedCh) }()
e := &executor{
rw: rw,
notifiers: nts,
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)}
evalTS := time.Now()
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics. // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
if !skipRandSleepOnGroupStart { if !skipRandSleepOnGroupStart {
randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64))) randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64)))
@ -247,7 +260,31 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
} }
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
e := &executor{rw: rw, notifiers: nts}
eval := func(ts time.Time) {
g.metrics.iterationTotal.Inc()
start := time.Now()
if len(g.Rules) < 1 {
g.metrics.iterationDuration.UpdateDuration(start)
g.LastEvaluation = start
return
}
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration)
for err := range errs {
if err != nil {
logger.Errorf("group %q: %s", g.Name, err)
}
}
g.metrics.iterationDuration.UpdateDuration(start)
g.LastEvaluation = start
}
eval(evalTS)
t := time.NewTicker(g.Interval) t := time.NewTicker(g.Interval)
defer t.Stop() defer t.Stop()
for { for {
@ -274,32 +311,26 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
g.mu.Unlock() g.mu.Unlock()
logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
case <-t.C: case <-t.C:
g.metrics.iterationTotal.Inc() missed := (time.Since(evalTS) / g.Interval) - 1
iterationStart := time.Now() if missed > 0 {
if len(g.Rules) > 0 { g.metrics.iterationMissed.Inc()
errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, getResolveDuration(g.Interval))
for err := range errs {
if err != nil {
logger.Errorf("group %q: %s", g.Name, err)
}
}
g.LastEvaluation = iterationStart
} }
g.metrics.iterationDuration.UpdateDuration(iterationStart) evalTS = evalTS.Add((missed + 1) * g.Interval)
eval(evalTS)
} }
} }
} }
// getResolveDuration returns the duration after which firing alert // getResolveDuration returns the duration after which firing alert
// can be considered as resolved. // can be considered as resolved.
func getResolveDuration(groupInterval time.Duration) time.Duration { func getResolveDuration(groupInterval, delta, maxDuration time.Duration) time.Duration {
delta := *resendDelay
if groupInterval > delta { if groupInterval > delta {
delta = groupInterval delta = groupInterval
} }
resolveDuration := delta * 4 resolveDuration := delta * 4
if *maxResolveDuration > 0 && resolveDuration > *maxResolveDuration { if maxDuration > 0 && resolveDuration > maxDuration {
resolveDuration = *maxResolveDuration resolveDuration = maxDuration
} }
return resolveDuration return resolveDuration
} }
@ -307,14 +338,21 @@ func getResolveDuration(groupInterval time.Duration) time.Duration {
type executor struct { type executor struct {
notifiers func() []notifier.Notifier notifiers func() []notifier.Notifier
rw *remotewrite.Client rw *remotewrite.Client
previouslySentSeriesToRWMu sync.Mutex
// previouslySentSeriesToRW stores series sent to RW on previous iteration
// map[ruleID]map[ruleLabels][]prompb.Label
// where `ruleID` is ID of the Rule within a Group
// and `ruleLabels` is []prompb.Label marshalled to a string
previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label
} }
func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, resolveDuration time.Duration) chan error { func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration) chan error {
res := make(chan error, len(rules)) res := make(chan error, len(rules))
if concurrency == 1 { if concurrency == 1 {
// fast path // fast path
for _, rule := range rules { for _, rule := range rules {
res <- e.exec(ctx, rule, resolveDuration) res <- e.exec(ctx, rule, ts, resolveDuration)
} }
close(res) close(res)
return res return res
@ -327,7 +365,7 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurren
sem <- struct{}{} sem <- struct{}{}
wg.Add(1) wg.Add(1)
go func(r Rule) { go func(r Rule) {
res <- e.exec(ctx, r, resolveDuration) res <- e.exec(ctx, r, ts, resolveDuration)
<-sem <-sem
wg.Done() wg.Done()
}(rule) }(rule)
@ -348,24 +386,29 @@ var (
remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`) remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`)
) )
func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Duration) error { func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration) error {
execTotal.Inc() execTotal.Inc()
now := time.Now() tss, err := rule.Exec(ctx, ts)
tss, err := rule.Exec(ctx)
if err != nil { if err != nil {
execErrors.Inc() execErrors.Inc()
return fmt.Errorf("rule %q: failed to execute: %w", rule, err) return fmt.Errorf("rule %q: failed to execute: %w", rule, err)
} }
if len(tss) > 0 && e.rw != nil { errGr := new(utils.ErrGroup)
for _, ts := range tss { if e.rw != nil {
remoteWriteTotal.Inc() pushToRW := func(tss []prompbmarshal.TimeSeries) {
if err := e.rw.Push(ts); err != nil { for _, ts := range tss {
remoteWriteErrors.Inc() remoteWriteTotal.Inc()
return fmt.Errorf("rule %q: remote write failure: %w", rule, err) if err := e.rw.Push(ts); err != nil {
remoteWriteErrors.Inc()
errGr.Add(fmt.Errorf("rule %q: remote write failure: %w", rule, err))
}
} }
} }
pushToRW(tss)
staleSeries := e.getStaleSeries(rule, tss, ts)
pushToRW(staleSeries)
} }
ar, ok := rule.(*AlertingRule) ar, ok := rule.(*AlertingRule)
@ -373,12 +416,11 @@ func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Dur
return nil return nil
} }
alerts := ar.alertsToSend(now, resolveDuration, *resendDelay) alerts := ar.alertsToSend(ts, resolveDuration, *resendDelay)
if len(alerts) < 1 { if len(alerts) < 1 {
return nil return nil
} }
errGr := new(utils.ErrGroup)
for _, nt := range e.notifiers() { for _, nt := range e.notifiers() {
if err := nt.Send(ctx, alerts); err != nil { if err := nt.Send(ctx, alerts); err != nil {
errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err)) errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err))
@ -386,3 +428,50 @@ func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Dur
} }
return errGr.Err() return errGr.Err()
} }
// getStaledSeries checks whether there are stale series from previously sent ones.
func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries {
ruleLabels := make(map[string][]prompbmarshal.Label, len(tss))
for _, ts := range tss {
// convert labels to strings so we can compare with previously sent series
key := labelsToString(ts.Labels)
ruleLabels[key] = ts.Labels
}
rID := rule.ID()
var staleS []prompbmarshal.TimeSeries
// check whether there are series which disappeared and need to be marked as stale
e.previouslySentSeriesToRWMu.Lock()
for key, labels := range e.previouslySentSeriesToRW[rID] {
if _, ok := ruleLabels[key]; ok {
continue
}
// previously sent series are missing in current series, so we mark them as stale
ss := newTimeSeriesPB([]float64{decimal.StaleNaN}, []int64{timestamp.Unix()}, labels)
staleS = append(staleS, ss)
}
// set previous series to current
e.previouslySentSeriesToRW[rID] = ruleLabels
e.previouslySentSeriesToRWMu.Unlock()
return staleS
}
func labelsToString(labels []prompbmarshal.Label) string {
var b strings.Builder
b.WriteRune('{')
for i, label := range labels {
if len(label.Name) == 0 {
b.WriteString("__name__")
} else {
b.WriteString(label.Name)
}
b.WriteRune('=')
b.WriteString(strconv.Quote(label.Value))
if i < len(labels)-1 {
b.WriteRune(',')
}
}
b.WriteRune('}')
return b.String()
}

View file

@ -3,6 +3,9 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"reflect"
"sort" "sort"
"testing" "testing"
"time" "time"
@ -239,7 +242,8 @@ func TestGroupStart(t *testing.T) {
time.Sleep(20 * evalInterval) time.Sleep(20 * evalInterval)
gotAlerts = fn.getAlerts() gotAlerts = fn.getAlerts()
expectedAlerts = []notifier.Alert{*alert1} alert2.State = notifier.StateInactive
expectedAlerts = []notifier.Alert{*alert1, *alert2}
compareAlerts(t, expectedAlerts, gotAlerts) compareAlerts(t, expectedAlerts, gotAlerts)
g.close() g.close()
@ -262,21 +266,100 @@ func TestResolveDuration(t *testing.T) {
{0, 0, 0, 0}, {0, 0, 0, 0},
} }
defaultResolveDuration := *maxResolveDuration
defaultResendDelay := *resendDelay
defer func() {
*maxResolveDuration = defaultResolveDuration
*resendDelay = defaultResendDelay
}()
for _, tc := range testCases { for _, tc := range testCases {
t.Run(fmt.Sprintf("%v-%v-%v", tc.groupInterval, tc.expected, tc.maxDuration), func(t *testing.T) { t.Run(fmt.Sprintf("%v-%v-%v", tc.groupInterval, tc.expected, tc.maxDuration), func(t *testing.T) {
*maxResolveDuration = tc.maxDuration got := getResolveDuration(tc.groupInterval, tc.resendDelay, tc.maxDuration)
*resendDelay = tc.resendDelay
got := getResolveDuration(tc.groupInterval)
if got != tc.expected { if got != tc.expected {
t.Errorf("expected to have %v; got %v", tc.expected, got) t.Errorf("expected to have %v; got %v", tc.expected, got)
} }
}) })
} }
} }
func TestGetStaleSeries(t *testing.T) {
ts := time.Now()
e := &executor{
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
}
f := func(rule Rule, labels, expLabels [][]prompbmarshal.Label) {
t.Helper()
var tss []prompbmarshal.TimeSeries
for _, l := range labels {
tss = append(tss, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, l))
}
staleS := e.getStaleSeries(rule, tss, ts)
if staleS == nil && expLabels == nil {
return
}
if len(staleS) != len(expLabels) {
t.Fatalf("expected to get %d stale series, got %d",
len(expLabels), len(staleS))
}
for i, exp := range expLabels {
got := staleS[i]
if !reflect.DeepEqual(exp, got.Labels) {
t.Fatalf("expected to get labels: \n%v;\ngot instead: \n%v",
exp, got.Labels)
}
if len(got.Samples) != 1 {
t.Fatalf("expected to have 1 sample; got %d", len(got.Samples))
}
if !decimal.IsStaleNaN(got.Samples[0].Value) {
t.Fatalf("expected sample value to be %v; got %v", decimal.StaleNaN, got.Samples[0].Value)
}
}
}
// warn: keep in mind, that executor holds the state, so sequence of f calls matters
// single series
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")},
nil)
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")},
nil)
f(&AlertingRule{RuleID: 1},
nil,
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
f(&AlertingRule{RuleID: 1},
nil,
nil)
// multiple series
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
},
nil)
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
nil)
f(&AlertingRule{RuleID: 1},
nil,
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")})
// multiple rules and series
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
},
nil)
f(&AlertingRule{RuleID: 2},
[][]prompbmarshal.Label{
toPromLabels(t, "__name__", "job:foo", "job", "foo"),
toPromLabels(t, "__name__", "job:foo", "job", "bar"),
},
nil)
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")})
f(&AlertingRule{RuleID: 1},
[][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")},
nil)
}

View file

@ -44,10 +44,10 @@ func (fq *fakeQuerier) BuildWithParams(_ datasource.QuerierParams) datasource.Qu
} }
func (fq *fakeQuerier) QueryRange(ctx context.Context, q string, _, _ time.Time) ([]datasource.Metric, error) { func (fq *fakeQuerier) QueryRange(ctx context.Context, q string, _, _ time.Time) ([]datasource.Metric, error) {
return fq.Query(ctx, q) return fq.Query(ctx, q, time.Now())
} }
func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) { func (fq *fakeQuerier) Query(_ context.Context, _ string, _ time.Time) ([]datasource.Metric, error) {
fq.Lock() fq.Lock()
defer fq.Unlock() defer fq.Unlock()
if fq.err != nil { if fq.err != nil {
@ -116,6 +116,21 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
return m return m
} }
func toPromLabels(t *testing.T, labels ...string) []prompbmarshal.Label {
t.Helper()
if len(labels) == 0 || len(labels)%2 != 0 {
t.Fatalf("expected to get even number of labels")
}
var ls []prompbmarshal.Label
for i := 0; i < len(labels); i += 2 {
ls = append(ls, prompbmarshal.Label{
Name: labels[i],
Value: labels[i+1],
})
}
return ls
}
func compareGroups(t *testing.T, a, b *Group) { func compareGroups(t *testing.T, a, b *Group) {
t.Helper() t.Helper()
if a.Name != b.Name { if a.Name != b.Name {

View file

@ -26,10 +26,14 @@ type Alert struct {
State AlertState State AlertState
// Expr contains expression that was executed to generate the Alert // Expr contains expression that was executed to generate the Alert
Expr string Expr string
// Start defines the moment of time when Alert has triggered // ActiveAt defines the moment of time when Alert has become active
ActiveAt time.Time
// Start defines the moment of time when Alert has become firing
Start time.Time Start time.Time
// End defines the moment of time when Alert supposed to expire // End defines the moment of time when Alert supposed to expire
End time.Time End time.Time
// ResolvedAt defines the moment when Alert was switched from Firing to Inactive
ResolvedAt time.Time
// LastSent defines the moment when Alert was sent last time // LastSent defines the moment when Alert was sent last time
LastSent time.Time LastSent time.Time
// Value stores the value returned from evaluating expression from Expr field // Value stores the value returned from evaluating expression from Expr field

View file

@ -124,14 +124,13 @@ func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([
} }
// Exec executes RecordingRule expression via the given Querier. // Exec executes RecordingRule expression via the given Querier.
func (rr *RecordingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) { func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) {
start := time.Now() qMetrics, err := rr.q.Query(ctx, rr.Expr, ts)
qMetrics, err := rr.q.Query(ctx, rr.Expr)
rr.mu.Lock() rr.mu.Lock()
defer rr.mu.Unlock() defer rr.mu.Unlock()
rr.lastExecTime = start rr.lastExecTime = ts
rr.lastExecDuration = time.Since(start) rr.lastExecDuration = time.Since(ts)
rr.lastExecError = err rr.lastExecError = err
rr.lastExecSamples = len(qMetrics) rr.lastExecSamples = len(qMetrics)
if err != nil { if err != nil {

View file

@ -77,7 +77,7 @@ func TestRecoridngRule_Exec(t *testing.T) {
fq := &fakeQuerier{} fq := &fakeQuerier{}
fq.add(tc.metrics...) fq.add(tc.metrics...)
tc.rule.q = fq tc.rule.q = fq
tss, err := tc.rule.Exec(context.TODO()) tss, err := tc.rule.Exec(context.TODO(), time.Now())
if err != nil { if err != nil {
t.Fatalf("unexpected Exec err: %s", err) t.Fatalf("unexpected Exec err: %s", err)
} }
@ -178,7 +178,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
expErr := "connection reset by peer" expErr := "connection reset by peer"
fq.setErr(errors.New(expErr)) fq.setErr(errors.New(expErr))
rr.q = fq rr.q = fq
_, err := rr.Exec(context.TODO()) _, err := rr.Exec(context.TODO(), time.Now())
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }
@ -193,7 +193,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) {
fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo")) fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"))
fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar")) fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar"))
_, err = rr.Exec(context.TODO()) _, err = rr.Exec(context.TODO(), time.Now())
if err == nil { if err == nil {
t.Fatalf("expected to get err; got nil") t.Fatalf("expected to get err; got nil")
} }

View file

@ -225,7 +225,7 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
droppedRows.Add(len(wr.Timeseries)) droppedRows.Add(len(wr.Timeseries))
droppedBytes.Add(len(b)) droppedBytes.Add(len(b))
logger.Errorf("all %d attempts to send request failed - dropping %d timeseries", logger.Errorf("all %d attempts to send request failed - dropping %d time series",
attempts, len(wr.Timeseries)) attempts, len(wr.Timeseries))
} }

View file

@ -3,8 +3,9 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
) )
// Rule represents alerting or recording rule // Rule represents alerting or recording rule
@ -14,8 +15,8 @@ type Rule interface {
// ID returns unique ID that may be used for // ID returns unique ID that may be used for
// identifying this Rule among others. // identifying this Rule among others.
ID() uint64 ID() uint64
// Exec executes the rule with given context // Exec executes the rule with given context at the given timestamp
Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error)
// ExecRange executes the rule on the given time range // ExecRange executes the rule on the given time range
ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error)
// UpdateWith performs modification of current Rule // UpdateWith performs modification of current Rule

View file

@ -30,3 +30,20 @@ func newTimeSeries(values []float64, timestamps []int64, labels map[string]strin
} }
return ts return ts
} }
// newTimeSeriesPB creates prompbmarshal.TimeSeries with given
// values, timestamps and labels.
// It expects that labels are already sorted.
func newTimeSeriesPB(values []float64, timestamps []int64, labels []prompbmarshal.Label) prompbmarshal.TimeSeries {
ts := prompbmarshal.TimeSeries{
Samples: make([]prompbmarshal.Sample, len(values)),
}
for i := range values {
ts.Samples[i] = prompbmarshal.Sample{
Value: values[i],
Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6,
}
}
ts.Labels = labels
return ts
}

View file

@ -14,7 +14,7 @@ func TestHandler(t *testing.T) {
ar := &AlertingRule{ ar := &AlertingRule{
Name: "alert", Name: "alert",
alerts: map[uint64]*notifier.Alert{ alerts: map[uint64]*notifier.Alert{
0: {}, 0: {State: notifier.StateFiring},
}, },
} }
g := &Group{ g := &Group{

View file

@ -2,8 +2,8 @@
DOCKER_NAMESPACE := victoriametrics DOCKER_NAMESPACE := victoriametrics
ROOT_IMAGE ?= alpine:3.15.2 ROOT_IMAGE ?= alpine:3.15.3
CERTS_IMAGE := alpine:3.15.2 CERTS_IMAGE := alpine:3.15.3
GO_BUILDER_IMAGE := golang:1.18.0-alpine GO_BUILDER_IMAGE := golang:1.18.0-alpine
BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1 BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1
BASE_IMAGE := local/base:1.1.3-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __) BASE_IMAGE := local/base:1.1.3-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __)

View file

@ -16,6 +16,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip ## tip
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add pre-defined dasbhoards for per-job CPU usage, memory usage and disk IO usage. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2243) for details. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add pre-defined dasbhoards for per-job CPU usage, memory usage and disk IO usage. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2243) for details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): improve compatibility with [Prometheus Alert Generator specification](https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2340).
* FEATURE: [vmgateway](https://docs.victoriametrics.com/vmgateway.html): Allow to read `-ratelimit.config` file from URL. Also add `-atelimit.configCheckInterval` command-line option. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2241).
* FEATURE: add the following command-line flags, which can be used for fine-grained limiting of CPU and memory usage during various API calls: * FEATURE: add the following command-line flags, which can be used for fine-grained limiting of CPU and memory usage during various API calls:
* `-search.maxFederateSeries` for limiting the number of time series, which can be returned from [/federate](https://docs.victoriametrics.com/#federation). * `-search.maxFederateSeries` for limiting the number of time series, which can be returned from [/federate](https://docs.victoriametrics.com/#federation).

View file

@ -100,7 +100,7 @@ Cluster:
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
```bash ```bash
curl --data-binary "@import.txt" -X POST 'http://<vminsert>:8480/insert/prometheus/api/v1/import' curl --data-binary "@import.txt" -X POST 'http://<vminsert>:8480/insert/0/prometheus/api/v1/import'
``` ```
</div> </div>

View file

@ -385,7 +385,7 @@ See also [downsampling docs](https://docs.victoriametrics.com/#downsampling).
`vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: `vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
* `http://<vmalert-addr>` - UI; * `http://<vmalert-addr>` - UI;
* `http://<vmalert-addr>/api/v1/groups` - list of all loaded groups and rules; * `http://<vmalert-addr>/api/v1/rules` - list of all loaded groups and rules;
* `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts; * `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts;
* `http://<vmalert-addr>/api/v1/<groupID>/<alertID>/status"` - get alert status by ID. * `http://<vmalert-addr>/api/v1/<groupID>/<alertID>/status"` - get alert status by ID.
Used as alert source in AlertManager. Used as alert source in AlertManager.

View file

@ -111,7 +111,7 @@ optionally preserving labels).
## Usage ## Usage
The vmanomapy accepts only one parameter -- config file path: The vmanomaly accepts only one parameter -- config file path:
```sh ```sh
python3 vmanomaly.py config_zscore.yaml python3 vmanomaly.py config_zscore.yaml