vmalert: revert unittest feature (#4734)

* Revert "vmalert: unittest support stale datapoint (#4696)"

This reverts commit 0b44df7ec8.

* Revert "docs: specify min version and limitations for vmalert's unit tests"

This reverts commit a24541bd

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* Revert "vmalert: init unit test (#4596)"

This reverts commit da60a68d

Signed-off-by: hagen1778 <roman@victoriametrics.com>

* docs: mention unittest revert in changelog

Signed-off-by: hagen1778 <roman@victoriametrics.com>

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>

(cherry picked from commit 9f1b9b86cc)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2023-07-28 10:42:02 +02:00 committed by hagen1778
parent 63d9a92d3d
commit 303d3616ec
No known key found for this signature in database
GPG key ID: 3BF75F3741CA9640
14 changed files with 323 additions and 985 deletions

View file

@ -742,249 +742,6 @@ See full description for these flags in `./vmalert -help`.
* `limit` group's param has no effect during replay (might be changed in future); * `limit` group's param has no effect during replay (might be changed in future);
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future). * `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
## Unit Testing for Rules
> Unit testing is available from v1.92.0.
> Unit tests do not respect `-clusterMode` for now.
You can use `vmalert` to run unit tests for alerting and recording rules.
In unit test mode vmalert performs the following actions:
* sets up an isolated VictoriaMetrics instance;
* simulates the periodic ingestion of time series;
* queries the ingested data for recording and alerting rules evaluation;
* tests whether the firing alerts or resulting recording rules match the expected results.
See how to run vmalert in unit test mode below:
```
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
```
vmalert is compatible with [Prometheus config format for tests](https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#test-file-format)
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
validates and executes [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) expressions,
which aren't always backward compatible with [PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/).
### Test file format
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
```
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
# Enterprise version of vmalert supports S3 and GCS paths to rules.
rule_files:
[ - <string> ]
# The evaluation interval for rules specified in `rule_files`
[ evaluation_interval: <duration> | default = 1m ]
# Groups listed below will be evaluated by order.
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
group_eval_order:
[ - <string> ]
# The list of unit test files to be checked during evaluation.
tests:
[ - <test_group> ]
```
#### `<test_group>`
```
# Interval between samples for input series
interval: <duration>
# Time series to persist into the database according to configured <interval> before running tests.
input_series:
[ - <series> ]
# Name of the test group, optional
[ name: <string> ]
# Unit tests for alerting rules
alert_rule_test:
[ - <alert_test_case> ]
# Unit tests for Metricsql expressions.
metricsql_expr_test:
[ - <metricsql_expr_test> ]
# External labels accessible for templating.
external_labels:
[ <labelname>: <string> ... ]
```
#### `<series>`
```
# series in the following format '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
series: <string>
# values support several special equations:
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
# Read this as series starts at a, then c further samples incrementing by b.
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
# '_' represents a missing sample from scrape
# 'stale' indicates a stale sample
# Examples:
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
# 4. ' 1 _x3 stale' becomes '1 _ _ _ stale' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
values: <string>
```
#### `<alert_test_case>`
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
but no need to add them under `exp_alerts`.
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
```
# The time elapsed from time=0s when this alerting rule should be checked.
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
eval_time: <duration>
# Name of the group name to be tested.
groupname: <string>
# Name of the alert to be tested.
alertname: <string>
# List of the expected alerts that are firing under the given alertname at
# the given evaluation time. If you want to test if an alerting rule should
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
exp_alerts:
[ - <alert> ]
```
#### `<alert>`
```
# These are the expanded labels and annotations of the expected alert.
# Note: labels also include the labels of the sample associated with the alert
exp_labels:
[ <labelname>: <string> ]
exp_annotations:
[ <labelname>: <string> ]
```
#### `<metricsql_expr_test>`
```
# Expression to evaluate
expr: <string>
# The time elapsed from time=0s when this expression be evaluated.
eval_time: <duration>
# Expected samples at the given evaluation time.
exp_samples:
[ - <sample> ]
```
#### `<sample>`
```
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
labels: <string>
# The expected value of the Metricsql expression.
value: <number>
```
### Example
This is an example input file for unit testing which will pass.
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
#### `test.yaml`
```
rule_files:
- rules.yaml
evaluation_interval: 1m
tests:
- interval: 1m
input_series:
- series: 'up{job="prometheus", instance="localhost:9090"}'
values: "0+0x1440"
metricsql_expr_test:
- expr: suquery_interval_test
eval_time: 4m
exp_samples:
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
value: 1
alert_rule_test:
- eval_time: 2h
groupname: group1
alertname: InstanceDown
exp_alerts:
- exp_labels:
job: prometheus
severity: page
instance: localhost:9090
datacenter: dc-123
exp_annotations:
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
exp_alerts:
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: group1
alertname: InstanceDown
exp_alerts: []
external_labels:
datacenter: dc-123
```
#### `alerts.yaml`
```
# This is the rules file.
groups:
- name: group1
rules:
- alert: InstanceDown
expr: up == 0
for: 5m
labels:
severity: page
annotations:
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: AlwaysFiring
expr: 1
- name: group2
rules:
- record: job:test:count_over_time1m
expr: sum without(instance) (count_over_time(test[1m]))
- record: suquery_interval_test
expr: count_over_time(up[5m:])
```
## Monitoring ## Monitoring
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page. `vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
@ -1535,11 +1292,6 @@ The shortlist of configuration flags is the following:
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
-tlsMinVersion string -tlsMinVersion string
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13 Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
-unittestFile array
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
Examples:
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
-version -version
Show VictoriaMetrics version Show VictoriaMetrics version
``` ```

