[vmalert] integration with AlertManager (#325)

This commit is contained in:
kreedom 2020-02-21 23:15:05 +02:00 committed by GitHub
parent 2f55cabaa4
commit 49390b8dbc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 273 additions and 10 deletions

View file

@ -72,6 +72,7 @@ errcheck: install-errcheck
errcheck -exclude=errcheck_excludes.txt ./app/vmstorage/...
errcheck -exclude=errcheck_excludes.txt ./app/vmbackup/...
errcheck -exclude=errcheck_excludes.txt ./app/vmrestore/...
errcheck -exclude=errcheck_excludes.txt ./app/vmalert/...
install-errcheck:
which errcheck || GO111MODULE=off go get -u github.com/kisielk/errcheck

View file

@ -2,11 +2,6 @@ package config
import "time"
// Labels basic struct of different labels
type Labels struct {
Severity string
}
// Annotations basic annotation for alert rule
type Annotations struct {
Summary string
@ -18,8 +13,11 @@ type Alert struct {
Name string
Expr string
For time.Duration
Labels Labels
Labels map[string]string
Annotations Annotations
Start time.Time
End time.Time
}
// Group grouping array of alert

View file

@ -41,7 +41,9 @@ func main() {
}()
sig := procutil.WaitForSigterm()
logger.Infof("service received signal %s", sig)
httpserver.Stop(*httpListenAddr)
if err := httpserver.Stop(*httpListenAddr); err != nil {
logger.Fatalf("cannot stop the webservice: %s", err)
}
w.stop()
}

View file

@ -0,0 +1,26 @@
{% import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
) %}
{% stripspace %}
{% func amRequest(alert *config.Alert, generatorURL string) %}
{
"startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %},
"generatorURL": {%q= generatorURL %},
{% if !alert.End.IsZero() %}
"endsAt":{%q= alert.End.Format(time.RFC3339Nano) %},
{% endif %}
"labels": {
"alertname":{%q= alert.Name %}
{% for k,v := range alert.Labels %}
,{%q= k %}:{%q= v %}
{% endfor %}
},
"annotations": {
"summary": {%q= alert.Annotations.Summary %},
"description": {%q= alert.Annotations.Description %}
}
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,101 @@
// Code generated by qtc from "alert_manager_request.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmalert/provider/alert_manager_request.qtpl:1
package provider
//line app/vmalert/provider/alert_manager_request.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"time"
)
//line app/vmalert/provider/alert_manager_request.qtpl:7
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmalert/provider/alert_manager_request.qtpl:7
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmalert/provider/alert_manager_request.qtpl:7
func streamamRequest(qw422016 *qt422016.Writer, alert *config.Alert, generatorURL string) {
//line app/vmalert/provider/alert_manager_request.qtpl:7
qw422016.N().S(`{"startsAt":`)
//line app/vmalert/provider/alert_manager_request.qtpl:9
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
//line app/vmalert/provider/alert_manager_request.qtpl:9
qw422016.N().S(`,"generatorURL":`)
//line app/vmalert/provider/alert_manager_request.qtpl:10
qw422016.N().Q(generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:10
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:11
if !alert.End.IsZero() {
//line app/vmalert/provider/alert_manager_request.qtpl:11
qw422016.N().S(`"endsAt":`)
//line app/vmalert/provider/alert_manager_request.qtpl:12
qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
//line app/vmalert/provider/alert_manager_request.qtpl:12
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:13
}
//line app/vmalert/provider/alert_manager_request.qtpl:13
qw422016.N().S(`"labels": {"alertname":`)
//line app/vmalert/provider/alert_manager_request.qtpl:15
qw422016.N().Q(alert.Name)
//line app/vmalert/provider/alert_manager_request.qtpl:16
for k, v := range alert.Labels {
//line app/vmalert/provider/alert_manager_request.qtpl:16
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:17
qw422016.N().Q(k)
//line app/vmalert/provider/alert_manager_request.qtpl:17
qw422016.N().S(`:`)
//line app/vmalert/provider/alert_manager_request.qtpl:17
qw422016.N().Q(v)
//line app/vmalert/provider/alert_manager_request.qtpl:18
}
//line app/vmalert/provider/alert_manager_request.qtpl:18
qw422016.N().S(`},"annotations": {"summary":`)
//line app/vmalert/provider/alert_manager_request.qtpl:21
qw422016.N().Q(alert.Annotations.Summary)
//line app/vmalert/provider/alert_manager_request.qtpl:21
qw422016.N().S(`,"description":`)
//line app/vmalert/provider/alert_manager_request.qtpl:22
qw422016.N().Q(alert.Annotations.Description)
//line app/vmalert/provider/alert_manager_request.qtpl:22
qw422016.N().S(`}}`)
//line app/vmalert/provider/alert_manager_request.qtpl:25
}
//line app/vmalert/provider/alert_manager_request.qtpl:25
func writeamRequest(qq422016 qtio422016.Writer, alert *config.Alert, generatorURL string) {
//line app/vmalert/provider/alert_manager_request.qtpl:25
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/provider/alert_manager_request.qtpl:25
streamamRequest(qw422016, alert, generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:25
qt422016.ReleaseWriter(qw422016)
//line app/vmalert/provider/alert_manager_request.qtpl:25
}
//line app/vmalert/provider/alert_manager_request.qtpl:25
func amRequest(alert *config.Alert, generatorURL string) string {
//line app/vmalert/provider/alert_manager_request.qtpl:25
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/provider/alert_manager_request.qtpl:25
writeamRequest(qb422016, alert, generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:25
qs422016 := string(qb422016.B)
//line app/vmalert/provider/alert_manager_request.qtpl:25
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/provider/alert_manager_request.qtpl:25
return qs422016
//line app/vmalert/provider/alert_manager_request.qtpl:25
}

View file

@ -1,11 +1,66 @@
package provider
import "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
import (
"bytes"
"fmt"
"io"
"net/http"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
const alertsPath = "/api/v2/alerts"
var pool = sync.Pool{New: func() interface{} {
return &bytes.Buffer{}
}}
// AlertManager represents integration provider with Prometheus alert manager
type AlertManager struct{}
type AlertManager struct {
alertURL string
argFunc AlertURLGenerator
client *http.Client
}
// AlertURLGenerator returns URL to single alert by given name
type AlertURLGenerator func(name string) string
// NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager {
return &AlertManager{
alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertsPath,
argFunc: fn,
client: c,
}
}
const (
jsonArrayOpen byte = 91 // [
jsonArrayClose byte = 93 // ]
)
// Send an alert or resolve message
func (a *AlertManager) Send(rule config.Alert) error {
func (am *AlertManager) Send(alert *config.Alert) error {
b := pool.Get().(*bytes.Buffer)
b.Reset()
defer pool.Put(b)
b.WriteByte(jsonArrayOpen)
writeamRequest(b, alert, am.argFunc(alert.Name))
b.WriteByte(jsonArrayClose)
resp, err := am.client.Post(am.alertURL, "application/json", b)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
b.Reset()
if _, err := io.Copy(b, resp.Body); err != nil {
logger.Errorf("unable to copy error response body to buffer %s", err)
}
return fmt.Errorf("invalid response from alertmanager %s", b)
}
return nil
}

View file

@ -0,0 +1,80 @@
package provider
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
)
func TestAlertManager_Send(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
t.Errorf("should not be called")
})
c := -1
mux.HandleFunc(alertsPath, func(w http.ResponseWriter, r *http.Request) {
c++
if r.Method != http.MethodPost {
t.Errorf("expected POST method got %s", r.Method)
}
switch c {
case 0:
conn, _, _ := w.(http.Hijacker).Hijack()
_ = conn.Close()
case 1:
w.WriteHeader(500)
case 2:
var a []struct {
Labels map[string]string `json:"labels"`
StartsAt time.Time `json:"startsAt"`
EndAt time.Time `json:"endsAt"`
Annotations map[string]string `json:"annotations"`
GeneratorURL string `json:"generatorURL"`
}
if err := json.NewDecoder(r.Body).Decode(&a); err != nil {
t.Errorf("can not unmarshal data into alert %s", err)
t.FailNow()
}
if len(a) != 1 {
t.Errorf("expected 1 alert in array got %d", len(a))
}
if a[0].GeneratorURL != "alert0" {
t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL)
}
if a[0].Labels["alertname"] != "alert0" {
t.Errorf("exptected alert0 as alert name got %s", a[0].Labels["alertname"])
}
if a[0].StartsAt.IsZero() {
t.Errorf("exptected non-zero start time")
}
if a[0].EndAt.IsZero() {
t.Errorf("exptected non-zero end time")
}
}
})
srv := httptest.NewServer(mux)
defer srv.Close()
am := NewAlertManager(srv.URL, func(name string) string {
return name
}, srv.Client())
if err := am.Send(&config.Alert{}); err == nil {
t.Error("expected connection error got nil")
}
if err := am.Send(&config.Alert{}); err == nil {
t.Error("expected wrong http code error got nil")
}
if err := am.Send(&config.Alert{
Name: "alert0",
Start: time.Now().UTC(),
End: time.Now().UTC(),
}); err != nil {
t.Errorf("unexpected error %s", err)
}
if c != 2 {
t.Errorf("expected 2 calls(count from zero) to server got %d", c)
}
}