vmalert: init unit test (#4596)

vmalert: support unit tests

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2945
---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Haleygo 2023-07-20 21:07:10 +08:00 committed by Aliaksandr Valialkin
parent a0cc6eb7a1
commit 939c8b8372
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
16 changed files with 1039 additions and 323 deletions

View file

@ -732,6 +732,245 @@ See full description for these flags in `./vmalert -help`.
* `query` template function is disabled for performance reasons (might be changed in future);
* `limit` group's param has no effect during replay (might be changed in future);
## Unit Testing for Rules
You can use `vmalert` to run unit tests for alerting and recording rules.
In unit test mode vmalert performs the following actions:
* sets up an isolated VictoriaMetrics instance;
* simulates the periodic ingestion of time series;
* queries the ingested data for recording and alerting rules evaluation;
* tests whether the firing alerts or resulting recording rules match the expected results.
See how to run vmalert in unit test mode below:
```
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
```
vmalert is compatible with [Prometheus config format for tests](https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#test-file-format)
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
validates and executes [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) expressions,
which aren't always backward compatible with [PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/).
### Test file format
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
```
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
# Enterprise version of vmalert supports S3 and GCS paths to rules.
rule_files:
[ - <string> ]
# The evaluation interval for rules specified in `rule_files`
[ evaluation_interval: <duration> | default = 1m ]
# Groups listed below will be evaluated by order.
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
group_eval_order:
[ - <string> ]
# The list of unit test files to be checked during evaluation.
tests:
[ - <test_group> ]
```
#### `<test_group>`
```
# Interval between samples for input series
interval: <duration>
# Time series to persist into the database according to configured <interval> before running tests.
input_series:
[ - <series> ]
# Name of the test group, optional
[ name: <string> ]
# Unit tests for alerting rules
alert_rule_test:
[ - <alert_test_case> ]
# Unit tests for Metricsql expressions.
metricsql_expr_test:
[ - <metricsql_expr_test> ]
# External labels accessible for templating.
external_labels:
[ <labelname>: <string> ... ]
```
#### `<series>`
```
# series in the following format '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
series: <string>
# values support several special equations:
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
# Read this as series starts at a, then c further samples incrementing by b.
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
# '_' represents a missing sample from scrape
# Examples:
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
# 4. ' 1 _x3' becomes '1 _ _ _ ' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
values: <string>
```
#### `<alert_test_case>`
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
but no need to add them under `exp_alerts`.
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
```
# The time elapsed from time=0s when this alerting rule should be checked.
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
eval_time: <duration>
# Name of the group name to be tested.
groupname: <string>
# Name of the alert to be tested.
alertname: <string>
# List of the expected alerts that are firing under the given alertname at
# the given evaluation time. If you want to test if an alerting rule should
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
exp_alerts:
[ - <alert> ]
```
#### `<alert>`
```
# These are the expanded labels and annotations of the expected alert.
# Note: labels also include the labels of the sample associated with the alert
exp_labels:
[ <labelname>: <string> ]
exp_annotations:
[ <labelname>: <string> ]
```
#### `<metricsql_expr_test>`
```
# Expression to evaluate
expr: <string>
# The time elapsed from time=0s when this expression be evaluated.
eval_time: <duration>
# Expected samples at the given evaluation time.
exp_samples:
[ - <sample> ]
```
#### `<sample>`
```
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
labels: <string>
# The expected value of the Metricsql expression.
value: <number>
```
### Example
This is an example input file for unit testing which will pass.
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
#### `test.yaml`
```
rule_files:
- rules.yaml
evaluation_interval: 1m
tests:
- interval: 1m
input_series:
- series: 'up{job="prometheus", instance="localhost:9090"}'
values: "0+0x1440"
metricsql_expr_test:
- expr: suquery_interval_test
eval_time: 4m
exp_samples:
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
value: 1
alert_rule_test:
- eval_time: 2h
groupname: group1
alertname: InstanceDown
exp_alerts:
- exp_labels:
job: prometheus
severity: page
instance: localhost:9090
datacenter: dc-123
exp_annotations:
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
exp_alerts:
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: group1
alertname: InstanceDown
exp_alerts: []
external_labels:
datacenter: dc-123
```
#### `alerts.yaml`
```
# This is the rules file.
groups:
- name: group1
rules:
- alert: InstanceDown
expr: up == 0
for: 5m
labels:
severity: page
annotations:
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: AlwaysFiring
expr: 1
- name: group2
rules:
- record: job:test:count_over_time1m
expr: sum without(instance) (count_over_time(test[1m]))
- record: suquery_interval_test
expr: count_over_time(up[5m:])
```
## Monitoring
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
@ -1282,6 +1521,11 @@ The shortlist of configuration flags is the following:
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
-tlsMinVersion string
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
-unittestFile array
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
Examples:
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
-version
Show VictoriaMetrics version
```

View file

@ -1,10 +1,14 @@
package datasource
import (
"bytes"
"context"
"net/http"
"net/url"
"strconv"
"time"
"golang.org/x/exp/slices"
)
// Querier interface wraps Query and QueryRange methods
@ -108,3 +112,69 @@ type Label struct {
Name string
Value string
}
// Labels is collection of Label
type Labels []Label
func (ls Labels) Len() int { return len(ls) }
func (ls Labels) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
func (ls Labels) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
func (ls Labels) String() string {
var b bytes.Buffer
b.WriteByte('{')
for i, l := range ls {
if i > 0 {
b.WriteByte(',')
b.WriteByte(' ')
}
b.WriteString(l.Name)
b.WriteByte('=')
b.WriteString(strconv.Quote(l.Value))
}
b.WriteByte('}')
return b.String()
}
// LabelCompare return negative if a is less than b, return 0 if they are the same
// eg.
// a=[]Label{{Name: "a", Value: "1"}},b=[]Label{{Name: "b", Value: "1"}}, return -1
// a=[]Label{{Name: "a", Value: "2"}},b=[]Label{{Name: "a", Value: "1"}}, return 1
// a=[]Label{{Name: "a", Value: "1"}},b=[]Label{{Name: "a", Value: "1"}}, return 0
func LabelCompare(a, b Labels) int {
l := len(a)
if len(b) < l {
l = len(b)
}
for i := 0; i < l; i++ {
if a[i].Name != b[i].Name {
if a[i].Name < b[i].Name {
return -1
}
return 1
}
if a[i].Value != b[i].Value {
if a[i].Value < b[i].Value {
return -1
}
return 1
}
}
// if all labels so far were in common, the set with fewer labels comes first.
return len(a) - len(b)
}
// ConvertToLabels convert map to Labels
func ConvertToLabels(m map[string]string) (labelset Labels) {
for k, v := range m {
labelset = append(labelset, Label{
Name: k,
Value: v,
})
}
// sort label
slices.SortFunc(labelset, func(a, b Label) bool { return a.Name < b.Name })
return
}

View file

@ -274,7 +274,7 @@ func (g *Group) close() {
var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
defer func() { close(g.finishedCh) }()
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
@ -422,7 +422,7 @@ type executor struct {
notifiers func() []notifier.Notifier
notifierHeaders map[string]string
rw *remotewrite.Client
rw remotewrite.RWClient
previouslySentSeriesToRWMu sync.Mutex
// previouslySentSeriesToRW stores series sent to RW on previous iteration

View file

@ -399,7 +399,7 @@ func configsEqual(a, b []config.Group) bool {
// setConfigSuccess sets config reload status to 1.
func setConfigSuccess(at uint64) {
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
configTimestamp.Set(at)
// reset the error if any
setConfigErr(nil)
}

View file

@ -19,7 +19,7 @@ type manager struct {
querierBuilder datasource.QuerierBuilder
notifiers func() []notifier.Notifier
rw *remotewrite.Client
rw remotewrite.RWClient
// remote read builder.
rr datasource.QuerierBuilder

View file

@ -257,7 +257,7 @@ func TestManagerUpdate(t *testing.T) {
func TestManagerUpdateNegative(t *testing.T) {
testCases := []struct {
notifiers []notifier.Notifier
rw *remotewrite.Client
rw remotewrite.RWClient
cfg config.Group
expErr string
}{

View file

@ -0,0 +1,320 @@
package remotewrite
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
var (
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
)
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
addr string
c *http.Client
authCfg *promauth.Config
input chan prompbmarshal.TimeSeries
flushInterval time.Duration
maxBatchSize int
maxQueueSize int
wg sync.WaitGroup
doneCh chan struct{}
}
// Config is config for remote write.
type Config struct {
// Addr of remote storage
Addr string
AuthCfg *promauth.Config
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
// populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
}
const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 1e5
defaultFlushInterval = 5 * time.Second
defaultWriteTimeout = 30 * time.Second
)
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Addr == "" {
return nil, fmt.Errorf("config.Addr can't be empty")
}
if cfg.MaxBatchSize == 0 {
cfg.MaxBatchSize = defaultMaxBatchSize
}
if cfg.MaxQueueSize == 0 {
cfg.MaxQueueSize = defaultMaxQueueSize
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,
maxQueueSize: cfg.MaxQueueSize,
doneCh: make(chan struct{}),
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
}
for i := 0; i < cc; i++ {
c.run(ctx)
}
return c, nil
}
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
select {
case <-c.doneCh:
return fmt.Errorf("client is closed")
case c.input <- s:
return nil
default:
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
c.maxQueueSize)
}
}
// Close stops the client and waits for all goroutines
// to exit.
func (c *Client) Close() error {
if c.doneCh == nil {
return fmt.Errorf("client is already closed")
}
close(c.input)
close(c.doneCh)
c.wg.Wait()
return nil
}
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
}
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
c.flush(lastCtx, wr)
cancel()
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer ticker.Stop()
for {
select {
case <-c.doneCh:
shutdown()
return
case <-ctx.Done():
shutdown()
return
case <-ticker.C:
c.flush(ctx, wr)
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
}
}
}
}()
}
var (
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
return float64(*concurrency)
})
)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote-write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
return
}
defer prompbmarshal.ResetWriteRequest(wr)
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
return
}
b := snappy.Encode(nil, data)
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
}
timeStart := time.Now()
defer sendDuration.Add(time.Since(timeStart).Seconds())
L:
for attempts := 0; ; attempts++ {
err := c.send(ctx, b)
if err == nil {
sentRows.Add(len(wr.Timeseries))
sentBytes.Add(len(b))
return
}
_, isNotRetriable := err.(*nonRetriableError)
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
if isNotRetriable {
// exit fast if error isn't retriable
break
}
// check if request has been cancelled before backoff
select {
case <-ctx.Done():
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
break L
default:
}
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
if timeLeftForRetries < 0 {
// the max retry time has passed, so we give up
break
}
if retryInterval > timeLeftForRetries {
retryInterval = timeLeftForRetries
}
// sleeping to prevent remote db hammering
time.Sleep(retryInterval)
retryInterval *= 2
}
droppedRows.Add(len(wr.Timeseries))
droppedBytes.Add(len(b))
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
len(wr.Timeseries))
}
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil {
c.authCfg.SetHeaders(req, true)
}
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
}
resp, err := c.c.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
// according to https://prometheus.io/docs/concepts/remote_write_spec/
// Prometheus remote Write compatible receivers MUST
switch resp.StatusCode / 100 {
case 2:
// respond with a HTTP 2xx status code when the write is successful.
return nil
case 4:
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)}
}
fallthrough
default:
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
}
}
type nonRetriableError struct {
err error
}
func (e *nonRetriableError) Error() string {
return e.err.Error()
}

View file

@ -0,0 +1,97 @@
package remotewrite
import (
"bytes"
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// DebugClient won't push series periodically, but will write data to remote endpoint
// immediately when Push() is called
type DebugClient struct {
addr string
c *http.Client
wg sync.WaitGroup
}
// NewDebugClient initiates and returns a new DebugClient
func NewDebugClient() (*DebugClient, error) {
if *addr == "" {
return nil, nil
}
t, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}
c := &DebugClient{
c: &http.Client{
Timeout: *sendTimeout,
Transport: t,
},
addr: strings.TrimSuffix(*addr, "/"),
}
return c, nil
}
// Push sends the given timeseries to the remote storage.
func (c *DebugClient) Push(s prompbmarshal.TimeSeries) error {
c.wg.Add(1)
defer c.wg.Done()
wr := &prompbmarshal.WriteRequest{Timeseries: []prompbmarshal.TimeSeries{s}}
data, err := wr.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal the given time series: %w", err)
}
return c.send(data)
}
// Close stops the DebugClient
func (c *DebugClient) Close() error {
c.wg.Wait()
return nil
}
func (c *DebugClient) send(data []byte) error {
b := snappy.Encode(nil, data)
r := bytes.NewReader(b)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
}
resp, err := c.c.Do(req)
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode/100 == 2 {
return nil
}
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
}

View file

@ -0,0 +1,50 @@
package remotewrite
import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestDebugClient_Push(t *testing.T) {
testSrv := newRWServer()
oldAddr := *addr
*addr = testSrv.URL
defer func() {
*addr = oldAddr
}()
client, err := NewDebugClient()
if err != nil {
t.Fatalf("failed to create debug client: %s", err)
}
const rowsN = 100
var sent int
for i := 0; i < rowsN; i++ {
s := prompbmarshal.TimeSeries{
Samples: []prompbmarshal.Sample{{
Value: float64(i),
Timestamp: time.Now().Unix(),
}},
}
err := client.Push(s)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
if err == nil {
sent++
}
}
if sent == 0 {
t.Fatalf("0 series sent")
}
if err := client.Close(); err != nil {
t.Fatalf("failed to close client: %s", err)
}
got := testSrv.accepted()
if got != sent {
t.Fatalf("expected to have %d series; got %d", sent, got)
}
}

View file

@ -1,320 +1,13 @@
package remotewrite
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
var (
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
)
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
addr string
c *http.Client
authCfg *promauth.Config
input chan prompbmarshal.TimeSeries
flushInterval time.Duration
maxBatchSize int
maxQueueSize int
wg sync.WaitGroup
doneCh chan struct{}
}
// Config is config for remote write.
type Config struct {
// Addr of remote storage
Addr string
AuthCfg *promauth.Config
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
// populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
}
const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 1e5
defaultFlushInterval = 5 * time.Second
defaultWriteTimeout = 30 * time.Second
)
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Addr == "" {
return nil, fmt.Errorf("config.Addr can't be empty")
}
if cfg.MaxBatchSize == 0 {
cfg.MaxBatchSize = defaultMaxBatchSize
}
if cfg.MaxQueueSize == 0 {
cfg.MaxQueueSize = defaultMaxQueueSize
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,
maxQueueSize: cfg.MaxQueueSize,
doneCh: make(chan struct{}),
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
}
for i := 0; i < cc; i++ {
c.run(ctx)
}
return c, nil
}
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
select {
case <-c.doneCh:
return fmt.Errorf("client is closed")
case c.input <- s:
return nil
default:
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
c.maxQueueSize)
}
}
// Close stops the client and waits for all goroutines
// to exit.
func (c *Client) Close() error {
if c.doneCh == nil {
return fmt.Errorf("client is already closed")
}
close(c.input)
close(c.doneCh)
c.wg.Wait()
return nil
}
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
}
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
c.flush(lastCtx, wr)
cancel()
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer ticker.Stop()
for {
select {
case <-c.doneCh:
shutdown()
return
case <-ctx.Done():
shutdown()
return
case <-ticker.C:
c.flush(ctx, wr)
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
}
}
}
}()
}
var (
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
return float64(*concurrency)
})
)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote-write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
return
}
defer prompbmarshal.ResetWriteRequest(wr)
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
return
}
b := snappy.Encode(nil, data)
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
}
timeStart := time.Now()
defer sendDuration.Add(time.Since(timeStart).Seconds())
L:
for attempts := 0; ; attempts++ {
err := c.send(ctx, b)
if err == nil {
sentRows.Add(len(wr.Timeseries))
sentBytes.Add(len(b))
return
}
_, isNotRetriable := err.(*nonRetriableError)
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
if isNotRetriable {
// exit fast if error isn't retriable
break
}
// check if request has been cancelled before backoff
select {
case <-ctx.Done():
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
break L
default:
}
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
if timeLeftForRetries < 0 {
// the max retry time has passed, so we give up
break
}
if retryInterval > timeLeftForRetries {
retryInterval = timeLeftForRetries
}
// sleeping to prevent remote db hammering
time.Sleep(retryInterval)
retryInterval *= 2
}
droppedRows.Add(len(wr.Timeseries))
droppedBytes.Add(len(b))
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
len(wr.Timeseries))
}
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil {
c.authCfg.SetHeaders(req, true)
}
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
}
resp, err := c.c.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
// according to https://prometheus.io/docs/concepts/remote_write_spec/
// Prometheus remote Write compatible receivers MUST
switch resp.StatusCode / 100 {
case 2:
// respond with a HTTP 2xx status code when the write is successful.
return nil
case 4:
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)}
}
fallthrough
default:
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
}
}
type nonRetriableError struct {
err error
}
func (e *nonRetriableError) Error() string {
return e.err.Error()
// RWClient represents an HTTP client for pushing data via remote write protocol
type RWClient interface {
// Push pushes the give time series to remote storage
Push(s prompbmarshal.TimeSeries) error
// Close stops the client. Client can't be reused after Close call.
Close() error
}

View file

@ -33,7 +33,7 @@ var (
"Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.")
)
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewrite.Client) error {
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewrite.RWClient) error {
if *replayMaxDatapoints < 1 {
return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1")
}
@ -78,7 +78,7 @@ func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewr
return nil
}
func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int {
var total int
step := g.Interval * time.Duration(*replayMaxDatapoints)
ri := rangeIterator{start: start, end: end, step: step}
@ -119,7 +119,7 @@ func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
return total
}
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
func replayRule(rule Rule, start, end time.Time, rw remotewrite.RWClient) (int, error) {
var err error
var tss []prompbmarshal.TimeSeries
for i := 0; i < *replayRuleRetryAttempts; i++ {

View file

@ -46,6 +46,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components
* FEATUTE: [vmalert](https://docs.victoriametrics.com/vmalert.html): display the error message received during unsuccessful config reload in vmalert's UI. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4076) for details.
* FEATUTE: [vmalert](https://docs.victoriametrics.com/vmalert.html): allow disabling of `step` param attached to [instant queries](https://docs.victoriametrics.com/keyConcepts.html#instant-query). This might be useful for using vmalert with datasources that to not support this param, unlike VictoriaMetrics. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4573) for details.
* FEATUTE: [vmalert](https://docs.victoriametrics.com/vmalert.html): support option for "blackholing" alerting notifications if `-notifier.blackhole` cmd-line flag is set. Enable this flag if you want vmalert to evaluate alerting rules without sending any notifications to external receivers (eg. alertmanager). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4122) for details. Thanks to @venkatbvc for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4639).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add unit test for alerting and recording rules, see more [details](https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules) here. Thanks to @Haleygo for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4596).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): expose `vmauth_user_request_duration_seconds` and `vmauth_unauthorized_user_request_duration_seconds` summary metrics for measuring requests latency per user.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): show backup progress percentage in log during backup uploading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): show restoring progress percentage in log during backup downloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).

View file

@ -743,6 +743,232 @@ See full description for these flags in `./vmalert -help`.
* `query` template function is disabled for performance reasons (might be changed in future);
* `limit` group's param has no effect during replay (might be changed in future);
## Unit Testing for Rules
You can use `vmalert` to test your rules.
It will setup an isolated VM instance, simulate the periodic ingestion of samples for several time series, use those series to evaluate recording and alerting rules, and then test whether the firing alerts or metricsql expressions match what was configured as the expected results.
```
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
```
### Test file format
```
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
# Enterprise version of vmalert supports S3 and GCS paths to rules.
rule_files:
[ - <file_name> ]
# The evaluation interval for rules specified in `rule_files`
[ evaluation_interval: <duration> | default = 1m ]
# Groups listed below will be evaluated by order.
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
group_eval_order:
[ - <group_name> ]
# The list of unit test files to be checked during evaluation.
tests:
[ - <test_group> ]
```
#### `<test_group>`
```
# Interval between samples for input series
interval: <duration>
# Time series to persist into the database according to configured <interval> before running tests.
input_series:
[ - <series> ]
# Name of the test group, optional
[ name: <string> ]
# Unit tests for alerting rules. Alerting rules are from the input file.
alert_rule_test:
[ - <alert_test_case> ]
# Unit tests for Metricsql expressions.
metricsql_expr_test:
[ - <metricsql_expr_test> ]
# External labels accessible to the alert template.
external_labels:
[ <labelname>: <string> ... ]
```
#### `<series>`
```
# series should format as '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
series: <string>
# values support several special equations:
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
# Read this as series starts at a, then c further samples incrementing by b.
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
# '_' represents a missing sample from scrape
# Examples:
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
# 4. ' 1 _x3' becomes '1 _ _ _ ' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
values: <string>
```
#### `<alert_test_case>`
vmalert by default adds group name and alert name to generated alerts and timeseries.
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`, but no need to add them under `exp_alerts`.
You can also pass `--disableAlertgroupLabel` to prevent adding groupname label, in that case, `groupname` can be missed.
```
# The time elapsed from time=0s when this alerting rule be checked.
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
eval_time: <duration>
# Name of the group name to be tested.
groupname: <string>
# Name of the alert to be tested.
alertname: <string>
# List of expected alerts which are firing under the given alertname at
# given evaluation time. If you want to test if an alerting rule should
# not be firing, then you can mention the above fields and leave 'exp_alerts' empty.
exp_alerts:
[ - <alert> ]
```
#### `<alert>`
```
# These are the expanded labels and annotations of the expected alert.
# Note: labels also include the labels of the sample associated with the alert
exp_labels:
[ <labelname>: <string> ]
exp_annotations:
[ <labelname>: <string> ]
```
#### `<metricsql_expr_test>`
```
# Expression to evaluate
expr: <string>
# The time elapsed from time=0s when this expression be evaluated.
eval_time: <duration>
# Expected samples at the given evaluation time.
exp_samples:
[ - <sample> ]
```
#### `<sample>`
```
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
labels: <string>
# The expected value of the Metricsql expression.
value: <number>
```
### Example
This is an example input file for unit testing which will passes. `test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
#### `test.yaml`
```
rule_files:
- rules.yaml
evaluation_interval: 1m
tests:
- interval: 1m
input_series:
- series: 'up{job="prometheus", instance="localhost:9090"}'
values: "0+0x1440"
metricsql_expr_test:
- expr: suquery_interval_test
eval_time: 4m
exp_samples:
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
value: 1
alert_rule_test:
- eval_time: 2h
groupname: group1
alertname: InstanceDown
exp_alerts:
- exp_labels:
job: prometheus
severity: page
instance: localhost:9090
datacenter: dc-123
exp_annotations:
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
exp_alerts:
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: group1
alertname: InstanceDown
exp_alerts: []
external_labels:
datacenter: dc-123
```
#### `alerts.yaml`
```
# This is the rules file.
groups:
- name: group1
rules:
- alert: InstanceDown
expr: up == 0
for: 5m
labels:
severity: page
annotations:
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: AlwaysFiring
expr: 1
- name: group2
rules:
- record: job:test:count_over_time1m
expr: sum without(instance) (count_over_time(test[1m]))
- record: suquery_interval_test
expr: count_over_time(up[5m:])
```
## Monitoring
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
@ -1293,6 +1519,11 @@ The shortlist of configuration flags is the following:
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
-tlsMinVersion string
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
-unittestFile array
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
Examples:
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
-version
Show VictoriaMetrics version
```

6
go.mod
View file

@ -39,7 +39,10 @@ require (
gopkg.in/yaml.v2 v2.4.0
)
require github.com/bmatcuk/doublestar/v4 v4.6.0
require (
github.com/bmatcuk/doublestar/v4 v4.6.0
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
)
require (
cloud.google.com/go v0.110.6 // indirect
@ -115,7 +118,6 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/goleak v1.2.1 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect

View file

@ -45,6 +45,14 @@ func (pd *Duration) Duration() time.Duration {
return pd.D
}
// ParseTime returns time for pd.
func (pd *Duration) ParseTime() time.Time {
if pd == nil {
return time.Time{}
}
return time.UnixMilli(pd.Duration().Milliseconds())
}
// ParseDuration parses duration string in Prometheus format
func ParseDuration(s string) (time.Duration, error) {
ms, err := metricsql.DurationValue(s, 0)