mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmalert: sync with master branch
This commit is contained in:
parent
7644f40763
commit
e3db2c73a6
29 changed files with 2569 additions and 0 deletions
78
app/vmalert/Makefile
Normal file
78
app/vmalert/Makefile
Normal file
|
@ -0,0 +1,78 @@
|
|||
# All these commands must run from repository root.
|
||||
|
||||
vmalert:
|
||||
APP_NAME=vmalert $(MAKE) app-local
|
||||
|
||||
vmalert-race:
|
||||
APP_NAME=vmalert RACE=-race $(MAKE) app-local
|
||||
|
||||
vmalert-prod:
|
||||
APP_NAME=vmalert $(MAKE) app-via-docker
|
||||
|
||||
vmalert-pure-prod:
|
||||
APP_NAME=vmalert $(MAKE) app-via-docker-pure
|
||||
|
||||
vmalert-amd64-prod:
|
||||
APP_NAME=vmalert $(MAKE) app-via-docker-amd64
|
||||
|
||||
vmalert-arm-prod:
|
||||
APP_NAME=vmalert $(MAKE) app-via-docker-arm
|
||||
|
||||
vmalert-arm64-prod:
|
||||
APP_NAME=vmalert $(MAKE) app-via-docker-arm64
|
||||
|
||||
vmalert-ppc64le-prod:
|
||||
APP_NAME=vmalert $(MAKE) app-via-docker-ppc64le
|
||||
|
||||
vmalert-386-prod:
|
||||
APP_NAME=vmalert $(MAKE) app-via-docker-386
|
||||
|
||||
package-vmalert:
|
||||
APP_NAME=vmalert $(MAKE) package-via-docker
|
||||
|
||||
package-vmalert-pure:
|
||||
APP_NAME=vmalert $(MAKE) package-via-docker-pure
|
||||
|
||||
package-vmalert-amd64:
|
||||
APP_NAME=vmalert $(MAKE) package-via-docker-amd64
|
||||
|
||||
package-vmalert-arm:
|
||||
APP_NAME=vmalert $(MAKE) package-via-docker-arm
|
||||
|
||||
package-vmalert-arm64:
|
||||
APP_NAME=vmalert $(MAKE) package-via-docker-arm64
|
||||
|
||||
package-vmalert-ppc64le:
|
||||
APP_NAME=vmalert $(MAKE) package-via-docker-ppc64le
|
||||
|
||||
package-vmalert-386:
|
||||
APP_NAME=vmalert $(MAKE) package-via-docker-386
|
||||
|
||||
publish-vmalert:
|
||||
APP_NAME=vmalert $(MAKE) publish-via-docker
|
||||
|
||||
test-vmalert:
|
||||
go test -race -cover ./app/vmalert
|
||||
|
||||
run-vmalert: vmalert
|
||||
./bin/vmalert -rule=app/vmalert/testdata/rules0-good.rules \
|
||||
-datasource.url=http://localhost:8428 -notifier.url=http://localhost:9093 \
|
||||
-evaluationInterval=3s
|
||||
|
||||
vmalert-amd64:
|
||||
CGO_ENABLED=1 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-amd64 ./app/vmalert
|
||||
|
||||
vmalert-arm:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=arm GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-arm ./app/vmalert
|
||||
|
||||
vmalert-arm64:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-arm64 ./app/vmalert
|
||||
|
||||
vmalert-ppc64le:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-ppc64le ./app/vmalert
|
||||
|
||||
vmalert-386:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=386 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/vmalert-386 ./app/vmalert
|
||||
|
||||
vmalert-pure:
|
||||
APP_NAME=vmalert $(MAKE) app-local-pure
|
95
app/vmalert/README.md
Normal file
95
app/vmalert/README.md
Normal file
|
@ -0,0 +1,95 @@
|
|||
## VM Alert
|
||||
|
||||
`vmalert` executes a list of given MetricsQL expressions (rules) and
|
||||
sends alerts to [Alert Manager](https://github.com/prometheus/alertmanager).
|
||||
|
||||
NOTE: `vmalert` is in early alpha and wasn't tested in production systems yet.
|
||||
|
||||
### Features:
|
||||
* Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) TSDB;
|
||||
* VictoriaMetrics [MetricsQL](https://github.com/VictoriaMetrics/VictoriaMetrics/wiki/MetricsQL)
|
||||
expressions validation;
|
||||
* Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules)
|
||||
support;
|
||||
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager);
|
||||
* Lightweight without extra dependencies.
|
||||
|
||||
### TODO:
|
||||
* Persist alerts state as timeseries in TSDB. Currently, alerts state is stored
|
||||
in process memory only and will be lost on restart;
|
||||
* Configuration hot reload.
|
||||
|
||||
### QuickStart
|
||||
|
||||
To build `vmalert` from sources:
|
||||
```
|
||||
git clone https://github.com/VictoriaMetrics/VictoriaMetrics
|
||||
cd VictoriaMetrics
|
||||
make vmalert
|
||||
```
|
||||
The build binary will be placed to `VictoriaMetrics/bin` folder.
|
||||
|
||||
To start using `vmalert` you will need the following things:
|
||||
* list of alert rules - PromQL/MetricsQL expressions to execute;
|
||||
* datasource address - reachable VictoriaMetrics instance for rules execution;
|
||||
* notifier address - reachable Alertmanager instance for processing,
|
||||
aggregating alerts and sending notifications.
|
||||
|
||||
Then configure `vmalert` accordingly:
|
||||
```
|
||||
./bin/vmalert -rule=alert.rules \
|
||||
-datasource.url=http://localhost:8428 \
|
||||
-notifier.url=http://localhost:9093
|
||||
```
|
||||
|
||||
Example for `.rules` file may be found [here](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/app/vmalert/testdata/rules0-good.rules)
|
||||
|
||||
`vmalert` runs evaluation for every group in a separate goroutine.
|
||||
Rules in group evaluated one-by-one sequentially.
|
||||
|
||||
`vmalert` also runs a web-server (`-httpListenAddr`) for serving metrics and alerts endpoints:
|
||||
* `http://<vmalert-addr>/api/v1/alerts` - list of all active alerts;
|
||||
* `http://<vmalert-addr>/api/v1/<groupName>/<alertID>/status" ` - get alert status by ID.
|
||||
Used as alert source in AlertManager.
|
||||
* `http://<vmalert-addr>/metrics` - application metrics.
|
||||
|
||||
### Configuration
|
||||
|
||||
The shortlist of configuration flags is the following:
|
||||
```
|
||||
Usage of vmalert:
|
||||
-datasource.url string
|
||||
Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428
|
||||
-datasource.basicAuth.password string
|
||||
Optional basic auth password to use for -datasource.url
|
||||
-datasource.basicAuth.username string
|
||||
Optional basic auth username to use for -datasource.url
|
||||
-evaluationInterval duration
|
||||
How often to evaluate the rules. Default 1m (default 1m0s)
|
||||
-external.url string
|
||||
External URL is used as alert's source for sent alerts to the notifier
|
||||
-httpListenAddr string
|
||||
Address to listen for http connections (default ":8880")
|
||||
-notifier.url string
|
||||
Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093
|
||||
-remotewrite.url string
|
||||
Optional URL to remote-write compatible storage where to write timeseriesbased on active alerts. E.g. http://127.0.0.1:8428
|
||||
-rule value
|
||||
Path to the file with alert rules.
|
||||
Supports patterns. Flag can be specified multiple times.
|
||||
Examples:
|
||||
-rule /path/to/file. Path to a single file with alerting rules
|
||||
-rule dir/*.yaml -rule /*.yaml. Relative path to all .yaml files in "dir" folder,
|
||||
absolute path to all .yaml files in root.
|
||||
-rule.validateTemplates
|
||||
Indicates to validate annotation and label templates (default true)
|
||||
```
|
||||
|
||||
Pass `-help` to `vmalert` in order to see the full list of supported
|
||||
command-line flags with their descriptions.
|
||||
|
||||
### Contributing
|
||||
|
||||
`vmalert` is mostly designed and built by VictoriaMetrics community.
|
||||
Feel free to share your experience and ideas for improving this
|
||||
software. Please keep simplicity as the main priority.
|
70
app/vmalert/config.go
Normal file
70
app/vmalert/config.go
Normal file
|
@ -0,0 +1,70 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gopkg.in/yaml.v2"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
)
|
||||
|
||||
// Parse parses rule configs from given file patterns
|
||||
func Parse(pathPatterns []string, validateAnnotations bool) ([]Group, error) {
|
||||
var fp []string
|
||||
for _, pattern := range pathPatterns {
|
||||
matches, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading file patther %s:%v", pattern, err)
|
||||
}
|
||||
fp = append(fp, matches...)
|
||||
}
|
||||
var groups []Group
|
||||
for _, file := range fp {
|
||||
groupsNames := map[string]struct{}{}
|
||||
gr, err := parseFile(file)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("file %s: %w", file, err)
|
||||
}
|
||||
for _, group := range gr {
|
||||
if _, ok := groupsNames[group.Name]; ok {
|
||||
return nil, fmt.Errorf("one file can not contain groups with the same name %s, filepath:%s", file, group.Name)
|
||||
}
|
||||
groupsNames[group.Name] = struct{}{}
|
||||
for _, rule := range group.Rules {
|
||||
if err = rule.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("invalid rule filepath:%s, group %s:%w", file, group.Name, err)
|
||||
}
|
||||
// TODO: this init looks weird here
|
||||
rule.alerts = make(map[uint64]*notifier.Alert)
|
||||
if validateAnnotations {
|
||||
if err = notifier.ValidateTemplates(rule.Annotations); err != nil {
|
||||
return nil, fmt.Errorf("invalid annotations filepath:%s, group %s:%w", file, group.Name, err)
|
||||
}
|
||||
if err = notifier.ValidateTemplates(rule.Labels); err != nil {
|
||||
return nil, fmt.Errorf("invalid labels filepath:%s, group %s:%w", file, group.Name, err)
|
||||
}
|
||||
}
|
||||
rule.group = &group
|
||||
}
|
||||
}
|
||||
groups = append(groups, gr...)
|
||||
}
|
||||
if len(groups) < 1 {
|
||||
return nil, fmt.Errorf("no groups found in %s", strings.Join(pathPatterns, ";"))
|
||||
}
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func parseFile(path string) ([]Group, error) {
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading alert rule file: %w", err)
|
||||
}
|
||||
g := struct {
|
||||
Groups []Group `yaml:"groups"`
|
||||
}{}
|
||||
err = yaml.Unmarshal(data, &g)
|
||||
return g.Groups, err
|
||||
}
|
39
app/vmalert/config_test.go
Normal file
39
app/vmalert/config_test.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
u, _ := url.Parse("https://victoriametrics.com/path")
|
||||
notifier.InitTemplateFunc(u)
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func TestParseGood(t *testing.T) {
|
||||
if _, err := Parse([]string{"testdata/*good.rules", "testdata/dir/*good.*"}, true); err != nil {
|
||||
t.Errorf("error parsing files %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseBad(t *testing.T) {
|
||||
if _, err := Parse([]string{"testdata/rules0-bad.rules"}, true); err == nil {
|
||||
t.Errorf("expected syntaxt error")
|
||||
}
|
||||
if _, err := Parse([]string{"testdata/dir/rules0-bad.rules"}, true); err == nil {
|
||||
t.Errorf("expected template annotation error")
|
||||
}
|
||||
if _, err := Parse([]string{"testdata/dir/rules1-bad.rules"}, true); err == nil {
|
||||
t.Errorf("expected same group error")
|
||||
}
|
||||
if _, err := Parse([]string{"testdata/dir/rules2-bad.rules"}, true); err == nil {
|
||||
t.Errorf("expected template label error")
|
||||
}
|
||||
if _, err := Parse([]string{"testdata/*.yaml"}, true); err == nil {
|
||||
t.Errorf("expected empty group")
|
||||
}
|
||||
}
|
24
app/vmalert/datasource/datasource.go
Normal file
24
app/vmalert/datasource/datasource.go
Normal file
|
@ -0,0 +1,24 @@
|
|||
package datasource
|
||||
|
||||
import "context"
|
||||
|
||||
// Querier interface wraps Query method which
|
||||
// executes given query and returns list of Metrics
|
||||
// as result
|
||||
type Querier interface {
|
||||
Query(ctx context.Context, query string) ([]Metric, error)
|
||||
}
|
||||
|
||||
// Metric is the basic entity which should be return by datasource
|
||||
// It represents single data point with full list of labels
|
||||
type Metric struct {
|
||||
Labels []Label
|
||||
Timestamp int64
|
||||
Value float64
|
||||
}
|
||||
|
||||
// Label represents metric's label
|
||||
type Label struct {
|
||||
Name string
|
||||
Value string
|
||||
}
|
103
app/vmalert/datasource/vm.go
Normal file
103
app/vmalert/datasource/vm.go
Normal file
|
@ -0,0 +1,103 @@
|
|||
package datasource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type response struct {
|
||||
Status string `json:"status"`
|
||||
Data struct {
|
||||
ResultType string `json:"resultType"`
|
||||
Result []struct {
|
||||
Labels map[string]string `json:"metric"`
|
||||
TV [2]interface{} `json:"value"`
|
||||
} `json:"result"`
|
||||
} `json:"data"`
|
||||
ErrorType string `json:"errorType"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func (r response) metrics() ([]Metric, error) {
|
||||
var ms []Metric
|
||||
var m Metric
|
||||
var f float64
|
||||
var err error
|
||||
for i, res := range r.Data.Result {
|
||||
f, err = strconv.ParseFloat(res.TV[1].(string), 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %s", res, res.TV[1], err)
|
||||
}
|
||||
m.Labels = nil
|
||||
for k, v := range r.Data.Result[i].Labels {
|
||||
m.Labels = append(m.Labels, Label{Name: k, Value: v})
|
||||
}
|
||||
m.Timestamp = int64(res.TV[0].(float64))
|
||||
m.Value = f
|
||||
ms = append(ms, m)
|
||||
}
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
const queryPath = "/api/v1/query?query="
|
||||
|
||||
// VMStorage represents vmstorage entity with ability to read and write metrics
|
||||
type VMStorage struct {
|
||||
c *http.Client
|
||||
queryURL string
|
||||
basicAuthUser, basicAuthPass string
|
||||
}
|
||||
|
||||
// NewVMStorage is a constructor for VMStorage
|
||||
func NewVMStorage(baseURL, basicAuthUser, basicAuthPass string, c *http.Client) *VMStorage {
|
||||
return &VMStorage{
|
||||
c: c,
|
||||
basicAuthUser: basicAuthUser,
|
||||
basicAuthPass: basicAuthPass,
|
||||
queryURL: strings.TrimSuffix(baseURL, "/") + queryPath,
|
||||
}
|
||||
}
|
||||
|
||||
// Query reads metrics from datasource by given query
|
||||
func (s *VMStorage) Query(ctx context.Context, query string) ([]Metric, error) {
|
||||
const (
|
||||
statusSuccess, statusError, rtVector = "success", "error", "vector"
|
||||
)
|
||||
req, err := http.NewRequest("POST", s.queryURL+url.QueryEscape(query), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if s.basicAuthPass != "" {
|
||||
req.SetBasicAuth(s.basicAuthUser, s.basicAuthPass)
|
||||
}
|
||||
resp, err := s.c.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting response from %s:%s", req.URL, err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("datasource returns unxeprected response code %d for %s with err %s. Reponse body %s", resp.StatusCode, req.URL, err, body)
|
||||
}
|
||||
r := &response{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(r); err != nil {
|
||||
return nil, fmt.Errorf("error parsing metrics for %s:%s", req.URL, err)
|
||||
}
|
||||
if r.Status == statusError {
|
||||
return nil, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL, r.ErrorType, r.Error)
|
||||
}
|
||||
if r.Status != statusSuccess {
|
||||
return nil, fmt.Errorf("unkown status:%s, Expected success or error ", r.Status)
|
||||
}
|
||||
if r.Data.ResultType != rtVector {
|
||||
return nil, fmt.Errorf("unkown restul type:%s. Expected vector", r.Data.ResultType)
|
||||
}
|
||||
return r.metrics()
|
||||
}
|
93
app/vmalert/datasource/vm_test.go
Normal file
93
app/vmalert/datasource/vm_test.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
package datasource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
basicAuthName = "foo"
|
||||
basicAuthPass = "bar"
|
||||
query = "vm_rows"
|
||||
)
|
||||
|
||||
func TestVMSelectQuery(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
|
||||
t.Errorf("should not be called")
|
||||
})
|
||||
c := -1
|
||||
mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) {
|
||||
c++
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("expected POST method got %s", r.Method)
|
||||
}
|
||||
if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass {
|
||||
t.Errorf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass)
|
||||
}
|
||||
if r.URL.Query().Get("query") != query {
|
||||
t.Errorf("exptected %s in query param, got %s", query, r.URL.Query().Get("query"))
|
||||
}
|
||||
switch c {
|
||||
case 0:
|
||||
conn, _, _ := w.(http.Hijacker).Hijack()
|
||||
_ = conn.Close()
|
||||
case 1:
|
||||
w.WriteHeader(500)
|
||||
case 2:
|
||||
w.Write([]byte("[]"))
|
||||
case 3:
|
||||
w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`))
|
||||
case 4:
|
||||
w.Write([]byte(`{"status":"unknown"}`))
|
||||
case 5:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`))
|
||||
case 6:
|
||||
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]}]}}`))
|
||||
}
|
||||
})
|
||||
|
||||
srv := httptest.NewServer(mux)
|
||||
defer srv.Close()
|
||||
am := NewVMStorage(srv.URL, basicAuthName, basicAuthPass, srv.Client())
|
||||
if _, err := am.Query(ctx, query); err == nil {
|
||||
t.Fatalf("expected connection error got nil")
|
||||
}
|
||||
if _, err := am.Query(ctx, query); err == nil {
|
||||
t.Fatalf("expected invalid response status error got nil")
|
||||
}
|
||||
if _, err := am.Query(ctx, query); err == nil {
|
||||
t.Fatalf("expected response body error got nil")
|
||||
}
|
||||
if _, err := am.Query(ctx, query); err == nil {
|
||||
t.Fatalf("expected error status got nil")
|
||||
}
|
||||
if _, err := am.Query(ctx, query); err == nil {
|
||||
t.Fatalf("expected unkown status got nil")
|
||||
}
|
||||
if _, err := am.Query(ctx, query); err == nil {
|
||||
t.Fatalf("expected non-vector resultType error got nil")
|
||||
}
|
||||
m, err := am.Query(ctx, query)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected %s", err)
|
||||
}
|
||||
if len(m) != 1 {
|
||||
t.Fatalf("exptected 1 metric got %d in %+v", len(m), m)
|
||||
}
|
||||
expected := Metric{
|
||||
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
|
||||
Timestamp: 1583786142,
|
||||
Value: 13763,
|
||||
}
|
||||
if m[0].Timestamp != expected.Timestamp &&
|
||||
m[0].Value != expected.Value &&
|
||||
m[0].Labels[0].Value != expected.Labels[0].Value &&
|
||||
m[0].Labels[0].Name != expected.Labels[0].Name {
|
||||
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
|
||||
}
|
||||
|
||||
}
|
213
app/vmalert/main.go
Normal file
213
app/vmalert/main.go
Normal file
|
@ -0,0 +1,213 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
rulePath = flagutil.NewArray("rule", `Path to the file with alert rules.
|
||||
Supports patterns. Flag can be specified multiple times.
|
||||
Examples:
|
||||
-rule /path/to/file. Path to a single file with alerting rules
|
||||
-rule dir/*.yaml -rule /*.yaml. Relative path to all .yaml files in "dir" folder,
|
||||
absolute path to all .yaml files in root.`)
|
||||
validateTemplates = flag.Bool("rule.validateTemplates", true, "Indicates to validate annotation and label templates")
|
||||
httpListenAddr = flag.String("httpListenAddr", ":8880", "Address to listen for http connections")
|
||||
datasourceURL = flag.String("datasource.url", "", "Victoria Metrics or VMSelect url. Required parameter. e.g. http://127.0.0.1:8428")
|
||||
basicAuthUsername = flag.String("datasource.basicAuth.username", "", "Optional basic auth username to use for -datasource.url")
|
||||
basicAuthPassword = flag.String("datasource.basicAuth.password", "", "Optional basic auth password to use for -datasource.url")
|
||||
remoteWriteURL = flag.String("remotewrite.url", "", "Optional URL to remote-write compatible storage where to write timeseries"+
|
||||
"based on active alerts. E.g. http://127.0.0.1:8428")
|
||||
evaluationInterval = flag.Duration("evaluationInterval", 1*time.Minute, "How often to evaluate the rules. Default 1m")
|
||||
notifierURL = flag.String("notifier.url", "", "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093")
|
||||
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier")
|
||||
)
|
||||
|
||||
// TODO: hot configuration reload
|
||||
// TODO: alerts state persistence
|
||||
func main() {
|
||||
envflag.Parse()
|
||||
buildinfo.Init()
|
||||
logger.Init()
|
||||
checkFlags()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
eu, err := getExternalURL(*externalURL, *httpListenAddr, httpserver.IsTLS())
|
||||
if err != nil {
|
||||
logger.Fatalf("can not get external url:%s ", err)
|
||||
}
|
||||
notifier.InitTemplateFunc(eu)
|
||||
|
||||
logger.Infof("reading alert rules configuration file from %s", strings.Join(*rulePath, ";"))
|
||||
groups, err := Parse(*rulePath, *validateTemplates)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot parse configuration file: %s", err)
|
||||
}
|
||||
|
||||
w := &watchdog{
|
||||
storage: datasource.NewVMStorage(*datasourceURL, *basicAuthUsername, *basicAuthPassword, &http.Client{}),
|
||||
alertProvider: notifier.NewAlertManager(*notifierURL, func(group, name string) string {
|
||||
return fmt.Sprintf("%s/api/v1/%s/%s/status", eu, group, name)
|
||||
}, &http.Client{}),
|
||||
}
|
||||
|
||||
if *remoteWriteURL != "" {
|
||||
c, err := remotewrite.NewClient(ctx, remotewrite.Config{
|
||||
Addr: *remoteWriteURL,
|
||||
FlushInterval: *evaluationInterval,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to init remotewrite client: %s", err)
|
||||
}
|
||||
w.rw = c
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for i := range groups {
|
||||
wg.Add(1)
|
||||
go func(group Group) {
|
||||
w.run(ctx, group, *evaluationInterval)
|
||||
wg.Done()
|
||||
}(groups[i])
|
||||
}
|
||||
|
||||
go httpserver.Serve(*httpListenAddr, (&requestHandler{groups: groups}).handler)
|
||||
|
||||
sig := procutil.WaitForSigterm()
|
||||
logger.Infof("service received signal %s", sig)
|
||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||
logger.Fatalf("cannot stop the webservice: %s", err)
|
||||
}
|
||||
cancel()
|
||||
if w.rw != nil {
|
||||
err := w.rw.Close()
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot stop the remotewrite: %s", err)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type watchdog struct {
|
||||
storage *datasource.VMStorage
|
||||
alertProvider notifier.Notifier
|
||||
rw *remotewrite.Client
|
||||
}
|
||||
|
||||
var (
|
||||
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
|
||||
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
|
||||
|
||||
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
||||
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
||||
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
||||
|
||||
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
||||
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
||||
alertsSendErrors = metrics.NewCounter(`vmalert_alerts_send_errors_total`)
|
||||
|
||||
remoteWriteSent = metrics.NewCounter(`vmalert_remotewrite_sent_total`)
|
||||
remoteWriteErrors = metrics.NewCounter(`vmalert_remotewrite_errors_total`)
|
||||
)
|
||||
|
||||
func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration) {
|
||||
logger.Infof("watchdog for %s has been started", group.Name)
|
||||
t := time.NewTicker(evaluationInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
iterationTotal.Inc()
|
||||
iterationStart := time.Now()
|
||||
for _, rule := range group.Rules {
|
||||
execTotal.Inc()
|
||||
|
||||
execStart := time.Now()
|
||||
err := rule.Exec(ctx, w.storage)
|
||||
execDuration.UpdateDuration(execStart)
|
||||
|
||||
if err != nil {
|
||||
execErrors.Inc()
|
||||
logger.Errorf("failed to execute rule %q.%q: %s", group.Name, rule.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
var alertsToSend []notifier.Alert
|
||||
for _, a := range rule.alerts {
|
||||
if a.State != notifier.StatePending {
|
||||
alertsToSend = append(alertsToSend, *a)
|
||||
}
|
||||
if a.State == notifier.StateInactive || w.rw == nil {
|
||||
continue
|
||||
}
|
||||
tss := rule.AlertToTimeSeries(a, execStart)
|
||||
for _, ts := range tss {
|
||||
remoteWriteSent.Inc()
|
||||
if err := w.rw.Push(ts); err != nil {
|
||||
remoteWriteErrors.Inc()
|
||||
logger.Errorf("failed to push timeseries to remotewrite: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
alertsSent.Add(len(alertsToSend))
|
||||
if err := w.alertProvider.Send(alertsToSend); err != nil {
|
||||
alertsSendErrors.Inc()
|
||||
logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err)
|
||||
}
|
||||
}
|
||||
iterationDuration.UpdateDuration(iterationStart)
|
||||
case <-ctx.Done():
|
||||
logger.Infof("%s received stop signal", group.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getExternalURL(externalURL, httpListenAddr string, isSecure bool) (*url.URL, error) {
|
||||
if externalURL != "" {
|
||||
return url.Parse(externalURL)
|
||||
}
|
||||
hname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
port := ""
|
||||
if ipport := strings.Split(httpListenAddr, ":"); len(ipport) > 1 {
|
||||
port = ":" + ipport[1]
|
||||
}
|
||||
schema := "http://"
|
||||
if isSecure {
|
||||
schema = "https://"
|
||||
}
|
||||
return url.Parse(fmt.Sprintf("%s%s%s", schema, hname, port))
|
||||
}
|
||||
|
||||
func checkFlags() {
|
||||
if *notifierURL == "" {
|
||||
flag.PrintDefaults()
|
||||
logger.Fatalf("notifier.url is empty")
|
||||
}
|
||||
if *datasourceURL == "" {
|
||||
flag.PrintDefaults()
|
||||
logger.Fatalf("datasource.url is empty")
|
||||
}
|
||||
}
|
105
app/vmalert/notifier/alert.go
Normal file
105
app/vmalert/notifier/alert.go
Normal file
|
@ -0,0 +1,105 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Alert the triggered alert
|
||||
// TODO: Looks like alert name isn't unique
|
||||
type Alert struct {
|
||||
Group string
|
||||
Name string
|
||||
Labels map[string]string
|
||||
Annotations map[string]string
|
||||
State AlertState
|
||||
|
||||
Start time.Time
|
||||
End time.Time
|
||||
Value float64
|
||||
ID uint64
|
||||
}
|
||||
|
||||
// AlertState type indicates the Alert state
|
||||
type AlertState int
|
||||
|
||||
const (
|
||||
// StateInactive is the state of an alert that is neither firing nor pending.
|
||||
StateInactive AlertState = iota
|
||||
// StatePending is the state of an alert that has been active for less than
|
||||
// the configured threshold duration.
|
||||
StatePending
|
||||
// StateFiring is the state of an alert that has been active for longer than
|
||||
// the configured threshold duration.
|
||||
StateFiring
|
||||
)
|
||||
|
||||
// String stringer for AlertState
|
||||
func (as AlertState) String() string {
|
||||
switch as {
|
||||
case StateFiring:
|
||||
return "firing"
|
||||
case StatePending:
|
||||
return "pending"
|
||||
}
|
||||
return "inactive"
|
||||
}
|
||||
|
||||
type alertTplData struct {
|
||||
Labels map[string]string
|
||||
Value float64
|
||||
}
|
||||
|
||||
const tplHeader = `{{ $value := .Value }}{{ $labels := .Labels }}`
|
||||
|
||||
// ExecTemplate executes the Alert template for give
|
||||
// map of annotations.
|
||||
func (a *Alert) ExecTemplate(annotations map[string]string) (map[string]string, error) {
|
||||
tplData := alertTplData{Value: a.Value, Labels: a.Labels}
|
||||
return templateAnnotations(annotations, tplHeader, tplData)
|
||||
}
|
||||
|
||||
// ValidateTemplates validate annotations for possible template error, uses empty data for template population
|
||||
func ValidateTemplates(annotations map[string]string) error {
|
||||
_, err := templateAnnotations(annotations, tplHeader, alertTplData{
|
||||
Labels: map[string]string{},
|
||||
Value: 0,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func templateAnnotations(annotations map[string]string, header string, data alertTplData) (map[string]string, error) {
|
||||
var builder strings.Builder
|
||||
var buf bytes.Buffer
|
||||
eg := errGroup{}
|
||||
r := make(map[string]string, len(annotations))
|
||||
for key, text := range annotations {
|
||||
r[key] = text
|
||||
buf.Reset()
|
||||
builder.Reset()
|
||||
builder.Grow(len(header) + len(text))
|
||||
builder.WriteString(header)
|
||||
builder.WriteString(text)
|
||||
if err := templateAnnotation(&buf, builder.String(), data); err != nil {
|
||||
eg.errs = append(eg.errs, fmt.Sprintf("key %s, template %s:%s", key, text, err))
|
||||
continue
|
||||
}
|
||||
r[key] = buf.String()
|
||||
}
|
||||
return r, eg.err()
|
||||
}
|
||||
|
||||
func templateAnnotation(dst io.Writer, text string, data alertTplData) error {
|
||||
tpl, err := template.New("").Funcs(tmplFunc).Option("missingkey=zero").Parse(text)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing annotation:%w", err)
|
||||
}
|
||||
if err = tpl.Execute(dst, data); err != nil {
|
||||
return fmt.Errorf("error evaluating annotation template:%w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
65
app/vmalert/notifier/alert_test.go
Normal file
65
app/vmalert/notifier/alert_test.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAlert_ExecTemplate(t *testing.T) {
|
||||
testCases := []struct {
|
||||
alert *Alert
|
||||
annotations map[string]string
|
||||
expTpl map[string]string
|
||||
}{
|
||||
{
|
||||
alert: &Alert{},
|
||||
annotations: map[string]string{},
|
||||
expTpl: map[string]string{},
|
||||
},
|
||||
{
|
||||
alert: &Alert{
|
||||
Value: 1e4,
|
||||
Labels: map[string]string{
|
||||
"instance": "localhost",
|
||||
},
|
||||
},
|
||||
annotations: map[string]string{},
|
||||
expTpl: map[string]string{},
|
||||
},
|
||||
{
|
||||
alert: &Alert{
|
||||
Value: 1e4,
|
||||
Labels: map[string]string{
|
||||
"job": "staging",
|
||||
"instance": "localhost",
|
||||
},
|
||||
},
|
||||
annotations: map[string]string{
|
||||
"summary": "Too high connection number for {{$labels.instance}} for job {{$labels.job}}",
|
||||
"description": "It is {{ $value }} connections for {{$labels.instance}}",
|
||||
},
|
||||
expTpl: map[string]string{
|
||||
"summary": "Too high connection number for localhost for job staging",
|
||||
"description": "It is 10000 connections for localhost",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
|
||||
tpl, err := tc.alert.ExecTemplate(tc.annotations)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(tpl) != len(tc.expTpl) {
|
||||
t.Fatalf("expected %d elements; got %d", len(tc.expTpl), len(tpl))
|
||||
}
|
||||
for k := range tc.expTpl {
|
||||
got, exp := tpl[k], tc.expTpl[k]
|
||||
if got != exp {
|
||||
t.Fatalf("expected %q=%q; got %q=%q", k, exp, k, got)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
51
app/vmalert/notifier/alertmanager.go
Normal file
51
app/vmalert/notifier/alertmanager.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// AlertManager represents integration provider with Prometheus alert manager
|
||||
// https://github.com/prometheus/alertmanager
|
||||
type AlertManager struct {
|
||||
alertURL string
|
||||
argFunc AlertURLGenerator
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// Send an alert or resolve message
|
||||
func (am *AlertManager) Send(alerts []Alert) error {
|
||||
b := &bytes.Buffer{}
|
||||
writeamRequest(b, alerts, am.argFunc)
|
||||
resp, err := am.client.Post(am.alertURL, "application/json", b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response from %q: %s", am.alertURL, err)
|
||||
}
|
||||
return fmt.Errorf("invalid SC %d from %q; response body: %s", resp.StatusCode, am.alertURL, string(body))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AlertURLGenerator returns URL to single alert by given name
|
||||
type AlertURLGenerator func(group, id string) string
|
||||
|
||||
const alertManagerPath = "/api/v2/alerts"
|
||||
|
||||
// NewAlertManager is a constructor for AlertManager
|
||||
func NewAlertManager(alertManagerURL string, fn AlertURLGenerator, c *http.Client) *AlertManager {
|
||||
return &AlertManager{
|
||||
alertURL: strings.TrimSuffix(alertManagerURL, "/") + alertManagerPath,
|
||||
argFunc: fn,
|
||||
client: c,
|
||||
}
|
||||
}
|
34
app/vmalert/notifier/alertmanager_request.qtpl
Normal file
34
app/vmalert/notifier/alertmanager_request.qtpl
Normal file
|
@ -0,0 +1,34 @@
|
|||
{% import (
|
||||
"strconv"
|
||||
"time"
|
||||
) %}
|
||||
{% stripspace %}
|
||||
|
||||
{% func amRequest(alerts []Alert, generatorURL func(string, string) string) %}
|
||||
[
|
||||
{% for i, alert := range alerts %}
|
||||
{
|
||||
"startsAt":{%q= alert.Start.Format(time.RFC3339Nano) %},
|
||||
"generatorURL": {%q= generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10)) %},
|
||||
{% if !alert.End.IsZero() %}
|
||||
"endsAt":{%q= alert.End.Format(time.RFC3339Nano) %},
|
||||
{% endif %}
|
||||
"labels": {
|
||||
"alertname":{%q= alert.Name %}
|
||||
{% for k,v := range alert.Labels %}
|
||||
,{%q= k %}:{%q= v %}
|
||||
{% endfor %}
|
||||
},
|
||||
"annotations": {
|
||||
{% code c := len(alert.Annotations) %}
|
||||
{% for k,v := range alert.Annotations %}
|
||||
{% code c = c-1 %}
|
||||
{%q= k %}:{%q= v %}{% if c > 0 %},{% endif %}
|
||||
{% endfor %}
|
||||
}
|
||||
}
|
||||
{% if i != len(alerts)-1 %},{% endif %}
|
||||
{% endfor %}
|
||||
]
|
||||
{% endfunc %}
|
||||
{% endstripspace %}
|
131
app/vmalert/notifier/alertmanager_request.qtpl.go
Normal file
131
app/vmalert/notifier/alertmanager_request.qtpl.go
Normal file
|
@ -0,0 +1,131 @@
|
|||
// Code generated by qtc from "alertmanager_request.qtpl". DO NOT EDIT.
|
||||
// See https://github.com/valyala/quicktemplate for details.
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:1
|
||||
package notifier
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:1
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:7
|
||||
import (
|
||||
qtio422016 "io"
|
||||
|
||||
qt422016 "github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:7
|
||||
var (
|
||||
_ = qtio422016.Copy
|
||||
_ = qt422016.AcquireByteBuffer
|
||||
)
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:7
|
||||
func streamamRequest(qw422016 *qt422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:7
|
||||
qw422016.N().S(`[`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:9
|
||||
for i, alert := range alerts {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:9
|
||||
qw422016.N().S(`{"startsAt":`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:11
|
||||
qw422016.N().Q(alert.Start.Format(time.RFC3339Nano))
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:11
|
||||
qw422016.N().S(`,"generatorURL":`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:12
|
||||
qw422016.N().Q(generatorURL(alert.Group, strconv.FormatUint(alert.ID, 10)))
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:12
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:13
|
||||
if !alert.End.IsZero() {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:13
|
||||
qw422016.N().S(`"endsAt":`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:14
|
||||
qw422016.N().Q(alert.End.Format(time.RFC3339Nano))
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:14
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:15
|
||||
}
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:15
|
||||
qw422016.N().S(`"labels": {"alertname":`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:17
|
||||
qw422016.N().Q(alert.Name)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:18
|
||||
for k, v := range alert.Labels {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:18
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:19
|
||||
qw422016.N().Q(k)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:19
|
||||
qw422016.N().S(`:`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:19
|
||||
qw422016.N().Q(v)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:20
|
||||
}
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:20
|
||||
qw422016.N().S(`},"annotations": {`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:23
|
||||
c := len(alert.Annotations)
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:24
|
||||
for k, v := range alert.Annotations {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:25
|
||||
c = c - 1
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:26
|
||||
qw422016.N().Q(k)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:26
|
||||
qw422016.N().S(`:`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:26
|
||||
qw422016.N().Q(v)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:26
|
||||
if c > 0 {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:26
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:26
|
||||
}
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:27
|
||||
}
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:27
|
||||
qw422016.N().S(`}}`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:30
|
||||
if i != len(alerts)-1 {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:30
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:30
|
||||
}
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:31
|
||||
}
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:31
|
||||
qw422016.N().S(`]`)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
}
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
func writeamRequest(qq422016 qtio422016.Writer, alerts []Alert, generatorURL func(string, string) string) {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
streamamRequest(qw422016, alerts, generatorURL)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
}
|
||||
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
func amRequest(alerts []Alert, generatorURL func(string, string) string) string {
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
writeamRequest(qb422016, alerts, generatorURL)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
return qs422016
|
||||
//line app/vmalert/notifier/alertmanager_request.qtpl:33
|
||||
}
|
80
app/vmalert/notifier/alertmanager_test.go
Normal file
80
app/vmalert/notifier/alertmanager_test.go
Normal file
|
@ -0,0 +1,80 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestAlertManager_Send(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) {
|
||||
t.Errorf("should not be called")
|
||||
})
|
||||
c := -1
|
||||
mux.HandleFunc(alertManagerPath, func(w http.ResponseWriter, r *http.Request) {
|
||||
c++
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("expected POST method got %s", r.Method)
|
||||
}
|
||||
switch c {
|
||||
case 0:
|
||||
conn, _, _ := w.(http.Hijacker).Hijack()
|
||||
_ = conn.Close()
|
||||
case 1:
|
||||
w.WriteHeader(500)
|
||||
case 2:
|
||||
var a []struct {
|
||||
Labels map[string]string `json:"labels"`
|
||||
StartsAt time.Time `json:"startsAt"`
|
||||
EndAt time.Time `json:"endsAt"`
|
||||
Annotations map[string]string `json:"annotations"`
|
||||
GeneratorURL string `json:"generatorURL"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&a); err != nil {
|
||||
t.Errorf("can not unmarshal data into alert %s", err)
|
||||
t.FailNow()
|
||||
}
|
||||
if len(a) != 1 {
|
||||
t.Errorf("expected 1 alert in array got %d", len(a))
|
||||
}
|
||||
if a[0].GeneratorURL != "group0" {
|
||||
t.Errorf("exptected alert0 as generatorURL got %s", a[0].GeneratorURL)
|
||||
}
|
||||
if a[0].Labels["alertname"] != "alert0" {
|
||||
t.Errorf("exptected alert0 as alert name got %s", a[0].Labels["alertname"])
|
||||
}
|
||||
if a[0].StartsAt.IsZero() {
|
||||
t.Errorf("exptected non-zero start time")
|
||||
}
|
||||
if a[0].EndAt.IsZero() {
|
||||
t.Errorf("exptected non-zero end time")
|
||||
}
|
||||
}
|
||||
})
|
||||
srv := httptest.NewServer(mux)
|
||||
defer srv.Close()
|
||||
am := NewAlertManager(srv.URL, func(group, name string) string {
|
||||
return group + name
|
||||
}, srv.Client())
|
||||
if err := am.Send([]Alert{{}, {}}); err == nil {
|
||||
t.Error("expected connection error got nil")
|
||||
}
|
||||
if err := am.Send([]Alert{}); err == nil {
|
||||
t.Error("expected wrong http code error got nil")
|
||||
}
|
||||
if err := am.Send([]Alert{{
|
||||
Group: "group",
|
||||
Name: "alert0",
|
||||
Start: time.Now().UTC(),
|
||||
End: time.Now().UTC(),
|
||||
Annotations: map[string]string{"a": "b", "c": "d", "e": "f"},
|
||||
}}); err != nil {
|
||||
t.Errorf("unexpected error %s", err)
|
||||
}
|
||||
if c != 2 {
|
||||
t.Errorf("expected 2 calls(count from zero) to server got %d", c)
|
||||
}
|
||||
}
|
6
app/vmalert/notifier/notifier.go
Normal file
6
app/vmalert/notifier/notifier.go
Normal file
|
@ -0,0 +1,6 @@
|
|||
package notifier
|
||||
|
||||
// Notifier is common interface for alert manager provider
|
||||
type Notifier interface {
|
||||
Send(alerts []Alert) error
|
||||
}
|
171
app/vmalert/notifier/template_func.go
Normal file
171
app/vmalert/notifier/template_func.go
Normal file
|
@ -0,0 +1,171 @@
|
|||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
html_template "html/template"
|
||||
"math"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
text_template "text/template"
|
||||
"time"
|
||||
)
|
||||
|
||||
var tmplFunc text_template.FuncMap
|
||||
|
||||
// InitTemplateFunc returns template helper functions
|
||||
func InitTemplateFunc(externalURL *url.URL) {
|
||||
tmplFunc = text_template.FuncMap{
|
||||
"args": func(args ...interface{}) map[string]interface{} {
|
||||
result := make(map[string]interface{})
|
||||
for i, a := range args {
|
||||
result[fmt.Sprintf("arg%d", i)] = a
|
||||
}
|
||||
return result
|
||||
},
|
||||
"reReplaceAll": func(pattern, repl, text string) string {
|
||||
re := regexp.MustCompile(pattern)
|
||||
return re.ReplaceAllString(text, repl)
|
||||
},
|
||||
"safeHtml": func(text string) html_template.HTML {
|
||||
return html_template.HTML(text)
|
||||
},
|
||||
"match": regexp.MatchString,
|
||||
"title": strings.Title,
|
||||
"toUpper": strings.ToUpper,
|
||||
"toLower": strings.ToLower,
|
||||
"humanize": func(v float64) string {
|
||||
if v == 0 || math.IsNaN(v) || math.IsInf(v, 0) {
|
||||
return fmt.Sprintf("%.4g", v)
|
||||
}
|
||||
if math.Abs(v) >= 1 {
|
||||
prefix := ""
|
||||
for _, p := range []string{"k", "M", "G", "T", "P", "E", "Z", "Y"} {
|
||||
if math.Abs(v) < 1000 {
|
||||
break
|
||||
}
|
||||
prefix = p
|
||||
v /= 1000
|
||||
}
|
||||
return fmt.Sprintf("%.4g%s", v, prefix)
|
||||
}
|
||||
prefix := ""
|
||||
for _, p := range []string{"m", "u", "n", "p", "f", "a", "z", "y"} {
|
||||
if math.Abs(v) >= 1 {
|
||||
break
|
||||
}
|
||||
prefix = p
|
||||
v *= 1000
|
||||
}
|
||||
return fmt.Sprintf("%.4g%s", v, prefix)
|
||||
},
|
||||
"humanize1024": func(v float64) string {
|
||||
if math.Abs(v) <= 1 || math.IsNaN(v) || math.IsInf(v, 0) {
|
||||
return fmt.Sprintf("%.4g", v)
|
||||
}
|
||||
prefix := ""
|
||||
for _, p := range []string{"ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi"} {
|
||||
if math.Abs(v) < 1024 {
|
||||
break
|
||||
}
|
||||
prefix = p
|
||||
v /= 1024
|
||||
}
|
||||
return fmt.Sprintf("%.4g%s", v, prefix)
|
||||
},
|
||||
"humanizeDuration": func(v float64) string {
|
||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||
return fmt.Sprintf("%.4g", v)
|
||||
}
|
||||
if v == 0 {
|
||||
return fmt.Sprintf("%.4gs", v)
|
||||
}
|
||||
if math.Abs(v) >= 1 {
|
||||
sign := ""
|
||||
if v < 0 {
|
||||
sign = "-"
|
||||
v = -v
|
||||
}
|
||||
seconds := int64(v) % 60
|
||||
minutes := (int64(v) / 60) % 60
|
||||
hours := (int64(v) / 60 / 60) % 24
|
||||
days := int64(v) / 60 / 60 / 24
|
||||
// For days to minutes, we display seconds as an integer.
|
||||
if days != 0 {
|
||||
return fmt.Sprintf("%s%dd %dh %dm %ds", sign, days, hours, minutes, seconds)
|
||||
}
|
||||
if hours != 0 {
|
||||
return fmt.Sprintf("%s%dh %dm %ds", sign, hours, minutes, seconds)
|
||||
}
|
||||
if minutes != 0 {
|
||||
return fmt.Sprintf("%s%dm %ds", sign, minutes, seconds)
|
||||
}
|
||||
// For seconds, we display 4 significant digits.
|
||||
return fmt.Sprintf("%s%.4gs", sign, v)
|
||||
}
|
||||
prefix := ""
|
||||
for _, p := range []string{"m", "u", "n", "p", "f", "a", "z", "y"} {
|
||||
if math.Abs(v) >= 1 {
|
||||
break
|
||||
}
|
||||
prefix = p
|
||||
v *= 1000
|
||||
}
|
||||
return fmt.Sprintf("%.4g%ss", v, prefix)
|
||||
},
|
||||
"humanizePercentage": func(v float64) string {
|
||||
return fmt.Sprintf("%.4g%%", v*100)
|
||||
},
|
||||
"humanizeTimestamp": func(v float64) string {
|
||||
if math.IsNaN(v) || math.IsInf(v, 0) {
|
||||
return fmt.Sprintf("%.4g", v)
|
||||
}
|
||||
t := TimeFromUnixNano(int64(v * 1e9)).Time().UTC()
|
||||
return fmt.Sprint(t)
|
||||
},
|
||||
"pathPrefix": func() string {
|
||||
return externalURL.Path
|
||||
},
|
||||
"externalURL": func() string {
|
||||
return externalURL.String()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Time is the number of milliseconds since the epoch
|
||||
// (1970-01-01 00:00 UTC) excluding leap seconds.
|
||||
type Time int64
|
||||
|
||||
// TimeFromUnixNano returns the Time equivalent to the Unix Time
|
||||
// t provided in nanoseconds.
|
||||
func TimeFromUnixNano(t int64) Time {
|
||||
return Time(t / nanosPerTick)
|
||||
}
|
||||
|
||||
// The number of nanoseconds per minimum tick.
|
||||
const nanosPerTick = int64(minimumTick / time.Nanosecond)
|
||||
|
||||
// MinimumTick is the minimum supported time resolution. This has to be
|
||||
// at least time.Second in order for the code below to work.
|
||||
const minimumTick = time.Millisecond
|
||||
|
||||
// second is the Time duration equivalent to one second.
|
||||
const second = int64(time.Second / minimumTick)
|
||||
|
||||
// Time returns the time.Time representation of t.
|
||||
func (t Time) Time() time.Time {
|
||||
return time.Unix(int64(t)/second, (int64(t)%second)*nanosPerTick)
|
||||
}
|
21
app/vmalert/notifier/utils.go
Normal file
21
app/vmalert/notifier/utils.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type errGroup struct {
|
||||
errs []string
|
||||
}
|
||||
|
||||
func (eg *errGroup) err() error {
|
||||
if eg == nil || len(eg.errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return eg
|
||||
}
|
||||
|
||||
func (eg *errGroup) Error() string {
|
||||
return fmt.Sprintf("errors:%s", strings.Join(eg.errs, "\n"))
|
||||
}
|
187
app/vmalert/remotewrite/remotewrite.go
Normal file
187
app/vmalert/remotewrite/remotewrite.go
Normal file
|
@ -0,0 +1,187 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
// Client is an asynchronous HTTP client for writing
|
||||
// timeseries via remote write protocol.
|
||||
type Client struct {
|
||||
addr string
|
||||
c *http.Client
|
||||
input chan prompbmarshal.TimeSeries
|
||||
baUser, baPass string
|
||||
flushInterval time.Duration
|
||||
maxBatchSize int
|
||||
maxQueueSize int
|
||||
|
||||
wg sync.WaitGroup
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// Addr of remote storage
|
||||
Addr string
|
||||
|
||||
BasicAuthUser string
|
||||
BasicAuthPass string
|
||||
|
||||
// MaxBatchSize defines max number of timeseries
|
||||
// to be flushed at once
|
||||
MaxBatchSize int
|
||||
// MaxQueueSize defines max length of input queue
|
||||
// populated by Push method
|
||||
MaxQueueSize int
|
||||
// FlushInterval defines time interval for flushing batches
|
||||
FlushInterval time.Duration
|
||||
// WriteTimeout defines timeout for HTTP write request
|
||||
// to remote storage
|
||||
WriteTimeout time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
defaultMaxBatchSize = 1e3
|
||||
defaultMaxQueueSize = 100
|
||||
defaultFlushInterval = 5 * time.Second
|
||||
defaultWriteTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
const writePath = "/api/v1/write"
|
||||
|
||||
// NewClient returns asynchronous client for
|
||||
// writing timeseries via remotewrite protocol.
|
||||
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
if cfg.Addr == "" {
|
||||
return nil, fmt.Errorf("config.Addr can't be empty")
|
||||
}
|
||||
if cfg.MaxBatchSize == 0 {
|
||||
cfg.MaxBatchSize = defaultMaxBatchSize
|
||||
}
|
||||
if cfg.MaxQueueSize == 0 {
|
||||
cfg.MaxQueueSize = defaultMaxQueueSize
|
||||
}
|
||||
if cfg.FlushInterval == 0 {
|
||||
cfg.FlushInterval = defaultFlushInterval
|
||||
}
|
||||
if cfg.WriteTimeout == 0 {
|
||||
cfg.WriteTimeout = defaultWriteTimeout
|
||||
}
|
||||
c := &Client{
|
||||
c: &http.Client{
|
||||
Timeout: cfg.WriteTimeout,
|
||||
},
|
||||
addr: strings.TrimSuffix(cfg.Addr, "/") + writePath,
|
||||
baUser: cfg.BasicAuthUser,
|
||||
baPass: cfg.BasicAuthPass,
|
||||
flushInterval: cfg.FlushInterval,
|
||||
maxBatchSize: cfg.MaxBatchSize,
|
||||
doneCh: make(chan struct{}),
|
||||
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
||||
}
|
||||
c.run(ctx)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Push adds timeseries into queue for writing into remote storage.
|
||||
// Push returns and error if client is stopped or if queue is full.
|
||||
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
return fmt.Errorf("client is closed")
|
||||
case c.input <- s:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("failed to push timeseries - queue is full (%d entries)",
|
||||
c.maxQueueSize)
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops the client and waits for all goroutines
|
||||
// to exit.
|
||||
func (c *Client) Close() error {
|
||||
if c.doneCh == nil {
|
||||
return fmt.Errorf("client is already closed")
|
||||
}
|
||||
close(c.input)
|
||||
close(c.doneCh)
|
||||
c.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) run(ctx context.Context) {
|
||||
ticker := time.NewTicker(c.flushInterval)
|
||||
wr := prompbmarshal.WriteRequest{}
|
||||
shutdown := func() {
|
||||
for ts := range c.input {
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
}
|
||||
lastCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
c.flush(lastCtx, wr)
|
||||
cancel()
|
||||
}
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
shutdown()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
shutdown()
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.flush(ctx, wr)
|
||||
wr = prompbmarshal.WriteRequest{}
|
||||
case ts := <-c.input:
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||
c.flush(ctx, wr)
|
||||
wr = prompbmarshal.WriteRequest{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Client) flush(ctx context.Context, wr prompbmarshal.WriteRequest) {
|
||||
if len(wr.Timeseries) < 1 {
|
||||
return
|
||||
}
|
||||
data, err := wr.Marshal()
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequest("POST", c.addr, bytes.NewReader(snappy.Encode(nil, data)))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create new HTTP request: %s", err)
|
||||
return
|
||||
}
|
||||
if c.baPass != "" {
|
||||
req.SetBasicAuth(c.baUser, c.baPass)
|
||||
}
|
||||
resp, err := c.c.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
logger.Errorf("error getting response from %s:%s", req.URL, err)
|
||||
return
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
logger.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, req.URL, body)
|
||||
return
|
||||
}
|
||||
}
|
269
app/vmalert/rule.go
Normal file
269
app/vmalert/rule.go
Normal file
|
@ -0,0 +1,269 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
// Group grouping array of alert
|
||||
type Group struct {
|
||||
Name string
|
||||
Rules []*Rule
|
||||
}
|
||||
|
||||
// Rule is basic alert entity
|
||||
type Rule struct {
|
||||
Name string `yaml:"alert"`
|
||||
Expr string `yaml:"expr"`
|
||||
For time.Duration `yaml:"for"`
|
||||
Labels map[string]string `yaml:"labels"`
|
||||
Annotations map[string]string `yaml:"annotations"`
|
||||
|
||||
group *Group
|
||||
|
||||
// guard status fields
|
||||
mu sync.RWMutex
|
||||
// stores list of active alerts
|
||||
alerts map[uint64]*notifier.Alert
|
||||
// stores last moment of time Exec was called
|
||||
lastExecTime time.Time
|
||||
// stores last error that happened in Exec func
|
||||
// resets on every successful Exec
|
||||
// may be used as Health state
|
||||
lastExecError error
|
||||
}
|
||||
|
||||
// Validate validates rule
|
||||
func (r *Rule) Validate() error {
|
||||
if r.Name == "" {
|
||||
return errors.New("rule name can not be empty")
|
||||
}
|
||||
if r.Expr == "" {
|
||||
return fmt.Errorf("expression for rule %q can't be empty", r.Name)
|
||||
}
|
||||
if _, err := metricsql.Parse(r.Expr); err != nil {
|
||||
return fmt.Errorf("invalid expression for rule %q: %w", r.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Exec executes Rule expression via the given Querier.
|
||||
// Based on the Querier results Rule maintains notifier.Alerts
|
||||
func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
|
||||
qMetrics, err := q.Query(ctx, r.Expr)
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
r.lastExecError = err
|
||||
r.lastExecTime = time.Now()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute query %q: %s", r.Expr, err)
|
||||
}
|
||||
|
||||
for h, a := range r.alerts {
|
||||
// cleanup inactive alerts from previous Eval
|
||||
if a.State == notifier.StateInactive {
|
||||
delete(r.alerts, h)
|
||||
}
|
||||
}
|
||||
|
||||
updated := make(map[uint64]struct{})
|
||||
// update list of active alerts
|
||||
for _, m := range qMetrics {
|
||||
h := hash(m)
|
||||
updated[h] = struct{}{}
|
||||
if _, ok := r.alerts[h]; ok {
|
||||
continue
|
||||
}
|
||||
a, err := r.newAlert(m)
|
||||
if err != nil {
|
||||
r.lastExecError = err
|
||||
return fmt.Errorf("failed to create alert: %s", err)
|
||||
}
|
||||
a.ID = h
|
||||
a.State = notifier.StatePending
|
||||
r.alerts[h] = a
|
||||
}
|
||||
|
||||
for h, a := range r.alerts {
|
||||
// if alert wasn't updated in this iteration
|
||||
// means it is resolved already
|
||||
if _, ok := updated[h]; !ok {
|
||||
a.State = notifier.StateInactive
|
||||
// set endTime to last execution time
|
||||
// so it can be sent by notifier on next step
|
||||
a.End = r.lastExecTime
|
||||
continue
|
||||
}
|
||||
if a.State == notifier.StatePending && time.Since(a.Start) >= r.For {
|
||||
a.State = notifier.StateFiring
|
||||
alertsFired.Inc()
|
||||
}
|
||||
if a.State == notifier.StateFiring {
|
||||
a.End = r.lastExecTime.Add(3 * *evaluationInterval)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: consider hashing algorithm in VM
|
||||
func hash(m datasource.Metric) uint64 {
|
||||
hash := fnv.New64a()
|
||||
labels := m.Labels
|
||||
sort.Slice(labels, func(i, j int) bool {
|
||||
return labels[i].Name < labels[j].Name
|
||||
})
|
||||
for _, l := range labels {
|
||||
hash.Write([]byte(l.Name))
|
||||
hash.Write([]byte(l.Value))
|
||||
hash.Write([]byte("\xff"))
|
||||
}
|
||||
return hash.Sum64()
|
||||
}
|
||||
|
||||
func (r *Rule) newAlert(m datasource.Metric) (*notifier.Alert, error) {
|
||||
a := ¬ifier.Alert{
|
||||
Group: r.group.Name,
|
||||
Name: r.Name,
|
||||
Labels: map[string]string{},
|
||||
Value: m.Value,
|
||||
Start: time.Now(),
|
||||
// TODO: support End time
|
||||
}
|
||||
|
||||
// 1. use data labels
|
||||
for _, l := range m.Labels {
|
||||
a.Labels[l.Name] = l.Value
|
||||
}
|
||||
|
||||
// 2. template rule labels with data labels
|
||||
rLabels, err := a.ExecTemplate(r.Labels)
|
||||
if err != nil {
|
||||
return a, err
|
||||
}
|
||||
|
||||
// 3. merge data labels and rule labels
|
||||
// metric labels may be overridden by
|
||||
// rule labels
|
||||
for k, v := range rLabels {
|
||||
a.Labels[k] = v
|
||||
}
|
||||
|
||||
// 4. template merged labels
|
||||
a.Labels, err = a.ExecTemplate(a.Labels)
|
||||
if err != nil {
|
||||
return a, err
|
||||
}
|
||||
|
||||
a.Annotations, err = a.ExecTemplate(r.Annotations)
|
||||
return a, err
|
||||
}
|
||||
|
||||
// AlertAPI generates APIAlert object from alert by its id(hash)
|
||||
func (r *Rule) AlertAPI(id uint64) *APIAlert {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
a, ok := r.alerts[id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return r.newAlertAPI(*a)
|
||||
}
|
||||
|
||||
// AlertsAPI generates list of APIAlert objects from existing alerts
|
||||
func (r *Rule) AlertsAPI() []*APIAlert {
|
||||
var alerts []*APIAlert
|
||||
r.mu.RLock()
|
||||
for _, a := range r.alerts {
|
||||
alerts = append(alerts, r.newAlertAPI(*a))
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
return alerts
|
||||
}
|
||||
|
||||
func (r *Rule) newAlertAPI(a notifier.Alert) *APIAlert {
|
||||
return &APIAlert{
|
||||
ID: a.ID,
|
||||
Name: a.Name,
|
||||
Group: a.Group,
|
||||
Expression: r.Expr,
|
||||
Labels: a.Labels,
|
||||
Annotations: a.Annotations,
|
||||
State: a.State.String(),
|
||||
ActiveAt: a.Start,
|
||||
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
// AlertMetricName is the metric name for synthetic alert timeseries.
|
||||
alertMetricName = "ALERTS"
|
||||
// AlertForStateMetricName is the metric name for 'for' state of alert.
|
||||
alertForStateMetricName = "ALERTS_FOR_STATE"
|
||||
|
||||
// AlertNameLabel is the label name indicating the name of an alert.
|
||||
alertNameLabel = "alertname"
|
||||
// AlertStateLabel is the label name indicating the state of an alert.
|
||||
alertStateLabel = "alertstate"
|
||||
)
|
||||
|
||||
func (r *Rule) AlertToTimeSeries(a *notifier.Alert, timestamp time.Time) []prompbmarshal.TimeSeries {
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
tss = append(tss, alertToTimeSeries(r.Name, a, timestamp))
|
||||
if r.For > 0 {
|
||||
tss = append(tss, alertForToTimeSeries(r.Name, a, timestamp))
|
||||
}
|
||||
return tss
|
||||
}
|
||||
|
||||
func alertToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
labels := make(map[string]string)
|
||||
for k, v := range a.Labels {
|
||||
labels[k] = v
|
||||
}
|
||||
labels["__name__"] = alertMetricName
|
||||
labels[alertNameLabel] = name
|
||||
labels[alertStateLabel] = a.State.String()
|
||||
return newTimeSeries(1, labels, timestamp)
|
||||
}
|
||||
|
||||
func alertForToTimeSeries(name string, a *notifier.Alert, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
labels := make(map[string]string)
|
||||
for k, v := range a.Labels {
|
||||
labels[k] = v
|
||||
}
|
||||
labels["__name__"] = alertForStateMetricName
|
||||
labels[alertNameLabel] = name
|
||||
return newTimeSeries(float64(a.Start.Unix()), labels, timestamp)
|
||||
}
|
||||
|
||||
func newTimeSeries(value float64, labels map[string]string, timestamp time.Time) prompbmarshal.TimeSeries {
|
||||
ts := prompbmarshal.TimeSeries{}
|
||||
ts.Samples = append(ts.Samples, prompbmarshal.Sample{
|
||||
Value: value,
|
||||
Timestamp: timestamp.UnixNano() / 1e6,
|
||||
})
|
||||
keys := make([]string, 0, len(labels))
|
||||
for k := range labels {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, key := range keys {
|
||||
ts.Labels = append(ts.Labels, prompbmarshal.Label{
|
||||
Name: key,
|
||||
Value: labels[key],
|
||||
})
|
||||
}
|
||||
return ts
|
||||
}
|
399
app/vmalert/rule_test.go
Normal file
399
app/vmalert/rule_test.go
Normal file
|
@ -0,0 +1,399 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
func TestRule_Validate(t *testing.T) {
|
||||
if err := (&Rule{}).Validate(); err == nil {
|
||||
t.Errorf("exptected empty name error")
|
||||
}
|
||||
if err := (&Rule{Name: "alert"}).Validate(); err == nil {
|
||||
t.Errorf("exptected empty expr error")
|
||||
}
|
||||
if err := (&Rule{Name: "alert", Expr: "test{"}).Validate(); err == nil {
|
||||
t.Errorf("exptected invalid expr error")
|
||||
}
|
||||
if err := (&Rule{Name: "alert", Expr: "test>0"}).Validate(); err != nil {
|
||||
t.Errorf("exptected valid rule got %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRule_AlertToTimeSeries(t *testing.T) {
|
||||
timestamp := time.Now()
|
||||
testCases := []struct {
|
||||
rule *Rule
|
||||
alert *notifier.Alert
|
||||
expTS []prompbmarshal.TimeSeries
|
||||
}{
|
||||
{
|
||||
newTestRule("instant", 0),
|
||||
¬ifier.Alert{State: notifier.StateFiring},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("instant extra labels", 0),
|
||||
¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{
|
||||
"job": "foo",
|
||||
"instance": "bar",
|
||||
}},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant extra labels",
|
||||
"job": "foo",
|
||||
"instance": "bar",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("instant labels override", 0),
|
||||
¬ifier.Alert{State: notifier.StateFiring, Labels: map[string]string{
|
||||
alertStateLabel: "foo",
|
||||
"__name__": "bar",
|
||||
}},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "instant labels override",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for", time.Second),
|
||||
¬ifier.Alert{State: notifier.StateFiring, Start: timestamp.Add(time.Second)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StateFiring.String(),
|
||||
alertNameLabel: "for",
|
||||
}, timestamp),
|
||||
newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for pending", 10*time.Second),
|
||||
¬ifier.Alert{State: notifier.StatePending, Start: timestamp.Add(time.Second)},
|
||||
[]prompbmarshal.TimeSeries{
|
||||
newTimeSeries(1, map[string]string{
|
||||
"__name__": alertMetricName,
|
||||
alertStateLabel: notifier.StatePending.String(),
|
||||
alertNameLabel: "for pending",
|
||||
}, timestamp),
|
||||
newTimeSeries(float64(timestamp.Add(time.Second).Unix()), map[string]string{
|
||||
"__name__": alertForStateMetricName,
|
||||
alertNameLabel: "for pending",
|
||||
}, timestamp),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.rule.Name, func(t *testing.T) {
|
||||
tss := tc.rule.AlertToTimeSeries(tc.alert, timestamp)
|
||||
if len(tc.expTS) != len(tss) {
|
||||
t.Fatalf("expected number of timeseries %d; got %d", len(tc.expTS), len(tss))
|
||||
}
|
||||
for i := range tc.expTS {
|
||||
expTS, gotTS := tc.expTS[i], tss[i]
|
||||
if len(expTS.Samples) != len(gotTS.Samples) {
|
||||
t.Fatalf("expected number of samples %d; got %d", len(expTS.Samples), len(gotTS.Samples))
|
||||
}
|
||||
for i, exp := range expTS.Samples {
|
||||
got := gotTS.Samples[i]
|
||||
if got.Value != exp.Value {
|
||||
t.Errorf("expected value %.2f; got %.2f", exp.Value, got.Value)
|
||||
}
|
||||
if got.Timestamp != exp.Timestamp {
|
||||
t.Errorf("expected timestamp %d; got %d", exp.Timestamp, got.Timestamp)
|
||||
}
|
||||
}
|
||||
if len(expTS.Labels) != len(gotTS.Labels) {
|
||||
t.Fatalf("expected number of labels %d; got %d", len(expTS.Labels), len(gotTS.Labels))
|
||||
}
|
||||
for i, exp := range expTS.Labels {
|
||||
got := gotTS.Labels[i]
|
||||
if got.Name != exp.Name {
|
||||
t.Errorf("expected label name %q; got %q", exp.Name, got.Name)
|
||||
}
|
||||
if got.Value != exp.Value {
|
||||
t.Errorf("expected label value %q; got %q", exp.Value, got.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newTestRule(name string, waitFor time.Duration) *Rule {
|
||||
return &Rule{Name: name, alerts: make(map[uint64]*notifier.Alert), For: waitFor}
|
||||
}
|
||||
|
||||
func TestRule_Exec(t *testing.T) {
|
||||
testCases := []struct {
|
||||
rule *Rule
|
||||
steps [][]datasource.Metric
|
||||
expAlerts map[uint64]*notifier.Alert
|
||||
}{
|
||||
{
|
||||
newTestRule("empty", 0),
|
||||
[][]datasource.Metric{},
|
||||
map[uint64]*notifier.Alert{},
|
||||
},
|
||||
{
|
||||
newTestRule("single-firing", 0),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("single-firing=>inactive", 0),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("single-firing=>inactive=>firing", 0),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("single-firing=>inactive=>firing=>inactive", 0),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("single-firing=>inactive=>firing=>inactive=>empty", 0),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
{},
|
||||
},
|
||||
map[uint64]*notifier.Alert{},
|
||||
},
|
||||
{
|
||||
newTestRule("single-firing=>inactive=>firing=>inactive=>empty=>firing", 0),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{},
|
||||
{},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("multiple-firing", 0),
|
||||
[][]datasource.Metric{
|
||||
{
|
||||
metricWithLabels(t, "__name__", "foo"),
|
||||
metricWithLabels(t, "__name__", "foo1"),
|
||||
metricWithLabels(t, "__name__", "foo2"),
|
||||
},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
|
||||
hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateFiring},
|
||||
hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("multiple-steps-firing", 0),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{metricWithLabels(t, "__name__", "foo1")},
|
||||
{metricWithLabels(t, "__name__", "foo2")},
|
||||
},
|
||||
// 1: fire first alert
|
||||
// 2: fire second alert, set first inactive
|
||||
// 3: fire third alert, set second inactive, delete first one
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo1")): {State: notifier.StateInactive},
|
||||
hash(metricWithLabels(t, "__name__", "foo2")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("duplicate", 0),
|
||||
[][]datasource.Metric{
|
||||
{
|
||||
// metrics with the same labelset should result in one alert
|
||||
metricWithLabels(t, "__name__", "foo", "type", "bar"),
|
||||
metricWithLabels(t, "type", "bar", "__name__", "foo"),
|
||||
},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo", "type", "bar")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for-pending", time.Minute),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for-fired", time.Millisecond),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for-pending=>inactive", time.Millisecond),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
// empty step to reset pending alerts
|
||||
{},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for-pending=>firing=>inactive", time.Millisecond),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
// empty step to reset pending alerts
|
||||
{},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateInactive},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for-pending=>firing=>inactive=>pending", time.Millisecond),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
// empty step to reset pending alerts
|
||||
{},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StatePending},
|
||||
},
|
||||
},
|
||||
{
|
||||
newTestRule("for-pending=>firing=>inactive=>pending=>firing", time.Millisecond),
|
||||
[][]datasource.Metric{
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
// empty step to reset pending alerts
|
||||
{},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
{metricWithLabels(t, "__name__", "foo")},
|
||||
},
|
||||
map[uint64]*notifier.Alert{
|
||||
hash(metricWithLabels(t, "__name__", "foo")): {State: notifier.StateFiring},
|
||||
},
|
||||
},
|
||||
}
|
||||
fakeGroup := &Group{Name: "TestRule_Exec"}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.rule.Name, func(t *testing.T) {
|
||||
fq := &fakeQuerier{}
|
||||
tc.rule.group = fakeGroup
|
||||
for _, step := range tc.steps {
|
||||
fq.reset()
|
||||
fq.add(t, step...)
|
||||
if err := tc.rule.Exec(context.TODO(), fq); err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
// artificial delay between applying steps
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
if len(tc.rule.alerts) != len(tc.expAlerts) {
|
||||
t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts))
|
||||
}
|
||||
for key, exp := range tc.expAlerts {
|
||||
got, ok := tc.rule.alerts[key]
|
||||
if !ok {
|
||||
t.Fatalf("expected to have key %d", key)
|
||||
}
|
||||
if got.State != exp.State {
|
||||
t.Fatalf("expected state %d; got %d", exp.State, got.State)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func metricWithLabels(t *testing.T, labels ...string) datasource.Metric {
|
||||
t.Helper()
|
||||
if len(labels) == 0 || len(labels)%2 != 0 {
|
||||
t.Fatalf("expected to get even number of labels")
|
||||
}
|
||||
m := datasource.Metric{}
|
||||
for i := 0; i < len(labels); i += 2 {
|
||||
m.Labels = append(m.Labels, datasource.Label{
|
||||
Name: labels[i],
|
||||
Value: labels[i+1],
|
||||
})
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
type fakeQuerier struct {
|
||||
metrics []datasource.Metric
|
||||
}
|
||||
|
||||
func (fq *fakeQuerier) reset() {
|
||||
fq.metrics = fq.metrics[:0]
|
||||
}
|
||||
|
||||
func (fq *fakeQuerier) add(t *testing.T, metrics ...datasource.Metric) {
|
||||
fq.metrics = append(fq.metrics, metrics...)
|
||||
}
|
||||
|
||||
func (fq fakeQuerier) Query(ctx context.Context, query string) ([]datasource.Metric, error) {
|
||||
return fq.metrics, nil
|
||||
}
|
19
app/vmalert/testdata/dir/rules0-bad.rules
vendored
Normal file
19
app/vmalert/testdata/dir/rules0-bad.rules
vendored
Normal file
|
@ -0,0 +1,19 @@
|
|||
groups:
|
||||
- name: group
|
||||
rules:
|
||||
- alert: InvalidAnnotations
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ $value }"
|
||||
description: "{{$labels}}"
|
||||
- alert: UnkownAnnotationsFunction
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ value|query }}"
|
||||
description: "{{$labels}}"
|
13
app/vmalert/testdata/dir/rules0-good.rules
vendored
Normal file
13
app/vmalert/testdata/dir/rules0-good.rules
vendored
Normal file
|
@ -0,0 +1,13 @@
|
|||
groups:
|
||||
- name: duplicatedGroupDiffFiles
|
||||
rules:
|
||||
- alert: VMRows
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ $value|humanize }}"
|
||||
description: "{{$labels}}"
|
||||
|
||||
|
22
app/vmalert/testdata/dir/rules1-bad.rules
vendored
Normal file
22
app/vmalert/testdata/dir/rules1-bad.rules
vendored
Normal file
|
@ -0,0 +1,22 @@
|
|||
groups:
|
||||
- name: sameGroup
|
||||
rules:
|
||||
- alert: alert
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ $value }}"
|
||||
description: "{{$labels}}"
|
||||
- name: sameGroup
|
||||
rules:
|
||||
- alert: alert
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ $value }}"
|
||||
description: "{{$labels}}"
|
||||
|
13
app/vmalert/testdata/dir/rules1-good.rules
vendored
Normal file
13
app/vmalert/testdata/dir/rules1-good.rules
vendored
Normal file
|
@ -0,0 +1,13 @@
|
|||
groups:
|
||||
- name: duplicatedGroupDiffFiles
|
||||
rules:
|
||||
- alert: VMRows
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ $value }}"
|
||||
description: "{{$labels}}"
|
||||
|
||||
|
11
app/vmalert/testdata/dir/rules2-bad.rules
vendored
Normal file
11
app/vmalert/testdata/dir/rules2-bad.rules
vendored
Normal file
|
@ -0,0 +1,11 @@
|
|||
groups:
|
||||
- name: group
|
||||
rules:
|
||||
- alert: UnkownLabelFunction
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
summary: "{{ value|query }}"
|
||||
annotations:
|
||||
description: "{{$labels}}"
|
28
app/vmalert/testdata/rules0-bad.rules
vendored
Normal file
28
app/vmalert/testdata/rules0-bad.rules
vendored
Normal file
|
@ -0,0 +1,28 @@
|
|||
groups:
|
||||
- name: group
|
||||
rules:
|
||||
- alert: InvalidExpr
|
||||
for: 5m
|
||||
expr: vm_rows{ > 0
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ $value }}"
|
||||
description: "{{$labels}}"
|
||||
- alert: EmptyExpr
|
||||
for: 5m
|
||||
expr: ""
|
||||
labels:
|
||||
label: bar
|
||||
annotations:
|
||||
summary: "{{ $value }}"
|
||||
description: "{{$labels}}"
|
||||
- alert: ""
|
||||
for: 5m
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: foo
|
||||
annotations:
|
||||
summary: "{{ $value }}"
|
||||
description: "{{$labels}}"
|
||||
|
23
app/vmalert/testdata/rules0-good.rules
vendored
Normal file
23
app/vmalert/testdata/rules0-good.rules
vendored
Normal file
|
@ -0,0 +1,23 @@
|
|||
groups:
|
||||
- name: groupGorSingleAlert
|
||||
rules:
|
||||
- alert: VMRows
|
||||
for: 10s
|
||||
expr: vm_rows > 0
|
||||
labels:
|
||||
label: bar
|
||||
template: "{{ $value|humanize }}"
|
||||
annotations:
|
||||
summary: "{{ $value|humanize }}"
|
||||
description: "{{$labels}}"
|
||||
|
||||
- name: TestGroup
|
||||
rules:
|
||||
- alert: Conns
|
||||
expr: sum(vm_tcplistener_conns) by(instance) > 1
|
||||
annotations:
|
||||
summary: "Too high connection number for {{$labels.instance}}"
|
||||
description: "It is {{ $value }} connections for {{$labels.instance}}"
|
||||
- alert: ExampleAlertAlwaysFiring
|
||||
expr: sum by(job)
|
||||
(up == 1)
|
134
app/vmalert/web.go
Normal file
134
app/vmalert/web.go
Normal file
|
@ -0,0 +1,134 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
)
|
||||
|
||||
// APIAlert has info for an alert.
|
||||
type APIAlert struct {
|
||||
ID uint64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Group string `json:"group"`
|
||||
Expression string `json:"expression"`
|
||||
State string `json:"state"`
|
||||
Value string `json:"value"`
|
||||
Labels map[string]string `json:"labels"`
|
||||
Annotations map[string]string `json:"annotations"`
|
||||
ActiveAt time.Time `json:"activeAt"`
|
||||
}
|
||||
|
||||
type requestHandler struct {
|
||||
groups []Group
|
||||
}
|
||||
|
||||
var pathList = [][]string{
|
||||
{"/api/v1/alerts", "list all active alerts"},
|
||||
{"/api/v1/groupName/alertID/status", "get alert status by ID"},
|
||||
// /metrics is served by httpserver by default
|
||||
{"/metrics", "list of application metrics"},
|
||||
}
|
||||
|
||||
func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
|
||||
resph := responseHandler{w}
|
||||
switch r.URL.Path {
|
||||
case "/":
|
||||
for _, path := range pathList {
|
||||
p, doc := path[0], path[1]
|
||||
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
|
||||
}
|
||||
return true
|
||||
case "/api/v1/alerts":
|
||||
resph.handle(rh.list())
|
||||
return true
|
||||
default:
|
||||
// /api/v1/<groupName>/<alertID>/status
|
||||
if strings.HasSuffix(r.URL.Path, "/status") {
|
||||
resph.handle(rh.alert(r.URL.Path))
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
type listAlertsResponse struct {
|
||||
Data struct {
|
||||
Alerts []*APIAlert `json:"alerts"`
|
||||
} `json:"data"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
func (rh *requestHandler) list() ([]byte, error) {
|
||||
lr := listAlertsResponse{Status: "success"}
|
||||
for _, g := range rh.groups {
|
||||
for _, r := range g.Rules {
|
||||
lr.Data.Alerts = append(lr.Data.Alerts, r.AlertsAPI()...)
|
||||
}
|
||||
}
|
||||
|
||||
// sort list of alerts for deterministic output
|
||||
sort.Slice(lr.Data.Alerts, func(i, j int) bool {
|
||||
return lr.Data.Alerts[i].Name < lr.Data.Alerts[j].Name
|
||||
})
|
||||
|
||||
b, err := json.Marshal(lr)
|
||||
if err != nil {
|
||||
return nil, &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf(`error encoding list of active alerts: %s`, err),
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
}
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (rh *requestHandler) alert(path string) ([]byte, error) {
|
||||
parts := strings.SplitN(strings.TrimPrefix(path, "/api/v1/"), "/", 3)
|
||||
if len(parts) != 3 {
|
||||
return nil, &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf(`path %q cointains /status suffix but doesn't match pattern "/group/alert/status"`, path),
|
||||
StatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
group := strings.TrimRight(parts[0], "/")
|
||||
idStr := strings.TrimRight(parts[1], "/")
|
||||
id, err := strconv.ParseUint(idStr, 10, 0)
|
||||
if err != nil {
|
||||
return nil, &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf(`cannot parse int from %q`, idStr),
|
||||
StatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
for _, g := range rh.groups {
|
||||
if g.Name != group {
|
||||
continue
|
||||
}
|
||||
for i := range g.Rules {
|
||||
if apiAlert := g.Rules[i].AlertAPI(id); apiAlert != nil {
|
||||
return json.Marshal(apiAlert)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, &httpserver.ErrorWithStatusCode{
|
||||
Err: fmt.Errorf(`cannot find alert %s in %q`, idStr, group),
|
||||
StatusCode: http.StatusNotFound,
|
||||
}
|
||||
}
|
||||
|
||||
// responseHandler wrapper on http.ResponseWriter with sugar
|
||||
type responseHandler struct{ http.ResponseWriter }
|
||||
|
||||
func (w responseHandler) handle(b []byte, err error) {
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, "%s", err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(b)
|
||||
}
|
72
app/vmalert/web_test.go
Normal file
72
app/vmalert/web_test.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
)
|
||||
|
||||
func TestHandler(t *testing.T) {
|
||||
rule := &Rule{
|
||||
Name: "alert",
|
||||
alerts: map[uint64]*notifier.Alert{
|
||||
0: {},
|
||||
},
|
||||
}
|
||||
rh := &requestHandler{
|
||||
groups: []Group{{
|
||||
Name: "group",
|
||||
Rules: []*Rule{rule},
|
||||
}},
|
||||
}
|
||||
getResp := func(url string, to interface{}, code int) {
|
||||
t.Helper()
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err %s", err)
|
||||
}
|
||||
if code != resp.StatusCode {
|
||||
t.Errorf("unexpected status code %d want %d", resp.StatusCode, code)
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
t.Errorf("err closing body %s", err)
|
||||
}
|
||||
}()
|
||||
if to != nil {
|
||||
if err = json.NewDecoder(resp.Body).Decode(to); err != nil {
|
||||
t.Errorf("unexpected err %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { rh.handler(w, r) }))
|
||||
defer ts.Close()
|
||||
t.Run("/api/v1/alerts", func(t *testing.T) {
|
||||
lr := listAlertsResponse{}
|
||||
getResp(ts.URL+"/api/v1/alerts", &lr, 200)
|
||||
if length := len(lr.Data.Alerts); length != 1 {
|
||||
t.Errorf("expected 1 alert got %d", length)
|
||||
}
|
||||
})
|
||||
t.Run("/api/v1/group/0/status", func(t *testing.T) {
|
||||
alert := &APIAlert{}
|
||||
getResp(ts.URL+"/api/v1/group/0/status", alert, 200)
|
||||
expAlert := rule.newAlertAPI(*rule.alerts[0])
|
||||
if !reflect.DeepEqual(alert, expAlert) {
|
||||
t.Errorf("expected %v is equal to %v", alert, expAlert)
|
||||
}
|
||||
})
|
||||
t.Run("/api/v1/group/1/status", func(t *testing.T) {
|
||||
getResp(ts.URL+"/api/v1/group/1/status", nil, 404)
|
||||
})
|
||||
t.Run("/api/v1/unknown-group/0/status", func(t *testing.T) {
|
||||
getResp(ts.URL+"/api/v1/unknown-group/0/status", nil, 404)
|
||||
})
|
||||
t.Run("/", func(t *testing.T) {
|
||||
getResp(ts.URL, nil, 200)
|
||||
})
|
||||
}
|
Loading…
Reference in a new issue