View file

@ -274,7 +274,7 @@ func (g *Group) close() {
var skipRandSleepOnGroupStart bool var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, rr datasource.QuerierBuilder) { func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
defer func() { close(g.finishedCh) }() defer func() { close(g.finishedCh) }()
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics. // Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
@ -422,7 +422,7 @@ type executor struct {
notifiers func() []notifier.Notifier notifiers func() []notifier.Notifier
notifierHeaders map[string]string notifierHeaders map[string]string
rw remotewrite.RWClient rw *remotewrite.Client
previouslySentSeriesToRWMu sync.Mutex previouslySentSeriesToRWMu sync.Mutex
// previouslySentSeriesToRW stores series sent to RW on previous iteration // previouslySentSeriesToRW stores series sent to RW on previous iteration

View file

@ -399,7 +399,7 @@ func configsEqual(a, b []config.Group) bool {
// setConfigSuccess sets config reload status to 1. // setConfigSuccess sets config reload status to 1.
func setConfigSuccess(at uint64) { func setConfigSuccess(at uint64) {
configSuccess.Set(1) configSuccess.Set(1)
configTimestamp.Set(at) configTimestamp.Set(fasttime.UnixTimestamp())
// reset the error if any // reset the error if any
setConfigErr(nil) setConfigErr(nil)
} }

View file

@ -19,7 +19,7 @@ type manager struct {
querierBuilder datasource.QuerierBuilder querierBuilder datasource.QuerierBuilder
notifiers func() []notifier.Notifier notifiers func() []notifier.Notifier
rw remotewrite.RWClient rw *remotewrite.Client
// remote read builder. // remote read builder.
rr datasource.QuerierBuilder rr datasource.QuerierBuilder

View file

@ -257,7 +257,7 @@ func TestManagerUpdate(t *testing.T) {
func TestManagerUpdateNegative(t *testing.T) { func TestManagerUpdateNegative(t *testing.T) {
testCases := []struct { testCases := []struct {
notifiers []notifier.Notifier notifiers []notifier.Notifier
rw remotewrite.RWClient rw *remotewrite.Client
cfg config.Group cfg config.Group
expErr string expErr string
}{ }{

View file

@ -1,320 +0,0 @@
package remotewrite
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
var (
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
)
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
addr string
c *http.Client
authCfg *promauth.Config
input chan prompbmarshal.TimeSeries
flushInterval time.Duration
maxBatchSize int
maxQueueSize int
wg sync.WaitGroup
doneCh chan struct{}
}
// Config is config for remote write.
type Config struct {
// Addr of remote storage
Addr string
AuthCfg *promauth.Config
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
// populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
}
const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 1e5
defaultFlushInterval = 5 * time.Second
defaultWriteTimeout = 30 * time.Second
)
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Addr == "" {
return nil, fmt.Errorf("config.Addr can't be empty")
}
if cfg.MaxBatchSize == 0 {
cfg.MaxBatchSize = defaultMaxBatchSize
}
if cfg.MaxQueueSize == 0 {
cfg.MaxQueueSize = defaultMaxQueueSize
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,
maxQueueSize: cfg.MaxQueueSize,
doneCh: make(chan struct{}),
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
}
for i := 0; i < cc; i++ {
c.run(ctx)
}
return c, nil
}
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
select {
case <-c.doneCh:
return fmt.Errorf("client is closed")
case c.input <- s:
return nil
default:
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
c.maxQueueSize)
}
}
// Close stops the client and waits for all goroutines
// to exit.
func (c *Client) Close() error {
if c.doneCh == nil {
return fmt.Errorf("client is already closed")
}
close(c.input)
close(c.doneCh)
c.wg.Wait()
return nil
}
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
}
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
c.flush(lastCtx, wr)
cancel()
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer ticker.Stop()
for {
select {
case <-c.doneCh:
shutdown()
return
case <-ctx.Done():
shutdown()
return
case <-ticker.C:
c.flush(ctx, wr)
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
}
}
}
}()
}
var (
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
return float64(*concurrency)
})
)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote-write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
return
}
defer prompbmarshal.ResetWriteRequest(wr)
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
return
}
b := snappy.Encode(nil, data)
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
}
timeStart := time.Now()
defer sendDuration.Add(time.Since(timeStart).Seconds())
L:
for attempts := 0; ; attempts++ {
err := c.send(ctx, b)
if err == nil {
sentRows.Add(len(wr.Timeseries))
sentBytes.Add(len(b))
return
}
_, isNotRetriable := err.(*nonRetriableError)
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
if isNotRetriable {
// exit fast if error isn't retriable
break
}
// check if request has been cancelled before backoff
select {
case <-ctx.Done():
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
break L
default:
}
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
if timeLeftForRetries < 0 {
// the max retry time has passed, so we give up
break
}
if retryInterval > timeLeftForRetries {
retryInterval = timeLeftForRetries
}
// sleeping to prevent remote db hammering
time.Sleep(retryInterval)
retryInterval *= 2
}
droppedRows.Add(len(wr.Timeseries))
droppedBytes.Add(len(b))
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
len(wr.Timeseries))
}
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil {
c.authCfg.SetHeaders(req, true)
}
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
}
resp, err := c.c.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
// according to https://prometheus.io/docs/concepts/remote_write_spec/
// Prometheus remote Write compatible receivers MUST
switch resp.StatusCode / 100 {
case 2:
// respond with a HTTP 2xx status code when the write is successful.
return nil
case 4:
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)}
}
fallthrough
default:
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
}
}
type nonRetriableError struct {
err error
}
func (e *nonRetriableError) Error() string {
return e.err.Error()
}

