vmalert add vm datasource, change alertmanager (#364)

* vmalert add vm datasource, change alertmanager

* make linter be happy

* make linter be happy.2

* PR comments

* PR comments.1
This commit is contained in:
kreedom 2020-03-13 12:19:31 +02:00 committed by GitHub
parent 499594f421
commit a746cb62b6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 478 additions and 115 deletions

View file

@ -2,31 +2,37 @@ package config
import "time" import "time"
// Annotations basic annotation for alert rule // Rule is basic alert entity
type Annotations struct { type Rule struct {
Summary string
Description string
}
// Alert basic alert entity rule
type Alert struct {
Name string Name string
Expr string Expr string
For time.Duration For time.Duration
Labels map[string]string Labels map[string]string
Annotations Annotations Annotations map[string]string
Start time.Time
End time.Time
} }
// Group grouping array of alert // Group grouping array of alert
type Group struct { type Group struct {
Name string Name string
Rules []Alert Rules []Rule
} }
// Parse parses config from given file // Parse parses config from given file
func Parse(filepath string) ([]Group, error) { func Parse(filepath string) ([]Group, error) {
return []Group{}, nil return []Group{{
Name: "foobar",
Rules: []Rule{{
Name: "vmrowsalert",
Expr: "vm_rows",
For: 1 * time.Second,
Labels: map[string]string{
"alert_label": "value1",
"alert_label2": "value2",
},
Annotations: map[string]string{
"summary": "{{ $value }}",
"description": "LABELS: {{ $labels }}",
},
}},
}}, nil
} }

View file

@ -1,14 +1,15 @@
package datasource package datasource
import "context" // Metric is the basic entity which should be return by datasource
// It represents single data point with full list of labels
// Metrics the data returns from storage type Metric struct {
type Metrics struct{} Labels []Label
Timestamp int64
// VMStorage represents vmstorage entity with ability to read and write metrics Value float64
type VMStorage struct{} }
//Query basic query to the datasource // Labels represents metric's label
func (s *VMStorage) Query(ctx context.Context, query string) ([]Metrics, error) { type Label struct {
return nil, nil Name string
Value string
} }

View file

@ -0,0 +1,103 @@
package datasource
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
)
type response struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Labels map[string]string `json:"metric"`
TV [2]interface{} `json:"value"`
} `json:"result"`
} `json:"data"`
ErrorType string `json:"errorType"`
Error string `json:"error"`
}
func (r response) metrics() ([]Metric, error) {
var ms []Metric
var m Metric
var f float64
var err error
for i, res := range r.Data.Result {
f, err = strconv.ParseFloat(res.TV[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %s", res, res.TV[1], err)
}
m.Labels = nil
for k, v := range r.Data.Result[i].Labels {
m.Labels = append(m.Labels, Label{Name: k, Value: v})
}
m.Timestamp = int64(res.TV[0].(float64))
m.Value = f
ms = append(ms, m)
}
return ms, nil
}
const queryPath = "/api/v1/query?query="
// VMStorage represents vmstorage entity with ability to read and write metrics
type VMStorage struct {
c *http.Client
queryURL string
basicAuthUser, basicAuthPass string
}
// NewVMStorage is a constructor for VMStorage
func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, c *http.Client) *VMStorage {
return &VMStorage{
c: c,
basicAuthUser: basicAuthUser,
basicAuthPass: basicAuthPass,
queryURL: strings.TrimSuffix(baseURL, "/") + queryPath,
}
}
// Query reads metrics from datasource by given query
func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
const (
statusSuccess, statusError, rtVector = "success", "error", "vector"
)
req, err := http.NewRequest("POST", s.queryURL+url.QueryEscape(query), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if s.basicAuthPass != "" {
req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass)
}
resp, err := s.c.Do(req.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("error getting response from %s:%s", req.URL, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("datasource returns unxeprected response code %d for %s with err %s. Reponse body %s", resp.StatusCode, req.URL, err, body)
}
r := &response{}
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
return nil, fmt.Errorf("error parsing metrics for %s:%s", req.URL, err)
}
if r.Status == statusError {
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error)
}
if r.Status != statusSuccess {
return nil, fmt.Errorf("unkown status:%s, Expected success or error ", r.Status)
}
if r.Data.ResultType != rtVector {
return nil, fmt.Errorf("unkown restul type:%s. Expected vector", r.Data.ResultType)
}
return r.metrics()
}

