From 4fd2b6cd1649ff8ed8929e4bd8a5cade41bad948 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 9 Nov 2020 22:27:32 +0000 Subject: [PATCH] vmalert: explicitly set extra labels to alert entities (#886) The previous implementation treated extra labels (global and rule labels) as separate label set to returned time series labels. Hence, time series always contained only original labels and alert ID was generated from sorted labels key-values. Extra labels didn't affect the generated ID and were applied on the following actions: - templating for Summary and Annotations; - persisting state via remote write; - restoring state via remote read. Such behaviour caused difficulties on restore procedure because extra labels had to be dropped before checking the alert ID, but that not always worked. Consider the case when expression returns the following time series `up{job="foo"}` and rule has extra label `job=bar`. This would mean that restored alert ID will be always different to the real time series because of collision. To solve the situation extra labels are now always applied beforehand and `vmalert` doesn't store original labels anymore. However, this could result into a new error situation. Consider the case when expression returns two time series `up{job="foo"}` and `up{job="baz"}`, while rule has extra label `job=bar`. In such case, applying extra labels will result into two identical time series and `vmalert` will return error: `result contains metrics with the same labelset after applying rule labels` https://github.com/VictoriaMetrics/VictoriaMetrics/issues/870 --- app/vmalert/alerting.go | 35 +++++++------------ app/vmalert/alerting_test.go | 50 ++++++++++++++++++++-------- app/vmalert/datasource/datasource.go | 17 ++++++++++ app/vmalert/datasource/vm.go | 2 +- app/vmalert/group_test.go | 10 ++++++ app/vmalert/recording.go | 3 -- app/vmalert/rule.go | 3 ++ 7 files changed, 79 insertions(+), 41 deletions(-) diff --git a/app/vmalert/alerting.go b/app/vmalert/alerting.go index 51eee637f..3b607eb4c 100644 --- a/app/vmalert/alerting.go +++ b/app/vmalert/alerting.go @@ -140,7 +140,17 @@ func (ar *AlertingRule) Exec(ctx context.Context, q datasource.Querier, series b updated := make(map[uint64]struct{}) // update list of active alerts for _, m := range qMetrics { + for k, v := range ar.Labels { + // apply extra labels + m.SetLabel(k, v) + } + h := hash(m) + if _, ok := updated[h]; ok { + // duplicate may be caused by extra labels + // conflicting with the metric labels + return nil, fmt.Errorf("labels %v: %w", m.Labels, errDuplicate) + } updated[h] = struct{}{} if a, ok := ar.alerts[h]; ok { if a.Value != m.Value { @@ -258,25 +268,11 @@ func (ar *AlertingRule) newAlert(m datasource.Metric, start time.Time) (*notifie } func (ar *AlertingRule) template(a *notifier.Alert) error { - // 1. template rule labels with data labels - rLabels, err := a.ExecTemplate(ar.Labels) - if err != nil { - return err - } - - // 2. merge data labels and rule labels - // metric labels may be overridden by - // rule labels - for k, v := range rLabels { - a.Labels[k] = v - } - - // 3. template merged labels + var err error a.Labels, err = a.ExecTemplate(a.Labels) if err != nil { return err } - a.Annotations, err = a.ExecTemplate(ar.Annotations) return err } @@ -419,14 +415,7 @@ func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookb // drop all extra labels, so hash key will // be identical to time series received in Exec for _, l := range labels { - if l.Name == alertNameLabel { - continue - } - if l.Name == alertGroupNameLabel { - continue - } - // drop all overridden labels - if _, ok := ar.Labels[l.Name]; ok { + if l.Name == alertNameLabel || l.Name == alertGroupNameLabel { continue } m.Labels = append(m.Labels, l) diff --git a/app/vmalert/alerting_test.go b/app/vmalert/alerting_test.go index 55aa2b48d..c646c1729 100644 --- a/app/vmalert/alerting_test.go +++ b/app/vmalert/alerting_test.go @@ -2,6 +2,8 @@ package main import ( "context" + "errors" + "strings" "testing" "time" @@ -218,19 +220,6 @@ func TestAlertingRule_Exec(t *testing.T) { hash(metricWithLabels(t, "name", "foo2")): {State: notifier.StateFiring}, }, }, - { - newTestAlertingRule("duplicate", 0), - [][]datasource.Metric{ - { - // metrics with the same labelset should result in one alert - metricWithLabels(t, "name", "foo", "type", "bar"), - metricWithLabels(t, "type", "bar", "name", "foo"), - }, - }, - map[uint64]*notifier.Alert{ - hash(metricWithLabels(t, "name", "foo", "type", "bar")): {State: notifier.StateFiring}, - }, - }, { newTestAlertingRule("for-pending", time.Minute), [][]datasource.Metric{ @@ -376,7 +365,7 @@ func TestAlertingRule_Restore(t *testing.T) { alertNameLabel, "", "foo", "bar", "namespace", "baz", - // following pair supposed to be dropped + // extra labels set by rule "source", "vm", ), }, @@ -384,6 +373,7 @@ func TestAlertingRule_Restore(t *testing.T) { hash(metricWithLabels(t, "foo", "bar", "namespace", "baz", + "source", "vm", )): {State: notifier.StatePending, Start: time.Now().Truncate(time.Hour)}, }, @@ -442,6 +432,38 @@ func TestAlertingRule_Restore(t *testing.T) { } } +func TestAlertingRule_Exec_Negative(t *testing.T) { + fq := &fakeQuerier{} + ar := newTestAlertingRule("test", 0) + ar.Labels = map[string]string{"job": "test"} + + // successful attempt + fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) + _, err := ar.Exec(context.TODO(), fq, false) + if err != nil { + t.Fatal(err) + } + + // label `job` will collide with rule extra label and will make both time series equal + fq.add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "baz")) + _, err = ar.Exec(context.TODO(), fq, false) + if !errors.Is(err, errDuplicate) { + t.Fatalf("expected to have %s error; got %s", errDuplicate, err) + } + + fq.reset() + + expErr := "connection reset by peer" + fq.setErr(errors.New(expErr)) + _, err = ar.Exec(context.TODO(), fq, false) + if err == nil { + t.Fatalf("expected to get err; got nil") + } + if !strings.Contains(err.Error(), expErr) { + t.Fatalf("expected to get err %q; got %q insterad", expErr, err) + } +} + func newTestRuleWithLabels(name string, labels ...string) *AlertingRule { r := newTestAlertingRule(name, 0) r.Labels = make(map[string]string) diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go index 3525f0ed5..de0265d24 100644 --- a/app/vmalert/datasource/datasource.go +++ b/app/vmalert/datasource/datasource.go @@ -17,6 +17,23 @@ type Metric struct { Value float64 } +// SetLabel adds or updates existing one label +// by the given key and label +func (m *Metric) SetLabel(key, value string) { + for i, l := range m.Labels { + if l.Name == key { + m.Labels[i].Value = value + return + } + } + m.AddLabel(key, value) +} + +// AddLabel appends the given label to the label set +func (m *Metric) AddLabel(key, value string) { + m.Labels = append(m.Labels, Label{Name: key, Value: value}) +} + // Label represents metric's label type Label struct { Name string diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index 71fe7f71d..bf57a04be 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -37,7 +37,7 @@ func (r response) metrics() ([]Metric, error) { } m.Labels = nil for k, v := range r.Data.Result[i].Labels { - m.Labels = append(m.Labels, Label{Name: k, Value: v}) + m.AddLabel(k, v) } m.Timestamp = int64(res.TV[0].(float64)) m.Value = f diff --git a/app/vmalert/group_test.go b/app/vmalert/group_test.go index 4a3246a1c..d8b98dcdd 100644 --- a/app/vmalert/group_test.go +++ b/app/vmalert/group_test.go @@ -172,6 +172,11 @@ func TestGroupStart(t *testing.T) { t.Fatalf("faield to create alert: %s", err) } alert1.State = notifier.StateFiring + // add external label + alert1.Labels["cluster"] = "east-1" + // add rule labels - see config/testdata/rules1-good.rules + alert1.Labels["label"] = "bar" + alert1.Labels["host"] = inst1 alert1.ID = hash(m1) alert2, err := r.newAlert(m2, time.Now()) @@ -179,6 +184,11 @@ func TestGroupStart(t *testing.T) { t.Fatalf("faield to create alert: %s", err) } alert2.State = notifier.StateFiring + // add external label + alert2.Labels["cluster"] = "east-1" + // add rule labels - see config/testdata/rules1-good.rules + alert2.Labels["label"] = "bar" + alert2.Labels["host"] = inst2 alert2.ID = hash(m2) finished := make(chan struct{}) diff --git a/app/vmalert/recording.go b/app/vmalert/recording.go index 9dd7b4de2..89f0c57e1 100644 --- a/app/vmalert/recording.go +++ b/app/vmalert/recording.go @@ -2,7 +2,6 @@ package main import ( "context" - "errors" "fmt" "hash/fnv" "sort" @@ -79,8 +78,6 @@ func (rr *RecordingRule) Close() { metrics.UnregisterMetric(rr.metrics.errors.name) } -var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels") - // Exec executes RecordingRule expression via the given Querier. func (rr *RecordingRule) Exec(ctx context.Context, q datasource.Querier, series bool) ([]prompbmarshal.TimeSeries, error) { if !series { diff --git a/app/vmalert/rule.go b/app/vmalert/rule.go index 1410d2bd6..293e929bf 100644 --- a/app/vmalert/rule.go +++ b/app/vmalert/rule.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -25,3 +26,5 @@ type Rule interface { // such as metrics unregister Close() } + +var errDuplicate = errors.New("result contains metrics with the same labelset after applying rule labels")