From b09272ccac9b28a81c59540441851f6dce9659f3 Mon Sep 17 00:00:00 2001 From: Hui Wang Date: Thu, 14 Nov 2024 19:23:39 +0800 Subject: [PATCH] app/vmalert: improve performances when rules produce large volumes of results 1. Avoid storing the last evaluation results outside of rules, check for stale time series as soon as possible; 2. remove duplicated template `Clone()`. This pull request is primarily reducing memory usage when rules produce large volumes of results, as seen in https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6894. The CPU time spent on garbage collection remains high and may be addressed in a separate PR. --- app/vmalert-tool/unittest/recording.go | 5 +- app/vmalert/datasource/client_graphite.go | 6 +- app/vmalert/datasource/client_prom.go | 29 ++- app/vmalert/datasource/client_test.go | 15 +- app/vmalert/datasource/client_vlogs.go | 16 +- app/vmalert/datasource/datasource.go | 29 +-- app/vmalert/datasource/vm_prom_api_test.go | 10 +- app/vmalert/notifier/alert.go | 10 +- app/vmalert/notifier/alert_test.go | 4 +- app/vmalert/rule/alerting.go | 92 ++++++- app/vmalert/rule/alerting_test.go | 288 +++++++++++++++----- app/vmalert/rule/group.go | 108 +------- app/vmalert/rule/group_test.go | 153 +---------- app/vmalert/rule/group_timing_test.go | 36 --- app/vmalert/rule/recording.go | 74 ++++-- app/vmalert/rule/recording_test.go | 289 ++++++++++++++++----- app/vmalert/rule/test_helpers.go | 20 +- app/vmalert/rule/utils.go | 34 +-- app/vmalert/templates/template.go | 2 + 19 files changed, 650 insertions(+), 570 deletions(-) delete mode 100644 app/vmalert/rule/group_timing_test.go diff --git a/app/vmalert-tool/unittest/recording.go b/app/vmalert-tool/unittest/recording.go index d54d8c1e5..05c95c9e7 100644 --- a/app/vmalert-tool/unittest/recording.go +++ b/app/vmalert-tool/unittest/recording.go @@ -9,6 +9,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/metricsql" ) @@ -48,7 +49,7 @@ Outer: } var expSamples []parsedSample for _, s := range mt.ExpSamples { - expLb := datasource.Labels{} + expLb := []prompbmarshal.Label{} if s.Labels != "" { metricsqlExpr, err := metricsql.Parse(s.Labels) if err != nil { @@ -64,7 +65,7 @@ Outer: } if len(metricsqlMetricExpr.LabelFilterss) > 0 { for _, l := range metricsqlMetricExpr.LabelFilterss[0] { - expLb = append(expLb, datasource.Label{ + expLb = append(expLb, prompbmarshal.Label{ Name: l.Label, Value: l.Value, }) diff --git a/app/vmalert/datasource/client_graphite.go b/app/vmalert/datasource/client_graphite.go index 2200f6ea5..768f92a33 100644 --- a/app/vmalert/datasource/client_graphite.go +++ b/app/vmalert/datasource/client_graphite.go @@ -46,8 +46,8 @@ const ( graphitePrefix = "/graphite" ) -func (s *Client) setGraphiteReqParams(r *http.Request, query string) { - if s.appendTypePrefix { +func (c *Client) setGraphiteReqParams(r *http.Request, query string) { + if c.appendTypePrefix { r.URL.Path += graphitePrefix } r.URL.Path += graphitePath @@ -58,7 +58,7 @@ func (s *Client) setGraphiteReqParams(r *http.Request, query string) { q.Set("target", query) q.Set("until", "now") - for k, vs := range s.extraParams { + for k, vs := range c.extraParams { if q.Has(k) { // extraParams are prior to params in URL q.Del(k) } diff --git a/app/vmalert/datasource/client_prom.go b/app/vmalert/datasource/client_prom.go index 08dd6d72e..df9669f6d 100644 --- a/app/vmalert/datasource/client_prom.go +++ b/app/vmalert/datasource/client_prom.go @@ -9,6 +9,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/valyala/fastjson" ) @@ -82,14 +83,14 @@ func (pi *promInstant) Unmarshal(b []byte) error { labels := metric.GetObject() r := &pi.ms[i] - r.Labels = make([]Label, 0, labels.Len()) + r.Labels = make([]prompbmarshal.Label, 0, labels.Len()) labels.Visit(func(key []byte, v *fastjson.Value) { lv, errLocal := v.StringBytes() if errLocal != nil { err = fmt.Errorf("error when parsing label value %q: %s", v, errLocal) return } - r.Labels = append(r.Labels, Label{ + r.Labels = append(r.Labels, prompbmarshal.Label{ Name: string(key), Value: string(lv), }) @@ -219,8 +220,8 @@ func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result return res, nil } -func (s *Client) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) { - if s.appendTypePrefix { +func (c *Client) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) { + if c.appendTypePrefix { r.URL.Path += "/prometheus" } if !*disablePathAppend { @@ -228,22 +229,22 @@ func (s *Client) setPrometheusInstantReqParams(r *http.Request, query string, ti } q := r.URL.Query() q.Set("time", timestamp.Format(time.RFC3339)) - if !*disableStepParam && s.evaluationInterval > 0 { // set step as evaluationInterval by default + if !*disableStepParam && c.evaluationInterval > 0 { // set step as evaluationInterval by default // always convert to seconds to keep compatibility with older // Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943 - q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds()))) + q.Set("step", fmt.Sprintf("%ds", int(c.evaluationInterval.Seconds()))) } - if !*disableStepParam && s.queryStep > 0 { // override step with user-specified value + if !*disableStepParam && c.queryStep > 0 { // override step with user-specified value // always convert to seconds to keep compatibility with older // Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943 - q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds()))) + q.Set("step", fmt.Sprintf("%ds", int(c.queryStep.Seconds()))) } r.URL.RawQuery = q.Encode() - s.setReqParams(r, query) + c.setReqParams(r, query) } -func (s *Client) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) { - if s.appendTypePrefix { +func (c *Client) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) { + if c.appendTypePrefix { r.URL.Path += "/prometheus" } if !*disablePathAppend { @@ -252,11 +253,11 @@ func (s *Client) setPrometheusRangeReqParams(r *http.Request, query string, star q := r.URL.Query() q.Add("start", start.Format(time.RFC3339)) q.Add("end", end.Format(time.RFC3339)) - if s.evaluationInterval > 0 { // set step as evaluationInterval by default + if c.evaluationInterval > 0 { // set step as evaluationInterval by default // always convert to seconds to keep compatibility with older // Prometheus versions. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1943 - q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds()))) + q.Set("step", fmt.Sprintf("%ds", int(c.evaluationInterval.Seconds()))) } r.URL.RawQuery = q.Encode() - s.setReqParams(r, query) + c.setReqParams(r, query) } diff --git a/app/vmalert/datasource/client_test.go b/app/vmalert/datasource/client_test.go index 35da2279a..d69db2787 100644 --- a/app/vmalert/datasource/client_test.go +++ b/app/vmalert/datasource/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) var ( @@ -144,12 +145,12 @@ func TestVMInstantQuery(t *testing.T) { } expected := []Metric{ { - Labels: []Label{{Value: "vm_rows", Name: "__name__"}, {Value: "bar", Name: "foo"}}, + Labels: []prompbmarshal.Label{{Value: "vm_rows", Name: "__name__"}, {Value: "bar", Name: "foo"}}, Timestamps: []int64{1583786142}, Values: []float64{13763}, }, { - Labels: []Label{{Value: "vm_requests", Name: "__name__"}, {Value: "baz", Name: "foo"}}, + Labels: []prompbmarshal.Label{{Value: "vm_requests", Name: "__name__"}, {Value: "baz", Name: "foo"}}, Timestamps: []int64{1583786140}, Values: []float64{2000}, }, @@ -214,7 +215,7 @@ func TestVMInstantQuery(t *testing.T) { } exp := []Metric{ { - Labels: []Label{{Value: "constantLine(10)", Name: "name"}}, + Labels: []prompbmarshal.Label{{Value: "constantLine(10)", Name: "name"}}, Timestamps: []int64{1611758403}, Values: []float64{10}, }, @@ -236,12 +237,12 @@ func TestVMInstantQuery(t *testing.T) { } expected = []Metric{ { - Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "bar", Name: "foo"}}, + Labels: []prompbmarshal.Label{{Value: "total", Name: "stats_result"}, {Value: "bar", Name: "foo"}}, Timestamps: []int64{1583786142}, Values: []float64{13763}, }, { - Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "baz", Name: "foo"}}, + Labels: []prompbmarshal.Label{{Value: "total", Name: "stats_result"}, {Value: "baz", Name: "foo"}}, Timestamps: []int64{1583786140}, Values: []float64{2000}, }, @@ -444,7 +445,7 @@ func TestVMRangeQuery(t *testing.T) { t.Fatalf("expected 1 metric got %d in %+v", len(m), m) } expected := Metric{ - Labels: []Label{{Value: "vm_rows", Name: "__name__"}}, + Labels: []prompbmarshal.Label{{Value: "vm_rows", Name: "__name__"}}, Timestamps: []int64{1583786142}, Values: []float64{13763}, } @@ -475,7 +476,7 @@ func TestVMRangeQuery(t *testing.T) { t.Fatalf("expected 1 metric got %d in %+v", len(m), m) } expected = Metric{ - Labels: []Label{{Value: "total", Name: "stats_result"}}, + Labels: []prompbmarshal.Label{{Value: "total", Name: "stats_result"}}, Timestamps: []int64{1583786142}, Values: []float64{10}, } diff --git a/app/vmalert/datasource/client_vlogs.go b/app/vmalert/datasource/client_vlogs.go index cb6e8892c..ff6340b6f 100644 --- a/app/vmalert/datasource/client_vlogs.go +++ b/app/vmalert/datasource/client_vlogs.go @@ -6,7 +6,7 @@ import ( "time" ) -func (s *Client) setVLogsInstantReqParams(r *http.Request, query string, timestamp time.Time) { +func (c *Client) setVLogsInstantReqParams(r *http.Request, query string, timestamp time.Time) { // there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix. if !*disablePathAppend { r.URL.Path += "/select/logsql/stats_query" @@ -16,15 +16,15 @@ func (s *Client) setVLogsInstantReqParams(r *http.Request, query string, timesta q.Set("time", timestamp.Format(time.RFC3339)) // set the `start` and `end` params if applyIntervalAsTimeFilter is enabled(time filter is missing in the rule expr), // so the query will be executed in time range [timestamp - evaluationInterval, timestamp]. - if s.applyIntervalAsTimeFilter && s.evaluationInterval > 0 { - q.Set("start", timestamp.Add(-s.evaluationInterval).Format(time.RFC3339)) + if c.applyIntervalAsTimeFilter && c.evaluationInterval > 0 { + q.Set("start", timestamp.Add(-c.evaluationInterval).Format(time.RFC3339)) q.Set("end", timestamp.Format(time.RFC3339)) } r.URL.RawQuery = q.Encode() - s.setReqParams(r, query) + c.setReqParams(r, query) } -func (s *Client) setVLogsRangeReqParams(r *http.Request, query string, start, end time.Time) { +func (c *Client) setVLogsRangeReqParams(r *http.Request, query string, start, end time.Time) { // there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix. if !*disablePathAppend { r.URL.Path += "/select/logsql/stats_query_range" @@ -33,11 +33,11 @@ func (s *Client) setVLogsRangeReqParams(r *http.Request, query string, start, en q.Add("start", start.Format(time.RFC3339)) q.Add("end", end.Format(time.RFC3339)) // set step as evaluationInterval by default - if s.evaluationInterval > 0 { - q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds()))) + if c.evaluationInterval > 0 { + q.Set("step", fmt.Sprintf("%ds", int(c.evaluationInterval.Seconds()))) } r.URL.RawQuery = q.Encode() - s.setReqParams(r, query) + c.setReqParams(r, query) } func parseVLogsResponse(req *http.Request, resp *http.Response) (res Result, err error) { diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go index 97a0f8d49..fb847f8d7 100644 --- a/app/vmalert/datasource/datasource.go +++ b/app/vmalert/datasource/datasource.go @@ -8,6 +8,8 @@ import ( "sort" "strconv" "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) // Querier interface wraps Query and QueryRange methods @@ -55,7 +57,7 @@ type QuerierParams struct { // Metric is the basic entity which should be return by datasource type Metric struct { - Labels []Label + Labels []prompbmarshal.Label Timestamps []int64 Values []float64 } @@ -72,22 +74,9 @@ func (m *Metric) SetLabel(key, value string) { m.AddLabel(key, value) } -// SetLabels sets the given map as Metric labels -func (m *Metric) SetLabels(ls map[string]string) { - var i int - m.Labels = make([]Label, len(ls)) - for k, v := range ls { - m.Labels[i] = Label{ - Name: k, - Value: v, - } - i++ - } -} - // AddLabel appends the given label to the label set func (m *Metric) AddLabel(key, value string) { - m.Labels = append(m.Labels, Label{Name: key, Value: value}) + m.Labels = append(m.Labels, prompbmarshal.Label{Name: key, Value: value}) } // DelLabel deletes the given label from the label set @@ -110,14 +99,8 @@ func (m *Metric) Label(key string) string { return "" } -// Label represents metric's label -type Label struct { - Name string - Value string -} - // Labels is collection of Label -type Labels []Label +type Labels []prompbmarshal.Label func (ls Labels) Len() int { return len(ls) } func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] } @@ -172,7 +155,7 @@ func LabelCompare(a, b Labels) int { // ConvertToLabels convert map to Labels func ConvertToLabels(m map[string]string) (labelset Labels) { for k, v := range m { - labelset = append(labelset, Label{ + labelset = append(labelset, prompbmarshal.Label{ Name: k, Value: v, }) diff --git a/app/vmalert/datasource/vm_prom_api_test.go b/app/vmalert/datasource/vm_prom_api_test.go index 8d4d3c1c8..1d538e224 100644 --- a/app/vmalert/datasource/vm_prom_api_test.go +++ b/app/vmalert/datasource/vm_prom_api_test.go @@ -3,6 +3,8 @@ package datasource import ( "reflect" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) func TestPromInstant_UnmarshalPositive(t *testing.T) { @@ -21,7 +23,7 @@ func TestPromInstant_UnmarshalPositive(t *testing.T) { f(`[{"metric":{"__name__":"up"},"value":[1583780000,"42"]}]`, []Metric{ { - Labels: []Label{{Name: "__name__", Value: "up"}}, + Labels: []prompbmarshal.Label{{Name: "__name__", Value: "up"}}, Timestamps: []int64{1583780000}, Values: []float64{42}, }, @@ -31,17 +33,17 @@ func TestPromInstant_UnmarshalPositive(t *testing.T) { {"metric":{"__name__":"foo"},"value":[1583780001,"7"]}, {"metric":{"__name__":"baz", "instance":"bar"},"value":[1583780002,"8"]}]`, []Metric{ { - Labels: []Label{{Name: "__name__", Value: "up"}}, + Labels: []prompbmarshal.Label{{Name: "__name__", Value: "up"}}, Timestamps: []int64{1583780000}, Values: []float64{42}, }, { - Labels: []Label{{Name: "__name__", Value: "foo"}}, + Labels: []prompbmarshal.Label{{Name: "__name__", Value: "foo"}}, Timestamps: []int64{1583780001}, Values: []float64{7}, }, { - Labels: []Label{{Name: "__name__", Value: "baz"}, {Name: "instance", Value: "bar"}}, + Labels: []prompbmarshal.Label{{Name: "__name__", Value: "baz"}, {Name: "instance", Value: "bar"}}, Timestamps: []int64{1583780002}, Values: []float64{8}, }, diff --git a/app/vmalert/notifier/alert.go b/app/vmalert/notifier/alert.go index b1dfc697c..b3d13a8b4 100644 --- a/app/vmalert/notifier/alert.go +++ b/app/vmalert/notifier/alert.go @@ -167,14 +167,8 @@ type tplData struct { ExternalURL string } -func templateAnnotation(dst io.Writer, text string, data tplData, tmpl *textTpl.Template, execute bool) error { - tpl, err := tmpl.Clone() - if err != nil { - return fmt.Errorf("error cloning template before parse annotation: %w", err) - } - // Clone() doesn't copy tpl Options, so we set them manually - tpl = tpl.Option("missingkey=zero") - tpl, err = tpl.Parse(text) +func templateAnnotation(dst io.Writer, text string, data tplData, tpl *textTpl.Template, execute bool) error { + tpl, err := tpl.Parse(text) if err != nil { return fmt.Errorf("error parsing annotation template: %w", err) } diff --git a/app/vmalert/notifier/alert_test.go b/app/vmalert/notifier/alert_test.go index 9338b3fd2..374582e1a 100644 --- a/app/vmalert/notifier/alert_test.go +++ b/app/vmalert/notifier/alert_test.go @@ -33,7 +33,7 @@ func TestAlertExecTemplate(t *testing.T) { qFn := func(_ string) ([]datasource.Metric, error) { return []datasource.Metric{ { - Labels: []datasource.Label{ + Labels: []prompbmarshal.Label{ {Name: "foo", Value: "bar"}, {Name: "baz", Value: "qux"}, }, @@ -41,7 +41,7 @@ func TestAlertExecTemplate(t *testing.T) { Timestamps: []int64{1}, }, { - Labels: []datasource.Label{ + Labels: []prompbmarshal.Label{ {Name: "foo", Value: "garply"}, {Name: "baz", Value: "fred"}, }, diff --git a/app/vmalert/rule/alerting.go b/app/vmalert/rule/alerting.go index f0d7101a9..1c0e4b284 100644 --- a/app/vmalert/rule/alerting.go +++ b/app/vmalert/rule/alerting.go @@ -14,8 +14,10 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) // AlertingRule is basic alert entity @@ -454,13 +456,16 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr ar.logDebugf(ts, a, "created in state PENDING") } var numActivePending int + var tss []prompbmarshal.TimeSeries for h, a := range ar.alerts { // if alert wasn't updated in this iteration // means it is resolved already if _, ok := updated[h]; !ok { if a.State == notifier.StatePending { - // alert was in Pending state - it is not - // active anymore + // alert was in Pending state - it is not active anymore + // add stale time series + tss = append(tss, pendingAlertStaleTimeSeries(a.Labels, ts.Unix(), true)...) + delete(ar.alerts, h) ar.logDebugf(ts, a, "PENDING => DELETED: is absent in current evaluation round") continue @@ -478,6 +483,9 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr if ts.Sub(a.KeepFiringSince) >= ar.KeepFiringFor { a.State = notifier.StateInactive a.ResolvedAt = ts + // add stale time series + tss = append(tss, firingAlertStaleTimeSeries(a.Labels, ts.Unix())...) + ar.logDebugf(ts, a, "FIRING => INACTIVE: is absent in current evaluation round") continue } @@ -489,6 +497,10 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr a.State = notifier.StateFiring a.Start = ts alertsFired.Inc() + if ar.For > 0 { + // add stale time series + tss = append(tss, pendingAlertStaleTimeSeries(a.Labels, ts.Unix(), false)...) + } ar.logDebugf(ts, a, "PENDING => FIRING: %s since becoming active at %v", ts.Sub(a.ActiveAt), a.ActiveAt) } } @@ -497,7 +509,7 @@ func (ar *AlertingRule) exec(ctx context.Context, ts time.Time, limit int) ([]pr curState.Err = fmt.Errorf("exec exceeded limit of %d with %d alerts", limit, numActivePending) return nil, curState.Err } - return ar.toTimeSeries(ts.Unix()), nil + return append(tss, ar.toTimeSeries(ts.Unix())...), nil } func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.QueryFn, ts time.Time) (*labelSet, map[string]string, error) { @@ -522,6 +534,7 @@ func (ar *AlertingRule) expandTemplates(m datasource.Metric, qFn templates.Query return ls, as, nil } +// toTimeSeries creates `ALERTS` and `ALERTS_FOR_STATE` for active alerts func (ar *AlertingRule) toTimeSeries(timestamp int64) []prompbmarshal.TimeSeries { var tss []prompbmarshal.TimeSeries for _, a := range ar.alerts { @@ -601,26 +614,83 @@ func (ar *AlertingRule) alertToTimeSeries(a *notifier.Alert, timestamp int64) [] } func alertToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries { - labels := make(map[string]string) + var labels []prompbmarshal.Label for k, v := range a.Labels { - labels[k] = v + labels = append(labels, prompbmarshal.Label{ + Name: k, + Value: v, + }) + } + // __name__ already been dropped, no need to check duplication + labels = append(labels, prompbmarshal.Label{Name: "__name__", Value: alertMetricName}) + if ol := promrelabel.GetLabelByName(labels, alertStateLabel); ol != nil { + ol.Value = a.State.String() + } else { + labels = append(labels, prompbmarshal.Label{Name: alertStateLabel, Value: a.State.String()}) } - labels["__name__"] = alertMetricName - labels[alertStateLabel] = a.State.String() return newTimeSeries([]float64{1}, []int64{timestamp}, labels) } -// alertForToTimeSeries returns a timeseries that represents +// alertForToTimeSeries returns a time series that represents // state of active alerts, where value is time when alert become active func alertForToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.TimeSeries { - labels := make(map[string]string) + var labels []prompbmarshal.Label for k, v := range a.Labels { - labels[k] = v + labels = append(labels, prompbmarshal.Label{ + Name: k, + Value: v, + }) } - labels["__name__"] = alertForStateMetricName + // __name__ already been dropped, no need to check duplication + labels = append(labels, prompbmarshal.Label{Name: "__name__", Value: alertForStateMetricName}) return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels) } +// pendingAlertStaleTimeSeries returns stale `ALERTS` and `ALERTS_FOR_STATE` time series +// for alerts which changed their state from Pending to Inactive or Firing. +func pendingAlertStaleTimeSeries(ls map[string]string, timestamp int64, includeAlertForState bool) []prompbmarshal.TimeSeries { + var result []prompbmarshal.TimeSeries + var baseLabels []prompbmarshal.Label + for k, v := range ls { + baseLabels = append(baseLabels, prompbmarshal.Label{ + Name: k, + Value: v, + }) + } + // __name__ already been dropped, no need to check duplication + alertsLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertMetricName}) + alertsLabels = append(alertsLabels, prompbmarshal.Label{Name: alertStateLabel, Value: notifier.StatePending.String()}) + result = append(result, newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsLabels)) + + if includeAlertForState { + alertsForStateLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertForStateMetricName}) + result = append(result, newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsForStateLabels)) + } + return result +} + +// firingAlertStaleTimeSeries returns stale `ALERTS` and `ALERTS_FOR_STATE` time series +// for alerts which changed their state from Firing to Inactive. +func firingAlertStaleTimeSeries(ls map[string]string, timestamp int64) []prompbmarshal.TimeSeries { + var baseLabels []prompbmarshal.Label + for k, v := range ls { + baseLabels = append(baseLabels, prompbmarshal.Label{ + Name: k, + Value: v, + }) + } + // __name__ already been dropped, no need to check duplication + alertsLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertMetricName}) + alertsLabels = append(alertsLabels, prompbmarshal.Label{Name: alertStateLabel, Value: notifier.StateFiring.String()}) + + alertsForStateLabels := append(baseLabels, prompbmarshal.Label{Name: "__name__", Value: alertForStateMetricName}) + + return []prompbmarshal.TimeSeries{ + newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsLabels), + newTimeSeries([]float64{decimal.StaleNaN}, []int64{timestamp}, alertsForStateLabels), + } +} + // restore restores the value of ActiveAt field for active alerts, // based on previously written time series `alertForStateMetricName`. // Only rules with For > 0 can be restored. diff --git a/app/vmalert/rule/alerting_test.go b/app/vmalert/rule/alerting_test.go index 0d76a3aa9..d54aefd34 100644 --- a/app/vmalert/rule/alerting_test.go +++ b/app/vmalert/rule/alerting_test.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) @@ -28,7 +29,7 @@ func TestAlertingRuleToTimeSeries(t *testing.T) { rule.alerts[alert.ID] = alert tss := rule.toTimeSeries(timestamp.Unix()) if err := compareTimeSeries(t, tssExpected, tss); err != nil { - t.Fatalf("timeseries mismatch: %s", err) + t.Fatalf("timeseries mismatch for rule %q: %s", rule.Name, err) } } @@ -36,14 +37,23 @@ func TestAlertingRuleToTimeSeries(t *testing.T) { State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second), }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": alertMetricName, - alertStateLabel: notifier.StateFiring.String(), + newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertMetricName, + }, + { + Name: alertStateLabel, + Value: notifier.StateFiring.String(), + }, }), newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, - map[string]string{ - "__name__": alertForStateMetricName, + []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertForStateMetricName, + }, }), }) @@ -54,18 +64,40 @@ func TestAlertingRuleToTimeSeries(t *testing.T) { "instance": "bar", }, }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": alertMetricName, - alertStateLabel: notifier.StateFiring.String(), - "job": "foo", - "instance": "bar", - }), + newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, + []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertMetricName, + }, + { + Name: alertStateLabel, + Value: notifier.StateFiring.String(), + }, + { + Name: "job", + Value: "foo", + }, + { + Name: "instance", + Value: "bar", + }, + }), newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, - map[string]string{ - "__name__": alertForStateMetricName, - "job": "foo", - "instance": "bar", + []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertForStateMetricName, + }, + { + Name: "job", + Value: "foo", + }, + { + Name: "instance", + Value: "bar", + }, }), }) @@ -73,18 +105,29 @@ func TestAlertingRuleToTimeSeries(t *testing.T) { State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second), Labels: map[string]string{ alertStateLabel: "foo", - "__name__": "bar", }, }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": alertMetricName, - alertStateLabel: notifier.StateFiring.String(), + newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertMetricName, + }, + { + Name: alertStateLabel, + Value: notifier.StateFiring.String(), + }, }), newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, - map[string]string{ - "__name__": alertForStateMetricName, - alertStateLabel: "foo", + []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertForStateMetricName, + }, + { + Name: alertStateLabel, + Value: "foo", + }, }), }) @@ -92,14 +135,23 @@ func TestAlertingRuleToTimeSeries(t *testing.T) { State: notifier.StateFiring, ActiveAt: timestamp.Add(time.Second), }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": alertMetricName, - alertStateLabel: notifier.StateFiring.String(), + newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertMetricName, + }, + { + Name: alertStateLabel, + Value: notifier.StateFiring.String(), + }, }), newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, - map[string]string{ - "__name__": alertForStateMetricName, + []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertForStateMetricName, + }, }), }) @@ -107,12 +159,21 @@ func TestAlertingRuleToTimeSeries(t *testing.T) { State: notifier.StatePending, ActiveAt: timestamp.Add(time.Second), }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": alertMetricName, - alertStateLabel: notifier.StatePending.String(), + newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertMetricName, + }, + { + Name: alertStateLabel, + Value: notifier.StatePending.String(), + }, }), - newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": alertForStateMetricName, + newTimeSeries([]float64{float64(timestamp.Add(time.Second).Unix())}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: alertForStateMetricName, + }, }), }) } @@ -124,7 +185,9 @@ func TestAlertingRule_Exec(t *testing.T) { alert *notifier.Alert } - f := func(rule *AlertingRule, steps [][]datasource.Metric, alertsExpected map[int][]testAlert) { + ts, _ := time.Parse(time.RFC3339, "2024-10-29T00:00:00Z") + + f := func(rule *AlertingRule, steps [][]datasource.Metric, alertsExpected map[int][]testAlert, tssExpected map[int][]prompbmarshal.TimeSeries) { t.Helper() fq := &datasource.FakeQuerier{} @@ -134,13 +197,19 @@ func TestAlertingRule_Exec(t *testing.T) { Name: "TestRule_Exec", } rule.GroupID = fakeGroup.ID() - ts := time.Now() for i, step := range steps { fq.Reset() fq.Add(step...) - if _, err := rule.exec(context.TODO(), ts, 0); err != nil { + tss, err := rule.exec(context.TODO(), ts, 0) + if err != nil { t.Fatalf("unexpected error: %s", err) } + // check generate time series + if _, ok := tssExpected[i]; ok { + if err := compareTimeSeries(t, tssExpected[i], tss); err != nil { + t.Fatalf("generated time series mismatch for rule %q in step %d: %s", rule.Name, i, err) + } + } // shift the execution timestamp before the next iteration ts = ts.Add(defaultStep) @@ -174,13 +243,21 @@ func TestAlertingRule_Exec(t *testing.T) { } } - f(newTestAlertingRule("empty", 0), [][]datasource.Metric{}, nil) + f(newTestAlertingRule("empty", 0), [][]datasource.Metric{}, nil, nil) - f(newTestAlertingRule("empty labels", 0), [][]datasource.Metric{ + f(newTestAlertingRule("empty_labels", 0), [][]datasource.Metric{ {datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}}}, }, map[int][]testAlert{ 0: {{alert: ¬ifier.Alert{State: notifier.StateFiring}}}, - }) + }, + map[int][]prompbmarshal.TimeSeries{ + 0: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "empty_labels"}, {Name: "alertstate", Value: "firing"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "empty_labels"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}}, + }, + }) f(newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive", 0), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, @@ -194,6 +271,25 @@ func TestAlertingRule_Exec(t *testing.T) { 2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, 3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}}, 4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}}, + }, map[int][]prompbmarshal.TimeSeries{ + 0: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}}, + }, + 1: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + }, + 2: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "single-firing=>inactive=>firing=>inactive=>inactive"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(2 * defaultStep).Unix()), Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + }, }) f(newTestAlertingRule("single-firing=>inactive=>firing=>inactive=>inactive=>firing", 0), [][]datasource.Metric{ @@ -210,7 +306,7 @@ func TestAlertingRule_Exec(t *testing.T) { 3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}}, 4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}}, 5: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, - }) + }, nil) f(newTestAlertingRule("multiple-firing", 0), [][]datasource.Metric{ { @@ -224,7 +320,7 @@ func TestAlertingRule_Exec(t *testing.T) { {labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, {labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, }, - }) + }, nil) // 1: fire first alert // 2: fire second alert, set first inactive @@ -233,27 +329,57 @@ func TestAlertingRule_Exec(t *testing.T) { {metricWithLabels(t, "name", "foo")}, {metricWithLabels(t, "name", "foo1")}, {metricWithLabels(t, "name", "foo2")}, - }, - map[int][]testAlert{ - 0: { - {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, - }, - 1: { - {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, - {labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, - }, - 2: { - {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, - {labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, - {labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, - }, - }) + }, map[int][]testAlert{ + 0: { + {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, + }, + 1: { + {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, + {labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, + }, + 2: { + {labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, + {labels: []string{"name", "foo1"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}, + {labels: []string{"name", "foo2"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}, + }, + }, map[int][]prompbmarshal.TimeSeries{ + 0: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}}, + }, + 1: { + // stale time series for foo, `firing -> inactive` + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + // new time series for foo1 + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo1"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo1"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(defaultStep).Unix()), Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + }, + 2: { + // stale time series for foo1 + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo1"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo1"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + // new time series for foo2 + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo2"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "multiple-steps-firing"}, {Name: "name", Value: "foo2"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(2 * defaultStep).Unix()), Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + }, + }) f(newTestAlertingRule("for-pending", time.Minute), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, }, map[int][]testAlert{ 0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}}, - }) + }, nil) f(newTestAlertingRule("for-fired", defaultStep), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, @@ -261,6 +387,22 @@ func TestAlertingRule_Exec(t *testing.T) { }, map[int][]testAlert{ 0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}}, 1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, + }, map[int][]prompbmarshal.TimeSeries{ + 0: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}}, + }, + 1: { + // stale time series for `pending -> firing` + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "alertstate", Value: "firing"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-fired"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Add(defaultStep).Unix()), Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + }, }) f(newTestAlertingRule("for-pending=>empty", time.Second), [][]datasource.Metric{ @@ -272,6 +414,26 @@ func TestAlertingRule_Exec(t *testing.T) { 0: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}}, 1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}}, 2: {}, + }, map[int][]prompbmarshal.TimeSeries{ + 0: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.UnixNano() / 1e6}}}, + }, + 1: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: 1, Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: float64(ts.Unix()), Timestamp: ts.Add(defaultStep).UnixNano() / 1e6}}}, + }, + // stale time series for `pending -> inactive` + 2: { + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "alertstate", Value: "pending"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + {Labels: []prompbmarshal.Label{{Name: "__name__", Value: alertForStateMetricName}, {Name: "alertname", Value: "for-pending=>empty"}, {Name: "name", Value: "foo"}}, + Samples: []prompbmarshal.Sample{{Value: decimal.StaleNaN, Timestamp: ts.Add(2*defaultStep).UnixNano() / 1e6}}}, + }, }) f(newTestAlertingRule("for-pending=>firing=>inactive=>pending=>firing", defaultStep), [][]datasource.Metric{ @@ -287,7 +449,7 @@ func TestAlertingRule_Exec(t *testing.T) { 2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}}, 3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}}, 4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, - }) + }, nil) f(newTestAlertingRuleWithCustomFields("for-pending=>firing=>keepfiring=>firing", defaultStep, 0, defaultStep, nil), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, @@ -300,7 +462,7 @@ func TestAlertingRule_Exec(t *testing.T) { 1: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, 2: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, 3: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, - }) + }, nil) f(newTestAlertingRuleWithCustomFields("for-pending=>firing=>keepfiring=>keepfiring=>inactive=>pending=>firing", defaultStep, 0, 2*defaultStep, nil), [][]datasource.Metric{ {metricWithLabels(t, "name", "foo")}, @@ -321,7 +483,7 @@ func TestAlertingRule_Exec(t *testing.T) { 4: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateInactive}}}, 5: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StatePending}}}, 6: {{labels: []string{"name", "foo"}, alert: ¬ifier.Alert{State: notifier.StateFiring}}}, - }) + }, nil) } func TestAlertingRuleExecRange(t *testing.T) { @@ -477,7 +639,7 @@ func TestAlertingRuleExecRange(t *testing.T) { {Values: []float64{1, 1, 1}, Timestamps: []int64{1, 3, 5}}, { Values: []float64{1, 1}, Timestamps: []int64{1, 5}, - Labels: []datasource.Label{{Name: "foo", Value: "bar"}}, + Labels: []prompbmarshal.Label{{Name: "foo", Value: "bar"}}, }, }, []*notifier.Alert{ {State: notifier.StatePending, ActiveAt: time.Unix(1, 0)}, @@ -523,7 +685,7 @@ func TestAlertingRuleExecRange(t *testing.T) { {Values: []float64{1, 1}, Timestamps: []int64{1, 100}}, { Values: []float64{1, 1}, Timestamps: []int64{1, 5}, - Labels: []datasource.Label{{Name: "foo", Value: "bar"}}, + Labels: []prompbmarshal.Label{{Name: "foo", Value: "bar"}}, }, }, []*notifier.Alert{ { @@ -1047,7 +1209,7 @@ func newTestAlertingRuleWithCustomFields(name string, waitFor, evalInterval, kee func TestAlertingRule_ToLabels(t *testing.T) { metric := datasource.Metric{ - Labels: []datasource.Label{ + Labels: []prompbmarshal.Label{ {Name: "instance", Value: "0.0.0.0:8800"}, {Name: "group", Value: "vmalert"}, {Name: "alertname", Value: "ConfigurationReloadFailure"}, diff --git a/app/vmalert/rule/group.go b/app/vmalert/rule/group.go index 5f0e42329..f19cc2578 100644 --- a/app/vmalert/rule/group.go +++ b/app/vmalert/rule/group.go @@ -8,12 +8,9 @@ import ( "fmt" "hash/fnv" "net/url" - "strconv" "sync" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/cheggaaa/pb/v3" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" @@ -21,7 +18,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/metrics" @@ -350,10 +346,9 @@ func (g *Group) Start(ctx context.Context, nts func() []notifier.Notifier, rw re } e := &executor{ - Rw: rw, - Notifiers: nts, - notifierHeaders: g.NotifierHeaders, - previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + Rw: rw, + Notifiers: nts, + notifierHeaders: g.NotifierHeaders, } g.infof("started") @@ -426,8 +421,6 @@ func (g *Group) Start(ctx context.Context, nts func() []notifier.Notifier, rw re continue } - // ensure that staleness is tracked for existing rules only - e.purgeStaleSeries(g.Rules) e.notifierHeaders = g.NotifierHeaders g.mu.Unlock() @@ -539,10 +532,9 @@ func (g *Group) Replay(start, end time.Time, rw remotewrite.RWClient, maxDataPoi // ExecOnce evaluates all the rules under group for once with given timestamp. func (g *Group) ExecOnce(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, evalTS time.Time) chan error { e := &executor{ - Rw: rw, - Notifiers: nts, - notifierHeaders: g.NotifierHeaders, - previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + Rw: rw, + Notifiers: nts, + notifierHeaders: g.NotifierHeaders, } if len(g.Rules) < 1 { return nil @@ -633,13 +625,6 @@ type executor struct { notifierHeaders map[string]string Rw remotewrite.RWClient - - previouslySentSeriesToRWMu sync.Mutex - // previouslySentSeriesToRW stores series sent to RW on previous iteration - // map[ruleID]map[ruleLabels][]prompb.Label - // where `ruleID` is ID of the Rule within a Group - // and `ruleLabels` is []prompb.Label marshalled to a string - previouslySentSeriesToRW map[uint64]map[string][]prompbmarshal.Label } // execConcurrently executes rules concurrently if concurrency>1 @@ -706,11 +691,6 @@ func (e *executor) exec(ctx context.Context, r Rule, ts time.Time, resolveDurati if err := pushToRW(tss); err != nil { return err } - - staleSeries := e.getStaleSeries(r, tss, ts) - if err := pushToRW(staleSeries); err != nil { - return err - } } ar, ok := r.(*AlertingRule) @@ -737,79 +717,3 @@ func (e *executor) exec(ctx context.Context, r Rule, ts time.Time, resolveDurati wg.Wait() return errGr.Err() } - -var bbPool bytesutil.ByteBufferPool - -// getStaleSeries checks whether there are stale series from previously sent ones. -func (e *executor) getStaleSeries(r Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries { - bb := bbPool.Get() - defer bbPool.Put(bb) - - ruleLabels := make(map[string][]prompbmarshal.Label, len(tss)) - for _, ts := range tss { - // convert labels to strings, so we can compare with previously sent series - bb.B = labelsToString(bb.B, ts.Labels) - ruleLabels[string(bb.B)] = ts.Labels - bb.Reset() - } - - rID := r.ID() - var staleS []prompbmarshal.TimeSeries - // check whether there are series which disappeared and need to be marked as stale - e.previouslySentSeriesToRWMu.Lock() - for key, labels := range e.previouslySentSeriesToRW[rID] { - if _, ok := ruleLabels[key]; ok { - continue - } - // previously sent series are missing in current series, so we mark them as stale - ss := newTimeSeriesPB([]float64{decimal.StaleNaN}, []int64{timestamp.Unix()}, labels) - staleS = append(staleS, ss) - } - // set previous series to current - e.previouslySentSeriesToRW[rID] = ruleLabels - e.previouslySentSeriesToRWMu.Unlock() - - return staleS -} - -// purgeStaleSeries deletes references in tracked -// previouslySentSeriesToRW list to Rules which aren't present -// in the given activeRules list. The method is used when the list -// of loaded rules has changed and executor has to remove -// references to non-existing rules. -func (e *executor) purgeStaleSeries(activeRules []Rule) { - newPreviouslySentSeriesToRW := make(map[uint64]map[string][]prompbmarshal.Label) - - e.previouslySentSeriesToRWMu.Lock() - - for _, rule := range activeRules { - id := rule.ID() - prev, ok := e.previouslySentSeriesToRW[id] - if ok { - // keep previous series for staleness detection - newPreviouslySentSeriesToRW[id] = prev - } - } - e.previouslySentSeriesToRW = nil - e.previouslySentSeriesToRW = newPreviouslySentSeriesToRW - - e.previouslySentSeriesToRWMu.Unlock() -} - -func labelsToString(dst []byte, labels []prompbmarshal.Label) []byte { - dst = append(dst, '{') - for i, label := range labels { - if len(label.Name) == 0 { - dst = append(dst, "__name__"...) - } else { - dst = append(dst, label.Name...) - } - dst = append(dst, '=') - dst = strconv.AppendQuote(dst, label.Value) - if i < len(labels)-1 { - dst = append(dst, ',') - } - } - dst = append(dst, '}') - return dst -} diff --git a/app/vmalert/rule/group_test.go b/app/vmalert/rule/group_test.go index cbe26b5e0..4e56a5634 100644 --- a/app/vmalert/rule/group_test.go +++ b/app/vmalert/rule/group_test.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "os" - "reflect" "sort" "testing" "time" @@ -17,8 +16,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/templates" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) @@ -383,153 +380,6 @@ func TestGetResolveDuration(t *testing.T) { f(2*time.Minute, 0, 1*time.Minute, 8*time.Minute) } -func TestGetStaleSeries(t *testing.T) { - ts := time.Now() - e := &executor{ - previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), - } - f := func(r Rule, labels, expLabels [][]prompbmarshal.Label) { - t.Helper() - - var tss []prompbmarshal.TimeSeries - for _, l := range labels { - tss = append(tss, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, l)) - } - staleS := e.getStaleSeries(r, tss, ts) - if staleS == nil && expLabels == nil { - return - } - if len(staleS) != len(expLabels) { - t.Fatalf("expected to get %d stale series, got %d", - len(expLabels), len(staleS)) - } - for i, exp := range expLabels { - got := staleS[i] - if !reflect.DeepEqual(exp, got.Labels) { - t.Fatalf("expected to get labels: \n%v;\ngot instead: \n%v", - exp, got.Labels) - } - if len(got.Samples) != 1 { - t.Fatalf("expected to have 1 sample; got %d", len(got.Samples)) - } - if !decimal.IsStaleNaN(got.Samples[0].Value) { - t.Fatalf("expected sample value to be %v; got %v", decimal.StaleNaN, got.Samples[0].Value) - } - } - } - - // warn: keep in mind, that executor holds the state, so sequence of f calls matters - - // single series - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}, - nil) - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}, - nil) - f(&AlertingRule{RuleID: 1}, - nil, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}) - f(&AlertingRule{RuleID: 1}, - nil, - nil) - - // multiple series - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{ - toPromLabels(t, "__name__", "job:foo", "job", "foo"), - toPromLabels(t, "__name__", "job:foo", "job", "bar"), - }, - nil) - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}) - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, - nil) - f(&AlertingRule{RuleID: 1}, - nil, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}) - - // multiple rules and series - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{ - toPromLabels(t, "__name__", "job:foo", "job", "foo"), - toPromLabels(t, "__name__", "job:foo", "job", "bar"), - }, - nil) - f(&AlertingRule{RuleID: 2}, - [][]prompbmarshal.Label{ - toPromLabels(t, "__name__", "job:foo", "job", "foo"), - toPromLabels(t, "__name__", "job:foo", "job", "bar"), - }, - nil) - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "foo")}) - f(&AlertingRule{RuleID: 1}, - [][]prompbmarshal.Label{toPromLabels(t, "__name__", "job:foo", "job", "bar")}, - nil) -} - -func TestPurgeStaleSeries(t *testing.T) { - ts := time.Now() - labels := toPromLabels(t, "__name__", "job:foo", "job", "foo") - tss := []prompbmarshal.TimeSeries{newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, labels)} - - f := func(curRules, newRules, expStaleRules []Rule) { - t.Helper() - e := &executor{ - previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), - } - // seed executor with series for - // current rules - for _, rule := range curRules { - e.getStaleSeries(rule, tss, ts) - } - - e.purgeStaleSeries(newRules) - - if len(e.previouslySentSeriesToRW) != len(expStaleRules) { - t.Fatalf("expected to get %d stale series, got %d", - len(expStaleRules), len(e.previouslySentSeriesToRW)) - } - - for _, exp := range expStaleRules { - if _, ok := e.previouslySentSeriesToRW[exp.ID()]; !ok { - t.Fatalf("expected to have rule %d; got nil instead", exp.ID()) - } - } - } - - f(nil, nil, nil) - f( - nil, - []Rule{&AlertingRule{RuleID: 1}}, - nil, - ) - f( - []Rule{&AlertingRule{RuleID: 1}}, - nil, - nil, - ) - f( - []Rule{&AlertingRule{RuleID: 1}}, - []Rule{&AlertingRule{RuleID: 2}}, - nil, - ) - f( - []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, - []Rule{&AlertingRule{RuleID: 2}}, - []Rule{&AlertingRule{RuleID: 2}}, - ) - f( - []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, - []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, - []Rule{&AlertingRule{RuleID: 1}, &AlertingRule{RuleID: 2}}, - ) -} - func TestFaultyNotifier(t *testing.T) { fq := &datasource.FakeQuerier{} fq.Add(metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "bar")) @@ -580,8 +430,7 @@ func TestFaultyRW(t *testing.T) { } e := &executor{ - Rw: &remotewrite.Client{}, - previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + Rw: &remotewrite.Client{}, } err := e.exec(context.Background(), r, time.Now(), 0, 10) diff --git a/app/vmalert/rule/group_timing_test.go b/app/vmalert/rule/group_timing_test.go deleted file mode 100644 index 94399c490..000000000 --- a/app/vmalert/rule/group_timing_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package rule - -import ( - "fmt" - "testing" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" -) - -func BenchmarkGetStaleSeries(b *testing.B) { - ts := time.Now() - n := 100 - payload := make([]prompbmarshal.TimeSeries, 0, n) - for i := 0; i < n; i++ { - s := fmt.Sprintf("%d", i) - labels := toPromLabels(b, - "__name__", "foo", ""+ - "instance", s, - "job", s, - "state", s, - ) - payload = append(payload, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, labels)) - } - - e := &executor{ - previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), - } - ar := &AlertingRule{RuleID: 1} - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - e.getStaleSeries(ar, payload, ts) - } -} diff --git a/app/vmalert/rule/recording.go b/app/vmalert/rule/recording.go index 7b2a7635e..a616fe794 100644 --- a/app/vmalert/rule/recording.go +++ b/app/vmalert/rule/recording.go @@ -3,16 +3,17 @@ package rule import ( "context" "fmt" - "sort" "strings" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) // RecordingRule is a Rule that supposed @@ -34,6 +35,8 @@ type RecordingRule struct { // during evaluations state *ruleState + lastEvaluation map[string]struct{} + metrics *recordingRuleMetrics } @@ -113,7 +116,7 @@ func (rr *RecordingRule) execRange(ctx context.Context, start, end time.Time) ([ var tss []prompbmarshal.TimeSeries for _, s := range res.Data { ts := rr.toTimeSeries(s) - key := stringifyLabels(ts) + key := stringifyLabels(ts.Labels) if _, ok := duplicates[key]; ok { return nil, fmt.Errorf("original metric %v; resulting labels %q: %w", s.Labels, key, errDuplicate) } @@ -155,28 +158,47 @@ func (rr *RecordingRule) exec(ctx context.Context, ts time.Time, limit int) ([]p return nil, curState.Err } - duplicates := make(map[string]struct{}, len(qMetrics)) + curEvaluation := make(map[string]struct{}, len(qMetrics)) + lastEvaluation := rr.lastEvaluation var tss []prompbmarshal.TimeSeries for _, r := range qMetrics { ts := rr.toTimeSeries(r) - key := stringifyLabels(ts) - if _, ok := duplicates[key]; ok { + key := stringifyLabels(ts.Labels) + if _, ok := curEvaluation[key]; ok { curState.Err = fmt.Errorf("original metric %v; resulting labels %q: %w", r, key, errDuplicate) return nil, curState.Err } - duplicates[key] = struct{}{} + curEvaluation[key] = struct{}{} + delete(lastEvaluation, key) tss = append(tss, ts) } + // check for stale time series + for k := range lastEvaluation { + tss = append(tss, prompbmarshal.TimeSeries{ + Labels: stringToLabels(k), + Samples: []prompbmarshal.Sample{ + {Value: decimal.StaleNaN, Timestamp: ts.UnixNano() / 1e6}, + }}) + } + rr.lastEvaluation = curEvaluation return tss, nil } -func stringifyLabels(ts prompbmarshal.TimeSeries) string { - labels := ts.Labels - if len(labels) > 1 { - sort.Slice(labels, func(i, j int) bool { - return labels[i].Name < labels[j].Name - }) +func stringToLabels(s string) []prompbmarshal.Label { + labels := strings.Split(s, ",") + rLabels := make([]prompbmarshal.Label, 0, len(labels)) + for i := range labels { + if label := strings.Split(labels[i], "="); len(label) == 2 { + rLabels = append(rLabels, prompbmarshal.Label{ + Name: label[0], + Value: label[1], + }) + } } + return rLabels +} + +func stringifyLabels(labels []prompbmarshal.Label) string { b := strings.Builder{} for i, l := range labels { b.WriteString(l.Name) @@ -190,19 +212,27 @@ func stringifyLabels(ts prompbmarshal.TimeSeries) string { } func (rr *RecordingRule) toTimeSeries(m datasource.Metric) prompbmarshal.TimeSeries { - labels := make(map[string]string) - for _, l := range m.Labels { - labels[l.Name] = l.Value + if preN := promrelabel.GetLabelByName(m.Labels, "__name__"); preN != nil { + preN.Value = rr.Name + } else { + m.Labels = append(m.Labels, prompbmarshal.Label{ + Name: "__name__", + Value: rr.Name, + }) } - labels["__name__"] = rr.Name - // override existing labels with configured ones - for k, v := range rr.Labels { - if _, ok := labels[k]; ok && labels[k] != v { - labels[fmt.Sprintf("exported_%s", k)] = labels[k] + for k := range rr.Labels { + prevLabel := promrelabel.GetLabelByName(m.Labels, k) + if prevLabel != nil && prevLabel.Value != rr.Labels[k] { + // Rename the prevLabel to "exported_" + label.Name + prevLabel.Name = fmt.Sprintf("exported_%s", prevLabel.Name) } - labels[k] = v + m.Labels = append(m.Labels, prompbmarshal.Label{ + Name: k, + Value: rr.Labels[k], + }) } - return newTimeSeries(m.Values, m.Timestamps, labels) + ts := newTimeSeries(m.Values, m.Timestamps, m.Labels) + return ts } // updateWith copies all significant fields. diff --git a/app/vmalert/rule/recording_test.go b/app/vmalert/rule/recording_test.go index 469b4285a..1904fee99 100644 --- a/app/vmalert/rule/recording_test.go +++ b/app/vmalert/rule/recording_test.go @@ -9,59 +9,151 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) func TestRecordingRule_Exec(t *testing.T) { - f := func(rule *RecordingRule, metrics []datasource.Metric, tssExpected []prompbmarshal.TimeSeries) { + ts, _ := time.Parse(time.RFC3339, "2024-10-29T00:00:00Z") + const defaultStep = 5 * time.Millisecond + + f := func(rule *RecordingRule, steps [][]datasource.Metric, tssExpected [][]prompbmarshal.TimeSeries) { t.Helper() fq := &datasource.FakeQuerier{} - fq.Add(metrics...) - rule.q = fq - rule.state = &ruleState{ - entries: make([]StateEntry, 10), - } - tss, err := rule.exec(context.TODO(), time.Now(), 0) - if err != nil { - t.Fatalf("unexpected RecordingRule.exec error: %s", err) - } - if err := compareTimeSeries(t, tssExpected, tss); err != nil { - t.Fatalf("timeseries missmatch: %s", err) + for i, step := range steps { + fq.Reset() + fq.Add(step...) + rule.q = fq + rule.state = &ruleState{ + entries: make([]StateEntry, 10), + } + tss, err := rule.exec(context.TODO(), ts, 0) + if err != nil { + t.Fatalf("unexpected RecordingRule.exec error: %s", err) + } + if err := compareTimeSeries(t, tssExpected[i], tss); err != nil { + t.Fatalf("time series mismatch: %s", err) + } + + ts = ts.Add(defaultStep) } } - timestamp := time.Now() - f(&RecordingRule{ Name: "foo", - }, []datasource.Metric{ + }, [][]datasource.Metric{{ metricWithValueAndLabels(t, 10, "__name__", "bar"), - }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{10}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "foo", + }}, [][]prompbmarshal.TimeSeries{{ + newTimeSeries([]float64{10}, []int64{ts.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foo", + }, }), - }) + }}) f(&RecordingRule{ Name: "foobarbaz", - }, []datasource.Metric{ - metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"), - metricWithValueAndLabels(t, 2, "__name__", "bar", "job", "bar"), - metricWithValueAndLabels(t, 3, "__name__", "baz", "job", "baz"), - }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "foobarbaz", - "job": "foo", + }, [][]datasource.Metric{ + { + metricWithValueAndLabels(t, 1, "__name__", "foo", "job", "foo"), + metricWithValueAndLabels(t, 2, "__name__", "bar", "job", "bar"), + metricWithValueAndLabels(t, 3, "__name__", "baz", "job", "baz"), + }, + { + metricWithValueAndLabels(t, 10, "__name__", "foo", "job", "foo"), + }, + { + metricWithValueAndLabels(t, 10, "__name__", "foo", "job", "bar"), + }, + }, [][]prompbmarshal.TimeSeries{{ + newTimeSeries([]float64{1}, []int64{ts.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "foo", + }, }), - newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "foobarbaz", - "job": "bar", + newTimeSeries([]float64{2}, []int64{ts.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "bar", + }, }), - newTimeSeries([]float64{3}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "foobarbaz", - "job": "baz", + newTimeSeries([]float64{3}, []int64{ts.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "baz", + }, }), + }, + { + newTimeSeries([]float64{10}, []int64{ts.Add(defaultStep).UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "foo", + }, + }), + // other series are with NaN values + newTimeSeries([]float64{decimal.StaleNaN}, []int64{ts.Add(defaultStep).UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "bar", + }, + }), + newTimeSeries([]float64{decimal.StaleNaN}, []int64{ts.Add(defaultStep).UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "baz", + }, + }), + }, + { + newTimeSeries([]float64{10}, []int64{ts.Add(2 * defaultStep).UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "bar", + }, + }), + newTimeSeries([]float64{decimal.StaleNaN}, []int64{ts.Add(2 * defaultStep).UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "foo", + }, + }), + }, }) f(&RecordingRule{ @@ -69,22 +161,44 @@ func TestRecordingRule_Exec(t *testing.T) { Labels: map[string]string{ "source": "test", }, - }, []datasource.Metric{ + }, [][]datasource.Metric{{ metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"), metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar", "source", "origin"), - }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "job:foo", - "job": "foo", - "source": "test", + }}, [][]prompbmarshal.TimeSeries{{ + newTimeSeries([]float64{2}, []int64{ts.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "job:foo", + }, + { + Name: "job", + Value: "foo", + }, + { + Name: "source", + Value: "test", + }, }), - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "job:foo", - "job": "bar", - "source": "test", - "exported_source": "origin", - }), - }) + newTimeSeries([]float64{1}, []int64{ts.UnixNano()}, + []prompbmarshal.Label{ + { + Name: "__name__", + Value: "job:foo", + }, + { + Name: "job", + Value: "bar", + }, + { + Name: "source", + Value: "test", + }, + { + Name: "exported_source", + Value: "origin", + }, + }), + }}) } func TestRecordingRule_ExecRange(t *testing.T) { @@ -110,9 +224,13 @@ func TestRecordingRule_ExecRange(t *testing.T) { }, []datasource.Metric{ metricWithValuesAndLabels(t, []float64{10, 20, 30}, "__name__", "bar"), }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{10, 20, 30}, []int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()}, map[string]string{ - "__name__": "foo", - }), + newTimeSeries([]float64{10, 20, 30}, []int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()}, + []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foo", + }, + }), }) f(&RecordingRule{ @@ -122,18 +240,36 @@ func TestRecordingRule_ExecRange(t *testing.T) { metricWithValuesAndLabels(t, []float64{2, 3}, "__name__", "bar", "job", "bar"), metricWithValuesAndLabels(t, []float64{4, 5, 6}, "__name__", "baz", "job", "baz"), }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "foobarbaz", - "job": "foo", + newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "foo", + }, }), - newTimeSeries([]float64{2, 3}, []int64{timestamp.UnixNano(), timestamp.UnixNano()}, map[string]string{ - "__name__": "foobarbaz", - "job": "bar", + newTimeSeries([]float64{2, 3}, []int64{timestamp.UnixNano(), timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "bar", + }, }), newTimeSeries([]float64{4, 5, 6}, - []int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()}, map[string]string{ - "__name__": "foobarbaz", - "job": "baz", + []int64{timestamp.UnixNano(), timestamp.UnixNano(), timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "foobarbaz", + }, + { + Name: "job", + Value: "baz", + }, }), }) @@ -146,16 +282,35 @@ func TestRecordingRule_ExecRange(t *testing.T) { metricWithValueAndLabels(t, 2, "__name__", "foo", "job", "foo"), metricWithValueAndLabels(t, 1, "__name__", "bar", "job", "bar"), }, []prompbmarshal.TimeSeries{ - newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "job:foo", - "job": "foo", - "source": "test", - }), - newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, map[string]string{ - "__name__": "job:foo", - "job": "bar", - "source": "test", + newTimeSeries([]float64{2}, []int64{timestamp.UnixNano()}, []prompbmarshal.Label{ + { + Name: "__name__", + Value: "job:foo", + }, + { + Name: "job", + Value: "foo", + }, + { + Name: "source", + Value: "test", + }, }), + newTimeSeries([]float64{1}, []int64{timestamp.UnixNano()}, + []prompbmarshal.Label{ + { + Name: "__name__", + Value: "job:foo", + }, + { + Name: "job", + Value: "bar", + }, + { + Name: "source", + Value: "test", + }, + }), }) } diff --git a/app/vmalert/rule/test_helpers.go b/app/vmalert/rule/test_helpers.go index 9373e2b81..63d2c3f4b 100644 --- a/app/vmalert/rule/test_helpers.go +++ b/app/vmalert/rule/test_helpers.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) @@ -87,7 +88,7 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { } m := datasource.Metric{Values: []float64{1}, Timestamps: []int64{1}} for i := 0; i < len(labels); i += 2 { - m.Labels = append(m.Labels, datasource.Label{ + m.Labels = append(m.Labels, prompbmarshal.Label{ Name: labels[i], Value: labels[i+1], }) @@ -95,21 +96,6 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { return m } -func toPromLabels(t testing.TB, labels ...string) []prompbmarshal.Label { - t.Helper() - if len(labels) == 0 || len(labels)%2 != 0 { - t.Fatalf("expected to get even number of labels") - } - var ls []prompbmarshal.Label - for i := 0; i < len(labels); i += 2 { - ls = append(ls, prompbmarshal.Label{ - Name: labels[i], - Value: labels[i+1], - }) - } - return ls -} - func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error { t.Helper() if len(a) != len(b) { @@ -122,7 +108,7 @@ func compareTimeSeries(t *testing.T, a, b []prompbmarshal.TimeSeries) error { } for i, exp := range expTS.Samples { got := gotTS.Samples[i] - if got.Value != exp.Value { + if got.Value != exp.Value && (!decimal.IsStaleNaN(got.Value) || !decimal.IsStaleNaN(exp.Value)) { return fmt.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value) } // timestamp validation isn't always correct for now. diff --git a/app/vmalert/rule/utils.go b/app/vmalert/rule/utils.go index 2e435861b..9602bd876 100644 --- a/app/vmalert/rule/utils.go +++ b/app/vmalert/rule/utils.go @@ -9,10 +9,14 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" ) -func newTimeSeries(values []float64, timestamps []int64, labels map[string]string) prompbmarshal.TimeSeries { +// newTimeSeries first sorts given labels, then returns new time series. +func newTimeSeries(values []float64, timestamps []int64, labels []prompbmarshal.Label) prompbmarshal.TimeSeries { + promrelabel.SortLabels(labels) ts := prompbmarshal.TimeSeries{ + Labels: labels, Samples: make([]prompbmarshal.Sample, len(values)), } for i := range values { @@ -21,34 +25,6 @@ func newTimeSeries(values []float64, timestamps []int64, labels map[string]strin Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6, } } - keys := make([]string, 0, len(labels)) - for k := range labels { - keys = append(keys, k) - } - sort.Strings(keys) // make order deterministic - for _, key := range keys { - ts.Labels = append(ts.Labels, prompbmarshal.Label{ - Name: key, - Value: labels[key], - }) - } - return ts -} - -// newTimeSeriesPB creates prompbmarshal.TimeSeries with given -// values, timestamps and labels. -// It expects that labels are already sorted. -func newTimeSeriesPB(values []float64, timestamps []int64, labels []prompbmarshal.Label) prompbmarshal.TimeSeries { - ts := prompbmarshal.TimeSeries{ - Samples: make([]prompbmarshal.Sample, len(values)), - } - for i := range values { - ts.Samples[i] = prompbmarshal.Sample{ - Value: values[i], - Timestamp: time.Unix(timestamps[i], 0).UnixNano() / 1e6, - } - } - ts.Labels = labels return ts } diff --git a/app/vmalert/templates/template.go b/app/vmalert/templates/template.go index 5a6b427bb..d1eeeff3d 100644 --- a/app/vmalert/templates/template.go +++ b/app/vmalert/templates/template.go @@ -169,6 +169,8 @@ func GetWithFuncs(funcs textTpl.FuncMap) (*textTpl.Template, error) { if err != nil { return nil, err } + // Clone() doesn't copy tpl Options, so we set them manually + tmpl = tmpl.Option("missingkey=zero") return tmpl.Funcs(funcs), nil }