View file

@ -1,97 +0,0 @@
package remotewrite
import (
"bytes"
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
// DebugClient won't push series periodically, but will write data to remote endpoint
// immediately when Push() is called
type DebugClient struct {
addr string
c *http.Client
wg sync.WaitGroup
}
// NewDebugClient initiates and returns a new DebugClient
func NewDebugClient() (*DebugClient, error) {
if *addr == "" {
return nil, nil
}
t, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
}
c := &DebugClient{
c: &http.Client{
Timeout: *sendTimeout,
Transport: t,
},
addr: strings.TrimSuffix(*addr, "/"),
}
return c, nil
}
// Push sends the given timeseries to the remote storage.
func (c *DebugClient) Push(s prompbmarshal.TimeSeries) error {
c.wg.Add(1)
defer c.wg.Done()
wr := &prompbmarshal.WriteRequest{Timeseries: []prompbmarshal.TimeSeries{s}}
data, err := wr.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal the given time series: %w", err)
}
return c.send(data)
}
// Close stops the DebugClient
func (c *DebugClient) Close() error {
c.wg.Wait()
return nil
}
func (c *DebugClient) send(data []byte) error {
b := snappy.Encode(nil, data)
r := bytes.NewReader(b)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
}
resp, err := c.c.Do(req)
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode/100 == 2 {
return nil
}
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
}

