mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vmalert: revert unittest feature (#4734)
* Revert "vmalert: unittest support stale datapoint (#4696)" This reverts commit0b44df7ec8
. * Revert "docs: specify min version and limitations for vmalert's unit tests" This reverts commita24541bd
Signed-off-by: hagen1778 <roman@victoriametrics.com> * Revert "vmalert: init unit test (#4596)" This reverts commitda60a68d
Signed-off-by: hagen1778 <roman@victoriametrics.com> * docs: mention unittest revert in changelog Signed-off-by: hagen1778 <roman@victoriametrics.com> --------- Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
f8d30a486e
commit
9f1b9b86cc
29 changed files with 325 additions and 2248 deletions
|
@ -74,7 +74,6 @@ test-vmalert:
|
||||||
go test -v -race -cover ./app/vmalert/config
|
go test -v -race -cover ./app/vmalert/config
|
||||||
go test -v -race -cover ./app/vmalert/remotewrite
|
go test -v -race -cover ./app/vmalert/remotewrite
|
||||||
go test -v -race -cover ./app/vmalert/utils
|
go test -v -race -cover ./app/vmalert/utils
|
||||||
go test -v -race -cover ./app/vmalert/unittest
|
|
||||||
|
|
||||||
run-vmalert: vmalert
|
run-vmalert: vmalert
|
||||||
./bin/vmalert -rule=app/vmalert/config/testdata/rules/rules2-good.rules \
|
./bin/vmalert -rule=app/vmalert/config/testdata/rules/rules2-good.rules \
|
||||||
|
@ -104,10 +103,6 @@ replay-vmalert: vmalert
|
||||||
-replay.timeFrom=2021-05-11T07:21:43Z \
|
-replay.timeFrom=2021-05-11T07:21:43Z \
|
||||||
-replay.timeTo=2021-05-29T18:40:43Z
|
-replay.timeTo=2021-05-29T18:40:43Z
|
||||||
|
|
||||||
unittest-vmalert: vmalert
|
|
||||||
./bin/vmalert -unittestFile=app/vmalert/unittest/testdata/test1.yaml \
|
|
||||||
-unittestFile=app/vmalert/unittest/testdata/test2.yaml
|
|
||||||
|
|
||||||
vmalert-linux-amd64:
|
vmalert-linux-amd64:
|
||||||
APP_NAME=vmalert CGO_ENABLED=1 GOOS=linux GOARCH=amd64 $(MAKE) app-local-goos-goarch
|
APP_NAME=vmalert CGO_ENABLED=1 GOOS=linux GOARCH=amd64 $(MAKE) app-local-goos-goarch
|
||||||
|
|
||||||
|
|
|
@ -742,249 +742,6 @@ See full description for these flags in `./vmalert -help`.
|
||||||
* `limit` group's param has no effect during replay (might be changed in future);
|
* `limit` group's param has no effect during replay (might be changed in future);
|
||||||
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
|
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
|
||||||
|
|
||||||
## Unit Testing for Rules
|
|
||||||
|
|
||||||
> Unit testing is available from v1.92.0.
|
|
||||||
> Unit tests do not respect `-clusterMode` for now.
|
|
||||||
|
|
||||||
You can use `vmalert` to run unit tests for alerting and recording rules.
|
|
||||||
In unit test mode vmalert performs the following actions:
|
|
||||||
* sets up an isolated VictoriaMetrics instance;
|
|
||||||
* simulates the periodic ingestion of time series;
|
|
||||||
* queries the ingested data for recording and alerting rules evaluation;
|
|
||||||
* tests whether the firing alerts or resulting recording rules match the expected results.
|
|
||||||
|
|
||||||
See how to run vmalert in unit test mode below:
|
|
||||||
```
|
|
||||||
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
|
|
||||||
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
|
|
||||||
```
|
|
||||||
|
|
||||||
vmalert is compatible with [Prometheus config format for tests](https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#test-file-format)
|
|
||||||
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
|
|
||||||
validates and executes [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) expressions,
|
|
||||||
which aren't always backward compatible with [PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/).
|
|
||||||
|
|
||||||
### Test file format
|
|
||||||
|
|
||||||
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
|
|
||||||
```
|
|
||||||
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
|
|
||||||
# Enterprise version of vmalert supports S3 and GCS paths to rules.
|
|
||||||
rule_files:
|
|
||||||
[ - <string> ]
|
|
||||||
|
|
||||||
# The evaluation interval for rules specified in `rule_files`
|
|
||||||
[ evaluation_interval: <duration> | default = 1m ]
|
|
||||||
|
|
||||||
# Groups listed below will be evaluated by order.
|
|
||||||
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
|
|
||||||
group_eval_order:
|
|
||||||
[ - <string> ]
|
|
||||||
|
|
||||||
# The list of unit test files to be checked during evaluation.
|
|
||||||
tests:
|
|
||||||
[ - <test_group> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<test_group>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# Interval between samples for input series
|
|
||||||
interval: <duration>
|
|
||||||
# Time series to persist into the database according to configured <interval> before running tests.
|
|
||||||
input_series:
|
|
||||||
[ - <series> ]
|
|
||||||
|
|
||||||
# Name of the test group, optional
|
|
||||||
[ name: <string> ]
|
|
||||||
|
|
||||||
# Unit tests for alerting rules
|
|
||||||
alert_rule_test:
|
|
||||||
[ - <alert_test_case> ]
|
|
||||||
|
|
||||||
# Unit tests for Metricsql expressions.
|
|
||||||
metricsql_expr_test:
|
|
||||||
[ - <metricsql_expr_test> ]
|
|
||||||
|
|
||||||
# External labels accessible for templating.
|
|
||||||
external_labels:
|
|
||||||
[ <labelname>: <string> ... ]
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<series>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# series in the following format '<metric name>{<label name>=<label value>, ...}'
|
|
||||||
# Examples:
|
|
||||||
# series_name{label1="value1", label2="value2"}
|
|
||||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
|
||||||
series: <string>
|
|
||||||
|
|
||||||
# values support several special equations:
|
|
||||||
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
|
|
||||||
# Read this as series starts at a, then c further samples incrementing by b.
|
|
||||||
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
|
|
||||||
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
|
|
||||||
# '_' represents a missing sample from scrape
|
|
||||||
# 'stale' indicates a stale sample
|
|
||||||
# Examples:
|
|
||||||
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
|
|
||||||
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
|
|
||||||
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
|
|
||||||
# 4. ' 1 _x3 stale' becomes '1 _ _ _ stale' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
|
|
||||||
values: <string>
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<alert_test_case>`
|
|
||||||
|
|
||||||
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
|
|
||||||
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
|
|
||||||
but no need to add them under `exp_alerts`.
|
|
||||||
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
|
|
||||||
|
|
||||||
```
|
|
||||||
# The time elapsed from time=0s when this alerting rule should be checked.
|
|
||||||
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
|
|
||||||
eval_time: <duration>
|
|
||||||
|
|
||||||
# Name of the group name to be tested.
|
|
||||||
groupname: <string>
|
|
||||||
|
|
||||||
# Name of the alert to be tested.
|
|
||||||
alertname: <string>
|
|
||||||
|
|
||||||
# List of the expected alerts that are firing under the given alertname at
|
|
||||||
# the given evaluation time. If you want to test if an alerting rule should
|
|
||||||
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
|
|
||||||
exp_alerts:
|
|
||||||
[ - <alert> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<alert>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# These are the expanded labels and annotations of the expected alert.
|
|
||||||
# Note: labels also include the labels of the sample associated with the alert
|
|
||||||
exp_labels:
|
|
||||||
[ <labelname>: <string> ]
|
|
||||||
exp_annotations:
|
|
||||||
[ <labelname>: <string> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<metricsql_expr_test>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# Expression to evaluate
|
|
||||||
expr: <string>
|
|
||||||
|
|
||||||
# The time elapsed from time=0s when this expression be evaluated.
|
|
||||||
eval_time: <duration>
|
|
||||||
|
|
||||||
# Expected samples at the given evaluation time.
|
|
||||||
exp_samples:
|
|
||||||
[ - <sample> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<sample>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
|
|
||||||
# Examples:
|
|
||||||
# series_name{label1="value1", label2="value2"}
|
|
||||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
|
||||||
labels: <string>
|
|
||||||
|
|
||||||
# The expected value of the Metricsql expression.
|
|
||||||
value: <number>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Example
|
|
||||||
|
|
||||||
This is an example input file for unit testing which will pass.
|
|
||||||
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
|
|
||||||
|
|
||||||
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
|
|
||||||
|
|
||||||
#### `test.yaml`
|
|
||||||
|
|
||||||
```
|
|
||||||
rule_files:
|
|
||||||
- rules.yaml
|
|
||||||
|
|
||||||
evaluation_interval: 1m
|
|
||||||
|
|
||||||
tests:
|
|
||||||
- interval: 1m
|
|
||||||
input_series:
|
|
||||||
- series: 'up{job="prometheus", instance="localhost:9090"}'
|
|
||||||
values: "0+0x1440"
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: suquery_interval_test
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
|
|
||||||
value: 1
|
|
||||||
|
|
||||||
alert_rule_test:
|
|
||||||
- eval_time: 2h
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
job: prometheus
|
|
||||||
severity: page
|
|
||||||
instance: localhost:9090
|
|
||||||
datacenter: dc-123
|
|
||||||
exp_annotations:
|
|
||||||
summary: "Instance localhost:9090 down"
|
|
||||||
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: group1
|
|
||||||
alertname: AlwaysFiring
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
datacenter: dc-123
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts: []
|
|
||||||
|
|
||||||
external_labels:
|
|
||||||
datacenter: dc-123
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `alerts.yaml`
|
|
||||||
|
|
||||||
```
|
|
||||||
# This is the rules file.
|
|
||||||
|
|
||||||
groups:
|
|
||||||
- name: group1
|
|
||||||
rules:
|
|
||||||
- alert: InstanceDown
|
|
||||||
expr: up == 0
|
|
||||||
for: 5m
|
|
||||||
labels:
|
|
||||||
severity: page
|
|
||||||
annotations:
|
|
||||||
summary: "Instance {{ $labels.instance }} down"
|
|
||||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
|
|
||||||
- alert: AlwaysFiring
|
|
||||||
expr: 1
|
|
||||||
|
|
||||||
- name: group2
|
|
||||||
rules:
|
|
||||||
- record: job:test:count_over_time1m
|
|
||||||
expr: sum without(instance) (count_over_time(test[1m]))
|
|
||||||
- record: suquery_interval_test
|
|
||||||
expr: count_over_time(up[5m:])
|
|
||||||
```
|
|
||||||
|
|
||||||
## Monitoring
|
## Monitoring
|
||||||
|
|
||||||
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
|
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
|
||||||
|
@ -1535,11 +1292,6 @@ The shortlist of configuration flags is the following:
|
||||||
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
|
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
|
||||||
-tlsMinVersion string
|
-tlsMinVersion string
|
||||||
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
|
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
|
||||||
-unittestFile array
|
|
||||||
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
|
|
||||||
Examples:
|
|
||||||
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
|
|
||||||
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
|
|
||||||
-version
|
-version
|
||||||
Show VictoriaMetrics version
|
Show VictoriaMetrics version
|
||||||
```
|
```
|
||||||
|
|
|
@ -274,7 +274,7 @@ func (g *Group) close() {
|
||||||
|
|
||||||
var skipRandSleepOnGroupStart bool
|
var skipRandSleepOnGroupStart bool
|
||||||
|
|
||||||
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
|
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
|
||||||
defer func() { close(g.finishedCh) }()
|
defer func() { close(g.finishedCh) }()
|
||||||
|
|
||||||
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
|
||||||
|
@ -422,7 +422,7 @@ type executor struct {
|
||||||
notifiers func() []notifier.Notifier
|
notifiers func() []notifier.Notifier
|
||||||
notifierHeaders map[string]string
|
notifierHeaders map[string]string
|
||||||
|
|
||||||
rw remotewrite.RWClient
|
rw *remotewrite.Client
|
||||||
|
|
||||||
previouslySentSeriesToRWMu sync.Mutex
|
previouslySentSeriesToRWMu sync.Mutex
|
||||||
// previouslySentSeriesToRW stores series sent to RW on previous iteration
|
// previouslySentSeriesToRW stores series sent to RW on previous iteration
|
||||||
|
|
|
@ -91,12 +91,7 @@ absolute path to all .tpl files in root.
|
||||||
|
|
||||||
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's Name as label to generated alerts and time series.")
|
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's Name as label to generated alerts and time series.")
|
||||||
|
|
||||||
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.")
|
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.")
|
||||||
unitTestFiles = flagutil.NewArrayString("unittestFile", `Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
|
|
||||||
Examples:
|
|
||||||
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
|
|
||||||
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
|
|
||||||
`)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var alertURLGeneratorFn notifier.AlertURLGenerator
|
var alertURLGeneratorFn notifier.AlertURLGenerator
|
||||||
|
@ -122,13 +117,6 @@ func main() {
|
||||||
logger.Fatalf("failed to parse %q: %s", *ruleTemplatesPath, err)
|
logger.Fatalf("failed to parse %q: %s", *ruleTemplatesPath, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(*unitTestFiles) > 0 {
|
|
||||||
if unitRule(*unitTestFiles...) {
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
if *dryRun {
|
if *dryRun {
|
||||||
groups, err := config.Parse(*rulePath, notifier.ValidateTemplates, true)
|
groups, err := config.Parse(*rulePath, notifier.ValidateTemplates, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -411,7 +399,7 @@ func configsEqual(a, b []config.Group) bool {
|
||||||
// setConfigSuccess sets config reload status to 1.
|
// setConfigSuccess sets config reload status to 1.
|
||||||
func setConfigSuccess(at uint64) {
|
func setConfigSuccess(at uint64) {
|
||||||
configSuccess.Set(1)
|
configSuccess.Set(1)
|
||||||
configTimestamp.Set(at)
|
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||||
// reset the error if any
|
// reset the error if any
|
||||||
setConfigErr(nil)
|
setConfigErr(nil)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,7 @@ type manager struct {
|
||||||
querierBuilder datasource.QuerierBuilder
|
querierBuilder datasource.QuerierBuilder
|
||||||
notifiers func() []notifier.Notifier
|
notifiers func() []notifier.Notifier
|
||||||
|
|
||||||
rw remotewrite.RWClient
|
rw *remotewrite.Client
|
||||||
// remote read builder.
|
// remote read builder.
|
||||||
rr datasource.QuerierBuilder
|
rr datasource.QuerierBuilder
|
||||||
|
|
||||||
|
|
|
@ -257,7 +257,7 @@ func TestManagerUpdate(t *testing.T) {
|
||||||
func TestManagerUpdateNegative(t *testing.T) {
|
func TestManagerUpdateNegative(t *testing.T) {
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
notifiers []notifier.Notifier
|
notifiers []notifier.Notifier
|
||||||
rw remotewrite.RWClient
|
rw *remotewrite.Client
|
||||||
cfg config.Group
|
cfg config.Group
|
||||||
expErr string
|
expErr string
|
||||||
}{
|
}{
|
||||||
|
|
|
@ -1,320 +0,0 @@
|
||||||
package remotewrite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/golang/snappy"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
|
|
||||||
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
|
|
||||||
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
|
|
||||||
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Client is an asynchronous HTTP client for writing
|
|
||||||
// timeseries via remote write protocol.
|
|
||||||
type Client struct {
|
|
||||||
addr string
|
|
||||||
c *http.Client
|
|
||||||
authCfg *promauth.Config
|
|
||||||
input chan prompbmarshal.TimeSeries
|
|
||||||
flushInterval time.Duration
|
|
||||||
maxBatchSize int
|
|
||||||
maxQueueSize int
|
|
||||||
|
|
||||||
wg sync.WaitGroup
|
|
||||||
doneCh chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Config is config for remote write.
|
|
||||||
type Config struct {
|
|
||||||
// Addr of remote storage
|
|
||||||
Addr string
|
|
||||||
AuthCfg *promauth.Config
|
|
||||||
|
|
||||||
// Concurrency defines number of readers that
|
|
||||||
// concurrently read from the queue and flush data
|
|
||||||
Concurrency int
|
|
||||||
// MaxBatchSize defines max number of timeseries
|
|
||||||
// to be flushed at once
|
|
||||||
MaxBatchSize int
|
|
||||||
// MaxQueueSize defines max length of input queue
|
|
||||||
// populated by Push method.
|
|
||||||
// Push will be rejected once queue is full.
|
|
||||||
MaxQueueSize int
|
|
||||||
// FlushInterval defines time interval for flushing batches
|
|
||||||
FlushInterval time.Duration
|
|
||||||
// Transport will be used by the underlying http.Client
|
|
||||||
Transport *http.Transport
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultConcurrency = 4
|
|
||||||
defaultMaxBatchSize = 1e3
|
|
||||||
defaultMaxQueueSize = 1e5
|
|
||||||
defaultFlushInterval = 5 * time.Second
|
|
||||||
defaultWriteTimeout = 30 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
// NewClient returns asynchronous client for
|
|
||||||
// writing timeseries via remotewrite protocol.
|
|
||||||
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
|
||||||
if cfg.Addr == "" {
|
|
||||||
return nil, fmt.Errorf("config.Addr can't be empty")
|
|
||||||
}
|
|
||||||
if cfg.MaxBatchSize == 0 {
|
|
||||||
cfg.MaxBatchSize = defaultMaxBatchSize
|
|
||||||
}
|
|
||||||
if cfg.MaxQueueSize == 0 {
|
|
||||||
cfg.MaxQueueSize = defaultMaxQueueSize
|
|
||||||
}
|
|
||||||
if cfg.FlushInterval == 0 {
|
|
||||||
cfg.FlushInterval = defaultFlushInterval
|
|
||||||
}
|
|
||||||
if cfg.Transport == nil {
|
|
||||||
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
|
|
||||||
}
|
|
||||||
cc := defaultConcurrency
|
|
||||||
if cfg.Concurrency > 0 {
|
|
||||||
cc = cfg.Concurrency
|
|
||||||
}
|
|
||||||
c := &Client{
|
|
||||||
c: &http.Client{
|
|
||||||
Timeout: *sendTimeout,
|
|
||||||
Transport: cfg.Transport,
|
|
||||||
},
|
|
||||||
addr: strings.TrimSuffix(cfg.Addr, "/"),
|
|
||||||
authCfg: cfg.AuthCfg,
|
|
||||||
flushInterval: cfg.FlushInterval,
|
|
||||||
maxBatchSize: cfg.MaxBatchSize,
|
|
||||||
maxQueueSize: cfg.MaxQueueSize,
|
|
||||||
doneCh: make(chan struct{}),
|
|
||||||
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < cc; i++ {
|
|
||||||
c.run(ctx)
|
|
||||||
}
|
|
||||||
return c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push adds timeseries into queue for writing into remote storage.
|
|
||||||
// Push returns and error if client is stopped or if queue is full.
|
|
||||||
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
|
|
||||||
select {
|
|
||||||
case <-c.doneCh:
|
|
||||||
return fmt.Errorf("client is closed")
|
|
||||||
case c.input <- s:
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
|
|
||||||
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
|
|
||||||
c.maxQueueSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close stops the client and waits for all goroutines
|
|
||||||
// to exit.
|
|
||||||
func (c *Client) Close() error {
|
|
||||||
if c.doneCh == nil {
|
|
||||||
return fmt.Errorf("client is already closed")
|
|
||||||
}
|
|
||||||
close(c.input)
|
|
||||||
close(c.doneCh)
|
|
||||||
c.wg.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) run(ctx context.Context) {
|
|
||||||
ticker := time.NewTicker(c.flushInterval)
|
|
||||||
wr := &prompbmarshal.WriteRequest{}
|
|
||||||
shutdown := func() {
|
|
||||||
for ts := range c.input {
|
|
||||||
wr.Timeseries = append(wr.Timeseries, ts)
|
|
||||||
}
|
|
||||||
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
|
||||||
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
|
|
||||||
c.flush(lastCtx, wr)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
c.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer c.wg.Done()
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.doneCh:
|
|
||||||
shutdown()
|
|
||||||
return
|
|
||||||
case <-ctx.Done():
|
|
||||||
shutdown()
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
c.flush(ctx, wr)
|
|
||||||
case ts, ok := <-c.input:
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
wr.Timeseries = append(wr.Timeseries, ts)
|
|
||||||
if len(wr.Timeseries) >= c.maxBatchSize {
|
|
||||||
c.flush(ctx, wr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
|
||||||
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
|
||||||
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
|
|
||||||
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
|
||||||
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
|
||||||
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
|
||||||
|
|
||||||
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
|
|
||||||
return float64(*concurrency)
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
// flush is a blocking function that marshals WriteRequest and sends
|
|
||||||
// it to remote-write endpoint. Flush performs limited amount of retries
|
|
||||||
// if request fails.
|
|
||||||
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
|
||||||
if len(wr.Timeseries) < 1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer prompbmarshal.ResetWriteRequest(wr)
|
|
||||||
defer bufferFlushDuration.UpdateDuration(time.Now())
|
|
||||||
|
|
||||||
data, err := wr.Marshal()
|
|
||||||
if err != nil {
|
|
||||||
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
b := snappy.Encode(nil, data)
|
|
||||||
|
|
||||||
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
|
|
||||||
if retryInterval > maxRetryInterval {
|
|
||||||
retryInterval = maxRetryInterval
|
|
||||||
}
|
|
||||||
timeStart := time.Now()
|
|
||||||
defer sendDuration.Add(time.Since(timeStart).Seconds())
|
|
||||||
L:
|
|
||||||
for attempts := 0; ; attempts++ {
|
|
||||||
err := c.send(ctx, b)
|
|
||||||
if err == nil {
|
|
||||||
sentRows.Add(len(wr.Timeseries))
|
|
||||||
sentBytes.Add(len(b))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, isNotRetriable := err.(*nonRetriableError)
|
|
||||||
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
|
|
||||||
|
|
||||||
if isNotRetriable {
|
|
||||||
// exit fast if error isn't retriable
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if request has been cancelled before backoff
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
|
|
||||||
break L
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
|
|
||||||
if timeLeftForRetries < 0 {
|
|
||||||
// the max retry time has passed, so we give up
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if retryInterval > timeLeftForRetries {
|
|
||||||
retryInterval = timeLeftForRetries
|
|
||||||
}
|
|
||||||
// sleeping to prevent remote db hammering
|
|
||||||
time.Sleep(retryInterval)
|
|
||||||
retryInterval *= 2
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
droppedRows.Add(len(wr.Timeseries))
|
|
||||||
droppedBytes.Add(len(b))
|
|
||||||
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
|
|
||||||
len(wr.Timeseries))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) send(ctx context.Context, data []byte) error {
|
|
||||||
r := bytes.NewReader(data)
|
|
||||||
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RFC standard compliant headers
|
|
||||||
req.Header.Set("Content-Encoding", "snappy")
|
|
||||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
|
||||||
|
|
||||||
// Prometheus compliant headers
|
|
||||||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
|
||||||
|
|
||||||
if c.authCfg != nil {
|
|
||||||
c.authCfg.SetHeaders(req, true)
|
|
||||||
}
|
|
||||||
if !*disablePathAppend {
|
|
||||||
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
|
||||||
}
|
|
||||||
resp, err := c.c.Do(req.WithContext(ctx))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
|
||||||
req.URL.Redacted(), err, len(data), r.Size())
|
|
||||||
}
|
|
||||||
defer func() { _ = resp.Body.Close() }()
|
|
||||||
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
|
|
||||||
// according to https://prometheus.io/docs/concepts/remote_write_spec/
|
|
||||||
// Prometheus remote Write compatible receivers MUST
|
|
||||||
switch resp.StatusCode / 100 {
|
|
||||||
case 2:
|
|
||||||
// respond with a HTTP 2xx status code when the write is successful.
|
|
||||||
return nil
|
|
||||||
case 4:
|
|
||||||
if resp.StatusCode != http.StatusTooManyRequests {
|
|
||||||
// MUST NOT retry write requests on HTTP 4xx responses other than 429
|
|
||||||
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
|
||||||
resp.StatusCode, req.URL.Redacted(), body)}
|
|
||||||
}
|
|
||||||
fallthrough
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
|
||||||
resp.StatusCode, req.URL.Redacted(), body)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type nonRetriableError struct {
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *nonRetriableError) Error() string {
|
|
||||||
return e.err.Error()
|
|
||||||
}
|
|
|
@ -1,97 +0,0 @@
|
||||||
package remotewrite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/golang/snappy"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DebugClient won't push series periodically, but will write data to remote endpoint
|
|
||||||
// immediately when Push() is called
|
|
||||||
type DebugClient struct {
|
|
||||||
addr string
|
|
||||||
c *http.Client
|
|
||||||
|
|
||||||
wg sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDebugClient initiates and returns a new DebugClient
|
|
||||||
func NewDebugClient() (*DebugClient, error) {
|
|
||||||
if *addr == "" {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
t, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to create transport: %w", err)
|
|
||||||
}
|
|
||||||
c := &DebugClient{
|
|
||||||
c: &http.Client{
|
|
||||||
Timeout: *sendTimeout,
|
|
||||||
Transport: t,
|
|
||||||
},
|
|
||||||
addr: strings.TrimSuffix(*addr, "/"),
|
|
||||||
}
|
|
||||||
return c, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push sends the given timeseries to the remote storage.
|
|
||||||
func (c *DebugClient) Push(s prompbmarshal.TimeSeries) error {
|
|
||||||
c.wg.Add(1)
|
|
||||||
defer c.wg.Done()
|
|
||||||
wr := &prompbmarshal.WriteRequest{Timeseries: []prompbmarshal.TimeSeries{s}}
|
|
||||||
data, err := wr.Marshal()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to marshal the given time series: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.send(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close stops the DebugClient
|
|
||||||
func (c *DebugClient) Close() error {
|
|
||||||
c.wg.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *DebugClient) send(data []byte) error {
|
|
||||||
b := snappy.Encode(nil, data)
|
|
||||||
r := bytes.NewReader(b)
|
|
||||||
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RFC standard compliant headers
|
|
||||||
req.Header.Set("Content-Encoding", "snappy")
|
|
||||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
|
||||||
|
|
||||||
// Prometheus compliant headers
|
|
||||||
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
|
||||||
|
|
||||||
if !*disablePathAppend {
|
|
||||||
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
|
||||||
}
|
|
||||||
resp, err := c.c.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
|
||||||
req.URL.Redacted(), err, len(data), r.Size())
|
|
||||||
}
|
|
||||||
defer func() { _ = resp.Body.Close() }()
|
|
||||||
|
|
||||||
if resp.StatusCode/100 == 2 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
|
||||||
resp.StatusCode, req.URL.Redacted(), body)
|
|
||||||
}
|
|
|
@ -1,50 +0,0 @@
|
||||||
package remotewrite
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestDebugClient_Push(t *testing.T) {
|
|
||||||
testSrv := newRWServer()
|
|
||||||
oldAddr := *addr
|
|
||||||
*addr = testSrv.URL
|
|
||||||
defer func() {
|
|
||||||
*addr = oldAddr
|
|
||||||
}()
|
|
||||||
|
|
||||||
client, err := NewDebugClient()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to create debug client: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
const rowsN = 100
|
|
||||||
var sent int
|
|
||||||
for i := 0; i < rowsN; i++ {
|
|
||||||
s := prompbmarshal.TimeSeries{
|
|
||||||
Samples: []prompbmarshal.Sample{{
|
|
||||||
Value: float64(i),
|
|
||||||
Timestamp: time.Now().Unix(),
|
|
||||||
}},
|
|
||||||
}
|
|
||||||
err := client.Push(s)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected err: %s", err)
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
sent++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if sent == 0 {
|
|
||||||
t.Fatalf("0 series sent")
|
|
||||||
}
|
|
||||||
if err := client.Close(); err != nil {
|
|
||||||
t.Fatalf("failed to close client: %s", err)
|
|
||||||
}
|
|
||||||
got := testSrv.accepted()
|
|
||||||
if got != sent {
|
|
||||||
t.Fatalf("expected to have %d series; got %d", sent, got)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,13 +1,320 @@
|
||||||
package remotewrite
|
package remotewrite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/snappy"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RWClient represents an HTTP client for pushing data via remote write protocol
|
var (
|
||||||
type RWClient interface {
|
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
|
||||||
// Push pushes the give time series to remote storage
|
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
|
||||||
Push(s prompbmarshal.TimeSeries) error
|
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
|
||||||
// Close stops the client. Client can't be reused after Close call.
|
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
|
||||||
Close() error
|
)
|
||||||
|
|
||||||
|
// Client is an asynchronous HTTP client for writing
|
||||||
|
// timeseries via remote write protocol.
|
||||||
|
type Client struct {
|
||||||
|
addr string
|
||||||
|
c *http.Client
|
||||||
|
authCfg *promauth.Config
|
||||||
|
input chan prompbmarshal.TimeSeries
|
||||||
|
flushInterval time.Duration
|
||||||
|
maxBatchSize int
|
||||||
|
maxQueueSize int
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
doneCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config is config for remote write.
|
||||||
|
type Config struct {
|
||||||
|
// Addr of remote storage
|
||||||
|
Addr string
|
||||||
|
AuthCfg *promauth.Config
|
||||||
|
|
||||||
|
// Concurrency defines number of readers that
|
||||||
|
// concurrently read from the queue and flush data
|
||||||
|
Concurrency int
|
||||||
|
// MaxBatchSize defines max number of timeseries
|
||||||
|
// to be flushed at once
|
||||||
|
MaxBatchSize int
|
||||||
|
// MaxQueueSize defines max length of input queue
|
||||||
|
// populated by Push method.
|
||||||
|
// Push will be rejected once queue is full.
|
||||||
|
MaxQueueSize int
|
||||||
|
// FlushInterval defines time interval for flushing batches
|
||||||
|
FlushInterval time.Duration
|
||||||
|
// Transport will be used by the underlying http.Client
|
||||||
|
Transport *http.Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultConcurrency = 4
|
||||||
|
defaultMaxBatchSize = 1e3
|
||||||
|
defaultMaxQueueSize = 1e5
|
||||||
|
defaultFlushInterval = 5 * time.Second
|
||||||
|
defaultWriteTimeout = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewClient returns asynchronous client for
|
||||||
|
// writing timeseries via remotewrite protocol.
|
||||||
|
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
|
||||||
|
if cfg.Addr == "" {
|
||||||
|
return nil, fmt.Errorf("config.Addr can't be empty")
|
||||||
|
}
|
||||||
|
if cfg.MaxBatchSize == 0 {
|
||||||
|
cfg.MaxBatchSize = defaultMaxBatchSize
|
||||||
|
}
|
||||||
|
if cfg.MaxQueueSize == 0 {
|
||||||
|
cfg.MaxQueueSize = defaultMaxQueueSize
|
||||||
|
}
|
||||||
|
if cfg.FlushInterval == 0 {
|
||||||
|
cfg.FlushInterval = defaultFlushInterval
|
||||||
|
}
|
||||||
|
if cfg.Transport == nil {
|
||||||
|
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
|
||||||
|
}
|
||||||
|
cc := defaultConcurrency
|
||||||
|
if cfg.Concurrency > 0 {
|
||||||
|
cc = cfg.Concurrency
|
||||||
|
}
|
||||||
|
c := &Client{
|
||||||
|
c: &http.Client{
|
||||||
|
Timeout: *sendTimeout,
|
||||||
|
Transport: cfg.Transport,
|
||||||
|
},
|
||||||
|
addr: strings.TrimSuffix(cfg.Addr, "/"),
|
||||||
|
authCfg: cfg.AuthCfg,
|
||||||
|
flushInterval: cfg.FlushInterval,
|
||||||
|
maxBatchSize: cfg.MaxBatchSize,
|
||||||
|
maxQueueSize: cfg.MaxQueueSize,
|
||||||
|
doneCh: make(chan struct{}),
|
||||||
|
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < cc; i++ {
|
||||||
|
c.run(ctx)
|
||||||
|
}
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Push adds timeseries into queue for writing into remote storage.
|
||||||
|
// Push returns and error if client is stopped or if queue is full.
|
||||||
|
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
|
||||||
|
select {
|
||||||
|
case <-c.doneCh:
|
||||||
|
return fmt.Errorf("client is closed")
|
||||||
|
case c.input <- s:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
|
||||||
|
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
|
||||||
|
c.maxQueueSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close stops the client and waits for all goroutines
|
||||||
|
// to exit.
|
||||||
|
func (c *Client) Close() error {
|
||||||
|
if c.doneCh == nil {
|
||||||
|
return fmt.Errorf("client is already closed")
|
||||||
|
}
|
||||||
|
close(c.input)
|
||||||
|
close(c.doneCh)
|
||||||
|
c.wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) run(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(c.flushInterval)
|
||||||
|
wr := &prompbmarshal.WriteRequest{}
|
||||||
|
shutdown := func() {
|
||||||
|
for ts := range c.input {
|
||||||
|
wr.Timeseries = append(wr.Timeseries, ts)
|
||||||
|
}
|
||||||
|
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
|
||||||
|
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
|
||||||
|
c.flush(lastCtx, wr)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.doneCh:
|
||||||
|
shutdown()
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
shutdown()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
c.flush(ctx, wr)
|
||||||
|
case ts, ok := <-c.input:
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
wr.Timeseries = append(wr.Timeseries, ts)
|
||||||
|
if len(wr.Timeseries) >= c.maxBatchSize {
|
||||||
|
c.flush(ctx, wr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
|
||||||
|
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
|
||||||
|
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
|
||||||
|
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
|
||||||
|
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
|
||||||
|
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
||||||
|
|
||||||
|
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
|
||||||
|
return float64(*concurrency)
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
// flush is a blocking function that marshals WriteRequest and sends
|
||||||
|
// it to remote-write endpoint. Flush performs limited amount of retries
|
||||||
|
// if request fails.
|
||||||
|
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||||
|
if len(wr.Timeseries) < 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer prompbmarshal.ResetWriteRequest(wr)
|
||||||
|
defer bufferFlushDuration.UpdateDuration(time.Now())
|
||||||
|
|
||||||
|
data, err := wr.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("failed to marshal WriteRequest: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b := snappy.Encode(nil, data)
|
||||||
|
|
||||||
|
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
|
||||||
|
if retryInterval > maxRetryInterval {
|
||||||
|
retryInterval = maxRetryInterval
|
||||||
|
}
|
||||||
|
timeStart := time.Now()
|
||||||
|
defer sendDuration.Add(time.Since(timeStart).Seconds())
|
||||||
|
L:
|
||||||
|
for attempts := 0; ; attempts++ {
|
||||||
|
err := c.send(ctx, b)
|
||||||
|
if err == nil {
|
||||||
|
sentRows.Add(len(wr.Timeseries))
|
||||||
|
sentBytes.Add(len(b))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, isNotRetriable := err.(*nonRetriableError)
|
||||||
|
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
|
||||||
|
|
||||||
|
if isNotRetriable {
|
||||||
|
// exit fast if error isn't retriable
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if request has been cancelled before backoff
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
|
||||||
|
break L
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
|
||||||
|
if timeLeftForRetries < 0 {
|
||||||
|
// the max retry time has passed, so we give up
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if retryInterval > timeLeftForRetries {
|
||||||
|
retryInterval = timeLeftForRetries
|
||||||
|
}
|
||||||
|
// sleeping to prevent remote db hammering
|
||||||
|
time.Sleep(retryInterval)
|
||||||
|
retryInterval *= 2
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
droppedRows.Add(len(wr.Timeseries))
|
||||||
|
droppedBytes.Add(len(b))
|
||||||
|
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
|
||||||
|
len(wr.Timeseries))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) send(ctx context.Context, data []byte) error {
|
||||||
|
r := bytes.NewReader(data)
|
||||||
|
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RFC standard compliant headers
|
||||||
|
req.Header.Set("Content-Encoding", "snappy")
|
||||||
|
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||||
|
|
||||||
|
// Prometheus compliant headers
|
||||||
|
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||||
|
|
||||||
|
if c.authCfg != nil {
|
||||||
|
c.authCfg.SetHeaders(req, true)
|
||||||
|
}
|
||||||
|
if !*disablePathAppend {
|
||||||
|
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
||||||
|
}
|
||||||
|
resp, err := c.c.Do(req.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
||||||
|
req.URL.Redacted(), err, len(data), r.Size())
|
||||||
|
}
|
||||||
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
|
||||||
|
// according to https://prometheus.io/docs/concepts/remote_write_spec/
|
||||||
|
// Prometheus remote Write compatible receivers MUST
|
||||||
|
switch resp.StatusCode / 100 {
|
||||||
|
case 2:
|
||||||
|
// respond with a HTTP 2xx status code when the write is successful.
|
||||||
|
return nil
|
||||||
|
case 4:
|
||||||
|
if resp.StatusCode != http.StatusTooManyRequests {
|
||||||
|
// MUST NOT retry write requests on HTTP 4xx responses other than 429
|
||||||
|
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||||
|
resp.StatusCode, req.URL.Redacted(), body)}
|
||||||
|
}
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||||
|
resp.StatusCode, req.URL.Redacted(), body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type nonRetriableError struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *nonRetriableError) Error() string {
|
||||||
|
return e.err.Error()
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ var (
|
||||||
"Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.")
|
"Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.")
|
||||||
)
|
)
|
||||||
|
|
||||||
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewrite.RWClient) error {
|
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewrite.Client) error {
|
||||||
if *replayMaxDatapoints < 1 {
|
if *replayMaxDatapoints < 1 {
|
||||||
return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1")
|
return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1")
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewri
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int {
|
func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
|
||||||
var total int
|
var total int
|
||||||
step := g.Interval * time.Duration(*replayMaxDatapoints)
|
step := g.Interval * time.Duration(*replayMaxDatapoints)
|
||||||
ri := rangeIterator{start: start, end: end, step: step}
|
ri := rangeIterator{start: start, end: end, step: step}
|
||||||
|
@ -119,7 +119,7 @@ func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int {
|
||||||
return total
|
return total
|
||||||
}
|
}
|
||||||
|
|
||||||
func replayRule(rule Rule, start, end time.Time, rw remotewrite.RWClient) (int, error) {
|
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
|
||||||
var err error
|
var err error
|
||||||
var tss []prompbmarshal.TimeSeries
|
var tss []prompbmarshal.TimeSeries
|
||||||
for i := 0; i < *replayRuleRetryAttempts; i++ {
|
for i := 0; i < *replayRuleRetryAttempts; i++ {
|
||||||
|
|
|
@ -1,40 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestUnitRule(t *testing.T) {
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
disableGroupLabel bool
|
|
||||||
files []string
|
|
||||||
failed bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "run multi files",
|
|
||||||
files: []string{"./unittest/testdata/test1.yaml", "./unittest/testdata/test2.yaml"},
|
|
||||||
failed: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "disable group label",
|
|
||||||
disableGroupLabel: true,
|
|
||||||
files: []string{"./unittest/testdata/disable-group-label.yaml"},
|
|
||||||
failed: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "failing test",
|
|
||||||
files: []string{"./unittest/testdata/failed-test.yaml"},
|
|
||||||
failed: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for _, tc := range testCases {
|
|
||||||
oldFlag := *disableAlertGroupLabel
|
|
||||||
*disableAlertGroupLabel = tc.disableGroupLabel
|
|
||||||
fail := unitRule(tc.files...)
|
|
||||||
if fail != tc.failed {
|
|
||||||
t.Fatalf("failed to test %s, expect %t, got %t", tc.name, tc.failed, fail)
|
|
||||||
}
|
|
||||||
*disableAlertGroupLabel = oldFlag
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,436 +0,0 @@
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"reflect"
|
|
||||||
"sort"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"gopkg.in/yaml.v2"
|
|
||||||
|
|
||||||
vmalertconfig "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/notifier"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/remotewrite"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/unittest"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
storagePath string
|
|
||||||
// insert series from 1970-01-01T00:00:00
|
|
||||||
testStartTime = time.Unix(0, 0).UTC()
|
|
||||||
|
|
||||||
testPromWriteHTTPPath = "http://127.0.0.1" + *httpListenAddr + "/api/v1/write"
|
|
||||||
testDataSourcePath = "http://127.0.0.1" + *httpListenAddr + "/prometheus"
|
|
||||||
testRemoteWritePath = "http://127.0.0.1" + *httpListenAddr
|
|
||||||
testHealthHTTPPath = "http://127.0.0.1" + *httpListenAddr + "/health"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
testStoragePath = "vmalert-unittest"
|
|
||||||
testLogLevel = "ERROR"
|
|
||||||
)
|
|
||||||
|
|
||||||
func unitRule(files ...string) bool {
|
|
||||||
storagePath = filepath.Join(os.TempDir(), testStoragePath)
|
|
||||||
processFlags()
|
|
||||||
vminsert.Init()
|
|
||||||
vmselect.Init()
|
|
||||||
// storagePath will be created again when closing vmselect, so remove it again.
|
|
||||||
defer fs.MustRemoveAll(storagePath)
|
|
||||||
defer vminsert.Stop()
|
|
||||||
defer vmselect.Stop()
|
|
||||||
return rulesUnitTest(files...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func rulesUnitTest(files ...string) bool {
|
|
||||||
var failed bool
|
|
||||||
for _, f := range files {
|
|
||||||
if err := ruleUnitTest(f); err != nil {
|
|
||||||
fmt.Println(" FAILED")
|
|
||||||
fmt.Printf("\nfailed to run unit test for file %q: \n%s", f, err)
|
|
||||||
failed = true
|
|
||||||
} else {
|
|
||||||
fmt.Println(" SUCCESS")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return failed
|
|
||||||
}
|
|
||||||
|
|
||||||
func ruleUnitTest(filename string) []error {
|
|
||||||
fmt.Println("\nUnit Testing: ", filename)
|
|
||||||
b, err := os.ReadFile(filename)
|
|
||||||
if err != nil {
|
|
||||||
return []error{fmt.Errorf("failed to read file: %w", err)}
|
|
||||||
}
|
|
||||||
|
|
||||||
var unitTestInp unitTestFile
|
|
||||||
if err := yaml.UnmarshalStrict(b, &unitTestInp); err != nil {
|
|
||||||
return []error{fmt.Errorf("failed to unmarshal file: %w", err)}
|
|
||||||
}
|
|
||||||
if err := resolveAndGlobFilepaths(filepath.Dir(filename), &unitTestInp); err != nil {
|
|
||||||
return []error{fmt.Errorf("failed to resolve path for `rule_files`: %w", err)}
|
|
||||||
}
|
|
||||||
|
|
||||||
if unitTestInp.EvaluationInterval.Duration() == 0 {
|
|
||||||
fmt.Println("evaluation_interval set to 1m by default")
|
|
||||||
unitTestInp.EvaluationInterval = &promutils.Duration{D: 1 * time.Minute}
|
|
||||||
}
|
|
||||||
|
|
||||||
groupOrderMap := make(map[string]int)
|
|
||||||
for i, gn := range unitTestInp.GroupEvalOrder {
|
|
||||||
if _, ok := groupOrderMap[gn]; ok {
|
|
||||||
return []error{fmt.Errorf("group name repeated in `group_eval_order`: %s", gn)}
|
|
||||||
}
|
|
||||||
groupOrderMap[gn] = i
|
|
||||||
}
|
|
||||||
|
|
||||||
testGroups, err := vmalertconfig.Parse(unitTestInp.RuleFiles, nil, true)
|
|
||||||
if err != nil {
|
|
||||||
return []error{fmt.Errorf("failed to parse `rule_files`: %w", err)}
|
|
||||||
}
|
|
||||||
|
|
||||||
var errs []error
|
|
||||||
for _, t := range unitTestInp.Tests {
|
|
||||||
if err := verifyTestGroup(t); err != nil {
|
|
||||||
errs = append(errs, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
testErrs := t.test(unitTestInp.EvaluationInterval.Duration(), groupOrderMap, testGroups)
|
|
||||||
errs = append(errs, testErrs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(errs) > 0 {
|
|
||||||
return errs
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func verifyTestGroup(group testGroup) error {
|
|
||||||
var testGroupName string
|
|
||||||
if group.TestGroupName != "" {
|
|
||||||
testGroupName = fmt.Sprintf("testGroupName: %s\n", group.TestGroupName)
|
|
||||||
}
|
|
||||||
for _, at := range group.AlertRuleTests {
|
|
||||||
if at.Alertname == "" {
|
|
||||||
return fmt.Errorf("\n%s missing required filed \"alertname\"", testGroupName)
|
|
||||||
}
|
|
||||||
if !*disableAlertGroupLabel && at.GroupName == "" {
|
|
||||||
return fmt.Errorf("\n%s missing required filed \"groupname\" when flag \"disableAlertGroupLabel\" is false", testGroupName)
|
|
||||||
}
|
|
||||||
if at.EvalTime == nil {
|
|
||||||
return fmt.Errorf("\n%s missing required filed \"eval_time\"", testGroupName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, et := range group.MetricsqlExprTests {
|
|
||||||
if et.Expr == "" {
|
|
||||||
return fmt.Errorf("\n%s missing required filed \"expr\"", testGroupName)
|
|
||||||
}
|
|
||||||
if et.EvalTime == nil {
|
|
||||||
return fmt.Errorf("\n%s missing required filed \"eval_time\"", testGroupName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func processFlags() {
|
|
||||||
flag.Parse()
|
|
||||||
for _, fv := range []struct {
|
|
||||||
flag string
|
|
||||||
value string
|
|
||||||
}{
|
|
||||||
{flag: "storageDataPath", value: storagePath},
|
|
||||||
{flag: "loggerLevel", value: testLogLevel},
|
|
||||||
{flag: "search.disableCache", value: "true"},
|
|
||||||
// set storage retention time to 100 years, allow to store series from 1970-01-01T00:00:00.
|
|
||||||
{flag: "retentionPeriod", value: "100y"},
|
|
||||||
{flag: "datasource.url", value: testDataSourcePath},
|
|
||||||
{flag: "remoteWrite.url", value: testRemoteWritePath},
|
|
||||||
} {
|
|
||||||
// panics if flag doesn't exist
|
|
||||||
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
|
|
||||||
logger.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func setUp() {
|
|
||||||
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
|
|
||||||
go httpserver.Serve(*httpListenAddr, false, func(w http.ResponseWriter, r *http.Request) bool {
|
|
||||||
switch r.URL.Path {
|
|
||||||
case "/prometheus/api/v1/query":
|
|
||||||
if err := prometheus.QueryHandler(nil, time.Now(), w, r); err != nil {
|
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
case "/prometheus/api/v1/write", "/api/v1/write":
|
|
||||||
if err := promremotewrite.InsertHandler(r); err != nil {
|
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
readyCheckFunc := func() bool {
|
|
||||||
resp, err := http.Get(testHealthHTTPPath)
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
return resp.StatusCode == 200
|
|
||||||
}
|
|
||||||
checkCheck:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
logger.Fatalf("http server can't be ready in 30s")
|
|
||||||
default:
|
|
||||||
if readyCheckFunc() {
|
|
||||||
break checkCheck
|
|
||||||
}
|
|
||||||
time.Sleep(3 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func tearDown() {
|
|
||||||
if err := httpserver.Stop(*httpListenAddr); err != nil {
|
|
||||||
logger.Errorf("cannot stop the webservice: %s", err)
|
|
||||||
}
|
|
||||||
vmstorage.Stop()
|
|
||||||
metrics.UnregisterAllMetrics()
|
|
||||||
fs.MustRemoveAll(storagePath)
|
|
||||||
}
|
|
||||||
|
|
||||||
// resolveAndGlobFilepaths joins all relative paths in a configuration
|
|
||||||
// with a given base directory and replaces all globs with matching files.
|
|
||||||
func resolveAndGlobFilepaths(baseDir string, utf *unitTestFile) error {
|
|
||||||
for i, rf := range utf.RuleFiles {
|
|
||||||
if rf != "" && !filepath.IsAbs(rf) {
|
|
||||||
utf.RuleFiles[i] = filepath.Join(baseDir, rf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var globbedFiles []string
|
|
||||||
for _, rf := range utf.RuleFiles {
|
|
||||||
m, err := filepath.Glob(rf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if len(m) == 0 {
|
|
||||||
fmt.Fprintln(os.Stderr, " WARNING: no file match pattern", rf)
|
|
||||||
}
|
|
||||||
globbedFiles = append(globbedFiles, m...)
|
|
||||||
}
|
|
||||||
utf.RuleFiles = globbedFiles
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]int, testGroups []vmalertconfig.Group) (checkErrs []error) {
|
|
||||||
// set up vmstorage and http server for ingest and read queries
|
|
||||||
setUp()
|
|
||||||
// tear down vmstorage and clean the data dir
|
|
||||||
defer tearDown()
|
|
||||||
|
|
||||||
err := unittest.WriteInputSeries(tg.InputSeries, tg.Interval, testStartTime, testPromWriteHTTPPath)
|
|
||||||
if err != nil {
|
|
||||||
return []error{err}
|
|
||||||
}
|
|
||||||
|
|
||||||
q, err := datasource.Init(nil)
|
|
||||||
if err != nil {
|
|
||||||
return []error{fmt.Errorf("failed to init datasource: %v", err)}
|
|
||||||
}
|
|
||||||
rw, err := remotewrite.NewDebugClient()
|
|
||||||
if err != nil {
|
|
||||||
return []error{fmt.Errorf("failed to init wr: %v", err)}
|
|
||||||
}
|
|
||||||
|
|
||||||
alertEvalTimesMap := map[time.Duration]struct{}{}
|
|
||||||
alertExpResultMap := map[time.Duration]map[string]map[string][]unittest.ExpAlert{}
|
|
||||||
for _, at := range tg.AlertRuleTests {
|
|
||||||
et := at.EvalTime.Duration()
|
|
||||||
alertEvalTimesMap[et] = struct{}{}
|
|
||||||
if _, ok := alertExpResultMap[et]; !ok {
|
|
||||||
alertExpResultMap[et] = make(map[string]map[string][]unittest.ExpAlert)
|
|
||||||
}
|
|
||||||
if _, ok := alertExpResultMap[et][at.GroupName]; !ok {
|
|
||||||
alertExpResultMap[et][at.GroupName] = make(map[string][]unittest.ExpAlert)
|
|
||||||
}
|
|
||||||
alertExpResultMap[et][at.GroupName][at.Alertname] = at.ExpAlerts
|
|
||||||
}
|
|
||||||
alertEvalTimes := make([]time.Duration, 0, len(alertEvalTimesMap))
|
|
||||||
for k := range alertEvalTimesMap {
|
|
||||||
alertEvalTimes = append(alertEvalTimes, k)
|
|
||||||
}
|
|
||||||
sort.Slice(alertEvalTimes, func(i, j int) bool {
|
|
||||||
return alertEvalTimes[i] < alertEvalTimes[j]
|
|
||||||
})
|
|
||||||
|
|
||||||
// sort group eval order according to the given "group_eval_order".
|
|
||||||
sort.Slice(testGroups, func(i, j int) bool {
|
|
||||||
return groupOrderMap[testGroups[i].Name] < groupOrderMap[testGroups[j].Name]
|
|
||||||
})
|
|
||||||
|
|
||||||
// create groups with given rule
|
|
||||||
var groups []*Group
|
|
||||||
for _, group := range testGroups {
|
|
||||||
ng := newGroup(group, q, *evaluationInterval, tg.ExternalLabels)
|
|
||||||
groups = append(groups, ng)
|
|
||||||
}
|
|
||||||
|
|
||||||
e := &executor{
|
|
||||||
rw: rw,
|
|
||||||
notifiers: func() []notifier.Notifier { return nil },
|
|
||||||
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
|
|
||||||
}
|
|
||||||
|
|
||||||
evalIndex := 0
|
|
||||||
maxEvalTime := testStartTime.Add(tg.maxEvalTime())
|
|
||||||
for ts := testStartTime; ts.Before(maxEvalTime) || ts.Equal(maxEvalTime); ts = ts.Add(evalInterval) {
|
|
||||||
for _, g := range groups {
|
|
||||||
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
|
|
||||||
errs := e.execConcurrently(context.Background(), g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
|
|
||||||
for err := range errs {
|
|
||||||
if err != nil {
|
|
||||||
checkErrs = append(checkErrs, fmt.Errorf("\nfailed to exec group: %q, time: %s, err: %w", g.Name,
|
|
||||||
ts, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// flush series after each group evaluation
|
|
||||||
vmstorage.Storage.DebugFlush()
|
|
||||||
}
|
|
||||||
|
|
||||||
// check alert_rule_test case at every eval time
|
|
||||||
for evalIndex < len(alertEvalTimes) {
|
|
||||||
if ts.Sub(testStartTime) > alertEvalTimes[evalIndex] ||
|
|
||||||
alertEvalTimes[evalIndex] >= ts.Add(evalInterval).Sub(testStartTime) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
gotAlertsMap := map[string]map[string]unittest.LabelsAndAnnotations{}
|
|
||||||
for _, g := range groups {
|
|
||||||
if *disableAlertGroupLabel {
|
|
||||||
g.Name = ""
|
|
||||||
}
|
|
||||||
if _, ok := alertExpResultMap[time.Duration(ts.UnixNano())][g.Name]; !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, ok := gotAlertsMap[g.Name]; !ok {
|
|
||||||
gotAlertsMap[g.Name] = make(map[string]unittest.LabelsAndAnnotations)
|
|
||||||
}
|
|
||||||
for _, rule := range g.Rules {
|
|
||||||
ar, isAlertRule := rule.(*AlertingRule)
|
|
||||||
if !isAlertRule {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, ok := alertExpResultMap[time.Duration(ts.UnixNano())][g.Name][ar.Name]; ok {
|
|
||||||
for _, got := range ar.alerts {
|
|
||||||
if got.State != notifier.StateFiring {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
laa := unittest.LabelAndAnnotation{
|
|
||||||
Labels: datasource.ConvertToLabels(got.Labels),
|
|
||||||
Annotations: datasource.ConvertToLabels(got.Annotations),
|
|
||||||
}
|
|
||||||
gotAlertsMap[g.Name][ar.Name] = append(gotAlertsMap[g.Name][ar.Name], laa)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for groupname, gres := range alertExpResultMap[alertEvalTimes[evalIndex]] {
|
|
||||||
for alertname, res := range gres {
|
|
||||||
var expAlerts unittest.LabelsAndAnnotations
|
|
||||||
for _, expAlert := range res {
|
|
||||||
if expAlert.ExpLabels == nil {
|
|
||||||
expAlert.ExpLabels = make(map[string]string)
|
|
||||||
}
|
|
||||||
// alertGroupNameLabel is added as additional labels when `disableAlertGroupLabel` is false
|
|
||||||
if !*disableAlertGroupLabel {
|
|
||||||
expAlert.ExpLabels[alertGroupNameLabel] = groupname
|
|
||||||
}
|
|
||||||
// alertNameLabel is added as additional labels in vmalert.
|
|
||||||
expAlert.ExpLabels[alertNameLabel] = alertname
|
|
||||||
expAlerts = append(expAlerts, unittest.LabelAndAnnotation{
|
|
||||||
Labels: datasource.ConvertToLabels(expAlert.ExpLabels),
|
|
||||||
Annotations: datasource.ConvertToLabels(expAlert.ExpAnnotations),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
sort.Sort(expAlerts)
|
|
||||||
|
|
||||||
gotAlerts := gotAlertsMap[groupname][alertname]
|
|
||||||
sort.Sort(gotAlerts)
|
|
||||||
if !reflect.DeepEqual(expAlerts, gotAlerts) {
|
|
||||||
var testGroupName string
|
|
||||||
if tg.TestGroupName != "" {
|
|
||||||
testGroupName = fmt.Sprintf("testGroupName: %s,\n", tg.TestGroupName)
|
|
||||||
}
|
|
||||||
expString := unittest.IndentLines(expAlerts.String(), " ")
|
|
||||||
gotString := unittest.IndentLines(gotAlerts.String(), " ")
|
|
||||||
checkErrs = append(checkErrs, fmt.Errorf("\n%s groupname: %s, alertname: %s, time: %s, \n exp:%v, \n got:%v ",
|
|
||||||
testGroupName, groupname, alertname, alertEvalTimes[evalIndex].String(), expString, gotString))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
evalIndex++
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
checkErrs = append(checkErrs, unittest.CheckMetricsqlCase(tg.MetricsqlExprTests, q)...)
|
|
||||||
return checkErrs
|
|
||||||
}
|
|
||||||
|
|
||||||
// unitTestFile holds the contents of a single unit test file
|
|
||||||
type unitTestFile struct {
|
|
||||||
RuleFiles []string `yaml:"rule_files"`
|
|
||||||
EvaluationInterval *promutils.Duration `yaml:"evaluation_interval"`
|
|
||||||
GroupEvalOrder []string `yaml:"group_eval_order"`
|
|
||||||
Tests []testGroup `yaml:"tests"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// testGroup is a group of input series and test cases associated with it
|
|
||||||
type testGroup struct {
|
|
||||||
Interval *promutils.Duration `yaml:"interval"`
|
|
||||||
InputSeries []unittest.Series `yaml:"input_series"`
|
|
||||||
AlertRuleTests []unittest.AlertTestCase `yaml:"alert_rule_test"`
|
|
||||||
MetricsqlExprTests []unittest.MetricsqlTestCase `yaml:"metricsql_expr_test"`
|
|
||||||
ExternalLabels map[string]string `yaml:"external_labels"`
|
|
||||||
TestGroupName string `yaml:"name"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// maxEvalTime returns the max eval time among all alert_rule_test and metricsql_expr_test
|
|
||||||
func (tg *testGroup) maxEvalTime() time.Duration {
|
|
||||||
var maxd time.Duration
|
|
||||||
for _, alert := range tg.AlertRuleTests {
|
|
||||||
if alert.EvalTime.Duration() > maxd {
|
|
||||||
maxd = alert.EvalTime.Duration()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, met := range tg.MetricsqlExprTests {
|
|
||||||
if met.EvalTime.Duration() > maxd {
|
|
||||||
maxd = met.EvalTime.Duration()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return maxd
|
|
||||||
}
|
|
|
@ -1,19 +0,0 @@
|
||||||
package unittest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
||||||
)
|
|
||||||
|
|
||||||
// AlertTestCase holds alert_rule_test cases defined in test file
|
|
||||||
type AlertTestCase struct {
|
|
||||||
EvalTime *promutils.Duration `yaml:"eval_time"`
|
|
||||||
GroupName string `yaml:"groupname"`
|
|
||||||
Alertname string `yaml:"alertname"`
|
|
||||||
ExpAlerts []ExpAlert `yaml:"exp_alerts"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ExpAlert holds exp_alerts defined in test file
|
|
||||||
type ExpAlert struct {
|
|
||||||
ExpLabels map[string]string `yaml:"exp_labels"`
|
|
||||||
ExpAnnotations map[string]string `yaml:"exp_annotations"`
|
|
||||||
}
|
|
|
@ -1,182 +0,0 @@
|
||||||
package unittest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"regexp"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
testutil "github.com/VictoriaMetrics/VictoriaMetrics/app/victoria-metrics/test"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Series holds input_series defined in the test file
|
|
||||||
type Series struct {
|
|
||||||
Series string `yaml:"series"`
|
|
||||||
Values string `yaml:"values"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// sequenceValue is an omittable value in a sequence of time series values.
|
|
||||||
type sequenceValue struct {
|
|
||||||
Value float64
|
|
||||||
Omitted bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func httpWrite(address string, r io.Reader) {
|
|
||||||
resp, err := http.Post(address, "", r)
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatalf("failed to send to storage: %v", err)
|
|
||||||
}
|
|
||||||
resp.Body.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteInputSeries send input series to vmstorage and flush them
|
|
||||||
func WriteInputSeries(input []Series, interval *promutils.Duration, startStamp time.Time, dst string) error {
|
|
||||||
r := testutil.WriteRequest{}
|
|
||||||
for _, data := range input {
|
|
||||||
expr, err := metricsql.Parse(data.Series)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to parse series %s: %v", data.Series, err)
|
|
||||||
}
|
|
||||||
promvals, err := parseInputValue(data.Values, true)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to parse input series value %s: %v", data.Values, err)
|
|
||||||
}
|
|
||||||
metricExpr, ok := expr.(*metricsql.MetricExpr)
|
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("failed to parse series %s to metric expr: %v", data.Series, err)
|
|
||||||
}
|
|
||||||
samples := make([]testutil.Sample, 0, len(promvals))
|
|
||||||
ts := startStamp
|
|
||||||
for _, v := range promvals {
|
|
||||||
if !v.Omitted {
|
|
||||||
samples = append(samples, testutil.Sample{
|
|
||||||
Timestamp: ts.UnixMilli(),
|
|
||||||
Value: v.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
ts = ts.Add(interval.Duration())
|
|
||||||
}
|
|
||||||
var ls []testutil.Label
|
|
||||||
for _, filter := range metricExpr.LabelFilterss[0] {
|
|
||||||
ls = append(ls, testutil.Label{Name: filter.Label, Value: filter.Value})
|
|
||||||
}
|
|
||||||
r.Timeseries = append(r.Timeseries, testutil.TimeSeries{Labels: ls, Samples: samples})
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := testutil.Compress(r)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to compress data: %v", err)
|
|
||||||
}
|
|
||||||
// write input series to vm
|
|
||||||
httpWrite(dst, bytes.NewBuffer(data))
|
|
||||||
vmstorage.Storage.DebugFlush()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseInputValue support input like "1", "1+1x1 _ -4 3+20x1", see more examples in test.
|
|
||||||
func parseInputValue(input string, origin bool) ([]sequenceValue, error) {
|
|
||||||
var res []sequenceValue
|
|
||||||
items := strings.Split(input, " ")
|
|
||||||
reg := regexp.MustCompile(`\D?\d*\D?`)
|
|
||||||
for _, item := range items {
|
|
||||||
if item == "stale" {
|
|
||||||
res = append(res, sequenceValue{Value: decimal.StaleNaN})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
vals := reg.FindAllString(item, -1)
|
|
||||||
switch len(vals) {
|
|
||||||
case 1:
|
|
||||||
if vals[0] == "_" {
|
|
||||||
res = append(res, sequenceValue{Omitted: true})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
v, err := strconv.ParseFloat(vals[0], 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
res = append(res, sequenceValue{Value: v})
|
|
||||||
continue
|
|
||||||
case 2:
|
|
||||||
p1 := vals[0][:len(vals[0])-1]
|
|
||||||
v2, err := strconv.ParseInt(vals[1], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
option := vals[0][len(vals[0])-1]
|
|
||||||
switch option {
|
|
||||||
case '+':
|
|
||||||
v1, err := strconv.ParseFloat(p1, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
res = append(res, sequenceValue{Value: v1 + float64(v2)})
|
|
||||||
case 'x':
|
|
||||||
for i := int64(0); i <= v2; i++ {
|
|
||||||
if p1 == "_" {
|
|
||||||
if i == 0 {
|
|
||||||
i = 1
|
|
||||||
}
|
|
||||||
res = append(res, sequenceValue{Omitted: true})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
v1, err := strconv.ParseFloat(p1, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if !origin || v1 == 0 {
|
|
||||||
res = append(res, sequenceValue{Value: v1 * float64(i)})
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newVal := fmt.Sprintf("%s+0x%s", p1, vals[1])
|
|
||||||
newRes, err := parseInputValue(newVal, false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
res = append(res, newRes...)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("got invalid operation %b", option)
|
|
||||||
}
|
|
||||||
case 3:
|
|
||||||
r1, err := parseInputValue(fmt.Sprintf("%s%s", vals[1], vals[2]), false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
p1 := vals[0][:len(vals[0])-1]
|
|
||||||
v1, err := strconv.ParseFloat(p1, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
option := vals[0][len(vals[0])-1]
|
|
||||||
var isAdd bool
|
|
||||||
if option == '+' {
|
|
||||||
isAdd = true
|
|
||||||
}
|
|
||||||
for _, r := range r1 {
|
|
||||||
if isAdd {
|
|
||||||
res = append(res, sequenceValue{
|
|
||||||
Value: r.Value + v1,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
res = append(res, sequenceValue{
|
|
||||||
Value: v1 - r.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unsupported input %s", input)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return res, nil
|
|
||||||
}
|
|
|
@ -1,93 +0,0 @@
|
||||||
package unittest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestParseInputValue(t *testing.T) {
|
|
||||||
testCases := []struct {
|
|
||||||
input string
|
|
||||||
exp []sequenceValue
|
|
||||||
failed bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
"",
|
|
||||||
nil,
|
|
||||||
true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"testfailed",
|
|
||||||
nil,
|
|
||||||
true,
|
|
||||||
},
|
|
||||||
// stale doesn't support operations
|
|
||||||
{
|
|
||||||
"stalex3",
|
|
||||||
nil,
|
|
||||||
true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"-4",
|
|
||||||
[]sequenceValue{{Value: -4}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"_",
|
|
||||||
[]sequenceValue{{Omitted: true}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"stale",
|
|
||||||
[]sequenceValue{{Value: decimal.StaleNaN}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"-4x1",
|
|
||||||
[]sequenceValue{{Value: -4}, {Value: -4}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"_x1",
|
|
||||||
[]sequenceValue{{Omitted: true}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"1+1x4",
|
|
||||||
[]sequenceValue{{Value: 1}, {Value: 2}, {Value: 3}, {Value: 4}, {Value: 5}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"2-1x4",
|
|
||||||
[]sequenceValue{{Value: 2}, {Value: 1}, {Value: 0}, {Value: -1}, {Value: -2}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"1+1x1 _ -4 stale 3+20x1",
|
|
||||||
[]sequenceValue{{Value: 1}, {Value: 2}, {Omitted: true}, {Value: -4}, {Value: decimal.StaleNaN}, {Value: 3}, {Value: 23}},
|
|
||||||
false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
output, err := parseInputValue(tc.input, true)
|
|
||||||
if err != nil != tc.failed {
|
|
||||||
t.Fatalf("failed to parse %s, expect %t, got %t", tc.input, tc.failed, err != nil)
|
|
||||||
}
|
|
||||||
if len(tc.exp) != len(output) {
|
|
||||||
t.Fatalf("expect %v, got %v", tc.exp, output)
|
|
||||||
}
|
|
||||||
for i := 0; i < len(tc.exp); i++ {
|
|
||||||
if tc.exp[i].Omitted != output[i].Omitted {
|
|
||||||
t.Fatalf("expect %v, got %v", tc.exp, output)
|
|
||||||
}
|
|
||||||
if tc.exp[i].Value != output[i].Value {
|
|
||||||
if decimal.IsStaleNaN(tc.exp[i].Value) && decimal.IsStaleNaN(output[i].Value) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
t.Fatalf("expect %v, got %v", tc.exp, output)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,92 +0,0 @@
|
||||||
package unittest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"net/url"
|
|
||||||
"reflect"
|
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
|
||||||
"github.com/VictoriaMetrics/metricsql"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MetricsqlTestCase holds metricsql_expr_test cases defined in test file
|
|
||||||
type MetricsqlTestCase struct {
|
|
||||||
Expr string `yaml:"expr"`
|
|
||||||
EvalTime *promutils.Duration `yaml:"eval_time"`
|
|
||||||
ExpSamples []expSample `yaml:"exp_samples"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type expSample struct {
|
|
||||||
Labels string `yaml:"labels"`
|
|
||||||
Value float64 `yaml:"value"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckMetricsqlCase will check metricsql_expr_test cases
|
|
||||||
func CheckMetricsqlCase(cases []MetricsqlTestCase, q datasource.QuerierBuilder) (checkErrs []error) {
|
|
||||||
queries := q.BuildWithParams(datasource.QuerierParams{QueryParams: url.Values{"nocache": {"1"}, "latency_offset": {"1ms"}}, DataSourceType: "prometheus"})
|
|
||||||
Outer:
|
|
||||||
for _, mt := range cases {
|
|
||||||
result, _, err := queries.Query(context.Background(), mt.Expr, mt.EvalTime.ParseTime())
|
|
||||||
if err != nil {
|
|
||||||
checkErrs = append(checkErrs, fmt.Errorf(" expr: %q, time: %s, err: %w", mt.Expr,
|
|
||||||
mt.EvalTime.Duration().String(), err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var gotSamples []parsedSample
|
|
||||||
for _, s := range result.Data {
|
|
||||||
sort.Slice(s.Labels, func(i, j int) bool {
|
|
||||||
return s.Labels[i].Name < s.Labels[j].Name
|
|
||||||
})
|
|
||||||
gotSamples = append(gotSamples, parsedSample{
|
|
||||||
Labels: s.Labels,
|
|
||||||
Value: s.Values[0],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
var expSamples []parsedSample
|
|
||||||
for _, s := range mt.ExpSamples {
|
|
||||||
expLb := datasource.Labels{}
|
|
||||||
if s.Labels != "" {
|
|
||||||
metricsqlExpr, err := metricsql.Parse(s.Labels)
|
|
||||||
if err != nil {
|
|
||||||
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
|
|
||||||
mt.EvalTime.Duration().String(), fmt.Errorf("failed to parse labels %q: %w", s.Labels, err)))
|
|
||||||
continue Outer
|
|
||||||
}
|
|
||||||
metricsqlMetricExpr, ok := metricsqlExpr.(*metricsql.MetricExpr)
|
|
||||||
if !ok {
|
|
||||||
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
|
|
||||||
mt.EvalTime.Duration().String(), fmt.Errorf("got unsupported metricsql type")))
|
|
||||||
continue Outer
|
|
||||||
}
|
|
||||||
for _, l := range metricsqlMetricExpr.LabelFilterss[0] {
|
|
||||||
expLb = append(expLb, datasource.Label{
|
|
||||||
Name: l.Label,
|
|
||||||
Value: l.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Slice(expLb, func(i, j int) bool {
|
|
||||||
return expLb[i].Name < expLb[j].Name
|
|
||||||
})
|
|
||||||
expSamples = append(expSamples, parsedSample{
|
|
||||||
Labels: expLb,
|
|
||||||
Value: s.Value,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
sort.Slice(expSamples, func(i, j int) bool {
|
|
||||||
return datasource.LabelCompare(expSamples[i].Labels, expSamples[j].Labels) <= 0
|
|
||||||
})
|
|
||||||
sort.Slice(gotSamples, func(i, j int) bool {
|
|
||||||
return datasource.LabelCompare(gotSamples[i].Labels, gotSamples[j].Labels) <= 0
|
|
||||||
})
|
|
||||||
if !reflect.DeepEqual(expSamples, gotSamples) {
|
|
||||||
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s,\n exp: %v\n got: %v", mt.Expr,
|
|
||||||
mt.EvalTime.Duration().String(), parsedSamplesString(expSamples), parsedSamplesString(gotSamples)))
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,43 +0,0 @@
|
||||||
rule_files:
|
|
||||||
- rules.yaml
|
|
||||||
|
|
||||||
evaluation_interval: 1m
|
|
||||||
|
|
||||||
tests:
|
|
||||||
- interval: 1m
|
|
||||||
input_series:
|
|
||||||
- series: 'up{job="vmagent2", instance="localhost:9090"}'
|
|
||||||
values: "0+0x1440"
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: suquery_interval_test
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="vmagent2"}'
|
|
||||||
value: 1
|
|
||||||
|
|
||||||
alert_rule_test:
|
|
||||||
- eval_time: 2h
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
job: vmagent2
|
|
||||||
severity: page
|
|
||||||
instance: localhost:9090
|
|
||||||
datacenter: dc-123
|
|
||||||
exp_annotations:
|
|
||||||
summary: "Instance localhost:9090 down"
|
|
||||||
description: "localhost:9090 of job vmagent2 has been down for more than 5 minutes."
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
alertname: AlwaysFiring
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
datacenter: dc-123
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts: []
|
|
||||||
|
|
||||||
external_labels:
|
|
||||||
datacenter: dc-123
|
|
41
app/vmalert/unittest/testdata/failed-test.yaml
vendored
41
app/vmalert/unittest/testdata/failed-test.yaml
vendored
|
@ -1,41 +0,0 @@
|
||||||
rule_files:
|
|
||||||
- rules.yaml
|
|
||||||
|
|
||||||
tests:
|
|
||||||
- interval: 1m
|
|
||||||
name: "Failing test"
|
|
||||||
input_series:
|
|
||||||
- series: test
|
|
||||||
values: "0"
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: test
|
|
||||||
eval_time: 0m
|
|
||||||
exp_samples:
|
|
||||||
- value: 0
|
|
||||||
labels: test
|
|
||||||
|
|
||||||
# will failed cause there is no "Test" group and rule defined
|
|
||||||
alert_rule_test:
|
|
||||||
- eval_time: 0m
|
|
||||||
groupname: Test
|
|
||||||
alertname: Test
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels: {}
|
|
||||||
|
|
||||||
- interval: 1m
|
|
||||||
name: Failing alert test
|
|
||||||
input_series:
|
|
||||||
- series: 'up{job="test"}'
|
|
||||||
values: 0x10
|
|
||||||
|
|
||||||
alert_rule_test:
|
|
||||||
# will failed cause rule is firing
|
|
||||||
- eval_time: 5m
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts: []
|
|
||||||
# will failed cause missing groupname
|
|
||||||
- eval_time: 5m
|
|
||||||
alertname: AlwaysFiring
|
|
||||||
exp_alerts: []
|
|
30
app/vmalert/unittest/testdata/long-period.yaml
vendored
30
app/vmalert/unittest/testdata/long-period.yaml
vendored
|
@ -1,30 +0,0 @@
|
||||||
# can be executed successfully but will take more than 1 minute
|
|
||||||
# not included in unit test now
|
|
||||||
evaluation_interval: 100d
|
|
||||||
|
|
||||||
rule_files:
|
|
||||||
- rules.yaml
|
|
||||||
|
|
||||||
tests:
|
|
||||||
- interval: 1d
|
|
||||||
input_series:
|
|
||||||
- series: test
|
|
||||||
# Max time in time.Duration is 106751d from 1970 (2^63/10^9), i.e. 2262.
|
|
||||||
# But VictoriaMetrics supports maxTimestamp value +2 days from now. see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/827.
|
|
||||||
# We input series to 2024-01-01T00:00:00 here.
|
|
||||||
values: "0+1x19723"
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: timestamp(test)
|
|
||||||
eval_time: 0m
|
|
||||||
exp_samples:
|
|
||||||
- value: 0
|
|
||||||
- expr: test
|
|
||||||
eval_time: 100d
|
|
||||||
exp_samples:
|
|
||||||
- labels: test
|
|
||||||
value: 100
|
|
||||||
- expr: timestamp(test)
|
|
||||||
eval_time: 19000d
|
|
||||||
exp_samples:
|
|
||||||
- value: 1641600000 # 19000d -> seconds.
|
|
39
app/vmalert/unittest/testdata/rules.yaml
vendored
39
app/vmalert/unittest/testdata/rules.yaml
vendored
|
@ -1,39 +0,0 @@
|
||||||
groups:
|
|
||||||
- name: group1
|
|
||||||
rules:
|
|
||||||
- alert: InstanceDown
|
|
||||||
expr: up == 0
|
|
||||||
for: 5m
|
|
||||||
labels:
|
|
||||||
severity: page
|
|
||||||
annotations:
|
|
||||||
summary: "Instance {{ $labels.instance }} down"
|
|
||||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
|
|
||||||
- alert: AlwaysFiring
|
|
||||||
expr: 1
|
|
||||||
- alert: SameAlertNameWithDifferentGroup
|
|
||||||
expr: absent(test)
|
|
||||||
for: 1m
|
|
||||||
|
|
||||||
- name: group2
|
|
||||||
rules:
|
|
||||||
- record: t1
|
|
||||||
expr: test
|
|
||||||
- record: job:test:count_over_time1m
|
|
||||||
expr: sum without(instance) (count_over_time(test[1m]))
|
|
||||||
- record: suquery_interval_test
|
|
||||||
expr: count_over_time(up[5m:])
|
|
||||||
|
|
||||||
- alert: SameAlertNameWithDifferentGroup
|
|
||||||
expr: absent(test)
|
|
||||||
for: 5m
|
|
||||||
|
|
||||||
- name: group3
|
|
||||||
rules:
|
|
||||||
- record: t2
|
|
||||||
expr: t1
|
|
||||||
|
|
||||||
- name: group4
|
|
||||||
rules:
|
|
||||||
- record: t3
|
|
||||||
expr: t1
|
|
99
app/vmalert/unittest/testdata/test1.yaml
vendored
99
app/vmalert/unittest/testdata/test1.yaml
vendored
|
@ -1,99 +0,0 @@
|
||||||
rule_files:
|
|
||||||
- rules.yaml
|
|
||||||
|
|
||||||
evaluation_interval: 1m
|
|
||||||
group_eval_order: ["group4", "group2", "group3"]
|
|
||||||
|
|
||||||
tests:
|
|
||||||
- interval: 1m
|
|
||||||
name: "basic test"
|
|
||||||
input_series:
|
|
||||||
- series: "test"
|
|
||||||
values: "_x5 1x5 _ stale"
|
|
||||||
|
|
||||||
alert_rule_test:
|
|
||||||
- eval_time: 1m
|
|
||||||
groupname: group1
|
|
||||||
alertname: SameAlertNameWithDifferentGroup
|
|
||||||
exp_alerts:
|
|
||||||
- {}
|
|
||||||
- eval_time: 1m
|
|
||||||
groupname: group2
|
|
||||||
alertname: SameAlertNameWithDifferentGroup
|
|
||||||
exp_alerts: []
|
|
||||||
- eval_time: 6m
|
|
||||||
groupname: group1
|
|
||||||
alertname: SameAlertNameWithDifferentGroup
|
|
||||||
exp_alerts: []
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: test
|
|
||||||
eval_time: 11m
|
|
||||||
exp_samples:
|
|
||||||
- labels: '{__name__="test"}'
|
|
||||||
value: 1
|
|
||||||
- expr: test
|
|
||||||
eval_time: 12m
|
|
||||||
exp_samples: []
|
|
||||||
|
|
||||||
- interval: 1m
|
|
||||||
name: "basic test2"
|
|
||||||
input_series:
|
|
||||||
- series: 'up{job="vmagent1", instance="localhost:9090"}'
|
|
||||||
values: "0+0x1440"
|
|
||||||
- series: "test"
|
|
||||||
values: "0+1x1440"
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: count(ALERTS) by (alertgroup, alertname, alertstate)
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
- labels: '{alertgroup="group1", alertname="AlwaysFiring", alertstate="firing"}'
|
|
||||||
value: 1
|
|
||||||
- labels: '{alertgroup="group1", alertname="InstanceDown", alertstate="pending"}'
|
|
||||||
value: 1
|
|
||||||
- expr: t1
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
- value: 4
|
|
||||||
labels: '{__name__="t1", datacenter="dc-123"}'
|
|
||||||
- expr: t2
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
- value: 4
|
|
||||||
labels: '{__name__="t2", datacenter="dc-123"}'
|
|
||||||
- expr: t3
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
# t3 is 3 instead of 4 cause it's rules3 is evaluated before rules1
|
|
||||||
- value: 3
|
|
||||||
labels: '{__name__="t3", datacenter="dc-123"}'
|
|
||||||
|
|
||||||
alert_rule_test:
|
|
||||||
- eval_time: 10m
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
job: vmagent1
|
|
||||||
severity: page
|
|
||||||
instance: localhost:9090
|
|
||||||
datacenter: dc-123
|
|
||||||
exp_annotations:
|
|
||||||
summary: "Instance localhost:9090 down"
|
|
||||||
description: "localhost:9090 of job vmagent1 has been down for more than 5 minutes."
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: group1
|
|
||||||
alertname: AlwaysFiring
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
datacenter: dc-123
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: alerts
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts: []
|
|
||||||
|
|
||||||
external_labels:
|
|
||||||
datacenter: dc-123
|
|
46
app/vmalert/unittest/testdata/test2.yaml
vendored
46
app/vmalert/unittest/testdata/test2.yaml
vendored
|
@ -1,46 +0,0 @@
|
||||||
rule_files:
|
|
||||||
- rules.yaml
|
|
||||||
|
|
||||||
evaluation_interval: 1m
|
|
||||||
|
|
||||||
tests:
|
|
||||||
- interval: 1m
|
|
||||||
input_series:
|
|
||||||
- series: 'up{job="vmagent2", instance="localhost:9090"}'
|
|
||||||
values: "0+0x1440"
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: suquery_interval_test
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="vmagent2"}'
|
|
||||||
value: 1
|
|
||||||
|
|
||||||
alert_rule_test:
|
|
||||||
- eval_time: 2h
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
job: vmagent2
|
|
||||||
severity: page
|
|
||||||
instance: localhost:9090
|
|
||||||
datacenter: dc-123
|
|
||||||
exp_annotations:
|
|
||||||
summary: "Instance localhost:9090 down"
|
|
||||||
description: "localhost:9090 of job vmagent2 has been down for more than 5 minutes."
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: group1
|
|
||||||
alertname: AlwaysFiring
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
datacenter: dc-123
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts: []
|
|
||||||
|
|
||||||
external_labels:
|
|
||||||
datacenter: dc-123
|
|
|
@ -1,83 +0,0 @@
|
||||||
package unittest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
|
|
||||||
)
|
|
||||||
|
|
||||||
// parsedSample is a sample with parsed Labels
|
|
||||||
type parsedSample struct {
|
|
||||||
Labels datasource.Labels
|
|
||||||
Value float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *parsedSample) String() string {
|
|
||||||
return ps.Labels.String() + " " + strconv.FormatFloat(ps.Value, 'E', -1, 64)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parsedSamplesString(pss []parsedSample) string {
|
|
||||||
if len(pss) == 0 {
|
|
||||||
return "nil"
|
|
||||||
}
|
|
||||||
s := pss[0].String()
|
|
||||||
for _, ps := range pss[1:] {
|
|
||||||
s += ", " + ps.String()
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// LabelAndAnnotation holds labels and annotations
|
|
||||||
type LabelAndAnnotation struct {
|
|
||||||
Labels datasource.Labels
|
|
||||||
Annotations datasource.Labels
|
|
||||||
}
|
|
||||||
|
|
||||||
func (la *LabelAndAnnotation) String() string {
|
|
||||||
return "Labels:" + la.Labels.String() + "\nAnnotations:" + la.Annotations.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
// LabelsAndAnnotations is collection of LabelAndAnnotation
|
|
||||||
type LabelsAndAnnotations []LabelAndAnnotation
|
|
||||||
|
|
||||||
func (la LabelsAndAnnotations) Len() int { return len(la) }
|
|
||||||
|
|
||||||
func (la LabelsAndAnnotations) Swap(i, j int) { la[i], la[j] = la[j], la[i] }
|
|
||||||
func (la LabelsAndAnnotations) Less(i, j int) bool {
|
|
||||||
diff := datasource.LabelCompare(la[i].Labels, la[j].Labels)
|
|
||||||
if diff != 0 {
|
|
||||||
return diff < 0
|
|
||||||
}
|
|
||||||
return datasource.LabelCompare(la[i].Annotations, la[j].Annotations) < 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (la LabelsAndAnnotations) String() string {
|
|
||||||
if len(la) == 0 {
|
|
||||||
return "[]"
|
|
||||||
}
|
|
||||||
s := "[\n0:" + IndentLines("\n"+la[0].String(), " ")
|
|
||||||
for i, l := range la[1:] {
|
|
||||||
s += ",\n" + fmt.Sprintf("%d", i+1) + ":" + IndentLines("\n"+l.String(), " ")
|
|
||||||
}
|
|
||||||
s += "\n]"
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// IndentLines prefixes each line in the supplied string with the given "indent" string.
|
|
||||||
func IndentLines(lines, indent string) string {
|
|
||||||
sb := strings.Builder{}
|
|
||||||
n := strings.Split(lines, "\n")
|
|
||||||
for i, l := range n {
|
|
||||||
if i > 0 {
|
|
||||||
sb.WriteString(indent)
|
|
||||||
}
|
|
||||||
sb.WriteString(l)
|
|
||||||
if i != len(n)-1 {
|
|
||||||
sb.WriteRune('\n')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sb.String()
|
|
||||||
}
|
|
|
@ -33,7 +33,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
saCfgReloaderStopCh chan struct{}
|
saCfgReloaderStopCh = make(chan struct{})
|
||||||
saCfgReloaderWG sync.WaitGroup
|
saCfgReloaderWG sync.WaitGroup
|
||||||
|
|
||||||
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
|
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
|
||||||
|
@ -62,7 +62,6 @@ func CheckStreamAggrConfig() error {
|
||||||
//
|
//
|
||||||
// MustStopStreamAggr must be called when stream aggr is no longer needed.
|
// MustStopStreamAggr must be called when stream aggr is no longer needed.
|
||||||
func InitStreamAggr() {
|
func InitStreamAggr() {
|
||||||
saCfgReloaderStopCh = make(chan struct{})
|
|
||||||
if *streamAggrConfig == "" {
|
if *streamAggrConfig == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): revert unit test feature for alerting and recording rules introduced in [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4596). See the following [change](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4734).
|
||||||
|
|
||||||
## [v1.92.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.92.0)
|
## [v1.92.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.92.0)
|
||||||
|
|
||||||
Released at 2023-07-27
|
Released at 2023-07-27
|
||||||
|
|
248
docs/vmalert.md
248
docs/vmalert.md
|
@ -753,249 +753,6 @@ See full description for these flags in `./vmalert -help`.
|
||||||
* `limit` group's param has no effect during replay (might be changed in future);
|
* `limit` group's param has no effect during replay (might be changed in future);
|
||||||
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
|
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
|
||||||
|
|
||||||
## Unit Testing for Rules
|
|
||||||
|
|
||||||
> Unit testing is available from v1.92.0.
|
|
||||||
> Unit tests do not respect `-clusterMode` for now.
|
|
||||||
|
|
||||||
You can use `vmalert` to run unit tests for alerting and recording rules.
|
|
||||||
In unit test mode vmalert performs the following actions:
|
|
||||||
* sets up an isolated VictoriaMetrics instance;
|
|
||||||
* simulates the periodic ingestion of time series;
|
|
||||||
* queries the ingested data for recording and alerting rules evaluation;
|
|
||||||
* tests whether the firing alerts or resulting recording rules match the expected results.
|
|
||||||
|
|
||||||
See how to run vmalert in unit test mode below:
|
|
||||||
```
|
|
||||||
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
|
|
||||||
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
|
|
||||||
```
|
|
||||||
|
|
||||||
vmalert is compatible with [Prometheus config format for tests](https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#test-file-format)
|
|
||||||
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
|
|
||||||
validates and executes [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) expressions,
|
|
||||||
which aren't always backward compatible with [PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/).
|
|
||||||
|
|
||||||
### Test file format
|
|
||||||
|
|
||||||
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
|
|
||||||
```
|
|
||||||
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
|
|
||||||
# Enterprise version of vmalert supports S3 and GCS paths to rules.
|
|
||||||
rule_files:
|
|
||||||
[ - <string> ]
|
|
||||||
|
|
||||||
# The evaluation interval for rules specified in `rule_files`
|
|
||||||
[ evaluation_interval: <duration> | default = 1m ]
|
|
||||||
|
|
||||||
# Groups listed below will be evaluated by order.
|
|
||||||
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
|
|
||||||
group_eval_order:
|
|
||||||
[ - <string> ]
|
|
||||||
|
|
||||||
# The list of unit test files to be checked during evaluation.
|
|
||||||
tests:
|
|
||||||
[ - <test_group> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<test_group>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# Interval between samples for input series
|
|
||||||
interval: <duration>
|
|
||||||
# Time series to persist into the database according to configured <interval> before running tests.
|
|
||||||
input_series:
|
|
||||||
[ - <series> ]
|
|
||||||
|
|
||||||
# Name of the test group, optional
|
|
||||||
[ name: <string> ]
|
|
||||||
|
|
||||||
# Unit tests for alerting rules
|
|
||||||
alert_rule_test:
|
|
||||||
[ - <alert_test_case> ]
|
|
||||||
|
|
||||||
# Unit tests for Metricsql expressions.
|
|
||||||
metricsql_expr_test:
|
|
||||||
[ - <metricsql_expr_test> ]
|
|
||||||
|
|
||||||
# External labels accessible for templating.
|
|
||||||
external_labels:
|
|
||||||
[ <labelname>: <string> ... ]
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<series>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# series in the following format '<metric name>{<label name>=<label value>, ...}'
|
|
||||||
# Examples:
|
|
||||||
# series_name{label1="value1", label2="value2"}
|
|
||||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
|
||||||
series: <string>
|
|
||||||
|
|
||||||
# values support several special equations:
|
|
||||||
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
|
|
||||||
# Read this as series starts at a, then c further samples incrementing by b.
|
|
||||||
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
|
|
||||||
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
|
|
||||||
# '_' represents a missing sample from scrape
|
|
||||||
# 'stale' indicates a stale sample
|
|
||||||
# Examples:
|
|
||||||
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
|
|
||||||
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
|
|
||||||
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
|
|
||||||
# 4. ' 1 _x3 stale' becomes '1 _ _ _ stale' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
|
|
||||||
values: <string>
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<alert_test_case>`
|
|
||||||
|
|
||||||
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
|
|
||||||
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
|
|
||||||
but no need to add them under `exp_alerts`.
|
|
||||||
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
|
|
||||||
|
|
||||||
```
|
|
||||||
# The time elapsed from time=0s when this alerting rule should be checked.
|
|
||||||
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
|
|
||||||
eval_time: <duration>
|
|
||||||
|
|
||||||
# Name of the group name to be tested.
|
|
||||||
groupname: <string>
|
|
||||||
|
|
||||||
# Name of the alert to be tested.
|
|
||||||
alertname: <string>
|
|
||||||
|
|
||||||
# List of the expected alerts that are firing under the given alertname at
|
|
||||||
# the given evaluation time. If you want to test if an alerting rule should
|
|
||||||
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
|
|
||||||
exp_alerts:
|
|
||||||
[ - <alert> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<alert>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# These are the expanded labels and annotations of the expected alert.
|
|
||||||
# Note: labels also include the labels of the sample associated with the alert
|
|
||||||
exp_labels:
|
|
||||||
[ <labelname>: <string> ]
|
|
||||||
exp_annotations:
|
|
||||||
[ <labelname>: <string> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<metricsql_expr_test>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# Expression to evaluate
|
|
||||||
expr: <string>
|
|
||||||
|
|
||||||
# The time elapsed from time=0s when this expression be evaluated.
|
|
||||||
eval_time: <duration>
|
|
||||||
|
|
||||||
# Expected samples at the given evaluation time.
|
|
||||||
exp_samples:
|
|
||||||
[ - <sample> ]
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `<sample>`
|
|
||||||
|
|
||||||
```
|
|
||||||
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
|
|
||||||
# Examples:
|
|
||||||
# series_name{label1="value1", label2="value2"}
|
|
||||||
# go_goroutines{job="prometheus", instance="localhost:9090"}
|
|
||||||
labels: <string>
|
|
||||||
|
|
||||||
# The expected value of the Metricsql expression.
|
|
||||||
value: <number>
|
|
||||||
```
|
|
||||||
|
|
||||||
### Example
|
|
||||||
|
|
||||||
This is an example input file for unit testing which will pass.
|
|
||||||
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
|
|
||||||
|
|
||||||
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
|
|
||||||
|
|
||||||
#### `test.yaml`
|
|
||||||
|
|
||||||
```
|
|
||||||
rule_files:
|
|
||||||
- rules.yaml
|
|
||||||
|
|
||||||
evaluation_interval: 1m
|
|
||||||
|
|
||||||
tests:
|
|
||||||
- interval: 1m
|
|
||||||
input_series:
|
|
||||||
- series: 'up{job="prometheus", instance="localhost:9090"}'
|
|
||||||
values: "0+0x1440"
|
|
||||||
|
|
||||||
metricsql_expr_test:
|
|
||||||
- expr: suquery_interval_test
|
|
||||||
eval_time: 4m
|
|
||||||
exp_samples:
|
|
||||||
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
|
|
||||||
value: 1
|
|
||||||
|
|
||||||
alert_rule_test:
|
|
||||||
- eval_time: 2h
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
job: prometheus
|
|
||||||
severity: page
|
|
||||||
instance: localhost:9090
|
|
||||||
datacenter: dc-123
|
|
||||||
exp_annotations:
|
|
||||||
summary: "Instance localhost:9090 down"
|
|
||||||
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: group1
|
|
||||||
alertname: AlwaysFiring
|
|
||||||
exp_alerts:
|
|
||||||
- exp_labels:
|
|
||||||
datacenter: dc-123
|
|
||||||
|
|
||||||
- eval_time: 0
|
|
||||||
groupname: group1
|
|
||||||
alertname: InstanceDown
|
|
||||||
exp_alerts: []
|
|
||||||
|
|
||||||
external_labels:
|
|
||||||
datacenter: dc-123
|
|
||||||
```
|
|
||||||
|
|
||||||
#### `alerts.yaml`
|
|
||||||
|
|
||||||
```
|
|
||||||
# This is the rules file.
|
|
||||||
|
|
||||||
groups:
|
|
||||||
- name: group1
|
|
||||||
rules:
|
|
||||||
- alert: InstanceDown
|
|
||||||
expr: up == 0
|
|
||||||
for: 5m
|
|
||||||
labels:
|
|
||||||
severity: page
|
|
||||||
annotations:
|
|
||||||
summary: "Instance {{ $labels.instance }} down"
|
|
||||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
|
|
||||||
- alert: AlwaysFiring
|
|
||||||
expr: 1
|
|
||||||
|
|
||||||
- name: group2
|
|
||||||
rules:
|
|
||||||
- record: job:test:count_over_time1m
|
|
||||||
expr: sum without(instance) (count_over_time(test[1m]))
|
|
||||||
- record: suquery_interval_test
|
|
||||||
expr: count_over_time(up[5m:])
|
|
||||||
```
|
|
||||||
|
|
||||||
## Monitoring
|
## Monitoring
|
||||||
|
|
||||||
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
|
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
|
||||||
|
@ -1546,11 +1303,6 @@ The shortlist of configuration flags is the following:
|
||||||
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
|
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
|
||||||
-tlsMinVersion string
|
-tlsMinVersion string
|
||||||
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
|
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
|
||||||
-unittestFile array
|
|
||||||
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
|
|
||||||
Examples:
|
|
||||||
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
|
|
||||||
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
|
|
||||||
-version
|
-version
|
||||||
Show VictoriaMetrics version
|
Show VictoriaMetrics version
|
||||||
```
|
```
|
||||||
|
|
|
@ -45,14 +45,6 @@ func (pd *Duration) Duration() time.Duration {
|
||||||
return pd.D
|
return pd.D
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseTime returns time for pd.
|
|
||||||
func (pd *Duration) ParseTime() time.Time {
|
|
||||||
if pd == nil {
|
|
||||||
return time.Time{}
|
|
||||||
}
|
|
||||||
return time.UnixMilli(pd.Duration().Milliseconds())
|
|
||||||
}
|
|
||||||
|
|
||||||
// ParseDuration parses duration string in Prometheus format
|
// ParseDuration parses duration string in Prometheus format
|
||||||
func ParseDuration(s string) (time.Duration, error) {
|
func ParseDuration(s string) (time.Duration, error) {
|
||||||
ms, err := metricsql.DurationValue(s, 0)
|
ms, err := metricsql.DurationValue(s, 0)
|
||||||
|
|
Loading…
Reference in a new issue