mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
parent
9f8cc8ae1b
commit
7c9405f53d
3 changed files with 36 additions and 7 deletions
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -39,7 +40,6 @@ Examples:
|
|||
|
||||
// TODO: hot configuration reload
|
||||
// TODO: alerts state persistence
|
||||
// TODO: metrics
|
||||
func main() {
|
||||
envflag.Parse()
|
||||
buildinfo.Init()
|
||||
|
@ -89,22 +89,43 @@ type watchdog struct {
|
|||
alertProvider notifier.Notifier
|
||||
}
|
||||
|
||||
var (
|
||||
iterationTotal = metrics.NewCounter(`vmalert_iteration_total`)
|
||||
iterationDuration = metrics.NewSummary(`vmalert_iteration_duration_seconds`)
|
||||
|
||||
execTotal = metrics.NewCounter(`vmalert_execution_total`)
|
||||
execErrors = metrics.NewCounter(`vmalert_execution_errors_total`)
|
||||
execDuration = metrics.NewSummary(`vmalert_execution_duration_seconds`)
|
||||
)
|
||||
|
||||
func (w *watchdog) run(ctx context.Context, group Group, evaluationInterval time.Duration) {
|
||||
logger.Infof("watchdog for %s has been run", group.Name)
|
||||
logger.Infof("watchdog for %s has been started", group.Name)
|
||||
t := time.NewTicker(evaluationInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
iterationTotal.Inc()
|
||||
iterationStart := time.Now()
|
||||
for _, rule := range group.Rules {
|
||||
if err := rule.Exec(ctx, w.storage); err != nil {
|
||||
execTotal.Inc()
|
||||
|
||||
execStart := time.Now()
|
||||
err := rule.Exec(ctx, w.storage)
|
||||
execDuration.UpdateDuration(execStart)
|
||||
|
||||
if err != nil {
|
||||
execErrors.Inc()
|
||||
logger.Errorf("failed to execute rule %q.%q: %s", group.Name, rule.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := rule.Send(ctx, w.alertProvider); err != nil {
|
||||
logger.Errorf("failed to send alert for rule %q.%q: %s", group.Name, rule.Name, err)
|
||||
}
|
||||
}
|
||||
iterationDuration.UpdateDuration(iterationStart)
|
||||
case <-ctx.Done():
|
||||
logger.Infof("%s received stop signal", group.Name)
|
||||
return
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
// Group grouping array of alert
|
||||
|
@ -61,7 +62,7 @@ func (r *Rule) Validate() error {
|
|||
// Exec executes Rule expression via the given Querier.
|
||||
// Based on the Querier results Rule maintains notifier.Alerts
|
||||
func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
|
||||
metrics, err := q.Query(ctx, r.Expr)
|
||||
qMetrics, err := q.Query(ctx, r.Expr)
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
|
@ -80,7 +81,7 @@ func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
|
|||
|
||||
updated := make(map[uint64]struct{})
|
||||
// update list of active alerts
|
||||
for _, m := range metrics {
|
||||
for _, m := range qMetrics {
|
||||
h := hash(m)
|
||||
updated[h] = struct{}{}
|
||||
if _, ok := r.alerts[h]; ok {
|
||||
|
@ -108,6 +109,7 @@ func (r *Rule) Exec(ctx context.Context, q datasource.Querier) error {
|
|||
}
|
||||
if a.State == notifier.StatePending && time.Since(a.Start) >= r.For {
|
||||
a.State = notifier.StateFiring
|
||||
alertsFired.Inc()
|
||||
}
|
||||
if a.State == notifier.StateFiring {
|
||||
a.End = r.lastExecTime.Add(3 * *evaluationInterval)
|
||||
|
@ -138,11 +140,15 @@ func (r *Rule) Send(_ context.Context, ap notifier.Notifier) error {
|
|||
logger.Infof("no alerts to send")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Infof("sending %d alerts", len(alertsCopy))
|
||||
alertsSent.Add(len(alertsCopy))
|
||||
return ap.Send(alertsCopy)
|
||||
}
|
||||
|
||||
var (
|
||||
alertsFired = metrics.NewCounter(`vmalert_alerts_fired_total`)
|
||||
alertsSent = metrics.NewCounter(`vmalert_alerts_sent_total`)
|
||||
)
|
||||
|
||||
// TODO: consider hashing algorithm in VM
|
||||
func hash(m datasource.Metric) uint64 {
|
||||
hash := fnv.New64a()
|
||||
|
|
|
@ -32,6 +32,8 @@ type requestHandler struct {
|
|||
var pathList = [][]string{
|
||||
{"/api/v1/alerts", "list all active alerts"},
|
||||
{"/api/v1/groupName/alertID/status", "get alert status by ID"},
|
||||
// /metrics is served by httpserver by default
|
||||
{"/metrics", "list of application metrics"},
|
||||
}
|
||||
|
||||
func (rh *requestHandler) handler(w http.ResponseWriter, r *http.Request) bool {
|
||||
|
|
Loading…
Reference in a new issue