View file

@ -1,50 +0,0 @@
package remotewrite
import (
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestDebugClient_Push(t *testing.T) {
testSrv := newRWServer()
oldAddr := *addr
*addr = testSrv.URL
defer func() {
*addr = oldAddr
}()
client, err := NewDebugClient()
if err != nil {
t.Fatalf("failed to create debug client: %s", err)
}
const rowsN = 100
var sent int
for i := 0; i < rowsN; i++ {
s := prompbmarshal.TimeSeries{
Samples: []prompbmarshal.Sample{{
Value: float64(i),
Timestamp: time.Now().Unix(),
}},
}
err := client.Push(s)
if err != nil {
t.Fatalf("unexpected err: %s", err)
}
if err == nil {
sent++
}
}
if sent == 0 {
t.Fatalf("0 series sent")
}
if err := client.Close(); err != nil {
t.Fatalf("failed to close client: %s", err)
}
got := testSrv.accepted()
if got != sent {
t.Fatalf("expected to have %d series; got %d", sent, got)
}
}

View file

@ -1,13 +1,320 @@
package remotewrite package remotewrite
import ( import (
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"path"
"strings"
"sync"
"time"
"github.com/golang/snappy"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
) )
// RWClient represents an HTTP client for pushing data via remote write protocol var (
type RWClient interface { disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
// Push pushes the give time series to remote storage sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
Push(s prompbmarshal.TimeSeries) error retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
// Close stops the client. Client can't be reused after Close call. retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
Close() error )
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
addr string
c *http.Client
authCfg *promauth.Config
input chan prompbmarshal.TimeSeries
flushInterval time.Duration
maxBatchSize int
maxQueueSize int
wg sync.WaitGroup
doneCh chan struct{}
}
// Config is config for remote write.
type Config struct {
// Addr of remote storage
Addr string
AuthCfg *promauth.Config
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
// populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
}
const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 1e5
defaultFlushInterval = 5 * time.Second
defaultWriteTimeout = 30 * time.Second
)
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Addr == "" {
return nil, fmt.Errorf("config.Addr can't be empty")
}
if cfg.MaxBatchSize == 0 {
cfg.MaxBatchSize = defaultMaxBatchSize
}
if cfg.MaxQueueSize == 0 {
cfg.MaxQueueSize = defaultMaxQueueSize
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
}
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
}
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
},
addr: strings.TrimSuffix(cfg.Addr, "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,
maxQueueSize: cfg.MaxQueueSize,
doneCh: make(chan struct{}),
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
}
for i := 0; i < cc; i++ {
c.run(ctx)
}
return c, nil
}
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
select {
case <-c.doneCh:
return fmt.Errorf("client is closed")
case c.input <- s:
return nil
default:
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
c.maxQueueSize)
}
}
// Close stops the client and waits for all goroutines
// to exit.
func (c *Client) Close() error {
if c.doneCh == nil {
return fmt.Errorf("client is already closed")
}
close(c.input)
close(c.doneCh)
c.wg.Wait()
return nil
}
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
}
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
c.flush(lastCtx, wr)
cancel()
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
defer ticker.Stop()
for {
select {
case <-c.doneCh:
shutdown()
return
case <-ctx.Done():
shutdown()
return
case <-ticker.C:
c.flush(ctx, wr)
case ts, ok := <-c.input:
if !ok {
continue
}
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
}
}
}
}()
}
var (
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
return float64(*concurrency)
})
)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote-write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
return
}
defer prompbmarshal.ResetWriteRequest(wr)
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
return
}
b := snappy.Encode(nil, data)
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
}
timeStart := time.Now()
defer sendDuration.Add(time.Since(timeStart).Seconds())
L:
for attempts := 0; ; attempts++ {
err := c.send(ctx, b)
if err == nil {
sentRows.Add(len(wr.Timeseries))
sentBytes.Add(len(b))
return
}
_, isNotRetriable := err.(*nonRetriableError)
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
if isNotRetriable {
// exit fast if error isn't retriable
break
}
// check if request has been cancelled before backoff
select {
case <-ctx.Done():
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
break L
default:
}
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
if timeLeftForRetries < 0 {
// the max retry time has passed, so we give up
break
}
if retryInterval > timeLeftForRetries {
retryInterval = timeLeftForRetries
}
// sleeping to prevent remote db hammering
time.Sleep(retryInterval)
retryInterval *= 2
}
droppedRows.Add(len(wr.Timeseries))
droppedBytes.Add(len(b))
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
len(wr.Timeseries))
}
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
}
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil {
c.authCfg.SetHeaders(req, true)
}
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
}
resp, err := c.c.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
// according to https://prometheus.io/docs/concepts/remote_write_spec/
// Prometheus remote Write compatible receivers MUST
switch resp.StatusCode / 100 {
case 2:
// respond with a HTTP 2xx status code when the write is successful.
return nil
case 4:
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)}
}
fallthrough
default:
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
}
}
type nonRetriableError struct {
err error
}
func (e *nonRetriableError) Error() string {
return e.err.Error()
} }

