From 56de8f0356b19de0e29056865a78c0c6b9288b4c Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 28 Mar 2022 12:07:02 +0200 Subject: [PATCH 1/8] docs: fix typo in vmalert's API (#2380) The API handler was changed in 1.75 but docs still contain the old address. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2366 Signed-off-by: hagen1778 --- app/vmalert/README.md | 2 +- docs/vmalert.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/vmalert/README.md b/app/vmalert/README.md index e7c950e79..191105df5 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -381,7 +381,7 @@ See also [downsampling docs](https://docs.victoriametrics.com/#downsampling). `vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: * `http://` - UI; -* `http:///api/v1/groups` - list of all loaded groups and rules; +* `http:///api/v1/rules` - list of all loaded groups and rules; * `http:///api/v1/alerts` - list of all active alerts; * `http:///api/v1///status"` - get alert status by ID. Used as alert source in AlertManager. diff --git a/docs/vmalert.md b/docs/vmalert.md index 0bcd76343..f01dcf6c5 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -385,7 +385,7 @@ See also [downsampling docs](https://docs.victoriametrics.com/#downsampling). `vmalert` runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints: * `http://` - UI; -* `http:///api/v1/groups` - list of all loaded groups and rules; +* `http:///api/v1/rules` - list of all loaded groups and rules; * `http:///api/v1/alerts` - list of all active alerts; * `http:///api/v1///status"` - get alert status by ID. Used as alert source in AlertManager. From 0123295d50f7c0e5ac30e89c01402361792754fd Mon Sep 17 00:00:00 2001 From: Denys Holius <5650611+denisgolius@users.noreply.github.com> Date: Tue, 29 Mar 2022 13:48:11 +0300 Subject: [PATCH 2/8] Update alpine linux base image to the latest v3.15.3 (#2384) Updated alpine linux base image to the latest v3.15.3 which has fix for [CVE-2018-25032](https://security.alpinelinux.org/vuln/CVE-2018-25032). See https://alpinelinux.org/posts/Alpine-3.12.11-3.13.9-3.14.5-3.15.3-released.html --- deployment/docker/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deployment/docker/Makefile b/deployment/docker/Makefile index 47c9fa20e..7fcdd6fd9 100644 --- a/deployment/docker/Makefile +++ b/deployment/docker/Makefile @@ -2,8 +2,8 @@ DOCKER_NAMESPACE := victoriametrics -ROOT_IMAGE ?= alpine:3.15.2 -CERTS_IMAGE := alpine:3.15.2 +ROOT_IMAGE ?= alpine:3.15.3 +CERTS_IMAGE := alpine:3.15.3 GO_BUILDER_IMAGE := golang:1.18.0-alpine BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1 BASE_IMAGE := local/base:1.1.3-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __) From 0989649ad041d5984f83fac6e2c76dd3b92af19a Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Tue, 29 Mar 2022 15:09:07 +0200 Subject: [PATCH 3/8] Vmalert compliance 2 (#2340) * vmalert: split alert's `Start` field into `ActiveAt` and `Start` The `ActiveAt` field identifies when alert becomes active for rules with `for > 0`. Previously, this value was stored in field `Start`. The field `Start` now identifies the moment alert became `FIRING`. The split is needed in order to distinguish these two moments in the API responses for alerts. Signed-off-by: hagen1778 * vmalert: support specific moment of time for rules evaluation The Querier interface was extended to accept a new argument used as a timestamp at which evaluation should be made. It is needed to align rules execution time within the group. Signed-off-by: hagen1778 * vmalert: mark disappeared series as stale Series generated by alerting rules, which were sent to remote write now will be marked as stale if they will disappear on the next evaluation. This would make ALERTS and ALERTS_FOR_TIME series more precise. Signed-off-by: hagen1778 * wip Signed-off-by: hagen1778 * vmalert: evaluate rules at fixed timestamp Before, time at which rules were evaluated was calculated right before rule execution. The change makes sure that timestamp is calculated only once per evalution round and all rules are using the same timestamp. It also updates the logic of resending of already resolved alert notification. Signed-off-by: hagen1778 * vmalert: allow overridin `alertname` label value if it is present in response Previously, `alertname` was always equal to the Alerting Rule name. Now, its value can be overriden if series in response containt the different value for this label. The change is needed for improving compatibility with Prometheus. Signed-off-by: hagen1778 * vmalert: align rules evaluation in time Now, evaluation timestamp for rules evaluates as if there was no delay in rules evaluation. It means, that rules will be evaluated at fixed timestamps+group_interval. This way provides more consistent evaluation results and improves compatibility with Prometheus, Signed-off-by: hagen1778 * vmalert: add metric for missed iterations New metric `vmalert_iteration_missed_total` will show whether rules evaluation round was missed. Signed-off-by: hagen1778 * vmalert: reduce delay before the initial rule evaluation in group Signed-off-by: hagen1778 * vmalert: rollback alertname override According to the spec: ``` The alert name from the alerting rule (HighRequestLatency from the example above) MUST be added to the labels of the alert with the label name as alertname. It MUST override any existing alertname label. ``` https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md#step-3 Signed-off-by: hagen1778 * vmalert: throw err immediately on dedup detection ``` The execution of an alerting rule MUST error out immediately and MUST NOT send any alerts or add samples to samples receiver if there is more than one alert with the same labels ``` https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md#step-4 Signed-off-by: hagen1778 * vmalert: cleanup Signed-off-by: hagen1778 * vmalert: use strings builder to reduce allocs Signed-off-by: hagen1778 --- app/vmalert/alerting.go | 96 ++++++++++------ app/vmalert/alerting_test.go | 93 +++++++++------- app/vmalert/datasource/datasource.go | 2 +- app/vmalert/datasource/vm.go | 3 +- app/vmalert/datasource/vm_test.go | 17 +-- app/vmalert/group.go | 146 ++++++++++++++++++++----- app/vmalert/group_test.go | 105 ++++++++++++++++-- app/vmalert/helpers_test.go | 19 +++- app/vmalert/notifier/alert.go | 6 +- app/vmalert/recording.go | 9 +- app/vmalert/recording_test.go | 6 +- app/vmalert/remotewrite/remotewrite.go | 2 +- app/vmalert/rule.go | 7 +- app/vmalert/utils.go | 17 +++ app/vmalert/web_test.go | 2 +- 15 files changed, 388 insertions(+), 142 deletions(-) diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index 1d833b760..a5b929f86 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -191,9 +191,10 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([] if at.Sub(prevT) > ar.EvalInterval { // reset to Pending if there are gaps > EvalInterval between DPs a.State = notifier.StatePending - a.Start = at - } else if at.Sub(a.Start) >= ar.For { + a.ActiveAt = at + } else if at.Sub(a.ActiveAt) >= ar.For { a.State = notifier.StateFiring + a.Start = at } prevT = at result = append(result, ar.alertToTimeSeries(a, s.Timestamps[i])...) @@ -202,11 +203,15 @@ func (ar *AlertingRule) ExecRange(ctx context.Context, start, end time.Time) ([] return result, nil } +// resolvedRetention is the duration for which a resolved alert instance +// is kept in memory state and consequently repeatedly sent to the AlertManager. +const resolvedRetention = 15 * time.Minute + // Exec executes AlertingRule expression via the given Querier. // Based on the Querier results AlertingRule maintains notifier.Alerts -func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) { +func (ar *AlertingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) { start := time.Now() - qMetrics, err := ar.q.Query(ctx, ar.Expr) + qMetrics, err := ar.q.Query(ctx, ar.Expr, ts) ar.mu.Lock() defer ar.mu.Unlock() @@ -220,12 +225,12 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e for h, a := range ar.alerts { // cleanup inactive alerts from previous Exec - if a.State == notifier.StateInactive { + if a.State == notifier.StateInactive && ts.Sub(a.ResolvedAt) > resolvedRetention { delete(ar.alerts, h) } } - qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) } + qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query, ts) } updated := make(map[uint64]struct{}) // update list of active alerts for _, m := range qMetrics { @@ -250,10 +255,18 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e if _, ok := updated[h]; ok { // duplicate may be caused by extra labels // conflicting with the metric labels - return nil, fmt.Errorf("labels %v: %w", m.Labels, errDuplicate) + ar.lastExecError = fmt.Errorf("labels %v: %w", m.Labels, errDuplicate) + return nil, ar.lastExecError } updated[h] = struct{}{} if a, ok := ar.alerts[h]; ok { + if a.State == notifier.StateInactive { + // alert could be in inactive state for resolvedRetention + // so when we again receive metrics for it - we switch it + // back to notifier.StatePending + a.State = notifier.StatePending + a.ActiveAt = ts + } if a.Value != m.Values[0] { // update Value field with latest value a.Value = m.Values[0] @@ -273,6 +286,7 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e } a.ID = h a.State = notifier.StatePending + a.ActiveAt = ts ar.alerts[h] = a } @@ -286,15 +300,19 @@ func (ar *AlertingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, e delete(ar.alerts, h) continue } - a.State = notifier.StateInactive + if a.State == notifier.StateFiring { + a.State = notifier.StateInactive + a.ResolvedAt = ts + } continue } - if a.State == notifier.StatePending && time.Since(a.Start) >= ar.For { + if a.State == notifier.StatePending && time.Since(a.ActiveAt) >= ar.For { a.State = notifier.StateFiring + a.Start = ts alertsFired.Inc() } } - return ar.toTimeSeries(ar.lastExecTime.Unix()), nil + return ar.toTimeSeries(ts.Unix()), nil } func expandLabels(m datasource.Metric, q notifier.QueryFn, ar *AlertingRule) (map[string]string, error) { @@ -360,12 +378,12 @@ func hash(m datasource.Metric) uint64 { func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time, qFn notifier.QueryFn) (*notifier.Alert, error) { a := ¬ifier.Alert{ - GroupID: ar.GroupID, - Name: ar.Name, - Labels: map[string]string{}, - Value: m.Values[0], - Start: start, - Expr: ar.Expr, + GroupID: ar.GroupID, + Name: ar.Name, + Labels: map[string]string{}, + Value: m.Values[0], + ActiveAt: start, + Expr: ar.Expr, } for _, l := range m.Labels { // drop __name__ to be consistent with Prometheus alerting @@ -435,6 +453,9 @@ func (ar *AlertingRule) AlertsToAPI() []*APIAlert { var alerts []*APIAlert ar.mu.RLock() for _, a := range ar.alerts { + if a.State == notifier.StateInactive { + continue + } alerts = append(alerts, ar.newAlertAPI(*a)) } ar.mu.RUnlock() @@ -453,7 +474,7 @@ func (ar *AlertingRule) newAlertAPI(a notifier.Alert) *APIAlert { Labels: a.Labels, Annotations: a.Annotations, State: a.State.String(), - ActiveAt: a.Start, + ActiveAt: a.ActiveAt, Restored: a.Restored, Value: strconv.FormatFloat(a.Value, 'f', -1, 32), } @@ -479,7 +500,7 @@ const ( alertGroupNameLabel = "alertgroup" ) -// alertToTimeSeries converts the given alert with the given timestamp to timeseries +// alertToTimeSeries converts the given alert with the given timestamp to time series func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp int64) []prompbmarshal.TimeSeries { var tss []prompbmarshal.TimeSeries tss = append(tss, alertToTimeSeries(a, timestamp)) @@ -507,11 +528,11 @@ func alertForToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.Time labels[k] = v } labels["__name__"] = alertForStateMetricName - return newTimeSeries([]float64{float64(a.Start.Unix())}, []int64{timestamp}, labels) + return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels) } // Restore restores the state of active alerts basing on previously written time series. -// Restore restores only Start field. Field State will be always Pending and supposed +// Restore restores only ActiveAt field. Field State will be always Pending and supposed // to be updated on next Exec, as well as Value field. // Only rules with For > 0 will be restored. func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration, labels map[string]string) error { @@ -519,7 +540,8 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb return fmt.Errorf("querier is nil") } - qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query) } + ts := time.Now() + qFn := func(query string) ([]datasource.Metric, error) { return ar.q.Query(ctx, query, ts) } // account for external labels in filter var labelsFilter string @@ -532,7 +554,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb // remote write protocol which is used for state persistence in vmalert. expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])", alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds())) - qMetrics, err := q.Query(ctx, expr) + qMetrics, err := q.Query(ctx, expr, ts) if err != nil { return err } @@ -555,21 +577,27 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb // and returns only those which should be sent to notifier. // Isn't concurrent safe. func (ar *AlertingRule) alertsToSend(ts time.Time, resolveDuration, resendDelay time.Duration) []notifier.Alert { + needsSending := func(a *notifier.Alert) bool { + if a.State == notifier.StatePending { + return false + } + if a.ResolvedAt.After(a.LastSent) { + return true + } + return a.LastSent.Add(resendDelay).Before(ts) + } + var alerts []notifier.Alert for _, a := range ar.alerts { - switch a.State { - case notifier.StateFiring: - if time.Since(a.LastSent) < resendDelay { - continue - } - a.End = ts.Add(resolveDuration) - a.LastSent = ts - alerts = append(alerts, *a) - case notifier.StateInactive: - a.End = ts - a.LastSent = ts - alerts = append(alerts, *a) + if !needsSending(a) { + continue } + a.End = ts.Add(resolveDuration) + if a.State == notifier.StateInactive { + a.End = a.ResolvedAt + } + a.LastSent = ts + alerts = append(alerts, *a) } return alerts } diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index 184e13922..edca8254a 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -61,7 +61,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) { }, { newTestAlertingRule("for", time.Second), - ¬ifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)}, + ¬ifier.Alert{State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second)}, []prompbmarshal.TimeSeries{ newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ "__name__": alertMetricName, @@ -76,7 +76,7 @@ func TestAlertingRule_ToTimeSeries(t *testing.T) { }, { newTestAlertingRule("for pending", 10*time.Second), - ¬ifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)}, + ¬ifier.Alert{State: notifier.StatePending, ActiveAt: timestamp.Add(time.Second)}, []prompbmarshal.TimeSeries{ newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ "__name__": alertMetricName, @@ -169,7 +169,7 @@ func TestAlertingRule_Exec(t *testing.T) { }, }, { - newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty", 0), + newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, {}, @@ -177,7 +177,9 @@ func TestAlertingRule_Exec(t *testing.T) { {}, {}, }, - nil, + []testAlert{ + {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, + }, }, { newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0), @@ -217,8 +219,9 @@ func TestAlertingRule_Exec(t *testing.T) { }, // 1: fire first alert // 2: fire second alert, set first inactive - // 3: fire third alert, set second inactive, delete first one + // 3: fire third alert, set second inactive []testAlert{ + {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, {labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, {labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, }, @@ -301,7 +304,7 @@ func TestAlertingRule_Exec(t *testing.T) { for _, step := range tc.steps { fq.reset() fq.add(step...) - if _, err := tc.rule.Exec(context.TODO()); err != nil { + if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil { t.Fatalf("unexpected err: %s", err) } // artificial delay between applying steps @@ -380,9 +383,9 @@ func TestAlertingRule_ExecRange(t *testing.T) { {Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}}, }, []*notifier.Alert{ - {State: notifier.StatePending, Start: time.Unix(1, 0)}, - {State: notifier.StatePending, Start: time.Unix(3, 0)}, - {State: notifier.StatePending, Start: time.Unix(5, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(3, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(5, 0)}, }, }, { @@ -391,9 +394,9 @@ func TestAlertingRule_ExecRange(t *testing.T) { {Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}}, }, []*notifier.Alert{ - {State: notifier.StatePending, Start: time.Unix(1, 0)}, - {State: notifier.StatePending, Start: time.Unix(1, 0)}, - {State: notifier.StateFiring, Start: time.Unix(1, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)}, + {State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)}, }, }, { @@ -402,11 +405,11 @@ func TestAlertingRule_ExecRange(t *testing.T) { {Values: []float64{1, 1, 1, 1, 1}, Timestamps: []int64{1, 2, 5, 6, 20}}, }, []*notifier.Alert{ - {State: notifier.StatePending, Start: time.Unix(1, 0)}, - {State: notifier.StateFiring, Start: time.Unix(1, 0)}, - {State: notifier.StatePending, Start: time.Unix(5, 0)}, - {State: notifier.StateFiring, Start: time.Unix(5, 0)}, - {State: notifier.StatePending, Start: time.Unix(20, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)}, + {State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(5, 0)}, + {State: notifier.StateFiring, ActiveAt: time.Unix(5, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(20, 0)}, }, }, { @@ -418,15 +421,15 @@ func TestAlertingRule_ExecRange(t *testing.T) { }, }, []*notifier.Alert{ - {State: notifier.StatePending, Start: time.Unix(1, 0)}, - {State: notifier.StatePending, Start: time.Unix(1, 0)}, - {State: notifier.StateFiring, Start: time.Unix(1, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)}, + {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)}, + {State: notifier.StateFiring, ActiveAt: time.Unix(1, 0)}, // - {State: notifier.StatePending, Start: time.Unix(1, 0), + {State: notifier.StatePending, ActiveAt: time.Unix(1, 0), Labels: map[string]string{ "foo": "bar", }}, - {State: notifier.StatePending, Start: time.Unix(5, 0), + {State: notifier.StatePending, ActiveAt: time.Unix(5, 0), Labels: map[string]string{ "foo": "bar", }}, @@ -479,7 +482,7 @@ func TestAlertingRule_ExecRange(t *testing.T) { a.Labels = make(map[string]string) } a.Labels[alertNameLabel] = tc.rule.Name - expTS = append(expTS, tc.rule.alertToTimeSeries(tc.expAlerts[j], timestamp)...) + expTS = append(expTS, tc.rule.alertToTimeSeries(a, timestamp)...) j++ } } @@ -511,7 +514,7 @@ func TestAlertingRule_Restore(t *testing.T) { }, map[uint64]*notifier.Alert{ hash(datasource.Metric{}): {State: notifier.StatePending, - Start: time.Now().Truncate(time.Hour)}, + ActiveAt: time.Now().Truncate(time.Hour)}, }, }, { @@ -532,7 +535,7 @@ func TestAlertingRule_Restore(t *testing.T) { "foo", "bar", "namespace", "baz", )): {State: notifier.StatePending, - Start: time.Now().Truncate(time.Hour)}, + ActiveAt: time.Now().Truncate(time.Hour)}, }, }, { @@ -552,7 +555,7 @@ func TestAlertingRule_Restore(t *testing.T) { "namespace", "baz", "source", "vm", )): {State: notifier.StatePending, - Start: time.Now().Truncate(time.Hour)}, + ActiveAt: time.Now().Truncate(time.Hour)}, }, }, { @@ -573,11 +576,11 @@ func TestAlertingRule_Restore(t *testing.T) { }, map[uint64]*notifier.Alert{ hash(metricWithLabels(t, "host", "localhost-1")): {State: notifier.StatePending, - Start: time.Now().Truncate(time.Hour)}, + ActiveAt: time.Now().Truncate(time.Hour)}, hash(metricWithLabels(t, "host", "localhost-2")): {State: notifier.StatePending, - Start: time.Now().Truncate(2 * time.Hour)}, + ActiveAt: time.Now().Truncate(2 * time.Hour)}, hash(metricWithLabels(t, "host", "localhost-3")): {State: notifier.StatePending, - Start: time.Now().Truncate(3 * time.Hour)}, + ActiveAt: time.Now().Truncate(3 * time.Hour)}, }, }, } @@ -602,8 +605,8 @@ func TestAlertingRule_Restore(t *testing.T) { if got.State != exp.State { t.Fatalf("expected state %d; got %d", exp.State, got.State) } - if got.Start != exp.Start { - t.Fatalf("expected Start %v; got %v", exp.Start, got.Start) + if got.ActiveAt != exp.ActiveAt { + t.Fatalf("expected ActiveAt %v; got %v", exp.ActiveAt, got.ActiveAt) } } }) @@ -618,14 +621,14 @@ func TestAlertingRule_Exec_Negative(t *testing.T) { // successful attempt fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) - _, err := ar.Exec(context.TODO()) + _, err := ar.Exec(context.TODO(), time.Now()) if err != nil { t.Fatal(err) } // label `job` will collide with rule extra label and will make both time series equal fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) - _, err = ar.Exec(context.TODO()) + _, err = ar.Exec(context.TODO(), time.Now()) if !errors.Is(err, errDuplicate) { t.Fatalf("expected to have %s error; got %s", errDuplicate, err) } @@ -634,7 +637,7 @@ func TestAlertingRule_Exec_Negative(t *testing.T) { expErr := "connection reset by peer" fq.setErr(errors.New(expErr)) - _, err = ar.Exec(context.TODO()) + _, err = ar.Exec(context.TODO(), time.Now()) if err == nil { t.Fatalf("expected to get err; got nil") } @@ -688,8 +691,8 @@ func TestAlertingRule_Template(t *testing.T) { alerts: make(map[uint64]*notifier.Alert), }, []datasource.Metric{ - metricWithValueAndLabels(t, 2, "instance", "foo"), - metricWithValueAndLabels(t, 10, "instance", "bar"), + metricWithValueAndLabels(t, 2, "instance", "foo", alertNameLabel, "override"), + metricWithValueAndLabels(t, 10, "instance", "bar", alertNameLabel, "override"), }, map[uint64]*notifier.Alert{ hash(metricWithLabels(t, alertNameLabel, "override label", "region", "east", "instance", "foo")): { @@ -762,7 +765,7 @@ func TestAlertingRule_Template(t *testing.T) { tc.rule.GroupID = fakeGroup.ID() tc.rule.q = fq fq.add(tc.metrics...) - if _, err := tc.rule.Exec(context.TODO()); err != nil { + if _, err := tc.rule.Exec(context.TODO(), time.Now()); err != nil { t.Fatalf("unexpected err: %s", err) } for hash, expAlert := range tc.expAlerts { @@ -821,17 +824,17 @@ func TestAlertsToSend(t *testing.T) { 5*time.Minute, time.Minute, ) f( // resolve inactive alert at the current timestamp - []*notifier.Alert{{State: notifier.StateInactive}}, + []*notifier.Alert{{State: notifier.StateInactive, ResolvedAt: ts}}, []*notifier.Alert{{LastSent: ts, End: ts}}, time.Minute, time.Minute, ) f( // mixed case of firing and resolved alerts. Names are added for deterministic sorting - []*notifier.Alert{{Name: "a", State: notifier.StateFiring}, {Name: "b", State: notifier.StateInactive}}, + []*notifier.Alert{{Name: "a", State: notifier.StateFiring}, {Name: "b", State: notifier.StateInactive, ResolvedAt: ts}}, []*notifier.Alert{{Name: "a", LastSent: ts, End: ts.Add(5 * time.Minute)}, {Name: "b", LastSent: ts, End: ts}}, 5*time.Minute, time.Minute, ) f( // mixed case of pending and resolved alerts. Names are added for deterministic sorting - []*notifier.Alert{{Name: "a", State: notifier.StatePending}, {Name: "b", State: notifier.StateInactive}}, + []*notifier.Alert{{Name: "a", State: notifier.StatePending}, {Name: "b", State: notifier.StateInactive, ResolvedAt: ts}}, []*notifier.Alert{{Name: "b", LastSent: ts, End: ts}}, 5*time.Minute, time.Minute, ) @@ -850,6 +853,16 @@ func TestAlertsToSend(t *testing.T) { []*notifier.Alert{{LastSent: ts, End: ts.Add(time.Minute)}}, time.Minute, 0, ) + f( // inactive alert which has been sent already + []*notifier.Alert{{State: notifier.StateInactive, LastSent: ts.Add(-time.Second), ResolvedAt: ts.Add(-2 * time.Second)}}, + nil, + time.Minute, time.Minute, + ) + f( // inactive alert which has been resolved after last send + []*notifier.Alert{{State: notifier.StateInactive, LastSent: ts.Add(-time.Second), ResolvedAt: ts}}, + []*notifier.Alert{{LastSent: ts, End: ts}}, + time.Minute, time.Minute, + ) } func newTestRuleWithLabels(name string, labels ...string) *AlertingRule { diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go index e74f37314..5fd7f7acf 100644 --- a/app/vmalert/datasource/datasource.go +++ b/app/vmalert/datasource/datasource.go @@ -8,7 +8,7 @@ import ( // Querier interface wraps Query and QueryRange methods type Querier interface { - Query(ctx context.Context, query string) ([]Metric, error) + Query(ctx context.Context, query string, ts time.Time) ([]Metric, error) QueryRange(ctx context.Context, query string, from, to time.Time) ([]Metric, error) } diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index c30eed93a..73d16fd8a 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -71,13 +71,12 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati } // Query executes the given query and returns parsed response -func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) { +func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) ([]Metric, error) { req, err := s.newRequestPOST() if err != nil { return nil, err } - ts := time.Now() switch s.dataSourceType.String() { case "prometheus": s.setPrometheusInstantReqParams(req, query, ts) diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index 613cc7ae5..f15bf5740 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -89,26 +89,27 @@ func TestVMInstantQuery(t *testing.T) { p := NewPrometheusType() pq := s.BuildWithParams(QuerierParams{DataSourceType: &p, EvaluationInterval: 15 * time.Second}) + ts := time.Now() - if _, err := pq.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query, ts); err == nil { t.Fatalf("expected connection error got nil") } - if _, err := pq.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query, ts); err == nil { t.Fatalf("expected invalid response status error got nil") } - if _, err := pq.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query, ts); err == nil { t.Fatalf("expected response body error got nil") } - if _, err := pq.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query, ts); err == nil { t.Fatalf("expected error status got nil") } - if _, err := pq.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query, ts); err == nil { t.Fatalf("expected unknown status got nil") } - if _, err := pq.Query(ctx, query); err == nil { + if _, err := pq.Query(ctx, query, ts); err == nil { t.Fatalf("expected non-vector resultType error got nil") } - m, err := pq.Query(ctx, query) + m, err := pq.Query(ctx, query, ts) if err != nil { t.Fatalf("unexpected %s", err) } @@ -134,7 +135,7 @@ func TestVMInstantQuery(t *testing.T) { g := NewGraphiteType() gq := s.BuildWithParams(QuerierParams{DataSourceType: &g}) - m, err = gq.Query(ctx, queryRender) + m, err = gq.Query(ctx, queryRender, ts) if err != nil { t.Fatalf("unexpected %s", err) } diff --git a/app/vmalert/group.go b/app/vmalert/group.go index cc5bf3a37..41161ec69 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -5,6 +5,8 @@ import ( "fmt" "hash/fnv" "net/url" + "strconv" + "strings" "sync" "time" @@ -13,7 +15,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/metrics" ) @@ -44,6 +48,7 @@ type Group struct { type groupMetrics struct { iterationTotal *utils.Counter iterationDuration *utils.Summary + iterationMissed *utils.Counter } func newGroupMetrics(name, file string) *groupMetrics { @@ -51,6 +56,7 @@ func newGroupMetrics(name, file string) *groupMetrics { labels := fmt.Sprintf(`group=%q, file=%q`, name, file) m.iterationTotal = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_total{%s}`, labels)) m.iterationDuration = utils.GetOrCreateSummary(fmt.Sprintf(`vmalert_iteration_duration_seconds{%s}`, labels)) + m.iterationMissed = utils.GetOrCreateCounter(fmt.Sprintf(`vmalert_iteration_missed_total{%s}`, labels)) return m } @@ -226,6 +232,13 @@ var skipRandSleepOnGroupStart bool func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client) { defer func() { close(g.finishedCh) }() + e := &executor{ + rw: rw, + notifiers: nts, + previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)} + + evalTS := time.Now() + // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics. if !skipRandSleepOnGroupStart { randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64))) @@ -247,7 +260,31 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r } logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) - e := &executor{rw: rw, notifiers: nts} + + eval := func(ts time.Time) { + g.metrics.iterationTotal.Inc() + + start := time.Now() + + if len(g.Rules) < 1 { + g.metrics.iterationDuration.UpdateDuration(start) + g.LastEvaluation = start + return + } + + resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration) + errs := e.execConcurrently(ctx, g.Rules, ts, g.Concurrency, resolveDuration) + for err := range errs { + if err != nil { + logger.Errorf("group %q: %s", g.Name, err) + } + } + g.metrics.iterationDuration.UpdateDuration(start) + g.LastEvaluation = start + } + + eval(evalTS) + t := time.NewTicker(g.Interval) defer t.Stop() for { @@ -274,32 +311,26 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r g.mu.Unlock() logger.Infof("group %q re-started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency) case <-t.C: - g.metrics.iterationTotal.Inc() - iterationStart := time.Now() - if len(g.Rules) > 0 { - errs := e.execConcurrently(ctx, g.Rules, g.Concurrency, getResolveDuration(g.Interval)) - for err := range errs { - if err != nil { - logger.Errorf("group %q: %s", g.Name, err) - } - } - g.LastEvaluation = iterationStart + missed := (time.Since(evalTS) / g.Interval) - 1 + if missed > 0 { + g.metrics.iterationMissed.Inc() } - g.metrics.iterationDuration.UpdateDuration(iterationStart) + evalTS = evalTS.Add((missed + 1) * g.Interval) + + eval(evalTS) } } } // getResolveDuration returns the duration after which firing alert // can be considered as resolved. -func getResolveDuration(groupInterval time.Duration) time.Duration { - delta := *resendDelay +func getResolveDuration(groupInterval, delta, maxDuration time.Duration) time.Duration { if groupInterval > delta { delta = groupInterval } resolveDuration := delta * 4 - if *maxResolveDuration > 0 && resolveDuration > *maxResolveDuration { - resolveDuration = *maxResolveDuration + if maxDuration > 0 && resolveDuration > maxDuration { + resolveDuration = maxDuration } return resolveDuration } @@ -307,14 +338,20 @@ func getResolveDuration(groupInterval time.Duration) time.Duration { type executor struct { notifiers func() []notifier.Notifier rw *remotewrite.Client + + // previouslySentSeriesToRW stores series sent to RW on previous iteration + // map[ruleID]map[ruleLabels][]prompb.Label + // where `ruleID` is ID of the Rule within a Group + // and `ruleLabels` is []prompb.Label marshalled to a string + previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label } -func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurrency int, resolveDuration time.Duration) chan error { +func (e *executor) execConcurrently(ctx context.Context, rules []Rule, ts time.Time, concurrency int, resolveDuration time.Duration) chan error { res := make(chan error, len(rules)) if concurrency == 1 { // fast path for _, rule := range rules { - res <- e.exec(ctx, rule, resolveDuration) + res <- e.exec(ctx, rule, ts, resolveDuration) } close(res) return res @@ -327,7 +364,7 @@ func (e *executor) execConcurrently(ctx context.Context, rules []Rule, concurren sem <- struct{}{} wg.Add(1) go func(r Rule) { - res <- e.exec(ctx, r, resolveDuration) + res <- e.exec(ctx, r, ts, resolveDuration) <-sem wg.Done() }(rule) @@ -348,24 +385,29 @@ var ( remoteWriteTotal = metrics.NewCounter(`vmalert_remotewrite_total`) ) -func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Duration) error { +func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDuration time.Duration) error { execTotal.Inc() - now := time.Now() - tss, err := rule.Exec(ctx) + tss, err := rule.Exec(ctx, ts) if err != nil { execErrors.Inc() return fmt.Errorf("rule %q: failed to execute: %w", rule, err) } - if len(tss) > 0 && e.rw != nil { - for _, ts := range tss { - remoteWriteTotal.Inc() - if err := e.rw.Push(ts); err != nil { - remoteWriteErrors.Inc() - return fmt.Errorf("rule %q: remote write failure: %w", rule, err) + errGr := new(utils.ErrGroup) + if e.rw != nil { + pushToRW := func(tss []prompbmarshal.TimeSeries) { + for _, ts := range tss { + remoteWriteTotal.Inc() + if err := e.rw.Push(ts); err != nil { + remoteWriteErrors.Inc() + errGr.Add(fmt.Errorf("rule %q: remote write failure: %w", rule, err)) + } } } + pushToRW(tss) + staleSeries := e.getStaleSeries(rule, tss, ts) + pushToRW(staleSeries) } ar, ok := rule.(*AlertingRule) @@ -373,12 +415,11 @@ func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Dur return nil } - alerts := ar.alertsToSend(now, resolveDuration, *resendDelay) + alerts := ar.alertsToSend(ts, resolveDuration, *resendDelay) if len(alerts) < 1 { return nil } - errGr := new(utils.ErrGroup) for _, nt := range e.notifiers() { if err := nt.Send(ctx, alerts); err != nil { errGr.Add(fmt.Errorf("rule %q: failed to send alerts to addr %q: %w", rule, nt.Addr(), err)) @@ -386,3 +427,48 @@ func (e *executor) exec(ctx context.Context, rule Rule, resolveDuration time.Dur } return errGr.Err() } + +// getStaledSeries checks whether there are stale series from previously sent ones. +func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries { + ruleLabels := make(map[string][]prompbmarshal.Label) + for _, ts := range tss { + // convert labels to strings so we can compare with previously sent series + key := labelsToString(ts.Labels) + ruleLabels[key] = ts.Labels + } + + rID := rule.ID() + var staleS []prompbmarshal.TimeSeries + // check whether there are series which disappeared and need to be marked as stale + for key, labels := range e.previouslySentSeriesToRW[rID] { + if _, ok := ruleLabels[key]; ok { + continue + } + // previously sent series are missing in current series, so we mark them as stale + ss := newTimeSeriesPB([]float64{decimal.StaleNaN}, []int64{timestamp.Unix()}, labels) + staleS = append(staleS, ss) + } + // set previous series to current + e.previouslySentSeriesToRW[rID] = ruleLabels + + return staleS +} + +func labelsToString(labels []prompbmarshal.Label) string { + var b strings.Builder + b.WriteRune('{') + for i, label := range labels { + if len(label.Name) == 0 { + b.WriteString("__name__") + } else { + b.WriteString(label.Name) + } + b.WriteRune('=') + b.WriteString(strconv.Quote(label.Value)) + if i < len(labels)-1 { + b.WriteRune(',') + } + } + b.WriteRune('}') + return b.String() +} diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index d0906a485..d94838b60 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -3,6 +3,9 @@ package main import ( "context" "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "reflect" "sort" "testing" "time" @@ -239,7 +242,8 @@ func TestGroupStart(t *testing.T) { time.Sleep(20 * evalInterval) gotAlerts = fn.getAlerts() - expectedAlerts = []notifier.Alert{*alert1} + alert2.State = notifier.StateInactive + expectedAlerts = []notifier.Alert{*alert1, *alert2} compareAlerts(t, expectedAlerts, gotAlerts) g.close() @@ -262,21 +266,100 @@ func TestResolveDuration(t *testing.T) { {0, 0, 0, 0}, } - defaultResolveDuration := *maxResolveDuration - defaultResendDelay := *resendDelay - defer func() { - *maxResolveDuration = defaultResolveDuration - *resendDelay = defaultResendDelay - }() - for _, tc := range testCases { t.Run(fmt.Sprintf("%v-%v-%v", tc.groupInterval, tc.expected, tc.maxDuration), func(t *testing.T) { - *maxResolveDuration = tc.maxDuration - *resendDelay = tc.resendDelay - got := getResolveDuration(tc.groupInterval) + got := getResolveDuration(tc.groupInterval, tc.resendDelay, tc.maxDuration) if got != tc.expected { t.Errorf("expected to have %v; got %v", tc.expected, got) } }) } } + +func TestGetStaleSeries(t *testing.T) { + ts := time.Now() + e := &executor{ + previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + } + f := func(rule Rule, labels, expLabels [][]prompbmarshal.Label) { + t.Helper() + var tss []prompbmarshal.TimeSeries + for _, l := range labels { + tss = append(tss, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, l)) + } + staleS := e.getStaleSeries(rule, tss, ts) + if staleS == nil && expLabels == nil { + return + } + if len(staleS) != len(expLabels) { + t.Fatalf("expected to get %d stale series, got %d", + len(expLabels), len(staleS)) + } + for i, exp := range expLabels { + got := staleS[i] + if !reflect.DeepEqual(exp, got.Labels) { + t.Fatalf("expected to get labels: \n%v;\ngot instead: \n%v", + exp, got.Labels) + } + if len(got.Samples) != 1 { + t.Fatalf("expected to have 1 sample; got %d", len(got.Samples)) + } + if !decimal.IsStaleNaN(got.Samples[0].Value) { + t.Fatalf("expected sample value to be %v; got %v", decimal.StaleNaN, got.Samples[0].Value) + } + } + } + + // warn: keep in mind, that executor holds the state, so sequence of f calls matters + + // single series + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}, + nil) + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}, + nil) + f(&AlertingRule{RuleID: 1}, + nil, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}) + f(&AlertingRule{RuleID: 1}, + nil, + nil) + + // multiple series + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{ + toPromLabels(t, "__name__", "job:foo", "job", "foo"), + toPromLabels(t, "__name__", "job:foo", "job", "bar"), + }, + nil) + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}) + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, + nil) + f(&AlertingRule{RuleID: 1}, + nil, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}) + + // multiple rules and series + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{ + toPromLabels(t, "__name__", "job:foo", "job", "foo"), + toPromLabels(t, "__name__", "job:foo", "job", "bar"), + }, + nil) + f(&AlertingRule{RuleID: 2}, + [][]prompbmarshal.Label{ + toPromLabels(t, "__name__", "job:foo", "job", "foo"), + toPromLabels(t, "__name__", "job:foo", "job", "bar"), + }, + nil) + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}) + f(&AlertingRule{RuleID: 1}, + [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, + nil) +} diff --git a/app/vmalert/helpers_test.go b/app/vmalert/helpers_test.go index ebcdca3c3..4524ba449 100644 --- a/app/vmalert/helpers_test.go +++ b/app/vmalert/helpers_test.go @@ -44,10 +44,10 @@ func (fq *fakeQuerier) BuildWithParams(_ datasource.QuerierParams) datasource.Qu } func (fq *fakeQuerier) QueryRange(ctx context.Context, q string, _, _ time.Time) ([]datasource.Metric, error) { - return fq.Query(ctx, q) + return fq.Query(ctx, q, time.Now()) } -func (fq *fakeQuerier) Query(_ context.Context, _ string) ([]datasource.Metric, error) { +func (fq *fakeQuerier) Query(_ context.Context, _ string, _ time.Time) ([]datasource.Metric, error) { fq.Lock() defer fq.Unlock() if fq.err != nil { @@ -116,6 +116,21 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { return m } +func toPromLabels(t *testing.T, labels ...string) []prompbmarshal.Label { + t.Helper() + if len(labels) == 0 || len(labels)%2 != 0 { + t.Fatalf("expected to get even number of labels") + } + var ls []prompbmarshal.Label + for i := 0; i < len(labels); i += 2 { + ls = append(ls, prompbmarshal.Label{ + Name: labels[i], + Value: labels[i+1], + }) + } + return ls +} + func compareGroups(t *testing.T, a, b *Group) { t.Helper() if a.Name != b.Name { diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index 121babab1..44b80eb4a 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -26,10 +26,14 @@ type Alert struct { State AlertState // Expr contains expression that was executed to generate the Alert Expr string - // Start defines the moment of time when Alert has triggered + // ActiveAt defines the moment of time when Alert has become active + ActiveAt time.Time + // Start defines the moment of time when Alert has become firing Start time.Time // End defines the moment of time when Alert supposed to expire End time.Time + // ResolvedAt defines the moment when Alert was switched from Firing to Inactive + ResolvedAt time.Time // LastSent defines the moment when Alert was sent last time LastSent time.Time // Value stores the value returned from evaluating expression from Expr field diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 2ae840290..9a16cd408 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -124,14 +124,13 @@ func (rr *RecordingRule) ExecRange(ctx context.Context, start, end time.Time) ([ } // Exec executes RecordingRule expression via the given Querier. -func (rr *RecordingRule) Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) { - start := time.Now() - qMetrics, err := rr.q.Query(ctx, rr.Expr) +func (rr *RecordingRule) Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) { + qMetrics, err := rr.q.Query(ctx, rr.Expr, ts) rr.mu.Lock() defer rr.mu.Unlock() - rr.lastExecTime = start - rr.lastExecDuration = time.Since(start) + rr.lastExecTime = ts + rr.lastExecDuration = time.Since(ts) rr.lastExecError = err rr.lastExecSamples = len(qMetrics) if err != nil { diff --git a/app/vmalert/recording_test.go b/app/vmalert/recording_test.go index ab563a291..cbb2b75a7 100644 --- a/app/vmalert/recording_test.go +++ b/app/vmalert/recording_test.go @@ -77,7 +77,7 @@ func TestRecoridngRule_Exec(t *testing.T) { fq := &fakeQuerier{} fq.add(tc.metrics...) tc.rule.q = fq - tss, err := tc.rule.Exec(context.TODO()) + tss, err := tc.rule.Exec(context.TODO(), time.Now()) if err != nil { t.Fatalf("unexpected Exec err: %s", err) } @@ -178,7 +178,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) { expErr := "connection reset by peer" fq.setErr(errors.New(expErr)) rr.q = fq - _, err := rr.Exec(context.TODO()) + _, err := rr.Exec(context.TODO(), time.Now()) if err == nil { t.Fatalf("expected to get err; got nil") } @@ -193,7 +193,7 @@ func TestRecoridngRule_ExecNegative(t *testing.T) { fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo")) fq.add(metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "bar")) - _, err = rr.Exec(context.TODO()) + _, err = rr.Exec(context.TODO(), time.Now()) if err == nil { t.Fatalf("expected to get err; got nil") } diff --git a/app/vmalert/remotewrite/remotewrite.go b/app/vmalert/remotewrite/remotewrite.go index 035f2af77..35fa56ebd 100644 --- a/app/vmalert/remotewrite/remotewrite.go +++ b/app/vmalert/remotewrite/remotewrite.go @@ -225,7 +225,7 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) { droppedRows.Add(len(wr.Timeseries)) droppedBytes.Add(len(b)) - logger.Errorf("all %d attempts to send request failed - dropping %d timeseries", + logger.Errorf("all %d attempts to send request failed - dropping %d time series", attempts, len(wr.Timeseries)) } diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index 5157d6b09..04ce44933 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -3,8 +3,9 @@ package main import ( "context" "errors" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) // Rule represents alerting or recording rule @@ -14,8 +15,8 @@ type Rule interface { // ID returns unique ID that may be used for // identifying this Rule among others. ID() uint64 - // Exec executes the rule with given context - Exec(ctx context.Context) ([]prompbmarshal.TimeSeries, error) + // Exec executes the rule with given context at the given timestamp + Exec(ctx context.Context, ts time.Time) ([]prompbmarshal.TimeSeries, error) // ExecRange executes the rule on the given time range ExecRange(ctx context.Context, start, end time.Time) ([]prompbmarshal.TimeSeries, error) // UpdateWith performs modification of current Rule diff --git a/app/vmalert/utils.go b/app/vmalert/utils.go index 11d153fc9..7b15e1879 100644 --- a/app/vmalert/utils.go +++ b/app/vmalert/utils.go @@ -30,3 +30,20 @@ func newTimeSeries(values []float64, timestamps []int64, labels map[string]strin } return ts } + +// newTimeSeriesPB creates prompbmarshal.TimeSeries with given +// values, timestamps and labels. +// It expects that labels are already sorted. +func newTimeSeriesPB(values []float64, timestamps []int64, labels []prompbmarshal.Label) prompbmarshal.TimeSeries { + ts := prompbmarshal.TimeSeries{ + Samples: make([]prompbmarshal.Sample, len(values)), + } + for i := range values { + ts.Samples[i] = prompbmarshal.Sample{ + Value: values[i], + Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6, + } + } + ts.Labels = labels + return ts +} diff --git a/app/vmalert/web_test.go b/app/vmalert/web_test.go index 0181988aa..ef639b11b 100644 --- a/app/vmalert/web_test.go +++ b/app/vmalert/web_test.go @@ -14,7 +14,7 @@ func TestHandler(t *testing.T) { ar := &AlertingRule{ Name: "alert", alerts: map[uint64]*notifier.Alert{ - 0: {}, + 0: {State: notifier.StateFiring}, }, } g := &Group{ From 1354e6d71249b62293472f75a0340957ca35ceb2 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Wed, 30 Mar 2022 12:37:27 +0200 Subject: [PATCH 4/8] vmalert: protect executor's field from concurrent access (#2387) Executor recently gain field for storing previously sent series. Since the same executor object can be used in multiple goroutines, the access to this field should be serialized. Signed-off-by: hagen1778 --- app/vmalert/group.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/vmalert/group.go b/app/vmalert/group.go index 41161ec69..e218e4fc7 100644 --- a/app/vmalert/group.go +++ b/app/vmalert/group.go @@ -339,6 +339,7 @@ type executor struct { notifiers func() []notifier.Notifier rw *remotewrite.Client + previouslySentSeriesToRWMu sync.Mutex // previouslySentSeriesToRW stores series sent to RW on previous iteration // map[ruleID]map[ruleLabels][]prompb.Label // where `ruleID` is ID of the Rule within a Group @@ -430,7 +431,7 @@ func (e *executor) exec(ctx context.Context, rule Rule, ts time.Time, resolveDur // getStaledSeries checks whether there are stale series from previously sent ones. func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries { - ruleLabels := make(map[string][]prompbmarshal.Label) + ruleLabels := make(map[string][]prompbmarshal.Label, len(tss)) for _, ts := range tss { // convert labels to strings so we can compare with previously sent series key := labelsToString(ts.Labels) @@ -440,6 +441,7 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim rID := rule.ID() var staleS []prompbmarshal.TimeSeries // check whether there are series which disappeared and need to be marked as stale + e.previouslySentSeriesToRWMu.Lock() for key, labels := range e.previouslySentSeriesToRW[rID] { if _, ok := ruleLabels[key]; ok { continue @@ -450,6 +452,7 @@ func (e *executor) getStaleSeries(rule Rule, tss []prompbmarshal.TimeSeries, tim } // set previous series to current e.previouslySentSeriesToRW[rID] = ruleLabels + e.previouslySentSeriesToRWMu.Unlock() return staleS } From a9b6cf53a23273f5b9136ba6d7271066330110c2 Mon Sep 17 00:00:00 2001 From: Yurii Kravets <30324382+YuriKravetc@users.noreply.github.com> Date: Fri, 1 Apr 2022 11:23:18 +0300 Subject: [PATCH 5/8] url-examples (#2389) --- docs/url-examples.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/url-examples.md b/docs/url-examples.md index 20d770afb..95e4b9def 100644 --- a/docs/url-examples.md +++ b/docs/url-examples.md @@ -100,7 +100,7 @@ Cluster:
```bash -curl --data-binary "@import.txt" -X POST 'http://:8480/insert/prometheus/api/v1/import' +curl --data-binary "@import.txt" -X POST 'http://:8480/insert/0/prometheus/api/v1/import' ```
From 1c38ff6f484fdac20b0fea460d23ac6ee347ee1f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 1 Apr 2022 12:01:34 +0300 Subject: [PATCH 6/8] docs/CHANGELOG.md: document 0989649ad041d5984f83fac6e2c76dd3b92af19a --- docs/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9c2f9704c..3098f3dbf 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -16,6 +16,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add pre-defined dasbhoards for per-job CPU usage, memory usage and disk IO usage. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2243) for details. +* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): improve compatibility with [Prometheus Alert Generator specification](https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2340). * FEATURE: add the following command-line flags, which can be used for fine-grained limiting of CPU and memory usage during various API calls: * `-search.maxFederateSeries` for limiting the number of time series, which can be returned from [/federate](https://docs.victoriametrics.com/#federation). From f977ca8eaf958b4be6980ef678ffc0ba1eedeb3a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 1 Apr 2022 12:24:18 +0300 Subject: [PATCH 7/8] docs/CHANGELOG.md: document a57e3807537914396ee3eb378648a464fa9e1b97 --- docs/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3098f3dbf..68d6aacf1 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -17,6 +17,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add pre-defined dasbhoards for per-job CPU usage, memory usage and disk IO usage. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2243) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): improve compatibility with [Prometheus Alert Generator specification](https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2340). +* FEATURE: [vmgateway](https://docs.victoriametrics.com/vmgateway.html): Allow to read `-ratelimit.config` file from URL. Also add `-atelimit.configCheckInterval` command-line option. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2241). * FEATURE: add the following command-line flags, which can be used for fine-grained limiting of CPU and memory usage during various API calls: * `-search.maxFederateSeries` for limiting the number of time series, which can be returned from [/federate](https://docs.victoriametrics.com/#federation). From e2b1097545b7d33df7dd5e2aad093f9dbc02de24 Mon Sep 17 00:00:00 2001 From: Dima Lazerka <58356625+dima-vm@users.noreply.github.com> Date: Tue, 29 Mar 2022 18:37:23 +0300 Subject: [PATCH 8/8] Fix typo "vmanomapy" --- docs/vmanomaly.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/vmanomaly.md b/docs/vmanomaly.md index 3d88f1374..dafc37ab0 100644 --- a/docs/vmanomaly.md +++ b/docs/vmanomaly.md @@ -111,7 +111,7 @@ optionally preserving labels). ## Usage -The vmanomapy accepts only one parameter -- config file path: +The vmanomaly accepts only one parameter -- config file path: ```sh python3 vmanomaly.py config_zscore.yaml