diff --git a/app/vmalert/config/parser.go b/app/vmalert/config/parser.go index 5b547d8f2..ca4131ca6 100644 --- a/app/vmalert/config/parser.go +++ b/app/vmalert/config/parser.go @@ -2,31 +2,37 @@ package config import "time" -// Annotations basic annotation for alert rule -type Annotations struct { - Summary string - Description string -} - -// Alert basic alert entity rule -type Alert struct { +// Rule is basic alert entity +type Rule struct { Name string Expr string For time.Duration Labels map[string]string - Annotations Annotations - - Start time.Time - End time.Time + Annotations map[string]string } // Group grouping array of alert type Group struct { Name string - Rules []Alert + Rules []Rule } // Parse parses config from given file func Parse(filepath string) ([]Group, error) { - return []Group{}, nil + return []Group{{ + Name: "foobar", + Rules: []Rule{{ + Name: "vmrowsalert", + Expr: "vm_rows", + For: 1 * time.Second, + Labels: map[string]string{ + "alert_label": "value1", + "alert_label2": "value2", + }, + Annotations: map[string]string{ + "summary": "{{ $value }}", + "description": "LABELS: {{ $labels }}", + }, + }}, + }}, nil } diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go index d68b7a4d4..0be45cec5 100644 --- a/app/vmalert/datasource/datasource.go +++ b/app/vmalert/datasource/datasource.go @@ -1,14 +1,15 @@ package datasource -import "context" - -// Metrics the data returns from storage -type Metrics struct{} - -// VMStorage represents vmstorage entity with ability to read and write metrics -type VMStorage struct{} - -//Query basic query to the datasource -func (s *VMStorage) Query(ctx context.Context, query string) ([]Metrics, error) { - return nil, nil +// Metric is the basic entity which should be return by datasource +// It represents single data point with full list of labels +type Metric struct { + Labels []Label + Timestamp int64 + Value float64 +} + +// Labels represents metric's label +type Label struct { + Name string + Value string } diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go new file mode 100644 index 000000000..8ec74193d --- /dev/null +++ b/app/vmalert/datasource/vm.go @@ -0,0 +1,103 @@ +package datasource + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" +) + +type response struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Labels map[string]string `json:"metric"` + TV [2]interface{} `json:"value"` + } `json:"result"` + } `json:"data"` + ErrorType string `json:"errorType"` + Error string `json:"error"` +} + +func (r response) metrics() ([]Metric, error) { + var ms []Metric + var m Metric + var f float64 + var err error + for i, res := range r.Data.Result { + f, err = strconv.ParseFloat(res.TV[1].(string), 64) + if err != nil { + return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %s", res, res.TV[1], err) + } + m.Labels = nil + for k, v := range r.Data.Result[i].Labels { + m.Labels = append(m.Labels, Label{Name: k, Value: v}) + } + m.Timestamp = int64(res.TV[0].(float64)) + m.Value = f + ms = append(ms, m) + } + return ms, nil +} + +const queryPath = "/api/v1/query?query=" + +// VMStorage represents vmstorage entity with ability to read and write metrics +type VMStorage struct { + c *http.Client + queryURL string + basicAuthUser, basicAuthPass string +} + +// NewVMStorage is a constructor for VMStorage +func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, c *http.Client) *VMStorage { + return &VMStorage{ + c: c, + basicAuthUser: basicAuthUser, + basicAuthPass: basicAuthPass, + queryURL: strings.TrimSuffix(baseURL, "/") + queryPath, + } +} + +// Query reads metrics from datasource by given query +func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) { + const ( + statusSuccess, statusError, rtVector = "success", "error", "vector" + ) + req, err := http.NewRequest("POST", s.queryURL+url.QueryEscape(query), nil) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + if s.basicAuthPass != "" { + req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass) + } + resp, err := s.c.Do(req.WithContext(ctx)) + if err != nil { + return nil, fmt.Errorf("error getting response from %s:%s", req.URL, err) + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + return nil, fmt.Errorf("datasource returns unxeprected response code %d for %s with err %s. Reponse body %s", resp.StatusCode, req.URL, err, body) + } + r := &response{} + if err := json.NewDecoder(resp.Body).Decode(r); err != nil { + return nil, fmt.Errorf("error parsing metrics for %s:%s", req.URL, err) + } + if r.Status == statusError { + return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error) + } + if r.Status != statusSuccess { + return nil, fmt.Errorf("unkown status:%s, Expected success or error ", r.Status) + } + if r.Data.ResultType != rtVector { + return nil, fmt.Errorf("unkown restul type:%s. Expected vector", r.Data.ResultType) + } + return r.metrics() +} diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go new file mode 100644 index 000000000..cc1382815 --- /dev/null +++ b/app/vmalert/datasource/vm_test.go @@ -0,0 +1,93 @@ +package datasource + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" +) + +var ( + ctx = context.Background() + basicAuthName = "foo" + basicAuthPass = "bar" + query = "vm_rows" +) + +func TestVMSelectQuery(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { + t.Errorf("should not be called") + }) + c := -1 + mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) { + c++ + if r.Method != http.MethodPost { + t.Errorf("expected POST method got %s", r.Method) + } + if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass { + t.Errorf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass) + } + if r.URL.Query().Get("query") != query { + t.Errorf("exptected %s in query param, got %s", query, r.URL.Query().Get("query")) + } + switch c { + case 0: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 1: + w.WriteHeader(500) + case 2: + w.Write([]byte("[]")) + case 3: + w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`)) + case 4: + w.Write([]byte(`{"status":"unknown"}`)) + case 5: + w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`)) + case 6: + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]}]}}`)) + } + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, srv.Client()) + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected connection error got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected invalid response status error got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected response body error got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected error status got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected unkown status got nil") + } + if _, err := am.Query(ctx, query); err == nil { + t.Fatalf("expected non-vector resultType error got nil") + } + m, err := am.Query(ctx, query) + if err != nil { + t.Fatalf("unexpected %s", err) + } + if len(m) != 1 { + t.Fatalf("exptected 1 metric got %d in %+v", len(m), m) + } + expected := Metric{ + Labels: []Label{{Value: "vm_rows", Name: "__name__"}}, + Timestamp: 1583786142, + Value: 13763, + } + if m[0].Timestamp != expected.Timestamp && + m[0].Value != expected.Value && + m[0].Labels[0].Value != expected.Labels[0].Value && + m[0].Labels[0].Name != expected.Labels[0].Name { + t.Fatalf("unexpected metric %+v want %+v", m[0], expected) + } + +} diff --git a/app/vmalert/main.go b/app/vmalert/main.go index b613e6f32..c3a01f251 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -1,11 +1,17 @@ package main import ( + "context" "flag" + "fmt" + "net" "net/http" + "strings" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/provider" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -16,22 +22,35 @@ import ( var ( configPath = flag.String("config", "config.yaml", "Path to alert configuration file") httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") + + datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428") + basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url") + basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url") + evaluationInterval = flag.Duration("evaluationInterval", 1*time.Minute, "How often to evaluate the rules. Default 1m") + providerURL = flag.String("provider.url", "", "Prometheus alertmanager url. Required parameter. e.g. http://127.0.0.1:9093") ) func main() { envflag.Parse() buildinfo.Init() logger.Init() + ctx, cancel := context.WithCancel(context.Background()) logger.Infof("reading alert rules configuration file from %s", *configPath) alertGroups, err := config.Parse(*configPath) if err != nil { logger.Fatalf("Cannot parse configuration file %s", err) } - w := &watchdog{storage: &datasource.VMStorage{}} + addr := getWebServerAddr(*httpListenAddr, false) + w := &watchdog{ + storage: datasource.NewVMStorage(*datasourceURL, *basicAuthUsername, *basicAuthPassword, &http.Client{}), + alertProvider: provider.NewAlertManager(*providerURL, func(group, name string) string { + return addr + fmt.Sprintf("/%s/%s/status", group, name) + }, &http.Client{}), + } for id := range alertGroups { go func(group config.Group) { - w.run(group) + w.run(ctx, group, *evaluationInterval) }(alertGroups[id]) } go func() { @@ -44,15 +63,70 @@ func main() { if err := httpserver.Stop(*httpListenAddr); err != nil { logger.Fatalf("cannot stop the webservice: %s", err) } + cancel() w.stop() } type watchdog struct { - storage *datasource.VMStorage + storage *datasource.VMStorage + alertProvider provider.AlertProvider } -func (w *watchdog) run(a config.Group) { +func (w *watchdog) run(ctx context.Context, a config.Group, evaluationInterval time.Duration) { + t := time.NewTicker(evaluationInterval) + var metrics []datasource.Metric + var err error + var alerts []provider.Alert + defer t.Stop() + for { + select { + case <-t.C: + for _, r := range a.Rules { + if metrics, err = w.storage.Query(ctx, r.Expr); err != nil { + logger.Errorf("error reading metrics %s", err) + continue + } + // todo check for and calculate alert states + if len(metrics) < 1 { + continue + } + alerts = provider.AlertsFromMetrics(metrics, a.Name, r) + // todo save to storage + if err := w.alertProvider.Send(alerts); err != nil { + logger.Errorf("error sending alerts %s", err) + continue + } + // todo is alert still active/pending? + } + case <-ctx.Done(): + logger.Infof("%s receive stop signal", a.Name) + return + } + } +} + +func getWebServerAddr(httpListenAddr string, isSecure bool) string { + if strings.Index(httpListenAddr, ":") != 0 { + if isSecure { + return "https://" + httpListenAddr + } + return "http://" + httpListenAddr + } + + addrs, err := net.InterfaceAddrs() + if err != nil { + panic("error getting the interface addresses ") + } + for _, a := range addrs { + if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return "http://" + ipnet.IP.String() + httpListenAddr + } + } + } + // no loopback ip return internal address + return "http://127.0.0.1" + httpListenAddr } func (w *watchdog) stop() { diff --git a/app/vmalert/provider/alert_manager_request.qtpl b/app/vmalert/provider/alert_manager_request.qtpl index edcec0268..dc868f592 100644 --- a/app/vmalert/provider/alert_manager_request.qtpl +++ b/app/vmalert/provider/alert_manager_request.qtpl @@ -1,26 +1,33 @@ {% import ( "time" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" ) %} {% stripspace %} -{% func amRequest(alert *config.Alert, generatorURL string) %} +{% func amRequest(alerts []Alert, generatorURL func(string, string) string) %} +[ +{% for i, alert := range alerts %} { "startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %}, - "generatorURL": {%q= generatorURL %}, + "generatorURL": {%q= generatorURL(alert.Group, alert.Name) %}, {% if !alert.End.IsZero() %} "endsAt":{%q= alert.End.Format(time.RFC3339Nano) %}, {% endif %} "labels": { "alertname":{%q= alert.Name %} - {% for k,v := range alert.Labels %} - ,{%q= k %}:{%q= v %} + {% for _,v := range alert.Labels %} + ,{%q= v.Name %}:{%q= v.Value %} {% endfor %} }, "annotations": { - "summary": {%q= alert.Annotations.Summary %}, - "description": {%q= alert.Annotations.Description %} + {% code c := len(alert.Annotations) %} + {% for k,v := range alert.Annotations %} + {% code c = c-1 %} + {%q= k %}:{%q= v %}{% if c > 0 %},{% endif %} + {% endfor %} } } +{% if i != len(alerts)-1 %},{% endif %} +{% endfor %} +] {% endfunc %} {% endstripspace %} diff --git a/app/vmalert/provider/alert_manager_request.qtpl.go b/app/vmalert/provider/alert_manager_request.qtpl.go index 4fea9441a..3af065e6b 100644 --- a/app/vmalert/provider/alert_manager_request.qtpl.go +++ b/app/vmalert/provider/alert_manager_request.qtpl.go @@ -6,96 +6,125 @@ package provider //line app/vmalert/provider/alert_manager_request.qtpl:1 import ( - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "time" ) -//line app/vmalert/provider/alert_manager_request.qtpl:7 +//line app/vmalert/provider/alert_manager_request.qtpl:6 import ( qtio422016 "io" qt422016 "github.com/valyala/quicktemplate" ) -//line app/vmalert/provider/alert_manager_request.qtpl:7 +//line app/vmalert/provider/alert_manager_request.qtpl:6 var ( _ = qtio422016.Copy _ = qt422016.AcquireByteBuffer ) -//line app/vmalert/provider/alert_manager_request.qtpl:7 -func streamamRequest(qw422016 *qt422016.Writer, alert *config.Alert, generatorURL string) { -//line app/vmalert/provider/alert_manager_request.qtpl:7 - qw422016.N().S(`{"startsAt":`) -//line app/vmalert/provider/alert_manager_request.qtpl:9 - qw422016.N().Q(alert.Start.Format(time.RFC3339Nano)) -//line app/vmalert/provider/alert_manager_request.qtpl:9 - qw422016.N().S(`,"generatorURL":`) +//line app/vmalert/provider/alert_manager_request.qtpl:6 +func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) { +//line app/vmalert/provider/alert_manager_request.qtpl:6 + qw422016.N().S(`[`) +//line app/vmalert/provider/alert_manager_request.qtpl:8 + for i, alert := range alerts { +//line app/vmalert/provider/alert_manager_request.qtpl:8 + qw422016.N().S(`{"startsAt":`) //line app/vmalert/provider/alert_manager_request.qtpl:10 - qw422016.N().Q(generatorURL) + qw422016.N().Q(alert.Start.Format(time.RFC3339Nano)) //line app/vmalert/provider/alert_manager_request.qtpl:10 - qw422016.N().S(`,`) + qw422016.N().S(`,"generatorURL":`) //line app/vmalert/provider/alert_manager_request.qtpl:11 - if !alert.End.IsZero() { + qw422016.N().Q(generatorURL(alert.Group, alert.Name)) //line app/vmalert/provider/alert_manager_request.qtpl:11 - qw422016.N().S(`"endsAt":`) -//line app/vmalert/provider/alert_manager_request.qtpl:12 - qw422016.N().Q(alert.End.Format(time.RFC3339Nano)) -//line app/vmalert/provider/alert_manager_request.qtpl:12 qw422016.N().S(`,`) +//line app/vmalert/provider/alert_manager_request.qtpl:12 + if !alert.End.IsZero() { +//line app/vmalert/provider/alert_manager_request.qtpl:12 + qw422016.N().S(`"endsAt":`) //line app/vmalert/provider/alert_manager_request.qtpl:13 - } + qw422016.N().Q(alert.End.Format(time.RFC3339Nano)) //line app/vmalert/provider/alert_manager_request.qtpl:13 - qw422016.N().S(`"labels": {"alertname":`) -//line app/vmalert/provider/alert_manager_request.qtpl:15 - qw422016.N().Q(alert.Name) + qw422016.N().S(`,`) +//line app/vmalert/provider/alert_manager_request.qtpl:14 + } +//line app/vmalert/provider/alert_manager_request.qtpl:14 + qw422016.N().S(`"labels": {"alertname":`) //line app/vmalert/provider/alert_manager_request.qtpl:16 - for k, v := range alert.Labels { -//line app/vmalert/provider/alert_manager_request.qtpl:16 - qw422016.N().S(`,`) + qw422016.N().Q(alert.Name) //line app/vmalert/provider/alert_manager_request.qtpl:17 - qw422016.N().Q(k) + for _, v := range alert.Labels { //line app/vmalert/provider/alert_manager_request.qtpl:17 - qw422016.N().S(`:`) -//line app/vmalert/provider/alert_manager_request.qtpl:17 - qw422016.N().Q(v) + qw422016.N().S(`,`) //line app/vmalert/provider/alert_manager_request.qtpl:18 - } + qw422016.N().Q(v.Name) //line app/vmalert/provider/alert_manager_request.qtpl:18 - qw422016.N().S(`},"annotations": {"summary":`) -//line app/vmalert/provider/alert_manager_request.qtpl:21 - qw422016.N().Q(alert.Annotations.Summary) -//line app/vmalert/provider/alert_manager_request.qtpl:21 - qw422016.N().S(`,"description":`) + qw422016.N().S(`:`) +//line app/vmalert/provider/alert_manager_request.qtpl:18 + qw422016.N().Q(v.Value) +//line app/vmalert/provider/alert_manager_request.qtpl:19 + } +//line app/vmalert/provider/alert_manager_request.qtpl:19 + qw422016.N().S(`},"annotations": {`) //line app/vmalert/provider/alert_manager_request.qtpl:22 - qw422016.N().Q(alert.Annotations.Description) -//line app/vmalert/provider/alert_manager_request.qtpl:22 - qw422016.N().S(`}}`) -//line app/vmalert/provider/alert_manager_request.qtpl:25 -} + c := len(alert.Annotations) + +//line app/vmalert/provider/alert_manager_request.qtpl:23 + for k, v := range alert.Annotations { +//line app/vmalert/provider/alert_manager_request.qtpl:24 + c = c - 1 //line app/vmalert/provider/alert_manager_request.qtpl:25 -func writeamRequest(qq422016 qtio422016.Writer, alert *config.Alert, generatorURL string) { + qw422016.N().Q(k) //line app/vmalert/provider/alert_manager_request.qtpl:25 + qw422016.N().S(`:`) +//line app/vmalert/provider/alert_manager_request.qtpl:25 + qw422016.N().Q(v) +//line app/vmalert/provider/alert_manager_request.qtpl:25 + if c > 0 { +//line app/vmalert/provider/alert_manager_request.qtpl:25 + qw422016.N().S(`,`) +//line app/vmalert/provider/alert_manager_request.qtpl:25 + } +//line app/vmalert/provider/alert_manager_request.qtpl:26 + } +//line app/vmalert/provider/alert_manager_request.qtpl:26 + qw422016.N().S(`}}`) +//line app/vmalert/provider/alert_manager_request.qtpl:29 + if i != len(alerts)-1 { +//line app/vmalert/provider/alert_manager_request.qtpl:29 + qw422016.N().S(`,`) +//line app/vmalert/provider/alert_manager_request.qtpl:29 + } +//line app/vmalert/provider/alert_manager_request.qtpl:30 + } +//line app/vmalert/provider/alert_manager_request.qtpl:30 + qw422016.N().S(`]`) +//line app/vmalert/provider/alert_manager_request.qtpl:32 +} + +//line app/vmalert/provider/alert_manager_request.qtpl:32 +func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) { +//line app/vmalert/provider/alert_manager_request.qtpl:32 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmalert/provider/alert_manager_request.qtpl:25 - streamamRequest(qw422016, alert, generatorURL) -//line app/vmalert/provider/alert_manager_request.qtpl:25 +//line app/vmalert/provider/alert_manager_request.qtpl:32 + streamamRequest(qw422016, alerts, generatorURL) +//line app/vmalert/provider/alert_manager_request.qtpl:32 qt422016.ReleaseWriter(qw422016) -//line app/vmalert/provider/alert_manager_request.qtpl:25 +//line app/vmalert/provider/alert_manager_request.qtpl:32 } -//line app/vmalert/provider/alert_manager_request.qtpl:25 -func amRequest(alert *config.Alert, generatorURL string) string { -//line app/vmalert/provider/alert_manager_request.qtpl:25 +//line app/vmalert/provider/alert_manager_request.qtpl:32 +func amRequest(alerts []Alert, generatorURL func(string, string) string) string { +//line app/vmalert/provider/alert_manager_request.qtpl:32 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmalert/provider/alert_manager_request.qtpl:25 - writeamRequest(qb422016, alert, generatorURL) -//line app/vmalert/provider/alert_manager_request.qtpl:25 +//line app/vmalert/provider/alert_manager_request.qtpl:32 + writeamRequest(qb422016, alerts, generatorURL) +//line app/vmalert/provider/alert_manager_request.qtpl:32 qs422016 := string(qb422016.B) -//line app/vmalert/provider/alert_manager_request.qtpl:25 +//line app/vmalert/provider/alert_manager_request.qtpl:32 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmalert/provider/alert_manager_request.qtpl:25 +//line app/vmalert/provider/alert_manager_request.qtpl:32 return qs422016 -//line app/vmalert/provider/alert_manager_request.qtpl:25 +//line app/vmalert/provider/alert_manager_request.qtpl:32 } diff --git a/app/vmalert/provider/alertmanager.go b/app/vmalert/provider/alertmanager.go index 1589c2c24..2c22a95fa 100644 --- a/app/vmalert/provider/alertmanager.go +++ b/app/vmalert/provider/alertmanager.go @@ -8,7 +8,6 @@ import ( "strings" "sync" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -26,7 +25,7 @@ type AlertManager struct { } // AlertURLGenerator returns URL to single alert by given name -type AlertURLGenerator func(name string) string +type AlertURLGenerator func(group, name string) string // NewAlertManager is a constructor for AlertManager func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager { @@ -37,19 +36,12 @@ func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Clien } } -const ( - jsonArrayOpen byte = 91 // [ - jsonArrayClose byte = 93 // ] -) - // Send an alert or resolve message -func (am *AlertManager) Send(alert *config.Alert) error { +func (am *AlertManager) Send(alerts []Alert) error { b := pool.Get().(*bytes.Buffer) b.Reset() defer pool.Put(b) - b.WriteByte(jsonArrayOpen) - writeamRequest(b, alert, am.argFunc(alert.Name)) - b.WriteByte(jsonArrayClose) + writeamRequest(b, alerts, am.argFunc) resp, err := am.client.Post(am.alertURL, "application/json", b) if err != nil { return err diff --git a/app/vmalert/provider/alertmanager_test.go b/app/vmalert/provider/alertmanager_test.go index cc4f32388..cbffb8363 100644 --- a/app/vmalert/provider/alertmanager_test.go +++ b/app/vmalert/provider/alertmanager_test.go @@ -6,8 +6,6 @@ import ( "net/http/httptest" "testing" "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" ) func TestAlertManager_Send(t *testing.T) { @@ -42,7 +40,7 @@ func TestAlertManager_Send(t *testing.T) { if len(a) != 1 { t.Errorf("expected 1 alert in array got %d", len(a)) } - if a[0].GeneratorURL != "alert0" { + if a[0].GeneratorURL != "group0alert0" { t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL) } if a[0].Labels["alertname"] != "alert0" { @@ -58,20 +56,22 @@ func TestAlertManager_Send(t *testing.T) { }) srv := httptest.NewServer(mux) defer srv.Close() - am := NewAlertManager(srv.URL, func(name string) string { - return name + am := NewAlertManager(srv.URL, func(group, name string) string { + return group + name }, srv.Client()) - if err := am.Send(&config.Alert{}); err == nil { + if err := am.Send([]Alert{{}, {}}); err == nil { t.Error("expected connection error got nil") } - if err := am.Send(&config.Alert{}); err == nil { + if err := am.Send([]Alert{}); err == nil { t.Error("expected wrong http code error got nil") } - if err := am.Send(&config.Alert{ - Name: "alert0", - Start: time.Now().UTC(), - End: time.Now().UTC(), - }); err != nil { + if err := am.Send([]Alert{{ + Group: "group0", + Name: "alert0", + Start: time.Now().UTC(), + End: time.Now().UTC(), + Annotations: map[string]string{"a": "b", "c": "d", "e": "f"}, + }}); err != nil { t.Errorf("unexpected error %s", err) } if c != 2 { diff --git a/app/vmalert/provider/common.go b/app/vmalert/provider/common.go index 6adfc39ee..5fcf72159 100644 --- a/app/vmalert/provider/common.go +++ b/app/vmalert/provider/common.go @@ -1,8 +1,65 @@ package provider -import "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" +import ( + "sort" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" +) // AlertProvider is common interface for alert manager provider type AlertProvider interface { - Send(rule config.Alert) error + Send(alerts []Alert) error +} + +// Alert the triggered alert +type Alert struct { + Group string + Name string + Labels []datasource.Label + Annotations map[string]string + + Start time.Time + End time.Time + Value float64 +} + +// AlertsFromMetrics converts metrics to alerts by alert Rule +func AlertsFromMetrics(metrics []datasource.Metric, group string, rule config.Rule) []Alert { + alerts := make([]Alert, 0, len(metrics)) + for i, m := range metrics { + a := Alert{ + Group: group, + Name: rule.Name, + Labels: metrics[i].Labels, + // todo eval template in annotations + Annotations: rule.Annotations, + Start: time.Unix(m.Timestamp, 0), + } + for k, v := range rule.Labels { + a.Labels = append(a.Labels, datasource.Label{ + Name: k, + Value: v, + }) + } + a.Labels = removeDuplicated(a.Labels) + alerts = append(alerts, a) + } + return alerts +} + +func removeDuplicated(l []datasource.Label) []datasource.Label { + sort.Slice(l, func(i, j int) bool { + return l[i].Name < l[j].Name + }) + j := 0 + for i := 1; i < len(l); i++ { + if l[j] == l[i] { + continue + } + j++ + l[j] = l[i] + } + return l[:j+1] } diff --git a/app/vmalert/storage/memory.go b/app/vmalert/storage/memory.go new file mode 100644 index 000000000..82be0547e --- /dev/null +++ b/app/vmalert/storage/memory.go @@ -0,0 +1 @@ +package storage