View file

@ -33,7 +33,7 @@ var (
"Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.") "Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.")
) )
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewrite.RWClient) error { func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewrite.Client) error {
if *replayMaxDatapoints < 1 { if *replayMaxDatapoints < 1 {
return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1") return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1")
} }
@ -78,7 +78,7 @@ func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewri
return nil return nil
} }
func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int { func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
var total int var total int
step := g.Interval * time.Duration(*replayMaxDatapoints) step := g.Interval * time.Duration(*replayMaxDatapoints)
ri := rangeIterator{start: start, end: end, step: step} ri := rangeIterator{start: start, end: end, step: step}
@ -119,7 +119,7 @@ func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int {
return total return total
} }
func replayRule(rule Rule, start, end time.Time, rw remotewrite.RWClient) (int, error) { func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
var err error var err error
var tss []prompbmarshal.TimeSeries var tss []prompbmarshal.TimeSeries
for i := 0; i < *replayRuleRetryAttempts; i++ { for i := 0; i < *replayRuleRetryAttempts; i++ {

View file

@ -24,6 +24,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components
## tip ## tip
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): revert unit test feature for alerting and recording rules introduced in [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4596). See the following [change](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4734).
## [v1.92.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.92.0) ## [v1.92.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.92.0)
Released at 2023-07-27 Released at 2023-07-27

View file

@ -753,249 +753,6 @@ See full description for these flags in `./vmalert -help`.
* `limit` group's param has no effect during replay (might be changed in future); * `limit` group's param has no effect during replay (might be changed in future);
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future). * `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
## Unit Testing for Rules
> Unit testing is available from v1.92.0.
> Unit tests do not respect `-clusterMode` for now.
You can use `vmalert` to run unit tests for alerting and recording rules.
In unit test mode vmalert performs the following actions:
* sets up an isolated VictoriaMetrics instance;
* simulates the periodic ingestion of time series;
* queries the ingested data for recording and alerting rules evaluation;
* tests whether the firing alerts or resulting recording rules match the expected results.
See how to run vmalert in unit test mode below:
```
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
```
vmalert is compatible with [Prometheus config format for tests](https://prometheus.io/docs/prometheus/latest/configuration/unit_testing_rules/#test-file-format)
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
validates and executes [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html) expressions,
which aren't always backward compatible with [PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/).
### Test file format
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
```
# Path to the files or http url containing [rule groups](https://docs.victoriametrics.com/vmalert.html#groups) configuration.
# Enterprise version of vmalert supports S3 and GCS paths to rules.
rule_files:
[ - <string> ]
# The evaluation interval for rules specified in `rule_files`
[ evaluation_interval: <duration> | default = 1m ]
# Groups listed below will be evaluated by order.
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
group_eval_order:
[ - <string> ]
# The list of unit test files to be checked during evaluation.
tests:
[ - <test_group> ]
```
#### `<test_group>`
```
# Interval between samples for input series
interval: <duration>
# Time series to persist into the database according to configured <interval> before running tests.
input_series:
[ - <series> ]
# Name of the test group, optional
[ name: <string> ]
# Unit tests for alerting rules
alert_rule_test:
[ - <alert_test_case> ]
# Unit tests for Metricsql expressions.
metricsql_expr_test:
[ - <metricsql_expr_test> ]
# External labels accessible for templating.
external_labels:
[ <labelname>: <string> ... ]
```
#### `<series>`
```
# series in the following format '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
series: <string>
# values support several special equations:
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
# Read this as series starts at a, then c further samples incrementing by b.
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
# '_' represents a missing sample from scrape
# 'stale' indicates a stale sample
# Examples:
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
# 4. ' 1 _x3 stale' becomes '1 _ _ _ stale' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
values: <string>
```
#### `<alert_test_case>`
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
but no need to add them under `exp_alerts`.
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
```
# The time elapsed from time=0s when this alerting rule should be checked.
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
eval_time: <duration>
# Name of the group name to be tested.
groupname: <string>
# Name of the alert to be tested.
alertname: <string>
# List of the expected alerts that are firing under the given alertname at
# the given evaluation time. If you want to test if an alerting rule should
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
exp_alerts:
[ - <alert> ]
```
#### `<alert>`
```
# These are the expanded labels and annotations of the expected alert.
# Note: labels also include the labels of the sample associated with the alert
exp_labels:
[ <labelname>: <string> ]
exp_annotations:
[ <labelname>: <string> ]
```
#### `<metricsql_expr_test>`
```
# Expression to evaluate
expr: <string>
# The time elapsed from time=0s when this expression be evaluated.
eval_time: <duration>
# Expected samples at the given evaluation time.
exp_samples:
[ - <sample> ]
```
#### `<sample>`
```
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
labels: <string>
# The expected value of the Metricsql expression.
value: <number>
```
### Example
This is an example input file for unit testing which will pass.
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
#### `test.yaml`
```
rule_files:
- rules.yaml
evaluation_interval: 1m
tests:
- interval: 1m
input_series:
- series: 'up{job="prometheus", instance="localhost:9090"}'
values: "0+0x1440"
metricsql_expr_test:
- expr: suquery_interval_test
eval_time: 4m
exp_samples:
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
value: 1
alert_rule_test:
- eval_time: 2h
groupname: group1
alertname: InstanceDown
exp_alerts:
- exp_labels:
job: prometheus
severity: page
instance: localhost:9090
datacenter: dc-123
exp_annotations:
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
exp_alerts:
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: group1
alertname: InstanceDown
exp_alerts: []
external_labels:
datacenter: dc-123
```
#### `alerts.yaml`
```
# This is the rules file.
groups:
- name: group1
rules:
- alert: InstanceDown
expr: up == 0
for: 5m
labels:
severity: page
annotations:
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: AlwaysFiring
expr: 1
- name: group2
rules:
- record: job:test:count_over_time1m
expr: sum without(instance) (count_over_time(test[1m]))
- record: suquery_interval_test
expr: count_over_time(up[5m:])
```
## Monitoring ## Monitoring
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page. `vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
@ -1546,11 +1303,6 @@ The shortlist of configuration flags is the following:
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
-tlsMinVersion string -tlsMinVersion string
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13 Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
-unittestFile array
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
Examples:
-unittestFile="./unittest/testdata/test1.yaml,./unittest/testdata/test2.yaml".
See more information here https://docs.victoriametrics.com/vmalert.html#unit-testing-for-rules.
-version -version
Show VictoriaMetrics version Show VictoriaMetrics version
``` ```

View file

@ -45,14 +45,6 @@ func (pd *Duration) Duration() time.Duration {
return pd.D return pd.D
} }
// ParseTime returns time for pd.
func (pd *Duration) ParseTime() time.Time {
if pd == nil {
return time.Time{}
}
return time.UnixMilli(pd.Duration().Milliseconds())
}
// ParseDuration parses duration string in Prometheus format // ParseDuration parses duration string in Prometheus format
func ParseDuration(s string) (time.Duration, error) { func ParseDuration(s string) (time.Duration, error) {
ms, err := metricsql.DurationValue(s, 0) ms, err := metricsql.DurationValue(s, 0)