2020-04-27 21:19:27 +00:00
package main
import (
"context"
"flag"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/metrics"
)
var (
rulePath = flagutil . NewArray ( "rule" , ` Path to the file with alert rules .
Supports patterns . Flag can be specified multiple times .
Examples :
- rule / path / to / file . Path to a single file with alerting rules
- rule dir / * . yaml - rule / * . yaml . Relative path to all . yaml files in "dir" folder ,
absolute path to all . yaml files in root . ` )
validateTemplates = flag . Bool ( "rule.validateTemplates" , true , "Indicates to validate annotation and label templates" )
httpListenAddr = flag . String ( "httpListenAddr" , ":8880" , "Address to listen for http connections" )
2020-05-04 21:51:22 +00:00
datasourceURL = flag . String ( "datasource.url" , "" , "Victoria Metrics or VMSelect url. Required parameter." +
" E.g. http://127.0.0.1:8428" )
basicAuthUsername = flag . String ( "datasource.basicAuth.username" , "" , "Optional basic auth username for -datasource.url" )
basicAuthPassword = flag . String ( "datasource.basicAuth.password" , "" , "Optional basic auth password for -datasource.url" )
remoteWriteURL = flag . String ( "remotewrite.url" , "" , "Optional URL to Victoria Metrics or VMInsert where to persist alerts state" +
" in form of timeseries. E.g. http://127.0.0.1:8428" )
remoteWriteUsername = flag . String ( "remotewrite.basicAuth.username" , "" , "Optional basic auth username for -remotewrite.url" )
remoteWritePassword = flag . String ( "remotewrite.basicAuth.password" , "" , "Optional basic auth password for -remotewrite.url" )
remoteReadURL = flag . String ( "remoteread.url" , "" , "Optional URL to Victoria Metrics or VMSelect that will be used to restore alerts" +
" state. This configuration makes sense only if `vmalert` was configured with `remotewrite.url` before and has been successfully persisted its state." +
" E.g. http://127.0.0.1:8428" )
remoteReadUsername = flag . String ( "remoteread.basicAuth.username" , "" , "Optional basic auth username for -remoteread.url" )
remoteReadPassword = flag . String ( "remoteread.basicAuth.password" , "" , "Optional basic auth password for -remoteread.url" )
remoteReadLookBack = flag . Duration ( "remoteread.lookback" , time . Hour , "Lookback defines how far to look into past for alerts timeseries." +
" For example, if lookback=1h then range from now() to now()-1h will be scanned." )
evaluationInterval = flag . Duration ( "evaluationInterval" , time . Minute , "How often to evaluate the rules. Default 1m" )
2020-04-27 21:19:27 +00:00
notifierURL = flag . String ( "notifier.url" , "" , "Prometheus alertmanager URL. Required parameter. e.g. http://127.0.0.1:9093" )
externalURL = flag . String ( "external.url" , "" , "External URL is used as alert's source for sent alerts to the notifier" )
)
func main ( ) {
envflag . Parse ( )
buildinfo . Init ( )
logger . Init ( )
checkFlags ( )
ctx , cancel := context . WithCancel ( context . Background ( ) )
2020-04-27 21:33:55 +00:00
eu , err := getExternalURL ( * externalURL , * httpListenAddr , false )
2020-04-27 21:19:27 +00:00
if err != nil {
logger . Fatalf ( "can not get external url:%s " , err )
}
notifier . InitTemplateFunc ( eu )
logger . Infof ( "reading alert rules configuration file from %s" , strings . Join ( * rulePath , ";" ) )
2020-05-09 09:32:12 +00:00
groups , err := readRules ( )
2020-04-27 21:19:27 +00:00
if err != nil {
logger . Fatalf ( "cannot parse configuration file: %s" , err )
}
w := & watchdog {
storage : datasource . NewVMStorage ( * datasourceURL , * basicAuthUsername , * basicAuthPassword , & http . Client { } ) ,
alertProvider : notifier . NewAlertManager ( * notifierURL , func ( group , name string ) string {
return fmt . Sprintf ( "%s/api/v1/%s/%s/status" , eu , group , name )
} , & http . Client { } ) ,
}
if * remoteWriteURL != "" {
c , err := remotewrite . NewClient ( ctx , remotewrite . Config {
Addr : * remoteWriteURL ,
FlushInterval : * evaluationInterval ,
2020-05-04 21:51:22 +00:00
BasicAuthUser : * remoteWriteUsername ,
BasicAuthPass : * remoteWritePassword ,
2020-04-27 21:19:27 +00:00
} )
if err != nil {
logger . Fatalf ( "failed to init remotewrite client: %s" , err )
}
w . rw = c
}
2020-05-04 21:51:22 +00:00
var restoreDS * datasource . VMStorage
if * remoteReadURL != "" {
restoreDS = datasource . NewVMStorage ( * remoteReadURL , * remoteReadUsername , * remoteReadPassword , & http . Client { } )
}
2020-04-27 21:19:27 +00:00
wg := sync . WaitGroup { }
2020-05-09 09:32:12 +00:00
groupUpdateStorage := startInitGroups ( ctx , w , restoreDS , groups , & wg )
rh := & requestHandler { groups : groups , mu : sync . RWMutex { } }
//run config updater
wg . Add ( 1 )
sigHup := procutil . NewSighupChan ( )
go rh . runConfigUpdater ( ctx , sigHup , groupUpdateStorage , w , & wg )
go httpserver . Serve ( * httpListenAddr , ( rh ) . handler )
2020-04-27 21:19:27 +00:00
sig := procutil . WaitForSigterm ( )
logger . Infof ( "service received signal %s" , sig )
if err := httpserver . Stop ( * httpListenAddr ) ; err != nil {
logger . Fatalf ( "cannot stop the webservice: %s" , err )
}
cancel ( )
if w . rw != nil {
err := w . rw . Close ( )
if err != nil {
logger . Fatalf ( "cannot stop the remotewrite: %s" , err )
}
}
wg . Wait ( )
}
type watchdog struct {
storage * datasource . VMStorage
alertProvider notifier . Notifier
rw * remotewrite . Client
}
var (
iterationTotal = metrics . NewCounter ( ` vmalert_iteration_total ` )
iterationDuration = metrics . NewSummary ( ` vmalert_iteration_duration_seconds ` )
execTotal = metrics . NewCounter ( ` vmalert_execution_total ` )
execErrors = metrics . NewCounter ( ` vmalert_execution_errors_total ` )
execDuration = metrics . NewSummary ( ` vmalert_execution_duration_seconds ` )
alertsFired = metrics . NewCounter ( ` vmalert_alerts_fired_total ` )
alertsSent = metrics . NewCounter ( ` vmalert_alerts_sent_total ` )
alertsSendErrors = metrics . NewCounter ( ` vmalert_alerts_send_errors_total ` )
remoteWriteSent = metrics . NewCounter ( ` vmalert_remotewrite_sent_total ` )
remoteWriteErrors = metrics . NewCounter ( ` vmalert_remotewrite_errors_total ` )
2020-05-09 09:32:12 +00:00
configReloadTotal = metrics . NewCounter ( ` vmalert_config_reload_total ` )
configReloadOkTotal = metrics . NewCounter ( ` vmalert_config_reload_ok_total ` )
configReloadErrorTotal = metrics . NewCounter ( ` vmalert_config_reload_error_total ` )
2020-04-27 21:19:27 +00:00
)
2020-05-09 09:32:12 +00:00
func ( w * watchdog ) run ( ctx context . Context , group Group , evaluationInterval time . Duration , groupUpdate chan Group ) {
2020-04-27 21:19:27 +00:00
logger . Infof ( "watchdog for %s has been started" , group . Name )
t := time . NewTicker ( evaluationInterval )
defer t . Stop ( )
for {
select {
2020-05-09 09:32:12 +00:00
case newGroup := <- groupUpdate :
if newGroup . Rules == nil || len ( newGroup . Rules ) == 0 {
//empty rules for group
//need to exit
logger . Infof ( "stopping group: %s, it contains 0 rules now" , group . Name )
return
}
logger . Infof ( "new group update received, group: %s" , group . Name )
group . Update ( newGroup )
logger . Infof ( "group was reconciled, group: %s" , group . Name )
2020-04-27 21:19:27 +00:00
case <- t . C :
iterationTotal . Inc ( )
iterationStart := time . Now ( )
for _ , rule := range group . Rules {
execTotal . Inc ( )
execStart := time . Now ( )
err := rule . Exec ( ctx , w . storage )
execDuration . UpdateDuration ( execStart )
if err != nil {
execErrors . Inc ( )
logger . Errorf ( "failed to execute rule %q.%q: %s" , group . Name , rule . Name , err )
continue
}
var alertsToSend [ ] notifier . Alert
for _ , a := range rule . alerts {
if a . State != notifier . StatePending {
alertsToSend = append ( alertsToSend , * a )
}
if a . State == notifier . StateInactive || w . rw == nil {
continue
}
tss := rule . AlertToTimeSeries ( a , execStart )
for _ , ts := range tss {
remoteWriteSent . Inc ( )
if err := w . rw . Push ( ts ) ; err != nil {
remoteWriteErrors . Inc ( )
logger . Errorf ( "failed to push timeseries to remotewrite: %s" , err )
}
}
}
alertsSent . Add ( len ( alertsToSend ) )
if err := w . alertProvider . Send ( alertsToSend ) ; err != nil {
alertsSendErrors . Inc ( )
logger . Errorf ( "failed to send alert for rule %q.%q: %s" , group . Name , rule . Name , err )
}
}
iterationDuration . UpdateDuration ( iterationStart )
case <- ctx . Done ( ) :
logger . Infof ( "%s received stop signal" , group . Name )
return
}
}
}
func getExternalURL ( externalURL , httpListenAddr string , isSecure bool ) ( * url . URL , error ) {
if externalURL != "" {
return url . Parse ( externalURL )
}
hname , err := os . Hostname ( )
if err != nil {
return nil , err
}
port := ""
if ipport := strings . Split ( httpListenAddr , ":" ) ; len ( ipport ) > 1 {
port = ":" + ipport [ 1 ]
}
schema := "http://"
if isSecure {
schema = "https://"
}
return url . Parse ( fmt . Sprintf ( "%s%s%s" , schema , hname , port ) )
}
func checkFlags ( ) {
if * notifierURL == "" {
flag . PrintDefaults ( )
logger . Fatalf ( "notifier.url is empty" )
}
if * datasourceURL == "" {
flag . PrintDefaults ( )
logger . Fatalf ( "datasource.url is empty" )
}
}
2020-05-09 09:32:12 +00:00
func startInitGroups ( ctx context . Context , w * watchdog , restoreDS * datasource . VMStorage , groups [ ] Group , wg * sync . WaitGroup ) map [ string ] chan Group {
groupUpdateStorage := map [ string ] chan Group { }
for _ , g := range groups {
if restoreDS != nil {
err := g . Restore ( ctx , restoreDS , * remoteReadLookBack )
if err != nil {
logger . Errorf ( "error while restoring state for group %q: %s" , g . Name , err )
}
}
groupUpdateChan := make ( chan Group , 1 )
groupUpdateStorage [ g . Name ] = groupUpdateChan
wg . Add ( 1 )
go func ( group Group ) {
w . run ( ctx , group , * evaluationInterval , groupUpdateChan )
wg . Done ( )
} ( g )
}
return groupUpdateStorage
}
//wrapper
func readRules ( ) ( [ ] Group , error ) {
return Parse ( * rulePath , * validateTemplates )
}