View file

@ -0,0 +1,93 @@
package datasource
import (
"context"
"net/http"
"net/http/httptest"
"testing"
)
var (
ctx = context.Background()
basicAuthName = "foo"
basicAuthPass = "bar"
query = "vm_rows"
)
func TestVMSelectQuery(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
t.Errorf("should not be called")
})
c := -1
mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) {
c++
if r.Method != http.MethodPost {
t.Errorf("expected POST method got %s", r.Method)
}
if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass {
t.Errorf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass)
}
if r.URL.Query().Get("query") != query {
t.Errorf("exptected %s in query param, got %s", query, r.URL.Query().Get("query"))
}
switch c {
case 0:
conn, _, _ := w.(http.Hijacker).Hijack()
_ = conn.Close()
case 1:
w.WriteHeader(500)
case 2:
w.Write([]byte("[]"))
case 3:
w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`))
case 4:
w.Write([]byte(`{"status":"unknown"}`))
case 5:
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`))
case 6:
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]}]}}`))
}
})
srv := httptest.NewServer(mux)
defer srv.Close()
am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, srv.Client())
if _, err := am.Query(ctx, query); err == nil {
t.Fatalf("expected connection error got nil")
}
if _, err := am.Query(ctx, query); err == nil {
t.Fatalf("expected invalid response status error got nil")
}
if _, err := am.Query(ctx, query); err == nil {
t.Fatalf("expected response body error got nil")
}
if _, err := am.Query(ctx, query); err == nil {
t.Fatalf("expected error status got nil")
}
if _, err := am.Query(ctx, query); err == nil {
t.Fatalf("expected unkown status got nil")
}
if _, err := am.Query(ctx, query); err == nil {
t.Fatalf("expected non-vector resultType error got nil")
}
m, err := am.Query(ctx, query)
if err != nil {
t.Fatalf("unexpected %s", err)
}
if len(m) != 1 {
t.Fatalf("exptected 1 metric got %d in %+v", len(m), m)
}
expected := Metric{
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
Timestamp: 1583786142,
Value: 13763,
}
if m[0].Timestamp != expected.Timestamp &&
m[0].Value != expected.Value &&
m[0].Labels[0].Value != expected.Labels[0].Value &&
m[0].Labels[0].Name != expected.Labels[0].Name {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
}
}

View file

@ -1,11 +1,17 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt"
"net"
"net/http" "net/http"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/provider"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -16,22 +22,35 @@ import (
var ( var (
configPath = flag.String("config", "config.yaml", "Path to alert configuration file") configPath = flag.String("config", "config.yaml", "Path to alert configuration file")
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections") httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428")
basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url")
basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url")
evaluationInterval = flag.Duration("evaluationInterval", 1*time.Minute, "How often to evaluate the rules. Default 1m")
providerURL = flag.String("provider.url", "", "Prometheus alertmanager url. Required parameter. e.g. http://127.0.0.1:9093")
) )
func main() { func main() {
envflag.Parse() envflag.Parse()
buildinfo.Init() buildinfo.Init()
logger.Init() logger.Init()
ctx, cancel := context.WithCancel(context.Background())
logger.Infof("reading alert rules configuration file from %s", *configPath) logger.Infof("reading alert rules configuration file from %s", *configPath)
alertGroups, err := config.Parse(*configPath) alertGroups, err := config.Parse(*configPath)
if err != nil { if err != nil {
logger.Fatalf("Cannot parse configuration file %s", err) logger.Fatalf("Cannot parse configuration file %s", err)
} }
w := &watchdog{storage: &datasource.VMStorage{}} addr := getWebServerAddr(*httpListenAddr, false)
w := &watchdog{
storage: datasource.NewVMStorage(*datasourceURL, *basicAuthUsername, *basicAuthPassword, &http.Client{}),
alertProvider: provider.NewAlertManager(*providerURL, func(group, name string) string {
return addr + fmt.Sprintf("/%s/%s/status", group, name)
}, &http.Client{}),
}
for id := range alertGroups { for id := range alertGroups {
go func(group config.Group) { go func(group config.Group) {
w.run(group) w.run(ctx, group, *evaluationInterval)
}(alertGroups[id]) }(alertGroups[id])
} }
go func() { go func() {
@ -44,15 +63,70 @@ func main() {
if err := httpserver.Stop(*httpListenAddr); err != nil { if err := httpserver.Stop(*httpListenAddr); err != nil {
logger.Fatalf("cannot stop the webservice: %s", err) logger.Fatalf("cannot stop the webservice: %s", err)
} }
cancel()
w.stop() w.stop()
} }
type watchdog struct { type watchdog struct {
storage *datasource.VMStorage storage *datasource.VMStorage
alertProvider provider.AlertProvider
} }
func (w *watchdog) run(a config.Group) { func (w *watchdog) run(ctx context.Context, a config.Group, evaluationInterval time.Duration) {
t := time.NewTicker(evaluationInterval)
var metrics []datasource.Metric
var err error
var alerts []provider.Alert
defer t.Stop()
for {
select {
case <-t.C:
for _, r := range a.Rules {
if metrics, err = w.storage.Query(ctx, r.Expr); err != nil {
logger.Errorf("error reading metrics %s", err)
continue
}
// todo check for and calculate alert states
if len(metrics) < 1 {
continue
}
alerts = provider.AlertsFromMetrics(metrics, a.Name, r)
// todo save to storage
if err := w.alertProvider.Send(alerts); err != nil {
logger.Errorf("error sending alerts %s", err)
continue
}
// todo is alert still active/pending?
}
case <-ctx.Done():
logger.Infof("%s receive stop signal", a.Name)
return
}
}
}
func getWebServerAddr(httpListenAddr string, isSecure bool) string {
if strings.Index(httpListenAddr, ":") != 0 {
if isSecure {
return "https://" + httpListenAddr
}
return "http://" + httpListenAddr
}
addrs, err := net.InterfaceAddrs()
if err != nil {
panic("error getting the interface addresses ")
}
for _, a := range addrs {
if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return "http://" + ipnet.IP.String() + httpListenAddr
}
}
}
// no loopback ip return internal address
return "http://127.0.0.1" + httpListenAddr
} }
func (w *watchdog) stop() { func (w *watchdog) stop() {

View file

@ -1,26 +1,33 @@
{% import ( {% import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
) %} ) %}
{% stripspace %} {% stripspace %}
{% func amRequest(alert *config.Alert, generatorURL string) %} {% func amRequest(alerts []Alert, generatorURL func(string, string) string) %}
[
{% for i, alert := range alerts %}
{ {
"startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %}, "startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %},
"generatorURL": {%q= generatorURL %}, "generatorURL": {%q= generatorURL(alert.Group, alert.Name) %},
{% if !alert.End.IsZero() %} {% if !alert.End.IsZero() %}
"endsAt":{%q= alert.End.Format(time.RFC3339Nano) %}, "endsAt":{%q= alert.End.Format(time.RFC3339Nano) %},
{% endif %} {% endif %}
"labels": { "labels": {
"alertname":{%q= alert.Name %} "alertname":{%q= alert.Name %}
{% for k,v := range alert.Labels %} {% for _,v := range alert.Labels %}
,{%q= k %}:{%q= v %} ,{%q= v.Name %}:{%q= v.Value %}
{% endfor %} {% endfor %}
}, },
"annotations": { "annotations": {
"summary": {%q= alert.Annotations.Summary %}, {% code c := len(alert.Annotations) %}
"description": {%q= alert.Annotations.Description %} {% for k,v := range alert.Annotations %}
{% code c = c-1 %}
{%q= k %}:{%q= v %}{% if c > 0 %},{% endif %}
{% endfor %}
} }
} }
{% if i != len(alerts)-1 %},{% endif %}
{% endfor %}
]
{% endfunc %} {% endfunc %}
{% endstripspace %} {% endstripspace %}

View file

@ -6,96 +6,125 @@ package provider
//line app/vmalert/provider/alert_manager_request.qtpl:1 //line app/vmalert/provider/alert_manager_request.qtpl:1
import ( import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"time" "time"
) )
//line app/vmalert/provider/alert_manager_request.qtpl:7 //line app/vmalert/provider/alert_manager_request.qtpl:6
import ( import (
qtio422016 "io" qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate" qt422016 "github.com/valyala/quicktemplate"
) )
//line app/vmalert/provider/alert_manager_request.qtpl:7 //line app/vmalert/provider/alert_manager_request.qtpl:6
var ( var (
_ = qtio422016.Copy _ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer _ = qt422016.AcquireByteBuffer
) )
//line app/vmalert/provider/alert_manager_request.qtpl:7 //line app/vmalert/provider/alert_manager_request.qtpl:6
func streamamRequest(qw422016 *qt422016.Writer, alert *config.Alert, generatorURL string) { func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line app/vmalert/provider/alert_manager_request.qtpl:7 //line app/vmalert/provider/alert_manager_request.qtpl:6
qw422016.N().S(`[`)
//line app/vmalert/provider/alert_manager_request.qtpl:8
for i, alert := range alerts {
//line app/vmalert/provider/alert_manager_request.qtpl:8
qw422016.N().S(`{"startsAt":`) qw422016.N().S(`{"startsAt":`)
//line app/vmalert/provider/alert_manager_request.qtpl:9 //line app/vmalert/provider/alert_manager_request.qtpl:10
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano)) qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
//line app/vmalert/provider/alert_manager_request.qtpl:9 //line app/vmalert/provider/alert_manager_request.qtpl:10
qw422016.N().S(`,"generatorURL":`) qw422016.N().S(`,"generatorURL":`)
//line app/vmalert/provider/alert_manager_request.qtpl:10
qw422016.N().Q(generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:10
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:11 //line app/vmalert/provider/alert_manager_request.qtpl:11
qw422016.N().Q(generatorURL(alert.Group, alert.Name))
//line app/vmalert/provider/alert_manager_request.qtpl:11
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:12
if !alert.End.IsZero() { if !alert.End.IsZero() {
//line app/vmalert/provider/alert_manager_request.qtpl:11 //line app/vmalert/provider/alert_manager_request.qtpl:12
qw422016.N().S(`"endsAt":`) qw422016.N().S(`"endsAt":`)
//line app/vmalert/provider/alert_manager_request.qtpl:12 //line app/vmalert/provider/alert_manager_request.qtpl:13
qw422016.N().Q(alert.End.Format(time.RFC3339Nano)) qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
//line app/vmalert/provider/alert_manager_request.qtpl:12 //line app/vmalert/provider/alert_manager_request.qtpl:13
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:13 //line app/vmalert/provider/alert_manager_request.qtpl:14
} }
//line app/vmalert/provider/alert_manager_request.qtpl:13 //line app/vmalert/provider/alert_manager_request.qtpl:14
qw422016.N().S(`"labels": {"alertname":`) qw422016.N().S(`"labels": {"alertname":`)
//line app/vmalert/provider/alert_manager_request.qtpl:15 //line app/vmalert/provider/alert_manager_request.qtpl:16
qw422016.N().Q(alert.Name) qw422016.N().Q(alert.Name)
//line app/vmalert/provider/alert_manager_request.qtpl:16 //line app/vmalert/provider/alert_manager_request.qtpl:17
for k, v := range alert.Labels { for _, v := range alert.Labels {
//line app/vmalert/provider/alert_manager_request.qtpl:16 //line app/vmalert/provider/alert_manager_request.qtpl:17
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:17 //line app/vmalert/provider/alert_manager_request.qtpl:18
qw422016.N().Q(k) qw422016.N().Q(v.Name)
//line app/vmalert/provider/alert_manager_request.qtpl:17 //line app/vmalert/provider/alert_manager_request.qtpl:18
qw422016.N().S(`:`) qw422016.N().S(`:`)
//line app/vmalert/provider/alert_manager_request.qtpl:17 //line app/vmalert/provider/alert_manager_request.qtpl:18
qw422016.N().Q(v.Value)
//line app/vmalert/provider/alert_manager_request.qtpl:19
}
//line app/vmalert/provider/alert_manager_request.qtpl:19
qw422016.N().S(`},"annotations": {`)
//line app/vmalert/provider/alert_manager_request.qtpl:22
c := len(alert.Annotations)
//line app/vmalert/provider/alert_manager_request.qtpl:23
for k, v := range alert.Annotations {
//line app/vmalert/provider/alert_manager_request.qtpl:24
c = c - 1
//line app/vmalert/provider/alert_manager_request.qtpl:25
qw422016.N().Q(k)
//line app/vmalert/provider/alert_manager_request.qtpl:25
qw422016.N().S(`:`)
//line app/vmalert/provider/alert_manager_request.qtpl:25
qw422016.N().Q(v) qw422016.N().Q(v)
//line app/vmalert/provider/alert_manager_request.qtpl:18 //line app/vmalert/provider/alert_manager_request.qtpl:25
if c > 0 {
//line app/vmalert/provider/alert_manager_request.qtpl:25
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:25
} }
//line app/vmalert/provider/alert_manager_request.qtpl:18 //line app/vmalert/provider/alert_manager_request.qtpl:26
qw422016.N().S(`},"annotations": {"summary":`) }
//line app/vmalert/provider/alert_manager_request.qtpl:21 //line app/vmalert/provider/alert_manager_request.qtpl:26
qw422016.N().Q(alert.Annotations.Summary)
//line app/vmalert/provider/alert_manager_request.qtpl:21
qw422016.N().S(`,"description":`)
//line app/vmalert/provider/alert_manager_request.qtpl:22
qw422016.N().Q(alert.Annotations.Description)
//line app/vmalert/provider/alert_manager_request.qtpl:22
qw422016.N().S(`}}`) qw422016.N().S(`}}`)
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:29
if i != len(alerts)-1 {
//line app/vmalert/provider/alert_manager_request.qtpl:29
qw422016.N().S(`,`)
//line app/vmalert/provider/alert_manager_request.qtpl:29
}
//line app/vmalert/provider/alert_manager_request.qtpl:30
}
//line app/vmalert/provider/alert_manager_request.qtpl:30
qw422016.N().S(`]`)
//line app/vmalert/provider/alert_manager_request.qtpl:32
} }
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
func writeamRequest(qq422016 qtio422016.Writer, alert *config.Alert, generatorURL string) { func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
streamamRequest(qw422016, alert, generatorURL) streamamRequest(qw422016, alerts, generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
} }
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
func amRequest(alert *config.Alert, generatorURL string) string { func amRequest(alerts []Alert, generatorURL func(string, string) string) string {
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
writeamRequest(qb422016, alert, generatorURL) writeamRequest(qb422016, alerts, generatorURL)
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
return qs422016 return qs422016
//line app/vmalert/provider/alert_manager_request.qtpl:25 //line app/vmalert/provider/alert_manager_request.qtpl:32
} }

View file

@ -8,7 +8,6 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
@ -26,7 +25,7 @@ type AlertManager struct {
} }
// AlertURLGenerator returns URL to single alert by given name // AlertURLGenerator returns URL to single alert by given name
type AlertURLGenerator func(name string) string type AlertURLGenerator func(group, name string) string
// NewAlertManager is a constructor for AlertManager // NewAlertManager is a constructor for AlertManager
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager { func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager {
@ -37,19 +36,12 @@ func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Clien
} }
} }
const (
jsonArrayOpen byte = 91 // [
jsonArrayClose byte = 93 // ]
)
// Send an alert or resolve message // Send an alert or resolve message
func (am *AlertManager) Send(alert *config.Alert) error { func (am *AlertManager) Send(alerts []Alert) error {
b := pool.Get().(*bytes.Buffer) b := pool.Get().(*bytes.Buffer)
b.Reset() b.Reset()
defer pool.Put(b) defer pool.Put(b)
b.WriteByte(jsonArrayOpen) writeamRequest(b, alerts, am.argFunc)
writeamRequest(b, alert, am.argFunc(alert.Name))
b.WriteByte(jsonArrayClose)
resp, err := am.client.Post(am.alertURL, "application/json", b) resp, err := am.client.Post(am.alertURL, "application/json", b)
if err != nil { if err != nil {
return err return err

View file

@ -6,8 +6,6 @@ import (
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
) )
func TestAlertManager_Send(t *testing.T) { func TestAlertManager_Send(t *testing.T) {
@ -42,7 +40,7 @@ func TestAlertManager_Send(t *testing.T) {
if len(a) != 1 { if len(a) != 1 {
t.Errorf("expected 1 alert in array got %d", len(a)) t.Errorf("expected 1 alert in array got %d", len(a))
} }
if a[0].GeneratorURL != "alert0" { if a[0].GeneratorURL != "group0alert0" {
t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL) t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL)
} }
if a[0].Labels["alertname"] != "alert0" { if a[0].Labels["alertname"] != "alert0" {
@ -58,20 +56,22 @@ func TestAlertManager_Send(t *testing.T) {
}) })
srv := httptest.NewServer(mux) srv := httptest.NewServer(mux)
defer srv.Close() defer srv.Close()
am := NewAlertManager(srv.URL, func(name string) string { am := NewAlertManager(srv.URL, func(group, name string) string {
return name return group + name
}, srv.Client()) }, srv.Client())
if err := am.Send(&config.Alert{}); err == nil { if err := am.Send([]Alert{{}, {}}); err == nil {
t.Error("expected connection error got nil") t.Error("expected connection error got nil")
} }
if err := am.Send(&config.Alert{}); err == nil { if err := am.Send([]Alert{}); err == nil {
t.Error("expected wrong http code error got nil") t.Error("expected wrong http code error got nil")
} }
if err := am.Send(&config.Alert{ if err := am.Send([]Alert{{
Group: "group0",
Name: "alert0", Name: "alert0",
Start: time.Now().UTC(), Start: time.Now().UTC(),
End: time.Now().UTC(), End: time.Now().UTC(),
}); err != nil { Annotations: map[string]string{"a": "b", "c": "d", "e": "f"},
}}); err != nil {
t.Errorf("unexpected error %s", err) t.Errorf("unexpected error %s", err)
} }
if c != 2 { if c != 2 {

View file

@ -1,8 +1,65 @@
package provider package provider
import "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" import (
"sort"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
)
// AlertProvider is common interface for alert manager provider // AlertProvider is common interface for alert manager provider
type AlertProvider interface { type AlertProvider interface {
Send(rule config.Alert) error Send(alerts []Alert) error
}
// Alert the triggered alert
type Alert struct {
Group string
Name string
Labels []datasource.Label
Annotations map[string]string
Start time.Time
End time.Time
Value float64
}
// AlertsFromMetrics converts metrics to alerts by alert Rule
func AlertsFromMetrics(metrics []datasource.Metric, group string, rule config.Rule) []Alert {
alerts := make([]Alert, 0, len(metrics))
for i, m := range metrics {
a := Alert{
Group: group,
Name: rule.Name,
Labels: metrics[i].Labels,
// todo eval template in annotations
Annotations: rule.Annotations,
Start: time.Unix(m.Timestamp, 0),
}
for k, v := range rule.Labels {
a.Labels = append(a.Labels, datasource.Label{
Name: k,
Value: v,
})
}
a.Labels = removeDuplicated(a.Labels)
alerts = append(alerts, a)
}
return alerts
}
func removeDuplicated(l []datasource.Label) []datasource.Label {
sort.Slice(l, func(i, j int) bool {
return l[i].Name < l[j].Name
})
j := 0
for i := 1; i < len(l); i++ {
if l[j] == l[i] {
continue
}
j++
l[j] = l[i]
}
return l[:j+1]
} }

View file

@ -0,0 +1 @@
package storage