mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
vmalert: init unit test (#4596)
vmalert: support unit tests See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2945 --------- Signed-off-by: hagen1778 <roman@victoriametrics.com> Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
ecab3abb76
commit
da60a68d09
31 changed files with 2264 additions and 325 deletions
|
@ -74,6 +74,7 @@ test-vmalert:
|
|||
go test -v -race -cover ./app/vmalert/config
|
||||
go test -v -race -cover ./app/vmalert/remotewrite
|
||||
go test -v -race -cover ./app/vmalert/utils
|
||||
go test -v -race -cover ./app/vmalert/unittest
|
||||
|
||||
run-vmalert: vmalert
|
||||
./bin/vmalert -rule=app/vmalert/config/testdata/rules/rules2-good.rules \
|
||||
|
@ -102,6 +103,10 @@ replay-vmalert: vmalert
|
|||
-replay.timeFrom=2021-05-11T07:21:43Z \
|
||||
-replay.timeTo=2021-05-29T18:40:43Z
|
||||
|
||||
unittest-vmalert: vmalert
|
||||
./bin/vmalert -unittestFile=app/vmalert/unittest/testdata/test1.yaml \
|
||||
-unittestFile=app/vmalert/unittest/testdata/test2.yaml
|
||||
|
||||
vmalert-linux-amd64:
|
||||
APP_NAME=vmalert CGO_ENABLED=1 GOOS=linux GOARCH=amd64 $(MAKE) app-local-goos-goarch
|
||||
|
||||
|
|
|
@ -732,6 +732,245 @@ See full description for these flags in `./vmalert -help`.
|
|||
* `query` template function is disabled for performance reasons (might be changed in future);
|
||||
* `limit` group's param has no effect during replay (might be changed in future);
|
||||
|
||||
## Unit Testing for Rules
|
||||
|
||||
You can use `vmalert` to run unit tests for alerting and recording rules.
|
||||
In unit test mode vmalert performs the following actions:
|
||||
* sets up an isolated VictoriaMetrics instance;
|
||||
* simulates the periodic ingestion of time series;
|
||||
* queries the ingested data for recording and alerting rules evaluation;
|
||||
* tests whether the firing alerts or resulting recording rules match the expected results.
|
||||
|
||||
See how to run vmalert in unit test mode below:
|
||||
```
|
||||
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
|
||||
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
|
||||
```
|
||||
|
||||
vmalert is compatible with [Prometheus config format for tests](https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#test-file-format)
|
||||
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
|
||||
validates and executes [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) expressions,
|
||||
which aren't always backward compatible with [PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/).
|
||||
|
||||
### Test file format
|
||||
|
||||
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
|
||||
```
|
||||
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
|
||||
# Enterprise version of vmalert supports S3 and GCS paths to rules.
|
||||
rule_files:
|
||||
[ - <string> ]
|
||||
|
||||
# The evaluation interval for rules specified in `rule_files`
|
||||
[ evaluation_interval: <duration> | default = 1m ]
|
||||
|
||||
# Groups listed below will be evaluated by order.
|
||||
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
|
||||
group_eval_order:
|
||||
[ - <string> ]
|
||||
|
||||
# The list of unit test files to be checked during evaluation.
|
||||
tests:
|
||||
[ - <test_group> ]
|
||||
```
|
||||
|
||||
#### `<test_group>`
|
||||
|
||||
```
|
||||
# Interval between samples for input series
|
||||
interval: <duration>
|
||||
# Time series to persist into the database according to configured <interval> before running tests.
|
||||
input_series:
|
||||
[ - <series> ]
|
||||
|
||||
# Name of the test group, optional
|
||||
[ name: <string> ]
|
||||
|
||||
# Unit tests for alerting rules
|
||||
alert_rule_test:
|
||||
[ - <alert_test_case> ]
|
||||
|
||||
# Unit tests for Metricsql expressions.
|
||||
metricsql_expr_test:
|
||||
[ - <metricsql_expr_test> ]
|
||||
|
||||
# External labels accessible for templating.
|
||||
external_labels:
|
||||
[ <labelname>: <string> ... ]
|
||||
|
||||
```
|
||||
|
||||
#### `<series>`
|
||||
|
||||
```
|
||||
# series in the following format '<metric name>{<label name>=<label value>, ...}'
|
||||
# Examples:
|
||||
# series_name{label1="value1", label2="value2"}
|
||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
||||
series: <string>
|
||||
|
||||
# values support several special equations:
|
||||
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
|
||||
# Read this as series starts at a, then c further samples incrementing by b.
|
||||
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
|
||||
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
|
||||
# '_' represents a missing sample from scrape
|
||||
# Examples:
|
||||
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
|
||||
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
|
||||
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
|
||||
# 4. ' 1 _x3' becomes '1 _ _ _ ' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
|
||||
values: <string>
|
||||
```
|
||||
|
||||
#### `<alert_test_case>`
|
||||
|
||||
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
|
||||
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
|
||||
but no need to add them under `exp_alerts`.
|
||||
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
|
||||
|
||||
```
|
||||
# The time elapsed from time=0s when this alerting rule should be checked.
|
||||
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
|
||||
eval_time: <duration>
|
||||
|
||||
# Name of the group name to be tested.
|
||||
groupname: <string>
|
||||
|
||||
# Name of the alert to be tested.
|
||||
alertname: <string>
|
||||
|
||||
# List of the expected alerts that are firing under the given alertname at
|
||||
# the given evaluation time. If you want to test if an alerting rule should
|
||||
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
|
||||
exp_alerts:
|
||||
[ - <alert> ]
|
||||
```
|
||||
|
||||
#### `<alert>`
|
||||
|
||||
```
|
||||
# These are the expanded labels and annotations of the expected alert.
|
||||
# Note: labels also include the labels of the sample associated with the alert
|
||||
exp_labels:
|
||||
[ <labelname>: <string> ]
|
||||
exp_annotations:
|
||||
[ <labelname>: <string> ]
|
||||
```
|
||||
|
||||
#### `<metricsql_expr_test>`
|
||||
|
||||
```
|
||||
# Expression to evaluate
|
||||
expr: <string>
|
||||
|
||||
# The time elapsed from time=0s when this expression be evaluated.
|
||||
eval_time: <duration>
|
||||
|
||||
# Expected samples at the given evaluation time.
|
||||
exp_samples:
|
||||
[ - <sample> ]
|
||||
```
|
||||
|
||||
#### `<sample>`
|
||||
|
||||
```
|
||||
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
|
||||
# Examples:
|
||||
# series_name{label1="value1", label2="value2"}
|
||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
||||
labels: <string>
|
||||
|
||||
# The expected value of the Metricsql expression.
|
||||
value: <number>
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
This is an example input file for unit testing which will pass.
|
||||
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
|
||||
|
||||
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
|
||||
|
||||
#### `test.yaml`
|
||||
|
||||
```
|
||||
rule_files:
|
||||
- rules.yaml
|
||||
|
||||
evaluation_interval: 1m
|
||||
|
||||
tests:
|
||||
- interval: 1m
|
||||
input_series:
|
||||
- series: 'up{job="prometheus", instance="localhost:9090"}'
|
||||
values: "0+0x1440"
|
||||
|
||||
metricsql_expr_test:
|
||||
- expr: suquery_interval_test
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
|
||||
value: 1
|
||||
|
||||
alert_rule_test:
|
||||
- eval_time: 2h
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
job: prometheus
|
||||
severity: page
|
||||
instance: localhost:9090
|
||||
datacenter: dc-123
|
||||
exp_annotations:
|
||||
summary: "Instance localhost:9090 down"
|
||||
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
|
||||
|
||||
- eval_time: 0
|
||||
groupname: group1
|
||||
alertname: AlwaysFiring
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
datacenter: dc-123
|
||||
|
||||
- eval_time: 0
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts: []
|
||||
|
||||
external_labels:
|
||||
datacenter: dc-123
|
||||
```
|
||||
|
||||
#### `alerts.yaml`
|
||||
|
||||
```
|
||||
# This is the rules file.
|
||||
|
||||
groups:
|
||||
- name: group1
|
||||
rules:
|
||||
- alert: InstanceDown
|
||||
expr: up == 0
|
||||
for: 5m
|
||||
labels:
|
||||
severity: page
|
||||
annotations:
|
||||
summary: "Instance {{ $labels.instance }} down"
|
||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
|
||||
- alert: AlwaysFiring
|
||||
expr: 1
|
||||
|
||||
- name: group2
|
||||
rules:
|
||||
- record: job:test:count_over_time1m
|
||||
expr: sum without(instance) (count_over_time(test[1m]))
|
||||
- record: suquery_interval_test
|
||||
expr: count_over_time(up[5m:])
|
||||
```
|
||||
|
||||
## Monitoring
|
||||
|
||||
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
|
||||
|
@ -1282,6 +1521,11 @@ The shortlist of configuration flags is the following:
|
|||
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
|
||||
-tlsMinVersion string
|
||||
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
|
||||
-unittestFile array
|
||||
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
|
||||
Examples:
|
||||
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
|
||||
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
|
||||
-version
|
||||
Show VictoriaMetrics version
|
||||
```
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
package datasource
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
// Querier interface wraps Query and QueryRange methods
|
||||
|
@ -108,3 +112,69 @@ type Label struct {
|
|||
Name string
|
||||
Value string
|
||||
}
|
||||
|
||||
// Labels is collection of Label
|
||||
type Labels []Label
|
||||
|
||||
func (ls Labels) Len() int { return len(ls) }
|
||||
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
|
||||
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
|
||||
|
||||
func (ls Labels) String() string {
|
||||
var b bytes.Buffer
|
||||
|
||||
b.WriteByte('{')
|
||||
for i, l := range ls {
|
||||
if i > 0 {
|
||||
b.WriteByte(',')
|
||||
b.WriteByte(' ')
|
||||
}
|
||||
b.WriteString(l.Name)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.Quote(l.Value))
|
||||
}
|
||||
b.WriteByte('}')
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// LabelCompare return negative if a is less than b, return 0 if they are the same
|
||||
// eg.
|
||||
// a=[]Label{{Name: "a", Value: "1"}},b=[]Label{{Name: "b", Value: "1"}}, return -1
|
||||
// a=[]Label{{Name: "a", Value: "2"}},b=[]Label{{Name: "a", Value: "1"}}, return 1
|
||||
// a=[]Label{{Name: "a", Value: "1"}},b=[]Label{{Name: "a", Value: "1"}}, return 0
|
||||
func LabelCompare(a, b Labels) int {
|
||||
l := len(a)
|
||||
if len(b) < l {
|
||||
l = len(b)
|
||||
}
|
||||
|
||||
for i := 0; i < l; i++ {
|
||||
if a[i].Name != b[i].Name {
|
||||
if a[i].Name < b[i].Name {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
}
|
||||
if a[i].Value != b[i].Value {
|
||||
if a[i].Value < b[i].Value {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
}
|
||||
}
|
||||
// if all labels so far were in common, the set with fewer labels comes first.
|
||||
return len(a) - len(b)
|
||||
}
|
||||
|
||||
// ConvertToLabels convert map to Labels
|
||||
func ConvertToLabels(m map[string]string) (labelset Labels) {
|
||||
for k, v := range m {
|
||||
labelset = append(labelset, Label{
|
||||
Name: k,
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
// sort label
|
||||
slices.SortFunc(labelset, func(a, b Label) bool { return a.Name < b.Name })
|
||||
return
|
||||
}
|
||||
|
|
|
@ -274,7 +274,7 @@ func (g *Group) close() {
|
|||
|
||||
var skipRandSleepOnGroupStart bool
|
||||
|
||||
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
|
||||
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
|
||||
defer func() { close(g.finishedCh) }()
|
||||
|
||||
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
||||
|
@ -422,7 +422,7 @@ type executor struct {
|
|||
notifiers func() []notifier.Notifier
|
||||
notifierHeaders map[string]string
|
||||
|
||||
rw *remotewrite.Client
|
||||
rw remotewrite.RWClient
|
||||
|
||||
previouslySentSeriesToRWMu sync.Mutex
|
||||
// previouslySentSeriesToRW stores series sent to RW on previous iteration
|
||||
|
|
|
@ -91,7 +91,12 @@ absolute path to all .tpl files in root.
|
|||
|
||||
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's Name as label to generated alerts and time series.")
|
||||
|
||||
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.")
|
||||
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.")
|
||||
unitTestFiles = flagutil.NewArrayString("unittestFile", `Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
|
||||
Examples:
|
||||
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
|
||||
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
|
||||
`)
|
||||
)
|
||||
|
||||
var alertURLGeneratorFn notifier.AlertURLGenerator
|
||||
|
@ -117,6 +122,13 @@ func main() {
|
|||
logger.Fatalf("failed to parse %q: %s", *ruleTemplatesPath, err)
|
||||
}
|
||||
|
||||
if len(*unitTestFiles) > 0 {
|
||||
if unitRule(*unitTestFiles...) {
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if *dryRun {
|
||||
groups, err := config.Parse(*rulePath, notifier.ValidateTemplates, true)
|
||||
if err != nil {
|
||||
|
@ -399,7 +411,7 @@ func configsEqual(a, b []config.Group) bool {
|
|||
// setConfigSuccess sets config reload status to 1.
|
||||
func setConfigSuccess(at uint64) {
|
||||
configSuccess.Set(1)
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
configTimestamp.Set(at)
|
||||
// reset the error if any
|
||||
setConfigErr(nil)
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ type manager struct {
|
|||
querierBuilder datasource.QuerierBuilder
|
||||
notifiers func() []notifier.Notifier
|
||||
|
||||
rw *remotewrite.Client
|
||||
rw remotewrite.RWClient
|
||||
// remote read builder.
|
||||
rr datasource.QuerierBuilder
|
||||
|
||||
|
|
|
@ -257,7 +257,7 @@ func TestManagerUpdate(t *testing.T) {
|
|||
func TestManagerUpdateNegative(t *testing.T) {
|
||||
testCases := []struct {
|
||||
notifiers []notifier.Notifier
|
||||
rw *remotewrite.Client
|
||||
rw remotewrite.RWClient
|
||||
cfg config.Group
|
||||
expErr string
|
||||
}{
|
||||
|
|
320
app/vmalert/remotewrite/client.go
Normal file
320
app/vmalert/remotewrite/client.go
Normal file
|
@ -0,0 +1,320 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
|
||||
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
|
||||
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
|
||||
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
|
||||
)
|
||||
|
||||
// Client is an asynchronous HTTP client for writing
|
||||
// timeseries via remote write protocol.
|
||||
type Client struct {
|
||||
addr string
|
||||
c *http.Client
|
||||
authCfg *promauth.Config
|
||||
input chan prompbmarshal.TimeSeries
|
||||
flushInterval time.Duration
|
||||
maxBatchSize int
|
||||
maxQueueSize int
|
||||
|
||||
wg sync.WaitGroup
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// Config is config for remote write.
|
||||
type Config struct {
|
||||
// Addr of remote storage
|
||||
Addr string
|
||||
AuthCfg *promauth.Config
|
||||
|
||||
// Concurrency defines number of readers that
|
||||
// concurrently read from the queue and flush data
|
||||
Concurrency int
|
||||
// MaxBatchSize defines max number of timeseries
|
||||
// to be flushed at once
|
||||
MaxBatchSize int
|
||||
// MaxQueueSize defines max length of input queue
|
||||
// populated by Push method.
|
||||
// Push will be rejected once queue is full.
|
||||
MaxQueueSize int
|
||||
// FlushInterval defines time interval for flushing batches
|
||||
FlushInterval time.Duration
|
||||
// Transport will be used by the underlying http.Client
|
||||
Transport *http.Transport
|
||||
}
|
||||
|
||||
const (
|
||||
defaultConcurrency = 4
|
||||
defaultMaxBatchSize = 1e3
|
||||
defaultMaxQueueSize = 1e5
|
||||
defaultFlushInterval = 5 * time.Second
|
||||
defaultWriteTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// NewClient returns asynchronous client for
|
||||
// writing timeseries via remotewrite protocol.
|
||||
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
if cfg.Addr == "" {
|
||||
return nil, fmt.Errorf("config.Addr can't be empty")
|
||||
}
|
||||
if cfg.MaxBatchSize == 0 {
|
||||
cfg.MaxBatchSize = defaultMaxBatchSize
|
||||
}
|
||||
if cfg.MaxQueueSize == 0 {
|
||||
cfg.MaxQueueSize = defaultMaxQueueSize
|
||||
}
|
||||
if cfg.FlushInterval == 0 {
|
||||
cfg.FlushInterval = defaultFlushInterval
|
||||
}
|
||||
if cfg.Transport == nil {
|
||||
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
|
||||
}
|
||||
cc := defaultConcurrency
|
||||
if cfg.Concurrency > 0 {
|
||||
cc = cfg.Concurrency
|
||||
}
|
||||
c := &Client{
|
||||
c: &http.Client{
|
||||
Timeout: *sendTimeout,
|
||||
Transport: cfg.Transport,
|
||||
},
|
||||
addr: strings.TrimSuffix(cfg.Addr, "/"),
|
||||
authCfg: cfg.AuthCfg,
|
||||
flushInterval: cfg.FlushInterval,
|
||||
maxBatchSize: cfg.MaxBatchSize,
|
||||
maxQueueSize: cfg.MaxQueueSize,
|
||||
doneCh: make(chan struct{}),
|
||||
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
||||
}
|
||||
|
||||
for i := 0; i < cc; i++ {
|
||||
c.run(ctx)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Push adds timeseries into queue for writing into remote storage.
|
||||
// Push returns and error if client is stopped or if queue is full.
|
||||
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
return fmt.Errorf("client is closed")
|
||||
case c.input <- s:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
|
||||
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
|
||||
c.maxQueueSize)
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops the client and waits for all goroutines
|
||||
// to exit.
|
||||
func (c *Client) Close() error {
|
||||
if c.doneCh == nil {
|
||||
return fmt.Errorf("client is already closed")
|
||||
}
|
||||
close(c.input)
|
||||
close(c.doneCh)
|
||||
c.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) run(ctx context.Context) {
|
||||
ticker := time.NewTicker(c.flushInterval)
|
||||
wr := &prompbmarshal.WriteRequest{}
|
||||
shutdown := func() {
|
||||
for ts := range c.input {
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
}
|
||||
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
||||
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
|
||||
c.flush(lastCtx, wr)
|
||||
cancel()
|
||||
}
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
shutdown()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
shutdown()
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.flush(ctx, wr)
|
||||
case ts, ok := <-c.input:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||
c.flush(ctx, wr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var (
|
||||
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
||||
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
||||
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
|
||||
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
||||
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
||||
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
||||
|
||||
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
|
||||
return float64(*concurrency)
|
||||
})
|
||||
)
|
||||
|
||||
// flush is a blocking function that marshals WriteRequest and sends
|
||||
// it to remote-write endpoint. Flush performs limited amount of retries
|
||||
// if request fails.
|
||||
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||
if len(wr.Timeseries) < 1 {
|
||||
return
|
||||
}
|
||||
defer prompbmarshal.ResetWriteRequest(wr)
|
||||
defer bufferFlushDuration.UpdateDuration(time.Now())
|
||||
|
||||
data, err := wr.Marshal()
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
b := snappy.Encode(nil, data)
|
||||
|
||||
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
|
||||
if retryInterval > maxRetryInterval {
|
||||
retryInterval = maxRetryInterval
|
||||
}
|
||||
timeStart := time.Now()
|
||||
defer sendDuration.Add(time.Since(timeStart).Seconds())
|
||||
L:
|
||||
for attempts := 0; ; attempts++ {
|
||||
err := c.send(ctx, b)
|
||||
if err == nil {
|
||||
sentRows.Add(len(wr.Timeseries))
|
||||
sentBytes.Add(len(b))
|
||||
return
|
||||
}
|
||||
|
||||
_, isNotRetriable := err.(*nonRetriableError)
|
||||
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
|
||||
|
||||
if isNotRetriable {
|
||||
// exit fast if error isn't retriable
|
||||
break
|
||||
}
|
||||
|
||||
// check if request has been cancelled before backoff
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
|
||||
break L
|
||||
default:
|
||||
}
|
||||
|
||||
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
|
||||
if timeLeftForRetries < 0 {
|
||||
// the max retry time has passed, so we give up
|
||||
break
|
||||
}
|
||||
|
||||
if retryInterval > timeLeftForRetries {
|
||||
retryInterval = timeLeftForRetries
|
||||
}
|
||||
// sleeping to prevent remote db hammering
|
||||
time.Sleep(retryInterval)
|
||||
retryInterval *= 2
|
||||
|
||||
}
|
||||
|
||||
droppedRows.Add(len(wr.Timeseries))
|
||||
droppedBytes.Add(len(b))
|
||||
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
|
||||
len(wr.Timeseries))
|
||||
}
|
||||
|
||||
func (c *Client) send(ctx context.Context, data []byte) error {
|
||||
r := bytes.NewReader(data)
|
||||
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
||||
}
|
||||
|
||||
// RFC standard compliant headers
|
||||
req.Header.Set("Content-Encoding", "snappy")
|
||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||
|
||||
// Prometheus compliant headers
|
||||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||
|
||||
if c.authCfg != nil {
|
||||
c.authCfg.SetHeaders(req, true)
|
||||
}
|
||||
if !*disablePathAppend {
|
||||
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
||||
}
|
||||
resp, err := c.c.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
||||
req.URL.Redacted(), err, len(data), r.Size())
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
// according to https://prometheus.io/docs/concepts/remote_write_spec/
|
||||
// Prometheus remote Write compatible receivers MUST
|
||||
switch resp.StatusCode / 100 {
|
||||
case 2:
|
||||
// respond with a HTTP 2xx status code when the write is successful.
|
||||
return nil
|
||||
case 4:
|
||||
if resp.StatusCode != http.StatusTooManyRequests {
|
||||
// MUST NOT retry write requests on HTTP 4xx responses other than 429
|
||||
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||
resp.StatusCode, req.URL.Redacted(), body)}
|
||||
}
|
||||
fallthrough
|
||||
default:
|
||||
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||
resp.StatusCode, req.URL.Redacted(), body)
|
||||
}
|
||||
}
|
||||
|
||||
type nonRetriableError struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *nonRetriableError) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
97
app/vmalert/remotewrite/debug_client.go
Normal file
97
app/vmalert/remotewrite/debug_client.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
// DebugClient won't push series periodically, but will write data to remote endpoint
|
||||
// immediately when Push() is called
|
||||
type DebugClient struct {
|
||||
addr string
|
||||
c *http.Client
|
||||
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewDebugClient initiates and returns a new DebugClient
|
||||
func NewDebugClient() (*DebugClient, error) {
|
||||
if *addr == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
t, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create transport: %w", err)
|
||||
}
|
||||
c := &DebugClient{
|
||||
c: &http.Client{
|
||||
Timeout: *sendTimeout,
|
||||
Transport: t,
|
||||
},
|
||||
addr: strings.TrimSuffix(*addr, "/"),
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Push sends the given timeseries to the remote storage.
|
||||
func (c *DebugClient) Push(s prompbmarshal.TimeSeries) error {
|
||||
c.wg.Add(1)
|
||||
defer c.wg.Done()
|
||||
wr := &prompbmarshal.WriteRequest{Timeseries: []prompbmarshal.TimeSeries{s}}
|
||||
data, err := wr.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal the given time series: %w", err)
|
||||
}
|
||||
|
||||
return c.send(data)
|
||||
}
|
||||
|
||||
// Close stops the DebugClient
|
||||
func (c *DebugClient) Close() error {
|
||||
c.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DebugClient) send(data []byte) error {
|
||||
b := snappy.Encode(nil, data)
|
||||
r := bytes.NewReader(b)
|
||||
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
||||
}
|
||||
|
||||
// RFC standard compliant headers
|
||||
req.Header.Set("Content-Encoding", "snappy")
|
||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||
|
||||
// Prometheus compliant headers
|
||||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||
|
||||
if !*disablePathAppend {
|
||||
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
||||
}
|
||||
resp, err := c.c.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
||||
req.URL.Redacted(), err, len(data), r.Size())
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
if resp.StatusCode/100 == 2 {
|
||||
return nil
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||
resp.StatusCode, req.URL.Redacted(), body)
|
||||
}
|
50
app/vmalert/remotewrite/debug_client_test.go
Normal file
50
app/vmalert/remotewrite/debug_client_test.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
)
|
||||
|
||||
func TestDebugClient_Push(t *testing.T) {
|
||||
testSrv := newRWServer()
|
||||
oldAddr := *addr
|
||||
*addr = testSrv.URL
|
||||
defer func() {
|
||||
*addr = oldAddr
|
||||
}()
|
||||
|
||||
client, err := NewDebugClient()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create debug client: %s", err)
|
||||
}
|
||||
|
||||
const rowsN = 100
|
||||
var sent int
|
||||
for i := 0; i < rowsN; i++ {
|
||||
s := prompbmarshal.TimeSeries{
|
||||
Samples: []prompbmarshal.Sample{{
|
||||
Value: float64(i),
|
||||
Timestamp: time.Now().Unix(),
|
||||
}},
|
||||
}
|
||||
err := client.Push(s)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %s", err)
|
||||
}
|
||||
if err == nil {
|
||||
sent++
|
||||
}
|
||||
}
|
||||
if sent == 0 {
|
||||
t.Fatalf("0 series sent")
|
||||
}
|
||||
if err := client.Close(); err != nil {
|
||||
t.Fatalf("failed to close client: %s", err)
|
||||
}
|
||||
got := testSrv.accepted()
|
||||
if got != sent {
|
||||
t.Fatalf("expected to have %d series; got %d", sent, got)
|
||||
}
|
||||
}
|
|
@ -1,320 +1,13 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
|
||||
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
|
||||
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
|
||||
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
|
||||
)
|
||||
|
||||
// Client is an asynchronous HTTP client for writing
|
||||
// timeseries via remote write protocol.
|
||||
type Client struct {
|
||||
addr string
|
||||
c *http.Client
|
||||
authCfg *promauth.Config
|
||||
input chan prompbmarshal.TimeSeries
|
||||
flushInterval time.Duration
|
||||
maxBatchSize int
|
||||
maxQueueSize int
|
||||
|
||||
wg sync.WaitGroup
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
// Config is config for remote write.
|
||||
type Config struct {
|
||||
// Addr of remote storage
|
||||
Addr string
|
||||
AuthCfg *promauth.Config
|
||||
|
||||
// Concurrency defines number of readers that
|
||||
// concurrently read from the queue and flush data
|
||||
Concurrency int
|
||||
// MaxBatchSize defines max number of timeseries
|
||||
// to be flushed at once
|
||||
MaxBatchSize int
|
||||
// MaxQueueSize defines max length of input queue
|
||||
// populated by Push method.
|
||||
// Push will be rejected once queue is full.
|
||||
MaxQueueSize int
|
||||
// FlushInterval defines time interval for flushing batches
|
||||
FlushInterval time.Duration
|
||||
// Transport will be used by the underlying http.Client
|
||||
Transport *http.Transport
|
||||
}
|
||||
|
||||
const (
|
||||
defaultConcurrency = 4
|
||||
defaultMaxBatchSize = 1e3
|
||||
defaultMaxQueueSize = 1e5
|
||||
defaultFlushInterval = 5 * time.Second
|
||||
defaultWriteTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
// NewClient returns asynchronous client for
|
||||
// writing timeseries via remotewrite protocol.
|
||||
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||
if cfg.Addr == "" {
|
||||
return nil, fmt.Errorf("config.Addr can't be empty")
|
||||
}
|
||||
if cfg.MaxBatchSize == 0 {
|
||||
cfg.MaxBatchSize = defaultMaxBatchSize
|
||||
}
|
||||
if cfg.MaxQueueSize == 0 {
|
||||
cfg.MaxQueueSize = defaultMaxQueueSize
|
||||
}
|
||||
if cfg.FlushInterval == 0 {
|
||||
cfg.FlushInterval = defaultFlushInterval
|
||||
}
|
||||
if cfg.Transport == nil {
|
||||
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
|
||||
}
|
||||
cc := defaultConcurrency
|
||||
if cfg.Concurrency > 0 {
|
||||
cc = cfg.Concurrency
|
||||
}
|
||||
c := &Client{
|
||||
c: &http.Client{
|
||||
Timeout: *sendTimeout,
|
||||
Transport: cfg.Transport,
|
||||
},
|
||||
addr: strings.TrimSuffix(cfg.Addr, "/"),
|
||||
authCfg: cfg.AuthCfg,
|
||||
flushInterval: cfg.FlushInterval,
|
||||
maxBatchSize: cfg.MaxBatchSize,
|
||||
maxQueueSize: cfg.MaxQueueSize,
|
||||
doneCh: make(chan struct{}),
|
||||
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
||||
}
|
||||
|
||||
for i := 0; i < cc; i++ {
|
||||
c.run(ctx)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Push adds timeseries into queue for writing into remote storage.
|
||||
// Push returns and error if client is stopped or if queue is full.
|
||||
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
return fmt.Errorf("client is closed")
|
||||
case c.input <- s:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
|
||||
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
|
||||
c.maxQueueSize)
|
||||
}
|
||||
}
|
||||
|
||||
// Close stops the client and waits for all goroutines
|
||||
// to exit.
|
||||
func (c *Client) Close() error {
|
||||
if c.doneCh == nil {
|
||||
return fmt.Errorf("client is already closed")
|
||||
}
|
||||
close(c.input)
|
||||
close(c.doneCh)
|
||||
c.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) run(ctx context.Context) {
|
||||
ticker := time.NewTicker(c.flushInterval)
|
||||
wr := &prompbmarshal.WriteRequest{}
|
||||
shutdown := func() {
|
||||
for ts := range c.input {
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
}
|
||||
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
||||
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
|
||||
c.flush(lastCtx, wr)
|
||||
cancel()
|
||||
}
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-c.doneCh:
|
||||
shutdown()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
shutdown()
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.flush(ctx, wr)
|
||||
case ts, ok := <-c.input:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
wr.Timeseries = append(wr.Timeseries, ts)
|
||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||
c.flush(ctx, wr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var (
|
||||
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
||||
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
||||
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
|
||||
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
||||
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
||||
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
||||
|
||||
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
|
||||
return float64(*concurrency)
|
||||
})
|
||||
)
|
||||
|
||||
// flush is a blocking function that marshals WriteRequest and sends
|
||||
// it to remote-write endpoint. Flush performs limited amount of retries
|
||||
// if request fails.
|
||||
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||
if len(wr.Timeseries) < 1 {
|
||||
return
|
||||
}
|
||||
defer prompbmarshal.ResetWriteRequest(wr)
|
||||
defer bufferFlushDuration.UpdateDuration(time.Now())
|
||||
|
||||
data, err := wr.Marshal()
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
b := snappy.Encode(nil, data)
|
||||
|
||||
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
|
||||
if retryInterval > maxRetryInterval {
|
||||
retryInterval = maxRetryInterval
|
||||
}
|
||||
timeStart := time.Now()
|
||||
defer sendDuration.Add(time.Since(timeStart).Seconds())
|
||||
L:
|
||||
for attempts := 0; ; attempts++ {
|
||||
err := c.send(ctx, b)
|
||||
if err == nil {
|
||||
sentRows.Add(len(wr.Timeseries))
|
||||
sentBytes.Add(len(b))
|
||||
return
|
||||
}
|
||||
|
||||
_, isNotRetriable := err.(*nonRetriableError)
|
||||
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
|
||||
|
||||
if isNotRetriable {
|
||||
// exit fast if error isn't retriable
|
||||
break
|
||||
}
|
||||
|
||||
// check if request has been cancelled before backoff
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
|
||||
break L
|
||||
default:
|
||||
}
|
||||
|
||||
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
|
||||
if timeLeftForRetries < 0 {
|
||||
// the max retry time has passed, so we give up
|
||||
break
|
||||
}
|
||||
|
||||
if retryInterval > timeLeftForRetries {
|
||||
retryInterval = timeLeftForRetries
|
||||
}
|
||||
// sleeping to prevent remote db hammering
|
||||
time.Sleep(retryInterval)
|
||||
retryInterval *= 2
|
||||
|
||||
}
|
||||
|
||||
droppedRows.Add(len(wr.Timeseries))
|
||||
droppedBytes.Add(len(b))
|
||||
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
|
||||
len(wr.Timeseries))
|
||||
}
|
||||
|
||||
func (c *Client) send(ctx context.Context, data []byte) error {
|
||||
r := bytes.NewReader(data)
|
||||
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
||||
}
|
||||
|
||||
// RFC standard compliant headers
|
||||
req.Header.Set("Content-Encoding", "snappy")
|
||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||
|
||||
// Prometheus compliant headers
|
||||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||
|
||||
if c.authCfg != nil {
|
||||
c.authCfg.SetHeaders(req, true)
|
||||
}
|
||||
if !*disablePathAppend {
|
||||
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
||||
}
|
||||
resp, err := c.c.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
||||
req.URL.Redacted(), err, len(data), r.Size())
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
// according to https://prometheus.io/docs/concepts/remote_write_spec/
|
||||
// Prometheus remote Write compatible receivers MUST
|
||||
switch resp.StatusCode / 100 {
|
||||
case 2:
|
||||
// respond with a HTTP 2xx status code when the write is successful.
|
||||
return nil
|
||||
case 4:
|
||||
if resp.StatusCode != http.StatusTooManyRequests {
|
||||
// MUST NOT retry write requests on HTTP 4xx responses other than 429
|
||||
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||
resp.StatusCode, req.URL.Redacted(), body)}
|
||||
}
|
||||
fallthrough
|
||||
default:
|
||||
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||
resp.StatusCode, req.URL.Redacted(), body)
|
||||
}
|
||||
}
|
||||
|
||||
type nonRetriableError struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *nonRetriableError) Error() string {
|
||||
return e.err.Error()
|
||||
// RWClient represents an HTTP client for pushing data via remote write protocol
|
||||
type RWClient interface {
|
||||
// Push pushes the give time series to remote storage
|
||||
Push(s prompbmarshal.TimeSeries) error
|
||||
// Close stops the client. Client can't be reused after Close call.
|
||||
Close() error
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ var (
|
|||
"Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.")
|
||||
)
|
||||
|
||||
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewrite.Client) error {
|
||||
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewrite.RWClient) error {
|
||||
if *replayMaxDatapoints < 1 {
|
||||
return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1")
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewr
|
|||
return nil
|
||||
}
|
||||
|
||||
func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
|
||||
func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int {
|
||||
var total int
|
||||
step := g.Interval * time.Duration(*replayMaxDatapoints)
|
||||
ri := rangeIterator{start: start, end: end, step: step}
|
||||
|
@ -119,7 +119,7 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
|
|||
return total
|
||||
}
|
||||
|
||||
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
|
||||
func replayRule(rule Rule, start, end time.Time, rw remotewrite.RWClient) (int, error) {
|
||||
var err error
|
||||
var tss []prompbmarshal.TimeSeries
|
||||
for i := 0; i < *replayRuleRetryAttempts; i++ {
|
||||
|
|
40
app/vmalert/unitest_test.go
Normal file
40
app/vmalert/unitest_test.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUnitRule(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
disableGroupLabel bool
|
||||
files []string
|
||||
failed bool
|
||||
}{
|
||||
{
|
||||
name: "run multi files",
|
||||
files: []string{"./unittest/testdata/test1.yaml", "./unittest/testdata/test2.yaml"},
|
||||
failed: false,
|
||||
},
|
||||
{
|
||||
name: "disable group label",
|
||||
disableGroupLabel: true,
|
||||
files: []string{"./unittest/testdata/disable-group-label.yaml"},
|
||||
failed: false,
|
||||
},
|
||||
{
|
||||
name: "failing test",
|
||||
files: []string{"./unittest/testdata/failed-test.yaml"},
|
||||
failed: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
oldFlag := *disableAlertGroupLabel
|
||||
*disableAlertGroupLabel = tc.disableGroupLabel
|
||||
fail := unitRule(tc.files...)
|
||||
if fail != tc.failed {
|
||||
t.Fatalf("failed to test %s, expect %t, got %t", tc.name, tc.failed, fail)
|
||||
}
|
||||
*disableAlertGroupLabel = oldFlag
|
||||
}
|
||||
}
|
436
app/vmalert/unittest.go
Normal file
436
app/vmalert/unittest.go
Normal file
|
@ -0,0 +1,436 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
vmalertconfig "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/unittest"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
storagePath string
|
||||
// insert series from 1970-01-01T00:00:00
|
||||
testStartTime = time.Unix(0, 0).UTC()
|
||||
|
||||
testPromWriteHTTPPath = "http://127.0.0.1" + *httpListenAddr + "/api/v1/write"
|
||||
testDataSourcePath = "http://127.0.0.1" + *httpListenAddr + "/prometheus"
|
||||
testRemoteWritePath = "http://127.0.0.1" + *httpListenAddr
|
||||
testHealthHTTPPath = "http://127.0.0.1" + *httpListenAddr + "/health"
|
||||
)
|
||||
|
||||
const (
|
||||
testStoragePath = "vmalert-unittest"
|
||||
testLogLevel = "ERROR"
|
||||
)
|
||||
|
||||
func unitRule(files ...string) bool {
|
||||
storagePath = filepath.Join(os.TempDir(), testStoragePath)
|
||||
processFlags()
|
||||
vminsert.Init()
|
||||
vmselect.Init()
|
||||
// storagePath will be created again when closing vmselect, so remove it again.
|
||||
defer fs.MustRemoveAll(storagePath)
|
||||
defer vminsert.Stop()
|
||||
defer vmselect.Stop()
|
||||
return rulesUnitTest(files...)
|
||||
}
|
||||
|
||||
func rulesUnitTest(files ...string) bool {
|
||||
var failed bool
|
||||
for _, f := range files {
|
||||
if err := ruleUnitTest(f); err != nil {
|
||||
fmt.Println(" FAILED")
|
||||
fmt.Printf("\nfailed to run unit test for file %q: \n%s", f, err)
|
||||
failed = true
|
||||
} else {
|
||||
fmt.Println(" SUCCESS")
|
||||
}
|
||||
}
|
||||
return failed
|
||||
}
|
||||
|
||||
func ruleUnitTest(filename string) []error {
|
||||
fmt.Println("\nUnit Testing: ", filename)
|
||||
b, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
return []error{fmt.Errorf("failed to read file: %w", err)}
|
||||
}
|
||||
|
||||
var unitTestInp unitTestFile
|
||||
if err := yaml.UnmarshalStrict(b, &unitTestInp); err != nil {
|
||||
return []error{fmt.Errorf("failed to unmarshal file: %w", err)}
|
||||
}
|
||||
if err := resolveAndGlobFilepaths(filepath.Dir(filename), &unitTestInp); err != nil {
|
||||
return []error{fmt.Errorf("failed to resolve path for `rule_files`: %w", err)}
|
||||
}
|
||||
|
||||
if unitTestInp.EvaluationInterval.Duration() == 0 {
|
||||
fmt.Println("evaluation_interval set to 1m by default")
|
||||
unitTestInp.EvaluationInterval = &promutils.Duration{D: 1 * time.Minute}
|
||||
}
|
||||
|
||||
groupOrderMap := make(map[string]int)
|
||||
for i, gn := range unitTestInp.GroupEvalOrder {
|
||||
if _, ok := groupOrderMap[gn]; ok {
|
||||
return []error{fmt.Errorf("group name repeated in `group_eval_order`: %s", gn)}
|
||||
}
|
||||
groupOrderMap[gn] = i
|
||||
}
|
||||
|
||||
testGroups, err := vmalertconfig.Parse(unitTestInp.RuleFiles, nil, true)
|
||||
if err != nil {
|
||||
return []error{fmt.Errorf("failed to parse `rule_files`: %w", err)}
|
||||
}
|
||||
|
||||
var errs []error
|
||||
for _, t := range unitTestInp.Tests {
|
||||
if err := verifyTestGroup(t); err != nil {
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
testErrs := t.test(unitTestInp.EvaluationInterval.Duration(), groupOrderMap, testGroups)
|
||||
errs = append(errs, testErrs...)
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errs
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyTestGroup(group testGroup) error {
|
||||
var testGroupName string
|
||||
if group.TestGroupName != "" {
|
||||
testGroupName = fmt.Sprintf("testGroupName: %s\n", group.TestGroupName)
|
||||
}
|
||||
for _, at := range group.AlertRuleTests {
|
||||
if at.Alertname == "" {
|
||||
return fmt.Errorf("\n%s missing required filed \"alertname\"", testGroupName)
|
||||
}
|
||||
if !*disableAlertGroupLabel && at.GroupName == "" {
|
||||
return fmt.Errorf("\n%s missing required filed \"groupname\" when flag \"disableAlertGroupLabel\" is false", testGroupName)
|
||||
}
|
||||
if at.EvalTime == nil {
|
||||
return fmt.Errorf("\n%s missing required filed \"eval_time\"", testGroupName)
|
||||
}
|
||||
}
|
||||
for _, et := range group.MetricsqlExprTests {
|
||||
if et.Expr == "" {
|
||||
return fmt.Errorf("\n%s missing required filed \"expr\"", testGroupName)
|
||||
}
|
||||
if et.EvalTime == nil {
|
||||
return fmt.Errorf("\n%s missing required filed \"eval_time\"", testGroupName)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func processFlags() {
|
||||
flag.Parse()
|
||||
for _, fv := range []struct {
|
||||
flag string
|
||||
value string
|
||||
}{
|
||||
{flag: "storageDataPath", value: storagePath},
|
||||
{flag: "loggerLevel", value: testLogLevel},
|
||||
{flag: "search.disableCache", value: "true"},
|
||||
// set storage retention time to 100 years, allow to store series from 1970-01-01T00:00:00.
|
||||
{flag: "retentionPeriod", value: "100y"},
|
||||
{flag: "datasource.url", value: testDataSourcePath},
|
||||
{flag: "remoteWrite.url", value: testRemoteWritePath},
|
||||
} {
|
||||
// panics if flag doesn't exist
|
||||
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
|
||||
logger.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func setUp() {
|
||||
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
|
||||
go httpserver.Serve(*httpListenAddr, false, func(w http.ResponseWriter, r *http.Request) bool {
|
||||
switch r.URL.Path {
|
||||
case "/prometheus/api/v1/query":
|
||||
if err := prometheus.QueryHandler(nil, time.Now(), w, r); err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
}
|
||||
return true
|
||||
case "/prometheus/api/v1/write", "/api/v1/write":
|
||||
if err := promremotewrite.InsertHandler(r); err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
}
|
||||
return true
|
||||
default:
|
||||
}
|
||||
return false
|
||||
})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
readyCheckFunc := func() bool {
|
||||
resp, err := http.Get(testHealthHTTPPath)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
return resp.StatusCode == 200
|
||||
}
|
||||
checkCheck:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Fatalf("http server can't be ready in 30s")
|
||||
default:
|
||||
if readyCheckFunc() {
|
||||
break checkCheck
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tearDown() {
|
||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
||||
logger.Errorf("cannot stop the webservice: %s", err)
|
||||
}
|
||||
vmstorage.Stop()
|
||||
metrics.UnregisterAllMetrics()
|
||||
fs.MustRemoveAll(storagePath)
|
||||
}
|
||||
|
||||
// resolveAndGlobFilepaths joins all relative paths in a configuration
|
||||
// with a given base directory and replaces all globs with matching files.
|
||||
func resolveAndGlobFilepaths(baseDir string, utf *unitTestFile) error {
|
||||
for i, rf := range utf.RuleFiles {
|
||||
if rf != "" && !filepath.IsAbs(rf) {
|
||||
utf.RuleFiles[i] = filepath.Join(baseDir, rf)
|
||||
}
|
||||
}
|
||||
|
||||
var globbedFiles []string
|
||||
for _, rf := range utf.RuleFiles {
|
||||
m, err := filepath.Glob(rf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(m) == 0 {
|
||||
fmt.Fprintln(os.Stderr, " WARNING: no file match pattern", rf)
|
||||
}
|
||||
globbedFiles = append(globbedFiles, m...)
|
||||
}
|
||||
utf.RuleFiles = globbedFiles
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]int, testGroups []vmalertconfig.Group) (checkErrs []error) {
|
||||
// set up vmstorage and http server for ingest and read queries
|
||||
setUp()
|
||||
// tear down vmstorage and clean the data dir
|
||||
defer tearDown()
|
||||
|
||||
err := unittest.WriteInputSeries(tg.InputSeries, tg.Interval, testStartTime, testPromWriteHTTPPath)
|
||||
if err != nil {
|
||||
return []error{err}
|
||||
}
|
||||
|
||||
q, err := datasource.Init(nil)
|
||||
if err != nil {
|
||||
return []error{fmt.Errorf("failed to init datasource: %v", err)}
|
||||
}
|
||||
rw, err := remotewrite.NewDebugClient()
|
||||
if err != nil {
|
||||
return []error{fmt.Errorf("failed to init wr: %v", err)}
|
||||
}
|
||||
|
||||
alertEvalTimesMap := map[time.Duration]struct{}{}
|
||||
alertExpResultMap := map[time.Duration]map[string]map[string][]unittest.ExpAlert{}
|
||||
for _, at := range tg.AlertRuleTests {
|
||||
et := at.EvalTime.Duration()
|
||||
alertEvalTimesMap[et] = struct{}{}
|
||||
if _, ok := alertExpResultMap[et]; !ok {
|
||||
alertExpResultMap[et] = make(map[string]map[string][]unittest.ExpAlert)
|
||||
}
|
||||
if _, ok := alertExpResultMap[et][at.GroupName]; !ok {
|
||||
alertExpResultMap[et][at.GroupName] = make(map[string][]unittest.ExpAlert)
|
||||
}
|
||||
alertExpResultMap[et][at.GroupName][at.Alertname] = at.ExpAlerts
|
||||
}
|
||||
alertEvalTimes := make([]time.Duration, 0, len(alertEvalTimesMap))
|
||||
for k := range alertEvalTimesMap {
|
||||
alertEvalTimes = append(alertEvalTimes, k)
|
||||
}
|
||||
sort.Slice(alertEvalTimes, func(i, j int) bool {
|
||||
return alertEvalTimes[i] < alertEvalTimes[j]
|
||||
})
|
||||
|
||||
// sort group eval order according to the given "group_eval_order".
|
||||
sort.Slice(testGroups, func(i, j int) bool {
|
||||
return groupOrderMap[testGroups[i].Name] < groupOrderMap[testGroups[j].Name]
|
||||
})
|
||||
|
||||
// create groups with given rule
|
||||
var groups []*Group
|
||||
for _, group := range testGroups {
|
||||
ng := newGroup(group, q, *evaluationInterval, tg.ExternalLabels)
|
||||
groups = append(groups, ng)
|
||||
}
|
||||
|
||||
e := &executor{
|
||||
rw: rw,
|
||||
notifiers: func() []notifier.Notifier { return nil },
|
||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
||||
}
|
||||
|
||||
evalIndex := 0
|
||||
maxEvalTime := testStartTime.Add(tg.maxEvalTime())
|
||||
for ts := testStartTime; ts.Before(maxEvalTime) || ts.Equal(maxEvalTime); ts = ts.Add(evalInterval) {
|
||||
for _, g := range groups {
|
||||
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
|
||||
errs := e.execConcurrently(context.Background(), g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
|
||||
for err := range errs {
|
||||
if err != nil {
|
||||
checkErrs = append(checkErrs, fmt.Errorf("\nfailed to exec group: %q, time: %s, err: %w", g.Name,
|
||||
ts, err))
|
||||
}
|
||||
}
|
||||
// flush series after each group evaluation
|
||||
vmstorage.Storage.DebugFlush()
|
||||
}
|
||||
|
||||
// check alert_rule_test case at every eval time
|
||||
for evalIndex < len(alertEvalTimes) {
|
||||
if ts.Sub(testStartTime) > alertEvalTimes[evalIndex] ||
|
||||
alertEvalTimes[evalIndex] >= ts.Add(evalInterval).Sub(testStartTime) {
|
||||
break
|
||||
}
|
||||
gotAlertsMap := map[string]map[string]unittest.LabelsAndAnnotations{}
|
||||
for _, g := range groups {
|
||||
if *disableAlertGroupLabel {
|
||||
g.Name = ""
|
||||
}
|
||||
if _, ok := alertExpResultMap[time.Duration(ts.UnixNano())][g.Name]; !ok {
|
||||
continue
|
||||
}
|
||||
if _, ok := gotAlertsMap[g.Name]; !ok {
|
||||
gotAlertsMap[g.Name] = make(map[string]unittest.LabelsAndAnnotations)
|
||||
}
|
||||
for _, rule := range g.Rules {
|
||||
ar, isAlertRule := rule.(*AlertingRule)
|
||||
if !isAlertRule {
|
||||
continue
|
||||
}
|
||||
if _, ok := alertExpResultMap[time.Duration(ts.UnixNano())][g.Name][ar.Name]; ok {
|
||||
for _, got := range ar.alerts {
|
||||
if got.State != notifier.StateFiring {
|
||||
continue
|
||||
}
|
||||
laa := unittest.LabelAndAnnotation{
|
||||
Labels: datasource.ConvertToLabels(got.Labels),
|
||||
Annotations: datasource.ConvertToLabels(got.Annotations),
|
||||
}
|
||||
gotAlertsMap[g.Name][ar.Name] = append(gotAlertsMap[g.Name][ar.Name], laa)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
for groupname, gres := range alertExpResultMap[alertEvalTimes[evalIndex]] {
|
||||
for alertname, res := range gres {
|
||||
var expAlerts unittest.LabelsAndAnnotations
|
||||
for _, expAlert := range res {
|
||||
if expAlert.ExpLabels == nil {
|
||||
expAlert.ExpLabels = make(map[string]string)
|
||||
}
|
||||
// alertGroupNameLabel is added as additional labels when `disableAlertGroupLabel` is false
|
||||
if !*disableAlertGroupLabel {
|
||||
expAlert.ExpLabels[alertGroupNameLabel] = groupname
|
||||
}
|
||||
// alertNameLabel is added as additional labels in vmalert.
|
||||
expAlert.ExpLabels[alertNameLabel] = alertname
|
||||
expAlerts = append(expAlerts, unittest.LabelAndAnnotation{
|
||||
Labels: datasource.ConvertToLabels(expAlert.ExpLabels),
|
||||
Annotations: datasource.ConvertToLabels(expAlert.ExpAnnotations),
|
||||
})
|
||||
}
|
||||
sort.Sort(expAlerts)
|
||||
|
||||
gotAlerts := gotAlertsMap[groupname][alertname]
|
||||
sort.Sort(gotAlerts)
|
||||
if !reflect.DeepEqual(expAlerts, gotAlerts) {
|
||||
var testGroupName string
|
||||
if tg.TestGroupName != "" {
|
||||
testGroupName = fmt.Sprintf("testGroupName: %s,\n", tg.TestGroupName)
|
||||
}
|
||||
expString := unittest.IndentLines(expAlerts.String(), " ")
|
||||
gotString := unittest.IndentLines(gotAlerts.String(), " ")
|
||||
checkErrs = append(checkErrs, fmt.Errorf("\n%s groupname: %s, alertname: %s, time: %s, \n exp:%v, \n got:%v ",
|
||||
testGroupName, groupname, alertname, alertEvalTimes[evalIndex].String(), expString, gotString))
|
||||
}
|
||||
}
|
||||
}
|
||||
evalIndex++
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
checkErrs = append(checkErrs, unittest.CheckMetricsqlCase(tg.MetricsqlExprTests, q)...)
|
||||
return checkErrs
|
||||
}
|
||||
|
||||
// unitTestFile holds the contents of a single unit test file
|
||||
type unitTestFile struct {
|
||||
RuleFiles []string `yaml:"rule_files"`
|
||||
EvaluationInterval *promutils.Duration `yaml:"evaluation_interval"`
|
||||
GroupEvalOrder []string `yaml:"group_eval_order"`
|
||||
Tests []testGroup `yaml:"tests"`
|
||||
}
|
||||
|
||||
// testGroup is a group of input series and test cases associated with it
|
||||
type testGroup struct {
|
||||
Interval *promutils.Duration `yaml:"interval"`
|
||||
InputSeries []unittest.Series `yaml:"input_series"`
|
||||
AlertRuleTests []unittest.AlertTestCase `yaml:"alert_rule_test"`
|
||||
MetricsqlExprTests []unittest.MetricsqlTestCase `yaml:"metricsql_expr_test"`
|
||||
ExternalLabels map[string]string `yaml:"external_labels"`
|
||||
TestGroupName string `yaml:"name"`
|
||||
}
|
||||
|
||||
// maxEvalTime returns the max eval time among all alert_rule_test and metricsql_expr_test
|
||||
func (tg *testGroup) maxEvalTime() time.Duration {
|
||||
var maxd time.Duration
|
||||
for _, alert := range tg.AlertRuleTests {
|
||||
if alert.EvalTime.Duration() > maxd {
|
||||
maxd = alert.EvalTime.Duration()
|
||||
}
|
||||
}
|
||||
for _, met := range tg.MetricsqlExprTests {
|
||||
if met.EvalTime.Duration() > maxd {
|
||||
maxd = met.EvalTime.Duration()
|
||||
}
|
||||
}
|
||||
return maxd
|
||||
}
|
19
app/vmalert/unittest/alerting.go
Normal file
19
app/vmalert/unittest/alerting.go
Normal file
|
@ -0,0 +1,19 @@
|
|||
package unittest
|
||||
|
||||
import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
)
|
||||
|
||||
// AlertTestCase holds alert_rule_test cases defined in test file
|
||||
type AlertTestCase struct {
|
||||
EvalTime *promutils.Duration `yaml:"eval_time"`
|
||||
GroupName string `yaml:"groupname"`
|
||||
Alertname string `yaml:"alertname"`
|
||||
ExpAlerts []ExpAlert `yaml:"exp_alerts"`
|
||||
}
|
||||
|
||||
// ExpAlert holds exp_alerts defined in test file
|
||||
type ExpAlert struct {
|
||||
ExpLabels map[string]string `yaml:"exp_labels"`
|
||||
ExpAnnotations map[string]string `yaml:"exp_annotations"`
|
||||
}
|
177
app/vmalert/unittest/input.go
Normal file
177
app/vmalert/unittest/input.go
Normal file
|
@ -0,0 +1,177 @@
|
|||
package unittest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
testutil "github.com/VictoriaMetrics/VictoriaMetrics/app/victoria-metrics/test"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
// Series holds input_series defined in the test file
|
||||
type Series struct {
|
||||
Series string `yaml:"series"`
|
||||
Values string `yaml:"values"`
|
||||
}
|
||||
|
||||
// sequenceValue is an omittable value in a sequence of time series values.
|
||||
type sequenceValue struct {
|
||||
Value float64
|
||||
Omitted bool
|
||||
}
|
||||
|
||||
func httpWrite(address string, r io.Reader) {
|
||||
resp, err := http.Post(address, "", r)
|
||||
if err != nil {
|
||||
logger.Fatalf("failed to send to storage: %v", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
|
||||
// WriteInputSeries send input series to vmstorage and flush them
|
||||
func WriteInputSeries(input []Series, interval *promutils.Duration, startStamp time.Time, dst string) error {
|
||||
r := testutil.WriteRequest{}
|
||||
for _, data := range input {
|
||||
expr, err := metricsql.Parse(data.Series)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse series %s: %v", data.Series, err)
|
||||
}
|
||||
promvals, err := parseInputValue(data.Values, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse input series value %s: %v", data.Values, err)
|
||||
}
|
||||
metricExpr, ok := expr.(*metricsql.MetricExpr)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to parse series %s to metric expr: %v", data.Series, err)
|
||||
}
|
||||
samples := make([]testutil.Sample, 0, len(promvals))
|
||||
ts := startStamp
|
||||
for _, v := range promvals {
|
||||
if !v.Omitted {
|
||||
samples = append(samples, testutil.Sample{
|
||||
Timestamp: ts.UnixMilli(),
|
||||
Value: v.Value,
|
||||
})
|
||||
}
|
||||
ts = ts.Add(interval.Duration())
|
||||
}
|
||||
var ls []testutil.Label
|
||||
for _, filter := range metricExpr.LabelFilterss[0] {
|
||||
ls = append(ls, testutil.Label{Name: filter.Label, Value: filter.Value})
|
||||
}
|
||||
r.Timeseries = append(r.Timeseries, testutil.TimeSeries{Labels: ls, Samples: samples})
|
||||
}
|
||||
|
||||
data, err := testutil.Compress(r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to compress data: %v", err)
|
||||
}
|
||||
// write input series to vm
|
||||
httpWrite(dst, bytes.NewBuffer(data))
|
||||
vmstorage.Storage.DebugFlush()
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseInputValue support input like "1", "1+1x1 _ -4 3+20x1", see more examples in test.
|
||||
func parseInputValue(input string, origin bool) ([]sequenceValue, error) {
|
||||
var res []sequenceValue
|
||||
items := strings.Split(input, " ")
|
||||
reg2 := regexp.MustCompile(`\D?\d*\D?`)
|
||||
for _, item := range items {
|
||||
vals := reg2.FindAllString(item, -1)
|
||||
switch len(vals) {
|
||||
case 1:
|
||||
if vals[0] == "_" {
|
||||
res = append(res, sequenceValue{Omitted: true})
|
||||
continue
|
||||
}
|
||||
v, err := strconv.ParseFloat(vals[0], 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = append(res, sequenceValue{Value: v})
|
||||
continue
|
||||
case 2:
|
||||
p1 := vals[0][:len(vals[0])-1]
|
||||
v2, err := strconv.ParseInt(vals[1], 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
option := vals[0][len(vals[0])-1]
|
||||
switch option {
|
||||
case '+':
|
||||
v1, err := strconv.ParseFloat(p1, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = append(res, sequenceValue{Value: v1 + float64(v2)})
|
||||
case 'x':
|
||||
for i := int64(0); i <= v2; i++ {
|
||||
if p1 == "_" {
|
||||
if i == 0 {
|
||||
i = 1
|
||||
}
|
||||
res = append(res, sequenceValue{Omitted: true})
|
||||
continue
|
||||
}
|
||||
v1, err := strconv.ParseFloat(p1, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !origin || v1 == 0 {
|
||||
res = append(res, sequenceValue{Value: v1 * float64(i)})
|
||||
continue
|
||||
}
|
||||
newVal := fmt.Sprintf("%s+0x%s", p1, vals[1])
|
||||
newRes, err := parseInputValue(newVal, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = append(res, newRes...)
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("got invalid operation %b", option)
|
||||
}
|
||||
case 3:
|
||||
r1, err := parseInputValue(fmt.Sprintf("%s%s", vals[1], vals[2]), false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p1 := vals[0][:len(vals[0])-1]
|
||||
v1, err := strconv.ParseFloat(p1, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
option := vals[0][len(vals[0])-1]
|
||||
var isAdd bool
|
||||
if option == '+' {
|
||||
isAdd = true
|
||||
}
|
||||
for _, r := range r1 {
|
||||
if isAdd {
|
||||
res = append(res, sequenceValue{
|
||||
Value: r.Value + v1,
|
||||
})
|
||||
} else {
|
||||
res = append(res, sequenceValue{
|
||||
Value: v1 - r.Value,
|
||||
})
|
||||
}
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported input %s", input)
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
70
app/vmalert/unittest/input_test.go
Normal file
70
app/vmalert/unittest/input_test.go
Normal file
|
@ -0,0 +1,70 @@
|
|||
package unittest
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseInputValue(t *testing.T) {
|
||||
testCases := []struct {
|
||||
input string
|
||||
exp []sequenceValue
|
||||
failed bool
|
||||
}{
|
||||
{
|
||||
"",
|
||||
nil,
|
||||
true,
|
||||
},
|
||||
{
|
||||
"testfailed",
|
||||
nil,
|
||||
true,
|
||||
},
|
||||
{
|
||||
"-4",
|
||||
[]sequenceValue{{Value: -4}},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"_",
|
||||
[]sequenceValue{{Omitted: true}},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"-4x1",
|
||||
[]sequenceValue{{Value: -4}, {Value: -4}},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"_x1",
|
||||
[]sequenceValue{{Omitted: true}},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"1+1x4",
|
||||
[]sequenceValue{{Value: 1}, {Value: 2}, {Value: 3}, {Value: 4}, {Value: 5}},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"2-1x4",
|
||||
[]sequenceValue{{Value: 2}, {Value: 1}, {Value: 0}, {Value: -1}, {Value: -2}},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"1+1x1 _ -4 3+20x1",
|
||||
[]sequenceValue{{Value: 1}, {Value: 2}, {Omitted: true}, {Value: -4}, {Value: 3}, {Value: 23}},
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
output, err := parseInputValue(tc.input, true)
|
||||
if err != nil != tc.failed {
|
||||
t.Fatalf("failed to parse %s, expect %t, got %t", tc.input, tc.failed, err != nil)
|
||||
}
|
||||
if !reflect.DeepEqual(tc.exp, output) {
|
||||
t.Fatalf("expect %v, got %v", tc.exp, output)
|
||||
}
|
||||
}
|
||||
}
|
92
app/vmalert/unittest/recording.go
Normal file
92
app/vmalert/unittest/recording.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package unittest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
// MetricsqlTestCase holds metricsql_expr_test cases defined in test file
|
||||
type MetricsqlTestCase struct {
|
||||
Expr string `yaml:"expr"`
|
||||
EvalTime *promutils.Duration `yaml:"eval_time"`
|
||||
ExpSamples []expSample `yaml:"exp_samples"`
|
||||
}
|
||||
|
||||
type expSample struct {
|
||||
Labels string `yaml:"labels"`
|
||||
Value float64 `yaml:"value"`
|
||||
}
|
||||
|
||||
// CheckMetricsqlCase will check metricsql_expr_test cases
|
||||
func CheckMetricsqlCase(cases []MetricsqlTestCase, q datasource.QuerierBuilder) (checkErrs []error) {
|
||||
queries := q.BuildWithParams(datasource.QuerierParams{QueryParams: url.Values{"nocache": {"1"}, "latency_offset": {"1ms"}}, DataSourceType: "prometheus"})
|
||||
Outer:
|
||||
for _, mt := range cases {
|
||||
result, _, err := queries.Query(context.Background(), mt.Expr, mt.EvalTime.ParseTime())
|
||||
if err != nil {
|
||||
checkErrs = append(checkErrs, fmt.Errorf(" expr: %q, time: %s, err: %w", mt.Expr,
|
||||
mt.EvalTime.Duration().String(), err))
|
||||
continue
|
||||
}
|
||||
var gotSamples []parsedSample
|
||||
for _, s := range result.Data {
|
||||
sort.Slice(s.Labels, func(i, j int) bool {
|
||||
return s.Labels[i].Name < s.Labels[j].Name
|
||||
})
|
||||
gotSamples = append(gotSamples, parsedSample{
|
||||
Labels: s.Labels,
|
||||
Value: s.Values[0],
|
||||
})
|
||||
}
|
||||
var expSamples []parsedSample
|
||||
for _, s := range mt.ExpSamples {
|
||||
expLb := datasource.Labels{}
|
||||
if s.Labels != "" {
|
||||
metricsqlExpr, err := metricsql.Parse(s.Labels)
|
||||
if err != nil {
|
||||
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
|
||||
mt.EvalTime.Duration().String(), fmt.Errorf("failed to parse labels %q: %w", s.Labels, err)))
|
||||
continue Outer
|
||||
}
|
||||
metricsqlMetricExpr, ok := metricsqlExpr.(*metricsql.MetricExpr)
|
||||
if !ok {
|
||||
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
|
||||
mt.EvalTime.Duration().String(), fmt.Errorf("got unsupported metricsql type")))
|
||||
continue Outer
|
||||
}
|
||||
for _, l := range metricsqlMetricExpr.LabelFilterss[0] {
|
||||
expLb = append(expLb, datasource.Label{
|
||||
Name: l.Label,
|
||||
Value: l.Value,
|
||||
})
|
||||
}
|
||||
}
|
||||
sort.Slice(expLb, func(i, j int) bool {
|
||||
return expLb[i].Name < expLb[j].Name
|
||||
})
|
||||
expSamples = append(expSamples, parsedSample{
|
||||
Labels: expLb,
|
||||
Value: s.Value,
|
||||
})
|
||||
}
|
||||
sort.Slice(expSamples, func(i, j int) bool {
|
||||
return datasource.LabelCompare(expSamples[i].Labels, expSamples[j].Labels) <= 0
|
||||
})
|
||||
sort.Slice(gotSamples, func(i, j int) bool {
|
||||
return datasource.LabelCompare(gotSamples[i].Labels, gotSamples[j].Labels) <= 0
|
||||
})
|
||||
if !reflect.DeepEqual(expSamples, gotSamples) {
|
||||
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s,\n exp: %v\n got: %v", mt.Expr,
|
||||
mt.EvalTime.Duration().String(), parsedSamplesString(expSamples), parsedSamplesString(gotSamples)))
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
43
app/vmalert/unittest/testdata/disable-group-label.yaml
vendored
Normal file
43
app/vmalert/unittest/testdata/disable-group-label.yaml
vendored
Normal file
|
@ -0,0 +1,43 @@
|
|||
rule_files:
|
||||
- rules.yaml
|
||||
|
||||
evaluation_interval: 1m
|
||||
|
||||
tests:
|
||||
- interval: 1m
|
||||
input_series:
|
||||
- series: 'up{job="vmagent2", instance="localhost:9090"}'
|
||||
values: "0+0x1440"
|
||||
|
||||
metricsql_expr_test:
|
||||
- expr: suquery_interval_test
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="vmagent2"}'
|
||||
value: 1
|
||||
|
||||
alert_rule_test:
|
||||
- eval_time: 2h
|
||||
alertname: InstanceDown
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
job: vmagent2
|
||||
severity: page
|
||||
instance: localhost:9090
|
||||
datacenter: dc-123
|
||||
exp_annotations:
|
||||
summary: "Instance localhost:9090 down"
|
||||
description: "localhost:9090 of job vmagent2 has been down for more than 5 minutes."
|
||||
|
||||
- eval_time: 0
|
||||
alertname: AlwaysFiring
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
datacenter: dc-123
|
||||
|
||||
- eval_time: 0
|
||||
alertname: InstanceDown
|
||||
exp_alerts: []
|
||||
|
||||
external_labels:
|
||||
datacenter: dc-123
|
41
app/vmalert/unittest/testdata/failed-test.yaml
vendored
Normal file
41
app/vmalert/unittest/testdata/failed-test.yaml
vendored
Normal file
|
@ -0,0 +1,41 @@
|
|||
rule_files:
|
||||
- rules.yaml
|
||||
|
||||
tests:
|
||||
- interval: 1m
|
||||
name: "Failing test"
|
||||
input_series:
|
||||
- series: test
|
||||
values: "0"
|
||||
|
||||
metricsql_expr_test:
|
||||
- expr: test
|
||||
eval_time: 0m
|
||||
exp_samples:
|
||||
- value: 0
|
||||
labels: test
|
||||
|
||||
# will failed cause there is no "Test" group and rule defined
|
||||
alert_rule_test:
|
||||
- eval_time: 0m
|
||||
groupname: Test
|
||||
alertname: Test
|
||||
exp_alerts:
|
||||
- exp_labels: {}
|
||||
|
||||
- interval: 1m
|
||||
name: Failing alert test
|
||||
input_series:
|
||||
- series: 'up{job="test"}'
|
||||
values: 0x10
|
||||
|
||||
alert_rule_test:
|
||||
# will failed cause rule is firing
|
||||
- eval_time: 5m
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts: []
|
||||
# will failed cause missing groupname
|
||||
- eval_time: 5m
|
||||
alertname: AlwaysFiring
|
||||
exp_alerts: []
|
30
app/vmalert/unittest/testdata/long-period.yaml
vendored
Normal file
30
app/vmalert/unittest/testdata/long-period.yaml
vendored
Normal file
|
@ -0,0 +1,30 @@
|
|||
# can be executed successfully but will take more than 1 minute
|
||||
# not included in unit test now
|
||||
evaluation_interval: 100d
|
||||
|
||||
rule_files:
|
||||
- rules.yaml
|
||||
|
||||
tests:
|
||||
- interval: 1d
|
||||
input_series:
|
||||
- series: test
|
||||
# Max time in time.Duration is 106751d from 1970 (2^63/10^9), i.e. 2262.
|
||||
# But VictoriaMetrics supports maxTimestamp value +2 days from now. see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/827.
|
||||
# We input series to 2024-01-01T00:00:00 here.
|
||||
values: "0+1x19723"
|
||||
|
||||
metricsql_expr_test:
|
||||
- expr: timestamp(test)
|
||||
eval_time: 0m
|
||||
exp_samples:
|
||||
- value: 0
|
||||
- expr: test
|
||||
eval_time: 100d
|
||||
exp_samples:
|
||||
- labels: test
|
||||
value: 100
|
||||
- expr: timestamp(test)
|
||||
eval_time: 19000d
|
||||
exp_samples:
|
||||
- value: 1641600000 # 19000d -> seconds.
|
39
app/vmalert/unittest/testdata/rules.yaml
vendored
Normal file
39
app/vmalert/unittest/testdata/rules.yaml
vendored
Normal file
|
@ -0,0 +1,39 @@
|
|||
groups:
|
||||
- name: group1
|
||||
rules:
|
||||
- alert: InstanceDown
|
||||
expr: up == 0
|
||||
for: 5m
|
||||
labels:
|
||||
severity: page
|
||||
annotations:
|
||||
summary: "Instance {{ $labels.instance }} down"
|
||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
|
||||
- alert: AlwaysFiring
|
||||
expr: 1
|
||||
- alert: SameAlertNameWithDifferentGroup
|
||||
expr: absent(test)
|
||||
for: 1m
|
||||
|
||||
- name: group2
|
||||
rules:
|
||||
- record: t1
|
||||
expr: test
|
||||
- record: job:test:count_over_time1m
|
||||
expr: sum without(instance) (count_over_time(test[1m]))
|
||||
- record: suquery_interval_test
|
||||
expr: count_over_time(up[5m:])
|
||||
|
||||
- alert: SameAlertNameWithDifferentGroup
|
||||
expr: absent(test)
|
||||
for: 5m
|
||||
|
||||
- name: group3
|
||||
rules:
|
||||
- record: t2
|
||||
expr: t1
|
||||
|
||||
- name: group4
|
||||
rules:
|
||||
- record: t3
|
||||
expr: t1
|
89
app/vmalert/unittest/testdata/test1.yaml
vendored
Normal file
89
app/vmalert/unittest/testdata/test1.yaml
vendored
Normal file
|
@ -0,0 +1,89 @@
|
|||
rule_files:
|
||||
- rules.yaml
|
||||
|
||||
evaluation_interval: 1m
|
||||
group_eval_order: ["group4", "group2", "group3"]
|
||||
|
||||
tests:
|
||||
- interval: 1m
|
||||
name: "basic test"
|
||||
input_series:
|
||||
- series: "test"
|
||||
values: "_x5 1x5"
|
||||
|
||||
alert_rule_test:
|
||||
- eval_time: 1m
|
||||
groupname: group1
|
||||
alertname: SameAlertNameWithDifferentGroup
|
||||
exp_alerts:
|
||||
- {}
|
||||
- eval_time: 1m
|
||||
groupname: group2
|
||||
alertname: SameAlertNameWithDifferentGroup
|
||||
exp_alerts: []
|
||||
- eval_time: 6m
|
||||
groupname: group1
|
||||
alertname: SameAlertNameWithDifferentGroup
|
||||
exp_alerts: []
|
||||
|
||||
- interval: 1m
|
||||
name: "basic test2"
|
||||
input_series:
|
||||
- series: 'up{job="vmagent1", instance="localhost:9090"}'
|
||||
values: "0+0x1440"
|
||||
- series: "test"
|
||||
values: "0+1x1440"
|
||||
|
||||
metricsql_expr_test:
|
||||
- expr: count(ALERTS) by (alertgroup, alertname, alertstate)
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
- labels: '{alertgroup="group1", alertname="AlwaysFiring", alertstate="firing"}'
|
||||
value: 1
|
||||
- labels: '{alertgroup="group1", alertname="InstanceDown", alertstate="pending"}'
|
||||
value: 1
|
||||
- expr: t1
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
- value: 4
|
||||
labels: '{__name__="t1", datacenter="dc-123"}'
|
||||
- expr: t2
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
- value: 4
|
||||
labels: '{__name__="t2", datacenter="dc-123"}'
|
||||
- expr: t3
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
# t3 is 3 instead of 4 cause it's rules3 is evaluated before rules1
|
||||
- value: 3
|
||||
labels: '{__name__="t3", datacenter="dc-123"}'
|
||||
|
||||
alert_rule_test:
|
||||
- eval_time: 10m
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
job: vmagent1
|
||||
severity: page
|
||||
instance: localhost:9090
|
||||
datacenter: dc-123
|
||||
exp_annotations:
|
||||
summary: "Instance localhost:9090 down"
|
||||
description: "localhost:9090 of job vmagent1 has been down for more than 5 minutes."
|
||||
|
||||
- eval_time: 0
|
||||
groupname: group1
|
||||
alertname: AlwaysFiring
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
datacenter: dc-123
|
||||
|
||||
- eval_time: 0
|
||||
groupname: alerts
|
||||
alertname: InstanceDown
|
||||
exp_alerts: []
|
||||
|
||||
external_labels:
|
||||
datacenter: dc-123
|
46
app/vmalert/unittest/testdata/test2.yaml
vendored
Normal file
46
app/vmalert/unittest/testdata/test2.yaml
vendored
Normal file
|
@ -0,0 +1,46 @@
|
|||
rule_files:
|
||||
- rules.yaml
|
||||
|
||||
evaluation_interval: 1m
|
||||
|
||||
tests:
|
||||
- interval: 1m
|
||||
input_series:
|
||||
- series: 'up{job="vmagent2", instance="localhost:9090"}'
|
||||
values: "0+0x1440"
|
||||
|
||||
metricsql_expr_test:
|
||||
- expr: suquery_interval_test
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="vmagent2"}'
|
||||
value: 1
|
||||
|
||||
alert_rule_test:
|
||||
- eval_time: 2h
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
job: vmagent2
|
||||
severity: page
|
||||
instance: localhost:9090
|
||||
datacenter: dc-123
|
||||
exp_annotations:
|
||||
summary: "Instance localhost:9090 down"
|
||||
description: "localhost:9090 of job vmagent2 has been down for more than 5 minutes."
|
||||
|
||||
- eval_time: 0
|
||||
groupname: group1
|
||||
alertname: AlwaysFiring
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
datacenter: dc-123
|
||||
|
||||
- eval_time: 0
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts: []
|
||||
|
||||
external_labels:
|
||||
datacenter: dc-123
|
83
app/vmalert/unittest/type.go
Normal file
83
app/vmalert/unittest/type.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package unittest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
||||
)
|
||||
|
||||
// parsedSample is a sample with parsed Labels
|
||||
type parsedSample struct {
|
||||
Labels datasource.Labels
|
||||
Value float64
|
||||
}
|
||||
|
||||
func (ps *parsedSample) String() string {
|
||||
return ps.Labels.String() + " " + strconv.FormatFloat(ps.Value, 'E', -1, 64)
|
||||
}
|
||||
|
||||
func parsedSamplesString(pss []parsedSample) string {
|
||||
if len(pss) == 0 {
|
||||
return "nil"
|
||||
}
|
||||
s := pss[0].String()
|
||||
for _, ps := range pss[1:] {
|
||||
s += ", " + ps.String()
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// LabelAndAnnotation holds labels and annotations
|
||||
type LabelAndAnnotation struct {
|
||||
Labels datasource.Labels
|
||||
Annotations datasource.Labels
|
||||
}
|
||||
|
||||
func (la *LabelAndAnnotation) String() string {
|
||||
return "Labels:" + la.Labels.String() + "\nAnnotations:" + la.Annotations.String()
|
||||
}
|
||||
|
||||
// LabelsAndAnnotations is collection of LabelAndAnnotation
|
||||
type LabelsAndAnnotations []LabelAndAnnotation
|
||||
|
||||
func (la LabelsAndAnnotations) Len() int { return len(la) }
|
||||
|
||||
func (la LabelsAndAnnotations) Swap(i, j int) { la[i], la[j] = la[j], la[i] }
|
||||
func (la LabelsAndAnnotations) Less(i, j int) bool {
|
||||
diff := datasource.LabelCompare(la[i].Labels, la[j].Labels)
|
||||
if diff != 0 {
|
||||
return diff < 0
|
||||
}
|
||||
return datasource.LabelCompare(la[i].Annotations, la[j].Annotations) < 0
|
||||
}
|
||||
|
||||
func (la LabelsAndAnnotations) String() string {
|
||||
if len(la) == 0 {
|
||||
return "[]"
|
||||
}
|
||||
s := "[\n0:" + IndentLines("\n"+la[0].String(), " ")
|
||||
for i, l := range la[1:] {
|
||||
s += ",\n" + fmt.Sprintf("%d", i+1) + ":" + IndentLines("\n"+l.String(), " ")
|
||||
}
|
||||
s += "\n]"
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// IndentLines prefixes each line in the supplied string with the given "indent" string.
|
||||
func IndentLines(lines, indent string) string {
|
||||
sb := strings.Builder{}
|
||||
n := strings.Split(lines, "\n")
|
||||
for i, l := range n {
|
||||
if i > 0 {
|
||||
sb.WriteString(indent)
|
||||
}
|
||||
sb.WriteString(l)
|
||||
if i != len(n)-1 {
|
||||
sb.WriteRune('\n')
|
||||
}
|
||||
}
|
||||
return sb.String()
|
||||
}
|
|
@ -30,7 +30,7 @@ var (
|
|||
)
|
||||
|
||||
var (
|
||||
saCfgReloaderStopCh = make(chan struct{})
|
||||
saCfgReloaderStopCh chan struct{}
|
||||
saCfgReloaderWG sync.WaitGroup
|
||||
|
||||
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
|
||||
|
@ -59,6 +59,7 @@ func CheckStreamAggrConfig() error {
|
|||
//
|
||||
// MustStopStreamAggr must be called when stream aggr is no longer needed.
|
||||
func InitStreamAggr() {
|
||||
saCfgReloaderStopCh = make(chan struct{})
|
||||
if *streamAggrConfig == "" {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
|||
* FEATUTE: [vmalert](https://docs.victoriametrics.com/vmalert.html): display the error message received during unsuccessful config reload in vmalert's UI. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4076) for details.
|
||||
* FEATUTE: [vmalert](https://docs.victoriametrics.com/vmalert.html): allow disabling of `step` param attached to [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query). This might be useful for using vmalert with datasources that to not support this param, unlike VictoriaMetrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4573) for details.
|
||||
* FEATUTE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support option for "blackholing" alerting notifications if `-notifier.blackhole` cmd-line flag is set. Enable this flag if you want vmalert to evaluate alerting rules without sending any notifications to external receivers (eg. alertmanager). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4122) for details. Thanks to @venkatbvc for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4639).
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add unit test for alerting and recording rules, see more [details](https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules) here. Thanks to @Haleygo for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4596).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): expose `vmauth_user_request_duration_seconds` and `vmauth_unauthorized_user_request_duration_seconds` summary metrics for measuring requests latency per user.
|
||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): show backup progress percentage in log during backup uploading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
|
||||
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): show restoring progress percentage in log during backup downloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
|
||||
|
|
231
docs/vmalert.md
231
docs/vmalert.md
|
@ -743,6 +743,232 @@ See full description for these flags in `./vmalert -help`.
|
|||
* `query` template function is disabled for performance reasons (might be changed in future);
|
||||
* `limit` group's param has no effect during replay (might be changed in future);
|
||||
|
||||
## Unit Testing for Rules
|
||||
|
||||
You can use `vmalert` to test your rules.
|
||||
It will setup an isolated VM instance, simulate the periodic ingestion of samples for several time series, use those series to evaluate recording and alerting rules, and then test whether the firing alerts or metricsql expressions match what was configured as the expected results.
|
||||
|
||||
```
|
||||
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
|
||||
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
|
||||
```
|
||||
|
||||
### Test file format
|
||||
|
||||
```
|
||||
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
|
||||
# Enterprise version of vmalert supports S3 and GCS paths to rules.
|
||||
rule_files:
|
||||
[ - <file_name> ]
|
||||
|
||||
# The evaluation interval for rules specified in `rule_files`
|
||||
[ evaluation_interval: <duration> | default = 1m ]
|
||||
|
||||
# Groups listed below will be evaluated by order.
|
||||
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
|
||||
group_eval_order:
|
||||
[ - <group_name> ]
|
||||
|
||||
# The list of unit test files to be checked during evaluation.
|
||||
tests:
|
||||
[ - <test_group> ]
|
||||
```
|
||||
|
||||
#### `<test_group>`
|
||||
|
||||
```
|
||||
# Interval between samples for input series
|
||||
interval: <duration>
|
||||
# Time series to persist into the database according to configured <interval> before running tests.
|
||||
input_series:
|
||||
[ - <series> ]
|
||||
|
||||
# Name of the test group, optional
|
||||
[ name: <string> ]
|
||||
|
||||
# Unit tests for alerting rules. Alerting rules are from the input file.
|
||||
alert_rule_test:
|
||||
[ - <alert_test_case> ]
|
||||
|
||||
# Unit tests for Metricsql expressions.
|
||||
metricsql_expr_test:
|
||||
[ - <metricsql_expr_test> ]
|
||||
|
||||
# External labels accessible to the alert template.
|
||||
external_labels:
|
||||
[ <labelname>: <string> ... ]
|
||||
|
||||
```
|
||||
|
||||
#### `<series>`
|
||||
|
||||
```
|
||||
# series should format as '<metric name>{<label name>=<label value>, ...}'
|
||||
# Examples:
|
||||
# series_name{label1="value1", label2="value2"}
|
||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
||||
series: <string>
|
||||
|
||||
# values support several special equations:
|
||||
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
|
||||
# Read this as series starts at a, then c further samples incrementing by b.
|
||||
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
|
||||
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
|
||||
# '_' represents a missing sample from scrape
|
||||
# Examples:
|
||||
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
|
||||
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
|
||||
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
|
||||
# 4. ' 1 _x3' becomes '1 _ _ _ ' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
|
||||
values: <string>
|
||||
```
|
||||
|
||||
#### `<alert_test_case>`
|
||||
|
||||
vmalert by default adds group name and alert name to generated alerts and timeseries.
|
||||
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`, but no need to add them under `exp_alerts`.
|
||||
You can also pass `--disableAlertgroupLabel` to prevent adding groupname label, in that case, `groupname` can be missed.
|
||||
|
||||
```
|
||||
# The time elapsed from time=0s when this alerting rule be checked.
|
||||
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
|
||||
eval_time: <duration>
|
||||
|
||||
# Name of the group name to be tested.
|
||||
groupname: <string>
|
||||
|
||||
# Name of the alert to be tested.
|
||||
alertname: <string>
|
||||
|
||||
# List of expected alerts which are firing under the given alertname at
|
||||
# given evaluation time. If you want to test if an alerting rule should
|
||||
# not be firing, then you can mention the above fields and leave 'exp_alerts' empty.
|
||||
exp_alerts:
|
||||
[ - <alert> ]
|
||||
```
|
||||
|
||||
#### `<alert>`
|
||||
|
||||
```
|
||||
# These are the expanded labels and annotations of the expected alert.
|
||||
# Note: labels also include the labels of the sample associated with the alert
|
||||
exp_labels:
|
||||
[ <labelname>: <string> ]
|
||||
exp_annotations:
|
||||
[ <labelname>: <string> ]
|
||||
```
|
||||
|
||||
#### `<metricsql_expr_test>`
|
||||
|
||||
```
|
||||
# Expression to evaluate
|
||||
expr: <string>
|
||||
|
||||
# The time elapsed from time=0s when this expression be evaluated.
|
||||
eval_time: <duration>
|
||||
|
||||
# Expected samples at the given evaluation time.
|
||||
exp_samples:
|
||||
[ - <sample> ]
|
||||
```
|
||||
|
||||
#### `<sample>`
|
||||
|
||||
```
|
||||
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
|
||||
# Examples:
|
||||
# series_name{label1="value1", label2="value2"}
|
||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
||||
labels: <string>
|
||||
|
||||
# The expected value of the Metricsql expression.
|
||||
value: <number>
|
||||
```
|
||||
|
||||
### Example
|
||||
|
||||
This is an example input file for unit testing which will passes. `test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
|
||||
|
||||
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
|
||||
|
||||
#### `test.yaml`
|
||||
|
||||
```
|
||||
rule_files:
|
||||
- rules.yaml
|
||||
|
||||
evaluation_interval: 1m
|
||||
|
||||
tests:
|
||||
- interval: 1m
|
||||
input_series:
|
||||
- series: 'up{job="prometheus", instance="localhost:9090"}'
|
||||
values: "0+0x1440"
|
||||
|
||||
metricsql_expr_test:
|
||||
- expr: suquery_interval_test
|
||||
eval_time: 4m
|
||||
exp_samples:
|
||||
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
|
||||
value: 1
|
||||
|
||||
alert_rule_test:
|
||||
- eval_time: 2h
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
job: prometheus
|
||||
severity: page
|
||||
instance: localhost:9090
|
||||
datacenter: dc-123
|
||||
exp_annotations:
|
||||
summary: "Instance localhost:9090 down"
|
||||
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
|
||||
|
||||
- eval_time: 0
|
||||
groupname: group1
|
||||
alertname: AlwaysFiring
|
||||
exp_alerts:
|
||||
- exp_labels:
|
||||
datacenter: dc-123
|
||||
|
||||
- eval_time: 0
|
||||
groupname: group1
|
||||
alertname: InstanceDown
|
||||
exp_alerts: []
|
||||
|
||||
external_labels:
|
||||
datacenter: dc-123
|
||||
```
|
||||
|
||||
#### `alerts.yaml`
|
||||
|
||||
```
|
||||
# This is the rules file.
|
||||
|
||||
groups:
|
||||
- name: group1
|
||||
rules:
|
||||
- alert: InstanceDown
|
||||
expr: up == 0
|
||||
for: 5m
|
||||
labels:
|
||||
severity: page
|
||||
annotations:
|
||||
summary: "Instance {{ $labels.instance }} down"
|
||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
|
||||
- alert: AlwaysFiring
|
||||
expr: 1
|
||||
|
||||
- name: group2
|
||||
rules:
|
||||
- record: job:test:count_over_time1m
|
||||
expr: sum without(instance) (count_over_time(test[1m]))
|
||||
- record: suquery_interval_test
|
||||
expr: count_over_time(up[5m:])
|
||||
```
|
||||
|
||||
## Monitoring
|
||||
|
||||
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
|
||||
|
@ -1293,6 +1519,11 @@ The shortlist of configuration flags is the following:
|
|||
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
|
||||
-tlsMinVersion string
|
||||
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
|
||||
-unittestFile array
|
||||
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
|
||||
Examples:
|
||||
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
|
||||
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
|
||||
-version
|
||||
Show VictoriaMetrics version
|
||||
```
|
||||
|
|
6
go.mod
6
go.mod
|
@ -39,7 +39,10 @@ require (
|
|||
gopkg.in/yaml.v2 v2.4.0
|
||||
)
|
||||
|
||||
require github.com/bmatcuk/doublestar/v4 v4.6.0
|
||||
require (
|
||||
github.com/bmatcuk/doublestar/v4 v4.6.0
|
||||
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.110.6 // indirect
|
||||
|
@ -115,7 +118,6 @@ require (
|
|||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/goleak v1.2.1 // indirect
|
||||
golang.org/x/crypto v0.11.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
|
||||
golang.org/x/sync v0.3.0 // indirect
|
||||
golang.org/x/text v0.11.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
|
|
|
@ -45,6 +45,14 @@ func (pd *Duration) Duration() time.Duration {
|
|||
return pd.D
|
||||
}
|
||||
|
||||
// ParseTime returns time for pd.
|
||||
func (pd *Duration) ParseTime() time.Time {
|
||||
if pd == nil {
|
||||
return time.Time{}
|
||||
}
|
||||
return time.UnixMilli(pd.Duration().Milliseconds())
|
||||
}
|
||||
|
||||
// ParseDuration parses duration string in Prometheus format
|
||||
func ParseDuration(s string) (time.Duration, error) {
|
||||
ms, err := metricsql.DurationValue(s, 0)
|
||||
|
|
Loading…
Reference in a new issue