mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
vmalert: speed up state restore procedure on start (#3758)
* vmalert: speed up state restore procedure on start Alerts state restore procedure has been changed to become asynchronous. It doesn't block groups start anymore which significantly improves vmalert's startup time. Instead, state restore is called by each group in their goroutines after the first rules evaluation. While previously state restore attempt was made for all loaded alerting rules, now it is called only for alerts which became active after the first evaluation. This reduces the amount of API calls to the configured remote read URL. This also means that `remoteRead.ignoreRestoreErrors` command-line flag becomes deprecated now and will have no effect if configured. See relevant issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2608 Signed-off-by: hagen1778 <roman@victoriametrics.com> * make lint happy Signed-off-by: hagen1778 <roman@victoriametrics.com> * Apply suggestions from code review --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
0a824d9490
commit
6fd10e8871
9 changed files with 304 additions and 194 deletions
|
@ -604,54 +604,59 @@ func alertForToTimeSeries(a *notifier.Alert, timestamp int64) prompbmarshal.Time
|
||||||
return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels)
|
return newTimeSeries([]float64{float64(a.ActiveAt.Unix())}, []int64{timestamp}, labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore restores the state of active alerts basing on previously written time series.
|
// Restore restores the value of ActiveAt field for active alerts,
|
||||||
// Restore restores only ActiveAt field. Field State will be always Pending and supposed
|
// based on previously written time series `alertForStateMetricName`.
|
||||||
// to be updated on next Exec, as well as Value field.
|
// Only rules with For > 0 can be restored.
|
||||||
// Only rules with For > 0 will be restored.
|
func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, ts time.Time, lookback time.Duration) error {
|
||||||
func (ar *AlertingRule) Restore(ctx context.Context, q datasource.Querier, lookback time.Duration, labels map[string]string) error {
|
if ar.For < 1 {
|
||||||
if q == nil {
|
return nil
|
||||||
return fmt.Errorf("querier is nil")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := time.Now()
|
ar.alertsMu.Lock()
|
||||||
qFn := func(query string) ([]datasource.Metric, error) {
|
defer ar.alertsMu.Unlock()
|
||||||
res, _, err := ar.q.Query(ctx, query, ts)
|
|
||||||
return res, err
|
if len(ar.alerts) < 1 {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// account for external labels in filter
|
for _, a := range ar.alerts {
|
||||||
var labelsFilter string
|
if a.Restored || a.State != notifier.StatePending {
|
||||||
for k, v := range labels {
|
continue
|
||||||
labelsFilter += fmt.Sprintf(",%s=%q", k, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
expr := fmt.Sprintf("last_over_time(%s{alertname=%q%s}[%ds])",
|
|
||||||
alertForStateMetricName, ar.Name, labelsFilter, int(lookback.Seconds()))
|
|
||||||
qMetrics, _, err := q.Query(ctx, expr, ts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range qMetrics {
|
|
||||||
ls := &labelSet{
|
|
||||||
origin: make(map[string]string, len(m.Labels)),
|
|
||||||
processed: make(map[string]string, len(m.Labels)),
|
|
||||||
}
|
}
|
||||||
for _, l := range m.Labels {
|
|
||||||
if l.Name == "__name__" {
|
var labelsFilter []string
|
||||||
continue
|
for k, v := range a.Labels {
|
||||||
}
|
labelsFilter = append(labelsFilter, fmt.Sprintf("%s=%q", k, v))
|
||||||
ls.origin[l.Name] = l.Value
|
|
||||||
ls.processed[l.Name] = l.Value
|
|
||||||
}
|
}
|
||||||
a, err := ar.newAlert(m, ls, time.Unix(int64(m.Values[0]), 0), qFn)
|
sort.Strings(labelsFilter)
|
||||||
|
expr := fmt.Sprintf("last_over_time(%s{%s}[%ds])",
|
||||||
|
alertForStateMetricName, strings.Join(labelsFilter, ","), int(lookback.Seconds()))
|
||||||
|
|
||||||
|
ar.logDebugf(ts, nil, "restoring alert state via query %q", expr)
|
||||||
|
|
||||||
|
qMetrics, _, err := q.Query(ctx, expr, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create alert: %w", err)
|
return err
|
||||||
}
|
}
|
||||||
a.ID = hash(ls.processed)
|
|
||||||
a.State = notifier.StatePending
|
if len(qMetrics) < 1 {
|
||||||
|
ar.logDebugf(ts, nil, "no response was received from restore query")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// only one series expected in response
|
||||||
|
m := qMetrics[0]
|
||||||
|
// __name__ supposed to be alertForStateMetricName
|
||||||
|
m.DelLabel("__name__")
|
||||||
|
|
||||||
|
// we assume that restore query contains all label matchers,
|
||||||
|
// so all received labels will match anyway if their number is equal.
|
||||||
|
if len(m.Labels) != len(a.Labels) {
|
||||||
|
ar.logDebugf(ts, nil, "state restore query returned not expected label-set %v", m.Labels)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
a.ActiveAt = time.Unix(int64(m.Values[0]), 0)
|
||||||
a.Restored = true
|
a.Restored = true
|
||||||
ar.alerts[a.ID] = a
|
|
||||||
logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt)
|
logger.Infof("alert %q (%d) restored to state at %v", a.Name, a.ID, a.ActiveAt)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -6,12 +6,15 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAlertingRule_ToTimeSeries(t *testing.T) {
|
func TestAlertingRule_ToTimeSeries(t *testing.T) {
|
||||||
|
@ -502,118 +505,156 @@ func TestAlertingRule_ExecRange(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAlertingRule_Restore(t *testing.T) {
|
func TestGroup_Restore(t *testing.T) {
|
||||||
testCases := []struct {
|
defaultTS := time.Now()
|
||||||
rule *AlertingRule
|
fqr := &fakeQuerierWithRegistry{}
|
||||||
metrics []datasource.Metric
|
fn := func(rules []config.Rule, expAlerts map[uint64]*notifier.Alert) {
|
||||||
expAlerts map[uint64]*notifier.Alert
|
t.Helper()
|
||||||
}{
|
defer fqr.reset()
|
||||||
{
|
|
||||||
newTestRuleWithLabels("no extra labels"),
|
for _, r := range rules {
|
||||||
[]datasource.Metric{
|
fqr.set(r.Expr, metricWithValueAndLabels(t, 0, "__name__", r.Alert))
|
||||||
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
|
}
|
||||||
"__name__", alertForStateMetricName,
|
|
||||||
),
|
fg := newGroup(config.Group{Name: "TestRestore", Rules: rules}, fqr, time.Second, nil)
|
||||||
},
|
wg := sync.WaitGroup{}
|
||||||
map[uint64]*notifier.Alert{
|
wg.Add(1)
|
||||||
hash(nil): {State: notifier.StatePending,
|
go func() {
|
||||||
ActiveAt: time.Now().Truncate(time.Hour)},
|
nts := func() []notifier.Notifier { return []notifier.Notifier{&fakeNotifier{}} }
|
||||||
},
|
fg.start(context.Background(), nts, nil, fqr)
|
||||||
},
|
wg.Done()
|
||||||
{
|
}()
|
||||||
newTestRuleWithLabels("metric labels"),
|
fg.close()
|
||||||
[]datasource.Metric{
|
wg.Wait()
|
||||||
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
|
|
||||||
"__name__", alertForStateMetricName,
|
gotAlerts := make(map[uint64]*notifier.Alert)
|
||||||
alertNameLabel, "metric labels",
|
for _, rs := range fg.Rules {
|
||||||
alertGroupNameLabel, "groupID",
|
alerts := rs.(*AlertingRule).alerts
|
||||||
"foo", "bar",
|
for k, v := range alerts {
|
||||||
"namespace", "baz",
|
if !v.Restored {
|
||||||
),
|
// set not restored alerts to predictable timestamp
|
||||||
},
|
v.ActiveAt = defaultTS
|
||||||
map[uint64]*notifier.Alert{
|
}
|
||||||
hash(map[string]string{
|
gotAlerts[k] = v
|
||||||
alertNameLabel: "metric labels",
|
}
|
||||||
alertGroupNameLabel: "groupID",
|
}
|
||||||
"foo": "bar",
|
|
||||||
"namespace": "baz",
|
if len(gotAlerts) != len(expAlerts) {
|
||||||
}): {State: notifier.StatePending,
|
t.Fatalf("expected %d alerts; got %d", len(expAlerts), len(gotAlerts))
|
||||||
ActiveAt: time.Now().Truncate(time.Hour)},
|
}
|
||||||
},
|
for key, exp := range expAlerts {
|
||||||
},
|
got, ok := gotAlerts[key]
|
||||||
{
|
if !ok {
|
||||||
newTestRuleWithLabels("rule labels", "source", "vm"),
|
t.Fatalf("expected to have key %d", key)
|
||||||
[]datasource.Metric{
|
}
|
||||||
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
|
if got.State != notifier.StatePending {
|
||||||
"__name__", alertForStateMetricName,
|
t.Fatalf("expected state %d; got %d", notifier.StatePending, got.State)
|
||||||
"foo", "bar",
|
}
|
||||||
"namespace", "baz",
|
if got.ActiveAt != exp.ActiveAt {
|
||||||
// extra labels set by rule
|
t.Fatalf("expected ActiveAt %v; got %v", exp.ActiveAt, got.ActiveAt)
|
||||||
"source", "vm",
|
}
|
||||||
),
|
}
|
||||||
},
|
|
||||||
map[uint64]*notifier.Alert{
|
|
||||||
hash(map[string]string{
|
|
||||||
"foo": "bar",
|
|
||||||
"namespace": "baz",
|
|
||||||
"source": "vm",
|
|
||||||
}): {State: notifier.StatePending,
|
|
||||||
ActiveAt: time.Now().Truncate(time.Hour)},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
newTestRuleWithLabels("multiple alerts"),
|
|
||||||
[]datasource.Metric{
|
|
||||||
metricWithValueAndLabels(t, float64(time.Now().Truncate(time.Hour).Unix()),
|
|
||||||
"__name__", alertForStateMetricName,
|
|
||||||
"host", "localhost-1",
|
|
||||||
),
|
|
||||||
metricWithValueAndLabels(t, float64(time.Now().Truncate(2*time.Hour).Unix()),
|
|
||||||
"__name__", alertForStateMetricName,
|
|
||||||
"host", "localhost-2",
|
|
||||||
),
|
|
||||||
metricWithValueAndLabels(t, float64(time.Now().Truncate(3*time.Hour).Unix()),
|
|
||||||
"__name__", alertForStateMetricName,
|
|
||||||
"host", "localhost-3",
|
|
||||||
),
|
|
||||||
},
|
|
||||||
map[uint64]*notifier.Alert{
|
|
||||||
hash(map[string]string{"host": "localhost-1"}): {State: notifier.StatePending,
|
|
||||||
ActiveAt: time.Now().Truncate(time.Hour)},
|
|
||||||
hash(map[string]string{"host": "localhost-2"}): {State: notifier.StatePending,
|
|
||||||
ActiveAt: time.Now().Truncate(2 * time.Hour)},
|
|
||||||
hash(map[string]string{"host": "localhost-3"}): {State: notifier.StatePending,
|
|
||||||
ActiveAt: time.Now().Truncate(3 * time.Hour)},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
fakeGroup := Group{Name: "TestRule_Exec"}
|
|
||||||
for _, tc := range testCases {
|
stateMetric := func(name string, value time.Time, labels ...string) datasource.Metric {
|
||||||
t.Run(tc.rule.Name, func(t *testing.T) {
|
labels = append(labels, "__name__", alertForStateMetricName)
|
||||||
fq := &fakeQuerier{}
|
labels = append(labels, alertNameLabel, name)
|
||||||
tc.rule.GroupID = fakeGroup.ID()
|
labels = append(labels, alertGroupNameLabel, "TestRestore")
|
||||||
tc.rule.q = fq
|
return metricWithValueAndLabels(t, float64(value.Unix()), labels...)
|
||||||
fq.add(tc.metrics...)
|
}
|
||||||
if err := tc.rule.Restore(context.TODO(), fq, time.Hour, nil); err != nil {
|
|
||||||
t.Fatalf("unexpected err: %s", err)
|
// one active alert, no previous state
|
||||||
}
|
fn(
|
||||||
if len(tc.rule.alerts) != len(tc.expAlerts) {
|
[]config.Rule{{Alert: "foo", Expr: "foo", For: promutils.NewDuration(time.Second)}},
|
||||||
t.Fatalf("expected %d alerts; got %d", len(tc.expAlerts), len(tc.rule.alerts))
|
map[uint64]*notifier.Alert{
|
||||||
}
|
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore"}): {
|
||||||
for key, exp := range tc.expAlerts {
|
ActiveAt: defaultTS,
|
||||||
got, ok := tc.rule.alerts[key]
|
},
|
||||||
if !ok {
|
})
|
||||||
t.Fatalf("expected to have key %d", key)
|
fqr.reset()
|
||||||
}
|
|
||||||
if got.State != exp.State {
|
// one active alert with state restore
|
||||||
t.Fatalf("expected state %d; got %d", exp.State, got.State)
|
ts := time.Now().Truncate(time.Hour)
|
||||||
}
|
fqr.set(`last_over_time(ALERTS_FOR_STATE{alertgroup="TestRestore",alertname="foo"}[3600s])`,
|
||||||
if got.ActiveAt != exp.ActiveAt {
|
stateMetric("foo", ts))
|
||||||
t.Fatalf("expected ActiveAt %v; got %v", exp.ActiveAt, got.ActiveAt)
|
fn(
|
||||||
}
|
[]config.Rule{{Alert: "foo", Expr: "foo", For: promutils.NewDuration(time.Second)}},
|
||||||
}
|
map[uint64]*notifier.Alert{
|
||||||
|
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore"}): {
|
||||||
|
ActiveAt: ts},
|
||||||
|
})
|
||||||
|
|
||||||
|
// two rules, two active alerts, one with state restored
|
||||||
|
ts = time.Now().Truncate(time.Hour)
|
||||||
|
fqr.set(`last_over_time(ALERTS_FOR_STATE{alertgroup="TestRestore",alertname="bar"}[3600s])`,
|
||||||
|
stateMetric("foo", ts))
|
||||||
|
fn(
|
||||||
|
[]config.Rule{
|
||||||
|
{Alert: "foo", Expr: "foo", For: promutils.NewDuration(time.Second)},
|
||||||
|
{Alert: "bar", Expr: "bar", For: promutils.NewDuration(time.Second)},
|
||||||
|
},
|
||||||
|
map[uint64]*notifier.Alert{
|
||||||
|
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore"}): {
|
||||||
|
ActiveAt: defaultTS,
|
||||||
|
},
|
||||||
|
hash(map[string]string{alertNameLabel: "bar", alertGroupNameLabel: "TestRestore"}): {
|
||||||
|
ActiveAt: ts},
|
||||||
|
})
|
||||||
|
|
||||||
|
// two rules, two active alerts, two with state restored
|
||||||
|
ts = time.Now().Truncate(time.Hour)
|
||||||
|
fqr.set(`last_over_time(ALERTS_FOR_STATE{alertgroup="TestRestore",alertname="foo"}[3600s])`,
|
||||||
|
stateMetric("foo", ts))
|
||||||
|
fqr.set(`last_over_time(ALERTS_FOR_STATE{alertgroup="TestRestore",alertname="bar"}[3600s])`,
|
||||||
|
stateMetric("bar", ts))
|
||||||
|
fn(
|
||||||
|
[]config.Rule{
|
||||||
|
{Alert: "foo", Expr: "foo", For: promutils.NewDuration(time.Second)},
|
||||||
|
{Alert: "bar", Expr: "bar", For: promutils.NewDuration(time.Second)},
|
||||||
|
},
|
||||||
|
map[uint64]*notifier.Alert{
|
||||||
|
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore"}): {
|
||||||
|
ActiveAt: ts,
|
||||||
|
},
|
||||||
|
hash(map[string]string{alertNameLabel: "bar", alertGroupNameLabel: "TestRestore"}): {
|
||||||
|
ActiveAt: ts},
|
||||||
|
})
|
||||||
|
|
||||||
|
// one active alert but wrong state restore
|
||||||
|
ts = time.Now().Truncate(time.Hour)
|
||||||
|
fqr.set(`last_over_time(ALERTS_FOR_STATE{alertname="bar",alertgroup="TestRestore"}[3600s])`,
|
||||||
|
stateMetric("wrong alert", ts))
|
||||||
|
fn(
|
||||||
|
[]config.Rule{{Alert: "foo", Expr: "foo", For: promutils.NewDuration(time.Second)}},
|
||||||
|
map[uint64]*notifier.Alert{
|
||||||
|
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore"}): {
|
||||||
|
ActiveAt: defaultTS,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// one active alert with labels
|
||||||
|
ts = time.Now().Truncate(time.Hour)
|
||||||
|
fqr.set(`last_over_time(ALERTS_FOR_STATE{alertgroup="TestRestore",alertname="foo",env="dev"}[3600s])`,
|
||||||
|
stateMetric("foo", ts, "env", "dev"))
|
||||||
|
fn(
|
||||||
|
[]config.Rule{{Alert: "foo", Expr: "foo", Labels: map[string]string{"env": "dev"}, For: promutils.NewDuration(time.Second)}},
|
||||||
|
map[uint64]*notifier.Alert{
|
||||||
|
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore", "env": "dev"}): {
|
||||||
|
ActiveAt: ts,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// one active alert with restore labels missmatch
|
||||||
|
ts = time.Now().Truncate(time.Hour)
|
||||||
|
fqr.set(`last_over_time(ALERTS_FOR_STATE{alertgroup="TestRestore",alertname="foo",env="dev"}[3600s])`,
|
||||||
|
stateMetric("foo", ts, "env", "dev", "team", "foo"))
|
||||||
|
fn(
|
||||||
|
[]config.Rule{{Alert: "foo", Expr: "foo", Labels: map[string]string{"env": "dev"}, For: promutils.NewDuration(time.Second)}},
|
||||||
|
map[uint64]*notifier.Alert{
|
||||||
|
hash(map[string]string{alertNameLabel: "foo", alertGroupNameLabel: "TestRestore", "env": "dev"}): {
|
||||||
|
ActiveAt: defaultTS,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAlertingRule_Exec_Negative(t *testing.T) {
|
func TestAlertingRule_Exec_Negative(t *testing.T) {
|
||||||
|
|
|
@ -72,6 +72,15 @@ func (m *Metric) AddLabel(key, value string) {
|
||||||
m.Labels = append(m.Labels, Label{Name: key, Value: value})
|
m.Labels = append(m.Labels, Label{Name: key, Value: value})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DelLabel deletes the given label from the label set
|
||||||
|
func (m *Metric) DelLabel(key string) {
|
||||||
|
for i, l := range m.Labels {
|
||||||
|
if l.Name == key {
|
||||||
|
m.Labels = append(m.Labels[:i], m.Labels[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Label returns the given label value.
|
// Label returns the given label value.
|
||||||
// If label is missing empty string will be returned
|
// If label is missing empty string will be returned
|
||||||
func (m *Metric) Label(key string) string {
|
func (m *Metric) Label(key string) string {
|
||||||
|
|
|
@ -158,23 +158,23 @@ func (g *Group) ID() uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore restores alerts state for group rules
|
// Restore restores alerts state for group rules
|
||||||
func (g *Group) Restore(ctx context.Context, qb datasource.QuerierBuilder, lookback time.Duration, labels map[string]string) error {
|
func (g *Group) Restore(ctx context.Context, qb datasource.QuerierBuilder, ts time.Time, lookback time.Duration) error {
|
||||||
labels = mergeLabels(g.Name, "", labels, g.Labels)
|
|
||||||
for _, rule := range g.Rules {
|
for _, rule := range g.Rules {
|
||||||
rr, ok := rule.(*AlertingRule)
|
ar, ok := rule.(*AlertingRule)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if rr.For < 1 {
|
if ar.For < 1 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// ignore QueryParams on purpose, because they could contain
|
|
||||||
// query filters. This may affect the restore procedure.
|
|
||||||
q := qb.BuildWithParams(datasource.QuerierParams{
|
q := qb.BuildWithParams(datasource.QuerierParams{
|
||||||
DataSourceType: g.Type.String(),
|
DataSourceType: g.Type.String(),
|
||||||
Headers: g.Headers,
|
EvaluationInterval: g.Interval,
|
||||||
|
QueryParams: g.Params,
|
||||||
|
Headers: g.Headers,
|
||||||
|
Debug: ar.Debug,
|
||||||
})
|
})
|
||||||
if err := rr.Restore(ctx, q, lookback, labels); err != nil {
|
if err := ar.Restore(ctx, q, ts, lookback); err != nil {
|
||||||
return fmt.Errorf("error while restoring rule %q: %w", rule, err)
|
return fmt.Errorf("error while restoring rule %q: %w", rule, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -251,7 +251,7 @@ func (g *Group) close() {
|
||||||
|
|
||||||
var skipRandSleepOnGroupStart bool
|
var skipRandSleepOnGroupStart bool
|
||||||
|
|
||||||
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client) {
|
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
|
||||||
defer func() { close(g.finishedCh) }()
|
defer func() { close(g.finishedCh) }()
|
||||||
|
|
||||||
e := &executor{
|
e := &executor{
|
||||||
|
@ -259,26 +259,6 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
||||||
notifiers: nts,
|
notifiers: nts,
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)}
|
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label)}
|
||||||
|
|
||||||
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
|
||||||
if !skipRandSleepOnGroupStart {
|
|
||||||
randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64)))
|
|
||||||
sleepOffset := uint64(time.Now().UnixNano()) % uint64(g.Interval)
|
|
||||||
if randSleep < sleepOffset {
|
|
||||||
randSleep += uint64(g.Interval)
|
|
||||||
}
|
|
||||||
randSleep -= sleepOffset
|
|
||||||
sleepTimer := time.NewTimer(time.Duration(randSleep))
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
sleepTimer.Stop()
|
|
||||||
return
|
|
||||||
case <-g.doneCh:
|
|
||||||
sleepTimer.Stop()
|
|
||||||
return
|
|
||||||
case <-sleepTimer.C:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
evalTS := time.Now()
|
evalTS := time.Now()
|
||||||
|
|
||||||
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
logger.Infof("group %q started; interval=%v; concurrency=%d", g.Name, g.Interval, g.Concurrency)
|
||||||
|
@ -309,6 +289,16 @@ func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *r
|
||||||
|
|
||||||
t := time.NewTicker(g.Interval)
|
t := time.NewTicker(g.Interval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
|
// restore the rules state after the first evaluation
|
||||||
|
// so only active alerts can be restored.
|
||||||
|
if rr != nil {
|
||||||
|
err := g.Restore(ctx, rr, evalTS, *remoteReadLookBack)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("error while restoring ruleState for group %q: %s", g.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -209,7 +209,7 @@ func TestGroupStart(t *testing.T) {
|
||||||
fs.add(m1)
|
fs.add(m1)
|
||||||
fs.add(m2)
|
fs.add(m2)
|
||||||
go func() {
|
go func() {
|
||||||
g.start(context.Background(), func() []notifier.Notifier { return []notifier.Notifier{fn} }, nil)
|
g.start(context.Background(), func() []notifier.Notifier { return []notifier.Notifier{fn} }, nil, fs)
|
||||||
close(finished)
|
close(finished)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,49 @@ func (fq *fakeQuerier) Query(_ context.Context, _ string, _ time.Time) ([]dataso
|
||||||
return cp, req, nil
|
return cp, req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fakeQuerierWithRegistry struct {
|
||||||
|
sync.Mutex
|
||||||
|
registry map[string][]datasource.Metric
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fqr *fakeQuerierWithRegistry) set(key string, metrics ...datasource.Metric) {
|
||||||
|
fqr.Lock()
|
||||||
|
if fqr.registry == nil {
|
||||||
|
fqr.registry = make(map[string][]datasource.Metric)
|
||||||
|
}
|
||||||
|
fqr.registry[key] = metrics
|
||||||
|
fqr.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fqr *fakeQuerierWithRegistry) reset() {
|
||||||
|
fqr.Lock()
|
||||||
|
fqr.registry = nil
|
||||||
|
fqr.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fqr *fakeQuerierWithRegistry) BuildWithParams(_ datasource.QuerierParams) datasource.Querier {
|
||||||
|
return fqr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fqr *fakeQuerierWithRegistry) QueryRange(ctx context.Context, q string, _, _ time.Time) ([]datasource.Metric, error) {
|
||||||
|
req, _, err := fqr.Query(ctx, q, time.Now())
|
||||||
|
return req, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fqr *fakeQuerierWithRegistry) Query(_ context.Context, expr string, _ time.Time) ([]datasource.Metric, *http.Request, error) {
|
||||||
|
fqr.Lock()
|
||||||
|
defer fqr.Unlock()
|
||||||
|
|
||||||
|
req, _ := http.NewRequest(http.MethodPost, "foo.com", nil)
|
||||||
|
metrics, ok := fqr.registry[expr]
|
||||||
|
if !ok {
|
||||||
|
return nil, req, nil
|
||||||
|
}
|
||||||
|
cp := make([]datasource.Metric, len(metrics))
|
||||||
|
copy(cp, metrics)
|
||||||
|
return cp, req, nil
|
||||||
|
}
|
||||||
|
|
||||||
type fakeNotifier struct {
|
type fakeNotifier struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
alerts []notifier.Alert
|
alerts []notifier.Alert
|
||||||
|
|
|
@ -94,6 +94,10 @@ func main() {
|
||||||
logger.Init()
|
logger.Init()
|
||||||
pushmetrics.Init()
|
pushmetrics.Init()
|
||||||
|
|
||||||
|
if !*remoteReadIgnoreRestoreErrors {
|
||||||
|
logger.Warnf("flag `remoteRead.ignoreRestoreErrors` is deprecated and will be removed in next versions.")
|
||||||
|
}
|
||||||
|
|
||||||
err := templates.Load(*ruleTemplatesPath, true)
|
err := templates.Load(*ruleTemplatesPath, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("failed to parse %q: %s", *ruleTemplatesPath, err)
|
logger.Fatalf("failed to parse %q: %s", *ruleTemplatesPath, err)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||||
|
@ -82,24 +83,38 @@ func (m *manager) close() {
|
||||||
m.wg.Wait()
|
m.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *manager) startGroup(ctx context.Context, group *Group, restore bool) error {
|
func (m *manager) startGroup(ctx context.Context, g *Group, restore bool) error {
|
||||||
if restore && m.rr != nil {
|
|
||||||
err := group.Restore(ctx, m.rr, *remoteReadLookBack, m.labels)
|
|
||||||
if err != nil {
|
|
||||||
if !*remoteReadIgnoreRestoreErrors {
|
|
||||||
return fmt.Errorf("failed to restore ruleState for group %q: %w", group.Name, err)
|
|
||||||
}
|
|
||||||
logger.Errorf("error while restoring ruleState for group %q: %s", group.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
m.wg.Add(1)
|
m.wg.Add(1)
|
||||||
id := group.ID()
|
id := g.ID()
|
||||||
go func() {
|
go func() {
|
||||||
group.start(ctx, m.notifiers, m.rw)
|
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
||||||
|
if !skipRandSleepOnGroupStart {
|
||||||
|
randSleep := uint64(float64(g.Interval) * (float64(g.ID()) / (1 << 64)))
|
||||||
|
sleepOffset := uint64(time.Now().UnixNano()) % uint64(g.Interval)
|
||||||
|
if randSleep < sleepOffset {
|
||||||
|
randSleep += uint64(g.Interval)
|
||||||
|
}
|
||||||
|
randSleep -= sleepOffset
|
||||||
|
sleepTimer := time.NewTimer(time.Duration(randSleep))
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
sleepTimer.Stop()
|
||||||
|
return
|
||||||
|
case <-g.doneCh:
|
||||||
|
sleepTimer.Stop()
|
||||||
|
return
|
||||||
|
case <-sleepTimer.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if restore {
|
||||||
|
g.start(ctx, m.notifiers, m.rw, m.rr)
|
||||||
|
} else {
|
||||||
|
g.start(ctx, m.notifiers, m.rw, nil)
|
||||||
|
}
|
||||||
|
|
||||||
m.wg.Done()
|
m.wg.Done()
|
||||||
}()
|
}()
|
||||||
m.groups[id] = group
|
m.groups[id] = g
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,9 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): alerts state restore procedure was changed to become asynchronous. It doesn't block groups start anymore which significantly improves vmalert's startup time.
|
||||||
|
This also means that `-remoteRead.ignoreRestoreErrors` command-line flag becomes deprecated now and will have no effect if configured.
|
||||||
|
While previously state restore attempt was made for all the loaded alerting rules, now it is called only for alerts which became active after the first evaluation. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2608).
|
||||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): optimize VMUI for use from smarthones and tablets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3707).
|
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): optimize VMUI for use from smarthones and tablets. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3707).
|
||||||
|
|
||||||
## [v1.87.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.0)
|
## [v1.87.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.0)
|
||||||
|
|
Loading…
Reference in a new issue