Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2023-07-28 11:20:54 -07:00
commit e16015fa3b
No known key found for this signature in database
35 changed files with 351 additions and 2260 deletions

View file

@ -74,7 +74,6 @@ test-vmalert:
go test -v -race -cover ./app/vmalert/config
go test -v -race -cover ./app/vmalert/remotewrite
go test -v -race -cover ./app/vmalert/utils
go test -v -race -cover ./app/vmalert/unittest
run-vmalert: vmalert
./bin/vmalert -rule=app/vmalert/config/testdata/rules/rules2-good.rules \
@ -104,10 +103,6 @@ replay-vmalert: vmalert
-replay.timeFrom=2021-05-11T07:21:43Z \
unittest-vmalert: vmalert
./bin/vmalert -unittestFile=app/vmalert/unittest/testdata/test1.yaml \
APP_NAME=vmalert CGO_ENABLED=1 GOOS=linux GOARCH=amd64 $(MAKE) app-local-goos-goarch

View file

@ -378,7 +378,7 @@ to persist the state to the remote destination via the following flags:
Both flags are required for proper state restoration. Restore process may fail if time series are missing
in configured `-remoteRead.url`, weren't updated in the last `1h` (controlled by `-remoteRead.lookback`)
or received state doesn't match current `vmalert` rules configuration. `vmalert` marks successfully restored rules
with `restored` label in [web UI](#WEB).
with `restored` label in [web UI](#web).
### Multitenancy
@ -742,249 +742,6 @@ See full description for these flags in `./vmalert -help`.
* `limit` group's param has no effect during replay (might be changed in future);
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
## Unit Testing for Rules
> Unit testing is available from v1.92.0.
> Unit tests do not respect `-clusterMode` for now.
You can use `vmalert` to run unit tests for alerting and recording rules.
In unit test mode vmalert performs the following actions:
* sets up an isolated VictoriaMetrics instance;
* simulates the periodic ingestion of time series;
* queries the ingested data for recording and alerting rules evaluation;
* tests whether the firing alerts or resulting recording rules match the expected results.
See how to run vmalert in unit test mode below:
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
vmalert is compatible with [Prometheus config format for tests](
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
validates and executes [MetricsQL]( expressions,
which aren't always backward compatible with [PromQL](
### Test file format
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
# Path to the files or http url containing [rule groups]( configuration.
# Enterprise version of vmalert supports S3 and GCS paths to rules.
[ - <string> ]
# The evaluation interval for rules specified in `rule_files`
[ evaluation_interval: <duration> | default = 1m ]
# Groups listed below will be evaluated by order.
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
[ - <string> ]
# The list of unit test files to be checked during evaluation.
[ - <test_group> ]
#### `<test_group>`
# Interval between samples for input series
interval: <duration>
# Time series to persist into the database according to configured <interval> before running tests.
[ - <series> ]
# Name of the test group, optional
[ name: <string> ]
# Unit tests for alerting rules
[ - <alert_test_case> ]
# Unit tests for Metricsql expressions.
[ - <metricsql_expr_test> ]
# External labels accessible for templating.
[ <labelname>: <string> ... ]
#### `<series>`
# series in the following format '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
series: <string>
# values support several special equations:
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
# Read this as series starts at a, then c further samples incrementing by b.
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
# '_' represents a missing sample from scrape
# 'stale' indicates a stale sample
# Examples:
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
# 4. ' 1 _x3 stale' becomes '1 _ _ _ stale' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
values: <string>
#### `<alert_test_case>`
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
but no need to add them under `exp_alerts`.
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
# The time elapsed from time=0s when this alerting rule should be checked.
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
eval_time: <duration>
# Name of the group name to be tested.
groupname: <string>
# Name of the alert to be tested.
alertname: <string>
# List of the expected alerts that are firing under the given alertname at
# the given evaluation time. If you want to test if an alerting rule should
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
[ - <alert> ]
#### `<alert>`
# These are the expanded labels and annotations of the expected alert.
# Note: labels also include the labels of the sample associated with the alert
[ <labelname>: <string> ]
[ <labelname>: <string> ]
#### `<metricsql_expr_test>`
# Expression to evaluate
expr: <string>
# The time elapsed from time=0s when this expression be evaluated.
eval_time: <duration>
# Expected samples at the given evaluation time.
[ - <sample> ]
#### `<sample>`
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
labels: <string>
# The expected value of the Metricsql expression.
value: <number>
### Example
This is an example input file for unit testing which will pass.
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
#### `test.yaml`
- rules.yaml
evaluation_interval: 1m
- interval: 1m
- series: 'up{job="prometheus", instance="localhost:9090"}'
values: "0+0x1440"
- expr: suquery_interval_test
eval_time: 4m
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
value: 1
- eval_time: 2h
groupname: group1
alertname: InstanceDown
- exp_labels:
job: prometheus
severity: page
instance: localhost:9090
datacenter: dc-123
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: group1
alertname: InstanceDown
exp_alerts: []
datacenter: dc-123
#### `alerts.yaml`
# This is the rules file.
- name: group1
- alert: InstanceDown
expr: up == 0
for: 5m
severity: page
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: AlwaysFiring
expr: 1
- name: group2
- record: job:test:count_over_time1m
expr: sum without(instance) (count_over_time(test[1m]))
- record: suquery_interval_test
expr: count_over_time(up[5m:])
## Monitoring
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
@ -1535,11 +1292,6 @@ The shortlist of configuration flags is the following:
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
-tlsMinVersion string
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
-unittestFile array
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
See more information here
Show VictoriaMetrics version
@ -1607,7 +1359,7 @@ dns_sd_configs:
port: 9093
The list of configured or discovered Notifiers can be explored via [UI](#Web).
The list of configured or discovered Notifiers can be explored via [UI](#web).
If Alertmanager runs in cluster mode then all its URLs needs to be available during discovery
to ensure [high availability](

View file

@ -274,7 +274,7 @@ func (g *Group) close() {
var skipRandSleepOnGroupStart bool
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw remotewrite.RWClient, rr datasource.QuerierBuilder) {
func (g *Group) start(ctx context.Context, nts func() []notifier.Notifier, rw *remotewrite.Client, rr datasource.QuerierBuilder) {
defer func() { close(g.finishedCh) }()
// Spread group rules evaluation over time in order to reduce load on VictoriaMetrics.
@ -422,7 +422,7 @@ type executor struct {
notifiers func() []notifier.Notifier
notifierHeaders map[string]string
rw remotewrite.RWClient
rw *remotewrite.Client
previouslySentSeriesToRWMu sync.Mutex
// previouslySentSeriesToRW stores series sent to RW on previous iteration

View file

@ -91,12 +91,7 @@ absolute path to all .tpl files in root.
disableAlertGroupLabel = flag.Bool("disableAlertgroupLabel", false, "Whether to disable adding group's Name as label to generated alerts and time series.")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.")
unitTestFiles = flagutil.NewArrayString("unittestFile", `Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
See more information here
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified.")
var alertURLGeneratorFn notifier.AlertURLGenerator
@ -122,13 +117,6 @@ func main() {
logger.Fatalf("failed to parse %q: %s", *ruleTemplatesPath, err)
if len(*unitTestFiles) > 0 {
if unitRule(*unitTestFiles...) {
if *dryRun {
groups, err := config.Parse(*rulePath, notifier.ValidateTemplates, true)
if err != nil {

View file

@ -19,7 +19,7 @@ type manager struct {
querierBuilder datasource.QuerierBuilder
notifiers func() []notifier.Notifier
rw remotewrite.RWClient
rw *remotewrite.Client
// remote read builder.
rr datasource.QuerierBuilder

View file

@ -257,7 +257,7 @@ func TestManagerUpdate(t *testing.T) {
func TestManagerUpdateNegative(t *testing.T) {
testCases := []struct {
notifiers []notifier.Notifier
rw remotewrite.RWClient
rw *remotewrite.Client
cfg config.Group
expErr string

View file

@ -1,320 +0,0 @@
package remotewrite
import (
var (
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
addr string
c *http.Client
authCfg *promauth.Config
input chan prompbmarshal.TimeSeries
flushInterval time.Duration
maxBatchSize int
maxQueueSize int
wg sync.WaitGroup
doneCh chan struct{}
// Config is config for remote write.
type Config struct {
// Addr of remote storage
Addr string
AuthCfg *promauth.Config
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
// populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 1e5
defaultFlushInterval = 5 * time.Second
defaultWriteTimeout = 30 * time.Second
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Addr == "" {
return nil, fmt.Errorf("config.Addr can't be empty")
if cfg.MaxBatchSize == 0 {
cfg.MaxBatchSize = defaultMaxBatchSize
if cfg.MaxQueueSize == 0 {
cfg.MaxQueueSize = defaultMaxQueueSize
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
addr: strings.TrimSuffix(cfg.Addr, "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,
maxQueueSize: cfg.MaxQueueSize,
doneCh: make(chan struct{}),
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
for i := 0; i < cc; i++ {
return c, nil
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
select {
case <-c.doneCh:
return fmt.Errorf("client is closed")
case c.input <- s:
return nil
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
// Close stops the client and waits for all goroutines
// to exit.
func (c *Client) Close() error {
if c.doneCh == nil {
return fmt.Errorf("client is already closed")
return nil
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
c.flush(lastCtx, wr)
go func() {
defer c.wg.Done()
defer ticker.Stop()
for {
select {
case <-c.doneCh:
case <-ctx.Done():
case <-ticker.C:
c.flush(ctx, wr)
case ts, ok := <-c.input:
if !ok {
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
var (
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
return float64(*concurrency)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote-write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
defer prompbmarshal.ResetWriteRequest(wr)
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
b := snappy.Encode(nil, data)
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
timeStart := time.Now()
defer sendDuration.Add(time.Since(timeStart).Seconds())
for attempts := 0; ; attempts++ {
err := c.send(ctx, b)
if err == nil {
_, isNotRetriable := err.(*nonRetriableError)
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
if isNotRetriable {
// exit fast if error isn't retriable
// check if request has been cancelled before backoff
select {
case <-ctx.Done():
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
break L
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
if timeLeftForRetries < 0 {
// the max retry time has passed, so we give up
if retryInterval > timeLeftForRetries {
retryInterval = timeLeftForRetries
// sleeping to prevent remote db hammering
retryInterval *= 2
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil {
c.authCfg.SetHeaders(req, true)
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
resp, err := c.c.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
// according to
// Prometheus remote Write compatible receivers MUST
switch resp.StatusCode / 100 {
case 2:
// respond with a HTTP 2xx status code when the write is successful.
return nil
case 4:
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)}
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
type nonRetriableError struct {
err error
func (e *nonRetriableError) Error() string {
return e.err.Error()

View file

@ -1,97 +0,0 @@
package remotewrite
import (
// DebugClient won't push series periodically, but will write data to remote endpoint
// immediately when Push() is called
type DebugClient struct {
addr string
c *http.Client
wg sync.WaitGroup
// NewDebugClient initiates and returns a new DebugClient
func NewDebugClient() (*DebugClient, error) {
if *addr == "" {
return nil, nil
t, err := utils.Transport(*addr, *tlsCertFile, *tlsKeyFile, *tlsCAFile, *tlsServerName, *tlsInsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %w", err)
c := &DebugClient{
c: &http.Client{
Timeout: *sendTimeout,
Transport: t,
addr: strings.TrimSuffix(*addr, "/"),
return c, nil
// Push sends the given timeseries to the remote storage.
func (c *DebugClient) Push(s prompbmarshal.TimeSeries) error {
defer c.wg.Done()
wr := &prompbmarshal.WriteRequest{Timeseries: []prompbmarshal.TimeSeries{s}}
data, err := wr.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal the given time series: %w", err)
return c.send(data)
// Close stops the DebugClient
func (c *DebugClient) Close() error {
return nil
func (c *DebugClient) send(data []byte) error {
b := snappy.Encode(nil, data)
r := bytes.NewReader(b)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
resp, err := c.c.Do(req)
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode/100 == 2 {
return nil
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)

View file

@ -1,50 +0,0 @@
package remotewrite
import (
func TestDebugClient_Push(t *testing.T) {
testSrv := newRWServer()
oldAddr := *addr
*addr = testSrv.URL
defer func() {
*addr = oldAddr
client, err := NewDebugClient()
if err != nil {
t.Fatalf("failed to create debug client: %s", err)
const rowsN = 100
var sent int
for i := 0; i < rowsN; i++ {
s := prompbmarshal.TimeSeries{
Samples: []prompbmarshal.Sample{{
Value: float64(i),
Timestamp: time.Now().Unix(),
err := client.Push(s)
if err != nil {
t.Fatalf("unexpected err: %s", err)
if err == nil {
if sent == 0 {
t.Fatalf("0 series sent")
if err := client.Close(); err != nil {
t.Fatalf("failed to close client: %s", err)
got := testSrv.accepted()
if got != sent {
t.Fatalf("expected to have %d series; got %d", sent, got)

View file

@ -1,13 +1,320 @@
package remotewrite
import (
// RWClient represents an HTTP client for pushing data via remote write protocol
type RWClient interface {
// Push pushes the give time series to remote storage
Push(s prompbmarshal.TimeSeries) error
// Close stops the client. Client can't be reused after Close call.
Close() error
var (
disablePathAppend = flag.Bool("remoteWrite.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/write' path to the configured -remoteWrite.url.")
sendTimeout = flag.Duration("remoteWrite.sendTimeout", 30*time.Second, "Timeout for sending data to the configured -remoteWrite.url.")
retryMinInterval = flag.Duration("remoteWrite.retryMinInterval", time.Second, "The minimum delay between retry attempts. Every next retry attempt will double the delay to prevent hammering of remote database. See also -remoteWrite.retryMaxInterval")
retryMaxTime = flag.Duration("remoteWrite.retryMaxTime", time.Second*30, "The max time spent on retry attempts for the failed remote-write request. Change this value if it is expected for remoteWrite.url to be unreachable for more than -remoteWrite.retryMaxTime. See also -remoteWrite.retryMinInterval")
// Client is an asynchronous HTTP client for writing
// timeseries via remote write protocol.
type Client struct {
addr string
c *http.Client
authCfg *promauth.Config
input chan prompbmarshal.TimeSeries
flushInterval time.Duration
maxBatchSize int
maxQueueSize int
wg sync.WaitGroup
doneCh chan struct{}
// Config is config for remote write.
type Config struct {
// Addr of remote storage
Addr string
AuthCfg *promauth.Config
// Concurrency defines number of readers that
// concurrently read from the queue and flush data
Concurrency int
// MaxBatchSize defines max number of timeseries
// to be flushed at once
MaxBatchSize int
// MaxQueueSize defines max length of input queue
// populated by Push method.
// Push will be rejected once queue is full.
MaxQueueSize int
// FlushInterval defines time interval for flushing batches
FlushInterval time.Duration
// Transport will be used by the underlying http.Client
Transport *http.Transport
const (
defaultConcurrency = 4
defaultMaxBatchSize = 1e3
defaultMaxQueueSize = 1e5
defaultFlushInterval = 5 * time.Second
defaultWriteTimeout = 30 * time.Second
// NewClient returns asynchronous client for
// writing timeseries via remotewrite protocol.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
if cfg.Addr == "" {
return nil, fmt.Errorf("config.Addr can't be empty")
if cfg.MaxBatchSize == 0 {
cfg.MaxBatchSize = defaultMaxBatchSize
if cfg.MaxQueueSize == 0 {
cfg.MaxQueueSize = defaultMaxQueueSize
if cfg.FlushInterval == 0 {
cfg.FlushInterval = defaultFlushInterval
if cfg.Transport == nil {
cfg.Transport = http.DefaultTransport.(*http.Transport).Clone()
cc := defaultConcurrency
if cfg.Concurrency > 0 {
cc = cfg.Concurrency
c := &Client{
c: &http.Client{
Timeout: *sendTimeout,
Transport: cfg.Transport,
addr: strings.TrimSuffix(cfg.Addr, "/"),
authCfg: cfg.AuthCfg,
flushInterval: cfg.FlushInterval,
maxBatchSize: cfg.MaxBatchSize,
maxQueueSize: cfg.MaxQueueSize,
doneCh: make(chan struct{}),
input: make(chan prompbmarshal.TimeSeries, cfg.MaxQueueSize),
for i := 0; i < cc; i++ {
return c, nil
// Push adds timeseries into queue for writing into remote storage.
// Push returns and error if client is stopped or if queue is full.
func (c *Client) Push(s prompbmarshal.TimeSeries) error {
select {
case <-c.doneCh:
return fmt.Errorf("client is closed")
case c.input <- s:
return nil
return fmt.Errorf("failed to push timeseries - queue is full (%d entries). "+
"Queue size is controlled by -remoteWrite.maxQueueSize flag",
// Close stops the client and waits for all goroutines
// to exit.
func (c *Client) Close() error {
if c.doneCh == nil {
return fmt.Errorf("client is already closed")
return nil
func (c *Client) run(ctx context.Context) {
ticker := time.NewTicker(c.flushInterval)
wr := &prompbmarshal.WriteRequest{}
shutdown := func() {
for ts := range c.input {
wr.Timeseries = append(wr.Timeseries, ts)
lastCtx, cancel := context.WithTimeout(context.Background(), defaultWriteTimeout)
logger.Infof("shutting down remote write client and flushing remained %d series", len(wr.Timeseries))
c.flush(lastCtx, wr)
go func() {
defer c.wg.Done()
defer ticker.Stop()
for {
select {
case <-c.doneCh:
case <-ctx.Done():
case <-ticker.C:
c.flush(ctx, wr)
case ts, ok := <-c.input:
if !ok {
wr.Timeseries = append(wr.Timeseries, ts)
if len(wr.Timeseries) >= c.maxBatchSize {
c.flush(ctx, wr)
var (
sentRows = metrics.NewCounter(`vmalert_remotewrite_sent_rows_total`)
sentBytes = metrics.NewCounter(`vmalert_remotewrite_sent_bytes_total`)
sendDuration = metrics.NewFloatCounter(`vmalert_remotewrite_send_duration_seconds_total`)
droppedRows = metrics.NewCounter(`vmalert_remotewrite_dropped_rows_total`)
droppedBytes = metrics.NewCounter(`vmalert_remotewrite_dropped_bytes_total`)
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
_ = metrics.NewGauge(`vmalert_remotewrite_concurrency`, func() float64 {
return float64(*concurrency)
// flush is a blocking function that marshals WriteRequest and sends
// it to remote-write endpoint. Flush performs limited amount of retries
// if request fails.
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
if len(wr.Timeseries) < 1 {
defer prompbmarshal.ResetWriteRequest(wr)
defer bufferFlushDuration.UpdateDuration(time.Now())
data, err := wr.Marshal()
if err != nil {
logger.Errorf("failed to marshal WriteRequest: %s", err)
b := snappy.Encode(nil, data)
retryInterval, maxRetryInterval := *retryMinInterval, *retryMaxTime
if retryInterval > maxRetryInterval {
retryInterval = maxRetryInterval
timeStart := time.Now()
defer sendDuration.Add(time.Since(timeStart).Seconds())
for attempts := 0; ; attempts++ {
err := c.send(ctx, b)
if err == nil {
_, isNotRetriable := err.(*nonRetriableError)
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
if isNotRetriable {
// exit fast if error isn't retriable
// check if request has been cancelled before backoff
select {
case <-ctx.Done():
logger.Errorf("interrupting retry attempt %d: context cancelled", attempts+1)
break L
timeLeftForRetries := maxRetryInterval - time.Since(timeStart)
if timeLeftForRetries < 0 {
// the max retry time has passed, so we give up
if retryInterval > timeLeftForRetries {
retryInterval = timeLeftForRetries
// sleeping to prevent remote db hammering
retryInterval *= 2
logger.Errorf("attempts to send remote-write request failed - dropping %d time series",
func (c *Client) send(ctx context.Context, data []byte) error {
r := bytes.NewReader(data)
req, err := http.NewRequest(http.MethodPost, c.addr, r)
if err != nil {
return fmt.Errorf("failed to create new HTTP request: %w", err)
// RFC standard compliant headers
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
// Prometheus compliant headers
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
if c.authCfg != nil {
c.authCfg.SetHeaders(req, true)
if !*disablePathAppend {
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
resp, err := c.c.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
req.URL.Redacted(), err, len(data), r.Size())
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
// according to
// Prometheus remote Write compatible receivers MUST
switch resp.StatusCode / 100 {
case 2:
// respond with a HTTP 2xx status code when the write is successful.
return nil
case 4:
if resp.StatusCode != http.StatusTooManyRequests {
// MUST NOT retry write requests on HTTP 4xx responses other than 429
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)}
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
resp.StatusCode, req.URL.Redacted(), body)
type nonRetriableError struct {
err error
func (e *nonRetriableError) Error() string {
return e.err.Error()

View file

@ -33,7 +33,7 @@ var (
"Progress bar rendering might be verbose or break the logs parsing, so it is recommended to be disabled when not used in interactive mode.")
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewrite.RWClient) error {
func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw *remotewrite.Client) error {
if *replayMaxDatapoints < 1 {
return fmt.Errorf("replay.maxDatapointsPerQuery can't be lower than 1")
@ -78,7 +78,7 @@ func replay(groupsCfg []config.Group, qb datasource.QuerierBuilder, rw remotewri
return nil
func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int {
func (g *Group) replay(start, end time.Time, rw *remotewrite.Client) int {
var total int
step := g.Interval * time.Duration(*replayMaxDatapoints)
ri := rangeIterator{start: start, end: end, step: step}
@ -119,7 +119,7 @@ func (g *Group) replay(start, end time.Time, rw remotewrite.RWClient) int {
return total
func replayRule(rule Rule, start, end time.Time, rw remotewrite.RWClient) (int, error) {
func replayRule(rule Rule, start, end time.Time, rw *remotewrite.Client) (int, error) {
var err error
var tss []prompbmarshal.TimeSeries
for i := 0; i < *replayRuleRetryAttempts; i++ {

View file

@ -1,40 +0,0 @@
package main
import (
func TestUnitRule(t *testing.T) {
testCases := []struct {
name string
disableGroupLabel bool
files []string
failed bool
name: "run multi files",
files: []string{"./unittest/testdata/test1.yaml", "./unittest/testdata/test2.yaml"},
failed: false,
name: "disable group label",
disableGroupLabel: true,
files: []string{"./unittest/testdata/disable-group-label.yaml"},
failed: false,
name: "failing test",
files: []string{"./unittest/testdata/failed-test.yaml"},
failed: true,
for _, tc := range testCases {
oldFlag := *disableAlertGroupLabel
*disableAlertGroupLabel = tc.disableGroupLabel
fail := unitRule(tc.files...)
if fail != tc.failed {
t.Fatalf("failed to test %s, expect %t, got %t",, tc.failed, fail)
*disableAlertGroupLabel = oldFlag

View file

@ -1,436 +0,0 @@
package main
import (
vmalertconfig ""
var (
storagePath string
// insert series from 1970-01-01T00:00:00
testStartTime = time.Unix(0, 0).UTC()
testPromWriteHTTPPath = "" + *httpListenAddr + "/api/v1/write"
testDataSourcePath = "" + *httpListenAddr + "/prometheus"
testRemoteWritePath = "" + *httpListenAddr
testHealthHTTPPath = "" + *httpListenAddr + "/health"
const (
testStoragePath = "vmalert-unittest"
testLogLevel = "ERROR"
func unitRule(files ...string) bool {
storagePath = filepath.Join(os.TempDir(), testStoragePath)
// storagePath will be created again when closing vmselect, so remove it again.
defer fs.MustRemoveAll(storagePath)
defer vminsert.Stop()
defer vmselect.Stop()
return rulesUnitTest(files...)
func rulesUnitTest(files ...string) bool {
var failed bool
for _, f := range files {
if err := ruleUnitTest(f); err != nil {
fmt.Println(" FAILED")
fmt.Printf("\nfailed to run unit test for file %q: \n%s", f, err)
failed = true
} else {
fmt.Println(" SUCCESS")
return failed
func ruleUnitTest(filename string) []error {
fmt.Println("\nUnit Testing: ", filename)
b, err := os.ReadFile(filename)
if err != nil {
return []error{fmt.Errorf("failed to read file: %w", err)}
var unitTestInp unitTestFile
if err := yaml.UnmarshalStrict(b, &unitTestInp); err != nil {
return []error{fmt.Errorf("failed to unmarshal file: %w", err)}
if err := resolveAndGlobFilepaths(filepath.Dir(filename), &unitTestInp); err != nil {
return []error{fmt.Errorf("failed to resolve path for `rule_files`: %w", err)}
if unitTestInp.EvaluationInterval.Duration() == 0 {
fmt.Println("evaluation_interval set to 1m by default")
unitTestInp.EvaluationInterval = &promutils.Duration{D: 1 * time.Minute}
groupOrderMap := make(map[string]int)
for i, gn := range unitTestInp.GroupEvalOrder {
if _, ok := groupOrderMap[gn]; ok {
return []error{fmt.Errorf("group name repeated in `group_eval_order`: %s", gn)}
groupOrderMap[gn] = i
testGroups, err := vmalertconfig.Parse(unitTestInp.RuleFiles, nil, true)
if err != nil {
return []error{fmt.Errorf("failed to parse `rule_files`: %w", err)}
var errs []error
for _, t := range unitTestInp.Tests {
if err := verifyTestGroup(t); err != nil {
errs = append(errs, err)
testErrs := t.test(unitTestInp.EvaluationInterval.Duration(), groupOrderMap, testGroups)
errs = append(errs, testErrs...)
if len(errs) > 0 {
return errs
return nil
func verifyTestGroup(group testGroup) error {
var testGroupName string
if group.TestGroupName != "" {
testGroupName = fmt.Sprintf("testGroupName: %s\n", group.TestGroupName)
for _, at := range group.AlertRuleTests {
if at.Alertname == "" {
return fmt.Errorf("\n%s missing required filed \"alertname\"", testGroupName)
if !*disableAlertGroupLabel && at.GroupName == "" {
return fmt.Errorf("\n%s missing required filed \"groupname\" when flag \"disableAlertGroupLabel\" is false", testGroupName)
if at.EvalTime == nil {
return fmt.Errorf("\n%s missing required filed \"eval_time\"", testGroupName)
for _, et := range group.MetricsqlExprTests {
if et.Expr == "" {
return fmt.Errorf("\n%s missing required filed \"expr\"", testGroupName)
if et.EvalTime == nil {
return fmt.Errorf("\n%s missing required filed \"eval_time\"", testGroupName)
return nil
func processFlags() {
for _, fv := range []struct {
flag string
value string
{flag: "storageDataPath", value: storagePath},
{flag: "loggerLevel", value: testLogLevel},
{flag: "search.disableCache", value: "true"},
// set storage retention time to 100 years, allow to store series from 1970-01-01T00:00:00.
{flag: "retentionPeriod", value: "100y"},
{flag: "datasource.url", value: testDataSourcePath},
{flag: "remoteWrite.url", value: testRemoteWritePath},
} {
// panics if flag doesn't exist
if err := flag.Lookup(fv.flag).Value.Set(fv.value); err != nil {
logger.Fatalf("unable to set %q with value %q, err: %v", fv.flag, fv.value, err)
func setUp() {
go httpserver.Serve(*httpListenAddr, false, func(w http.ResponseWriter, r *http.Request) bool {
switch r.URL.Path {
case "/prometheus/api/v1/query":
if err := prometheus.QueryHandler(nil, time.Now(), w, r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
case "/prometheus/api/v1/write", "/api/v1/write":
if err := promremotewrite.InsertHandler(r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
return false
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
readyCheckFunc := func() bool {
resp, err := http.Get(testHealthHTTPPath)
if err != nil {
return false
_ = resp.Body.Close()
return resp.StatusCode == 200
for {
select {
case <-ctx.Done():
logger.Fatalf("http server can't be ready in 30s")
if readyCheckFunc() {
break checkCheck
time.Sleep(3 * time.Second)
func tearDown() {
if err := httpserver.Stop(*httpListenAddr); err != nil {
logger.Errorf("cannot stop the webservice: %s", err)
// resolveAndGlobFilepaths joins all relative paths in a configuration
// with a given base directory and replaces all globs with matching files.
func resolveAndGlobFilepaths(baseDir string, utf *unitTestFile) error {
for i, rf := range utf.RuleFiles {
if rf != "" && !filepath.IsAbs(rf) {
utf.RuleFiles[i] = filepath.Join(baseDir, rf)
var globbedFiles []string
for _, rf := range utf.RuleFiles {
m, err := filepath.Glob(rf)
if err != nil {
return err
if len(m) == 0 {
fmt.Fprintln(os.Stderr, " WARNING: no file match pattern", rf)
globbedFiles = append(globbedFiles, m...)
utf.RuleFiles = globbedFiles
return nil
func (tg *testGroup) test(evalInterval time.Duration, groupOrderMap map[string]int, testGroups []vmalertconfig.Group) (checkErrs []error) {
// set up vmstorage and http server for ingest and read queries
// tear down vmstorage and clean the data dir
defer tearDown()
err := unittest.WriteInputSeries(tg.InputSeries, tg.Interval, testStartTime, testPromWriteHTTPPath)
if err != nil {
return []error{err}
q, err := datasource.Init(nil)
if err != nil {
return []error{fmt.Errorf("failed to init datasource: %v", err)}
rw, err := remotewrite.NewDebugClient()
if err != nil {
return []error{fmt.Errorf("failed to init wr: %v", err)}
alertEvalTimesMap := map[time.Duration]struct{}{}
alertExpResultMap := map[time.Duration]map[string]map[string][]unittest.ExpAlert{}
for _, at := range tg.AlertRuleTests {
et := at.EvalTime.Duration()
alertEvalTimesMap[et] = struct{}{}
if _, ok := alertExpResultMap[et]; !ok {
alertExpResultMap[et] = make(map[string]map[string][]unittest.ExpAlert)
if _, ok := alertExpResultMap[et][at.GroupName]; !ok {
alertExpResultMap[et][at.GroupName] = make(map[string][]unittest.ExpAlert)
alertExpResultMap[et][at.GroupName][at.Alertname] = at.ExpAlerts
alertEvalTimes := make([]time.Duration, 0, len(alertEvalTimesMap))
for k := range alertEvalTimesMap {
alertEvalTimes = append(alertEvalTimes, k)
sort.Slice(alertEvalTimes, func(i, j int) bool {
return alertEvalTimes[i] < alertEvalTimes[j]
// sort group eval order according to the given "group_eval_order".
sort.Slice(testGroups, func(i, j int) bool {
return groupOrderMap[testGroups[i].Name] < groupOrderMap[testGroups[j].Name]
// create groups with given rule
var groups []*Group
for _, group := range testGroups {
ng := newGroup(group, q, *evaluationInterval, tg.ExternalLabels)
groups = append(groups, ng)
e := &executor{
rw: rw,
notifiers: func() []notifier.Notifier { return nil },
previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label),
evalIndex := 0
maxEvalTime := testStartTime.Add(tg.maxEvalTime())
for ts := testStartTime; ts.Before(maxEvalTime) || ts.Equal(maxEvalTime); ts = ts.Add(evalInterval) {
for _, g := range groups {
resolveDuration := getResolveDuration(g.Interval, *resendDelay, *maxResolveDuration)
errs := e.execConcurrently(context.Background(), g.Rules, ts, g.Concurrency, resolveDuration, g.Limit)
for err := range errs {
if err != nil {
checkErrs = append(checkErrs, fmt.Errorf("\nfailed to exec group: %q, time: %s, err: %w", g.Name,
ts, err))
// flush series after each group evaluation
// check alert_rule_test case at every eval time
for evalIndex < len(alertEvalTimes) {
if ts.Sub(testStartTime) > alertEvalTimes[evalIndex] ||
alertEvalTimes[evalIndex] >= ts.Add(evalInterval).Sub(testStartTime) {
gotAlertsMap := map[string]map[string]unittest.LabelsAndAnnotations{}
for _, g := range groups {
if *disableAlertGroupLabel {
g.Name = ""
if _, ok := alertExpResultMap[time.Duration(ts.UnixNano())][g.Name]; !ok {
if _, ok := gotAlertsMap[g.Name]; !ok {
gotAlertsMap[g.Name] = make(map[string]unittest.LabelsAndAnnotations)
for _, rule := range g.Rules {
ar, isAlertRule := rule.(*AlertingRule)
if !isAlertRule {
if _, ok := alertExpResultMap[time.Duration(ts.UnixNano())][g.Name][ar.Name]; ok {
for _, got := range ar.alerts {
if got.State != notifier.StateFiring {
laa := unittest.LabelAndAnnotation{
Labels: datasource.ConvertToLabels(got.Labels),
Annotations: datasource.ConvertToLabels(got.Annotations),
gotAlertsMap[g.Name][ar.Name] = append(gotAlertsMap[g.Name][ar.Name], laa)
for groupname, gres := range alertExpResultMap[alertEvalTimes[evalIndex]] {
for alertname, res := range gres {
var expAlerts unittest.LabelsAndAnnotations
for _, expAlert := range res {
if expAlert.ExpLabels == nil {
expAlert.ExpLabels = make(map[string]string)
// alertGroupNameLabel is added as additional labels when `disableAlertGroupLabel` is false
if !*disableAlertGroupLabel {
expAlert.ExpLabels[alertGroupNameLabel] = groupname
// alertNameLabel is added as additional labels in vmalert.
expAlert.ExpLabels[alertNameLabel] = alertname
expAlerts = append(expAlerts, unittest.LabelAndAnnotation{
Labels: datasource.ConvertToLabels(expAlert.ExpLabels),
Annotations: datasource.ConvertToLabels(expAlert.ExpAnnotations),
gotAlerts := gotAlertsMap[groupname][alertname]
if !reflect.DeepEqual(expAlerts, gotAlerts) {
var testGroupName string
if tg.TestGroupName != "" {
testGroupName = fmt.Sprintf("testGroupName: %s,\n", tg.TestGroupName)
expString := unittest.IndentLines(expAlerts.String(), " ")
gotString := unittest.IndentLines(gotAlerts.String(), " ")
checkErrs = append(checkErrs, fmt.Errorf("\n%s groupname: %s, alertname: %s, time: %s, \n exp:%v, \n got:%v ",
testGroupName, groupname, alertname, alertEvalTimes[evalIndex].String(), expString, gotString))
checkErrs = append(checkErrs, unittest.CheckMetricsqlCase(tg.MetricsqlExprTests, q)...)
return checkErrs
// unitTestFile holds the contents of a single unit test file
type unitTestFile struct {
RuleFiles []string `yaml:"rule_files"`
EvaluationInterval *promutils.Duration `yaml:"evaluation_interval"`
GroupEvalOrder []string `yaml:"group_eval_order"`
Tests []testGroup `yaml:"tests"`
// testGroup is a group of input series and test cases associated with it
type testGroup struct {
Interval *promutils.Duration `yaml:"interval"`
InputSeries []unittest.Series `yaml:"input_series"`
AlertRuleTests []unittest.AlertTestCase `yaml:"alert_rule_test"`
MetricsqlExprTests []unittest.MetricsqlTestCase `yaml:"metricsql_expr_test"`
ExternalLabels map[string]string `yaml:"external_labels"`
TestGroupName string `yaml:"name"`
// maxEvalTime returns the max eval time among all alert_rule_test and metricsql_expr_test
func (tg *testGroup) maxEvalTime() time.Duration {
var maxd time.Duration
for _, alert := range tg.AlertRuleTests {
if alert.EvalTime.Duration() > maxd {
maxd = alert.EvalTime.Duration()
for _, met := range tg.MetricsqlExprTests {
if met.EvalTime.Duration() > maxd {
maxd = met.EvalTime.Duration()
return maxd

View file

@ -1,19 +0,0 @@
package unittest
import (
// AlertTestCase holds alert_rule_test cases defined in test file
type AlertTestCase struct {
EvalTime *promutils.Duration `yaml:"eval_time"`
GroupName string `yaml:"groupname"`
Alertname string `yaml:"alertname"`
ExpAlerts []ExpAlert `yaml:"exp_alerts"`
// ExpAlert holds exp_alerts defined in test file
type ExpAlert struct {
ExpLabels map[string]string `yaml:"exp_labels"`
ExpAnnotations map[string]string `yaml:"exp_annotations"`

View file

@ -1,182 +0,0 @@
package unittest
import (
testutil ""
// Series holds input_series defined in the test file
type Series struct {
Series string `yaml:"series"`
Values string `yaml:"values"`
// sequenceValue is an omittable value in a sequence of time series values.
type sequenceValue struct {
Value float64
Omitted bool
func httpWrite(address string, r io.Reader) {
resp, err := http.Post(address, "", r)
if err != nil {
logger.Fatalf("failed to send to storage: %v", err)
// WriteInputSeries send input series to vmstorage and flush them
func WriteInputSeries(input []Series, interval *promutils.Duration, startStamp time.Time, dst string) error {
r := testutil.WriteRequest{}
for _, data := range input {
expr, err := metricsql.Parse(data.Series)
if err != nil {
return fmt.Errorf("failed to parse series %s: %v", data.Series, err)
promvals, err := parseInputValue(data.Values, true)
if err != nil {
return fmt.Errorf("failed to parse input series value %s: %v", data.Values, err)
metricExpr, ok := expr.(*metricsql.MetricExpr)
if !ok {
return fmt.Errorf("failed to parse series %s to metric expr: %v", data.Series, err)
samples := make([]testutil.Sample, 0, len(promvals))
ts := startStamp
for _, v := range promvals {
if !v.Omitted {
samples = append(samples, testutil.Sample{
Timestamp: ts.UnixMilli(),
Value: v.Value,
ts = ts.Add(interval.Duration())
var ls []testutil.Label
for _, filter := range metricExpr.LabelFilterss[0] {
ls = append(ls, testutil.Label{Name: filter.Label, Value: filter.Value})
r.Timeseries = append(r.Timeseries, testutil.TimeSeries{Labels: ls, Samples: samples})
data, err := testutil.Compress(r)
if err != nil {
return fmt.Errorf("failed to compress data: %v", err)
// write input series to vm
httpWrite(dst, bytes.NewBuffer(data))
return nil
// parseInputValue support input like "1", "1+1x1 _ -4 3+20x1", see more examples in test.
func parseInputValue(input string, origin bool) ([]sequenceValue, error) {
var res []sequenceValue
items := strings.Split(input, " ")
reg := regexp.MustCompile(`\D?\d*\D?`)
for _, item := range items {
if item == "stale" {
res = append(res, sequenceValue{Value: decimal.StaleNaN})
vals := reg.FindAllString(item, -1)
switch len(vals) {
case 1:
if vals[0] == "_" {
res = append(res, sequenceValue{Omitted: true})
v, err := strconv.ParseFloat(vals[0], 64)
if err != nil {
return nil, err
res = append(res, sequenceValue{Value: v})
case 2:
p1 := vals[0][:len(vals[0])-1]
v2, err := strconv.ParseInt(vals[1], 10, 64)
if err != nil {
return nil, err
option := vals[0][len(vals[0])-1]
switch option {
case '+':
v1, err := strconv.ParseFloat(p1, 64)
if err != nil {
return nil, err
res = append(res, sequenceValue{Value: v1 + float64(v2)})
case 'x':
for i := int64(0); i <= v2; i++ {
if p1 == "_" {
if i == 0 {
i = 1
res = append(res, sequenceValue{Omitted: true})
v1, err := strconv.ParseFloat(p1, 64)
if err != nil {
return nil, err
if !origin || v1 == 0 {
res = append(res, sequenceValue{Value: v1 * float64(i)})
newVal := fmt.Sprintf("%s+0x%s", p1, vals[1])
newRes, err := parseInputValue(newVal, false)
if err != nil {
return nil, err
res = append(res, newRes...)
return nil, fmt.Errorf("got invalid operation %b", option)
case 3:
r1, err := parseInputValue(fmt.Sprintf("%s%s", vals[1], vals[2]), false)
if err != nil {
return nil, err
p1 := vals[0][:len(vals[0])-1]
v1, err := strconv.ParseFloat(p1, 64)
if err != nil {
return nil, err
option := vals[0][len(vals[0])-1]
var isAdd bool
if option == '+' {
isAdd = true
for _, r := range r1 {
if isAdd {
res = append(res, sequenceValue{
Value: r.Value + v1,
} else {
res = append(res, sequenceValue{
Value: v1 - r.Value,
return nil, fmt.Errorf("unsupported input %s", input)
return res, nil

View file

@ -1,93 +0,0 @@
package unittest
import (
func TestParseInputValue(t *testing.T) {
testCases := []struct {
input string
exp []sequenceValue
failed bool
// stale doesn't support operations
[]sequenceValue{{Value: -4}},
[]sequenceValue{{Omitted: true}},
[]sequenceValue{{Value: decimal.StaleNaN}},
[]sequenceValue{{Value: -4}, {Value: -4}},
[]sequenceValue{{Omitted: true}},
[]sequenceValue{{Value: 1}, {Value: 2}, {Value: 3}, {Value: 4}, {Value: 5}},
[]sequenceValue{{Value: 2}, {Value: 1}, {Value: 0}, {Value: -1}, {Value: -2}},
"1+1x1 _ -4 stale 3+20x1",
[]sequenceValue{{Value: 1}, {Value: 2}, {Omitted: true}, {Value: -4}, {Value: decimal.StaleNaN}, {Value: 3}, {Value: 23}},
for _, tc := range testCases {
output, err := parseInputValue(tc.input, true)
if err != nil != tc.failed {
t.Fatalf("failed to parse %s, expect %t, got %t", tc.input, tc.failed, err != nil)
if len(tc.exp) != len(output) {
t.Fatalf("expect %v, got %v", tc.exp, output)
for i := 0; i < len(tc.exp); i++ {
if tc.exp[i].Omitted != output[i].Omitted {
t.Fatalf("expect %v, got %v", tc.exp, output)
if tc.exp[i].Value != output[i].Value {
if decimal.IsStaleNaN(tc.exp[i].Value) && decimal.IsStaleNaN(output[i].Value) {
t.Fatalf("expect %v, got %v", tc.exp, output)

View file

@ -1,92 +0,0 @@
package unittest
import (
// MetricsqlTestCase holds metricsql_expr_test cases defined in test file
type MetricsqlTestCase struct {
Expr string `yaml:"expr"`
EvalTime *promutils.Duration `yaml:"eval_time"`
ExpSamples []expSample `yaml:"exp_samples"`
type expSample struct {
Labels string `yaml:"labels"`
Value float64 `yaml:"value"`
// CheckMetricsqlCase will check metricsql_expr_test cases
func CheckMetricsqlCase(cases []MetricsqlTestCase, q datasource.QuerierBuilder) (checkErrs []error) {
queries := q.BuildWithParams(datasource.QuerierParams{QueryParams: url.Values{"nocache": {"1"}, "latency_offset": {"1ms"}}, DataSourceType: "prometheus"})
for _, mt := range cases {
result, _, err := queries.Query(context.Background(), mt.Expr, mt.EvalTime.ParseTime())
if err != nil {
checkErrs = append(checkErrs, fmt.Errorf(" expr: %q, time: %s, err: %w", mt.Expr,
mt.EvalTime.Duration().String(), err))
var gotSamples []parsedSample
for _, s := range result.Data {
sort.Slice(s.Labels, func(i, j int) bool {
return s.Labels[i].Name < s.Labels[j].Name
gotSamples = append(gotSamples, parsedSample{
Labels: s.Labels,
Value: s.Values[0],
var expSamples []parsedSample
for _, s := range mt.ExpSamples {
expLb := datasource.Labels{}
if s.Labels != "" {
metricsqlExpr, err := metricsql.Parse(s.Labels)
if err != nil {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("failed to parse labels %q: %w", s.Labels, err)))
continue Outer
metricsqlMetricExpr, ok := metricsqlExpr.(*metricsql.MetricExpr)
if !ok {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s, err: %v", mt.Expr,
mt.EvalTime.Duration().String(), fmt.Errorf("got unsupported metricsql type")))
continue Outer
for _, l := range metricsqlMetricExpr.LabelFilterss[0] {
expLb = append(expLb, datasource.Label{
Name: l.Label,
Value: l.Value,
sort.Slice(expLb, func(i, j int) bool {
return expLb[i].Name < expLb[j].Name
expSamples = append(expSamples, parsedSample{
Labels: expLb,
Value: s.Value,
sort.Slice(expSamples, func(i, j int) bool {
return datasource.LabelCompare(expSamples[i].Labels, expSamples[j].Labels) <= 0
sort.Slice(gotSamples, func(i, j int) bool {
return datasource.LabelCompare(gotSamples[i].Labels, gotSamples[j].Labels) <= 0
if !reflect.DeepEqual(expSamples, gotSamples) {
checkErrs = append(checkErrs, fmt.Errorf("\n expr: %q, time: %s,\n exp: %v\n got: %v", mt.Expr,
mt.EvalTime.Duration().String(), parsedSamplesString(expSamples), parsedSamplesString(gotSamples)))

View file

@ -1,43 +0,0 @@
- rules.yaml
evaluation_interval: 1m
- interval: 1m
- series: 'up{job="vmagent2", instance="localhost:9090"}'
values: "0+0x1440"
- expr: suquery_interval_test
eval_time: 4m
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="vmagent2"}'
value: 1
- eval_time: 2h
alertname: InstanceDown
- exp_labels:
job: vmagent2
severity: page
instance: localhost:9090
datacenter: dc-123
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job vmagent2 has been down for more than 5 minutes."
- eval_time: 0
alertname: AlwaysFiring
- exp_labels:
datacenter: dc-123
- eval_time: 0
alertname: InstanceDown
exp_alerts: []
datacenter: dc-123

View file

@ -1,41 +0,0 @@
- rules.yaml
- interval: 1m
name: "Failing test"
- series: test
values: "0"
- expr: test
eval_time: 0m
- value: 0
labels: test
# will failed cause there is no "Test" group and rule defined
- eval_time: 0m
groupname: Test
alertname: Test
- exp_labels: {}
- interval: 1m
name: Failing alert test
- series: 'up{job="test"}'
values: 0x10
# will failed cause rule is firing
- eval_time: 5m
groupname: group1
alertname: InstanceDown
exp_alerts: []
# will failed cause missing groupname
- eval_time: 5m
alertname: AlwaysFiring
exp_alerts: []

View file

@ -1,30 +0,0 @@
# can be executed successfully but will take more than 1 minute
# not included in unit test now
evaluation_interval: 100d
- rules.yaml
- interval: 1d
- series: test
# Max time in time.Duration is 106751d from 1970 (2^63/10^9), i.e. 2262.
# But VictoriaMetrics supports maxTimestamp value +2 days from now. see
# We input series to 2024-01-01T00:00:00 here.
values: "0+1x19723"
- expr: timestamp(test)
eval_time: 0m
- value: 0
- expr: test
eval_time: 100d
- labels: test
value: 100
- expr: timestamp(test)
eval_time: 19000d
- value: 1641600000 # 19000d -> seconds.

View file

@ -1,39 +0,0 @@
- name: group1
- alert: InstanceDown
expr: up == 0
for: 5m
severity: page
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: AlwaysFiring
expr: 1
- alert: SameAlertNameWithDifferentGroup
expr: absent(test)
for: 1m
- name: group2
- record: t1
expr: test
- record: job:test:count_over_time1m
expr: sum without(instance) (count_over_time(test[1m]))
- record: suquery_interval_test
expr: count_over_time(up[5m:])
- alert: SameAlertNameWithDifferentGroup
expr: absent(test)
for: 5m
- name: group3
- record: t2
expr: t1
- name: group4
- record: t3
expr: t1

View file

@ -1,99 +0,0 @@
- rules.yaml
evaluation_interval: 1m
group_eval_order: ["group4", "group2", "group3"]
- interval: 1m
name: "basic test"
- series: "test"
values: "_x5 1x5 _ stale"
- eval_time: 1m
groupname: group1
alertname: SameAlertNameWithDifferentGroup
- {}
- eval_time: 1m
groupname: group2
alertname: SameAlertNameWithDifferentGroup
exp_alerts: []
- eval_time: 6m
groupname: group1
alertname: SameAlertNameWithDifferentGroup
exp_alerts: []
- expr: test
eval_time: 11m
- labels: '{__name__="test"}'
value: 1
- expr: test
eval_time: 12m
exp_samples: []
- interval: 1m
name: "basic test2"
- series: 'up{job="vmagent1", instance="localhost:9090"}'
values: "0+0x1440"
- series: "test"
values: "0+1x1440"
- expr: count(ALERTS) by (alertgroup, alertname, alertstate)
eval_time: 4m
- labels: '{alertgroup="group1", alertname="AlwaysFiring", alertstate="firing"}'
value: 1
- labels: '{alertgroup="group1", alertname="InstanceDown", alertstate="pending"}'
value: 1
- expr: t1
eval_time: 4m
- value: 4
labels: '{__name__="t1", datacenter="dc-123"}'
- expr: t2
eval_time: 4m
- value: 4
labels: '{__name__="t2", datacenter="dc-123"}'
- expr: t3
eval_time: 4m
# t3 is 3 instead of 4 cause it's rules3 is evaluated before rules1
- value: 3
labels: '{__name__="t3", datacenter="dc-123"}'
- eval_time: 10m
groupname: group1
alertname: InstanceDown
- exp_labels:
job: vmagent1
severity: page
instance: localhost:9090
datacenter: dc-123
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job vmagent1 has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: alerts
alertname: InstanceDown
exp_alerts: []
datacenter: dc-123

View file

@ -1,46 +0,0 @@
- rules.yaml
evaluation_interval: 1m
- interval: 1m
- series: 'up{job="vmagent2", instance="localhost:9090"}'
values: "0+0x1440"
- expr: suquery_interval_test
eval_time: 4m
- labels: '{__name__="suquery_interval_test",datacenter="dc-123", instance="localhost:9090", job="vmagent2"}'
value: 1
- eval_time: 2h
groupname: group1
alertname: InstanceDown
- exp_labels:
job: vmagent2
severity: page
instance: localhost:9090
datacenter: dc-123
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job vmagent2 has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: group1
alertname: InstanceDown
exp_alerts: []
datacenter: dc-123

View file

@ -1,83 +0,0 @@
package unittest
import (
// parsedSample is a sample with parsed Labels
type parsedSample struct {
Labels datasource.Labels
Value float64
func (ps *parsedSample) String() string {
return ps.Labels.String() + " " + strconv.FormatFloat(ps.Value, 'E', -1, 64)
func parsedSamplesString(pss []parsedSample) string {
if len(pss) == 0 {
return "nil"
s := pss[0].String()
for _, ps := range pss[1:] {
s += ", " + ps.String()
return s
// LabelAndAnnotation holds labels and annotations
type LabelAndAnnotation struct {
Labels datasource.Labels
Annotations datasource.Labels
func (la *LabelAndAnnotation) String() string {
return "Labels:" + la.Labels.String() + "\nAnnotations:" + la.Annotations.String()
// LabelsAndAnnotations is collection of LabelAndAnnotation
type LabelsAndAnnotations []LabelAndAnnotation
func (la LabelsAndAnnotations) Len() int { return len(la) }
func (la LabelsAndAnnotations) Swap(i, j int) { la[i], la[j] = la[j], la[i] }
func (la LabelsAndAnnotations) Less(i, j int) bool {
diff := datasource.LabelCompare(la[i].Labels, la[j].Labels)
if diff != 0 {
return diff < 0
return datasource.LabelCompare(la[i].Annotations, la[j].Annotations) < 0
func (la LabelsAndAnnotations) String() string {
if len(la) == 0 {
return "[]"
s := "[\n0:" + IndentLines("\n"+la[0].String(), " ")
for i, l := range la[1:] {
s += ",\n" + fmt.Sprintf("%d", i+1) + ":" + IndentLines("\n"+l.String(), " ")
s += "\n]"
return s
// IndentLines prefixes each line in the supplied string with the given "indent" string.
func IndentLines(lines, indent string) string {
sb := strings.Builder{}
n := strings.Split(lines, "\n")
for i, l := range n {
if i > 0 {
if i != len(n)-1 {
return sb.String()

View file

@ -33,7 +33,7 @@ var (
var (
saCfgReloaderStopCh chan struct{}
saCfgReloaderStopCh = make(chan struct{})
saCfgReloaderWG sync.WaitGroup
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
@ -62,7 +62,6 @@ func CheckStreamAggrConfig() error {
// MustStopStreamAggr must be called when stream aggr is no longer needed.
func InitStreamAggr() {
saCfgReloaderStopCh = make(chan struct{})
if *streamAggrConfig == "" {

View file

@ -6,7 +6,7 @@ positions:
filename: /tmp/positions.yaml
- url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid
- url: http://vlogs:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app
tenant_id: "0:0"

View file

@ -24,6 +24,12 @@ The following `tip` changes can be tested by building VictoriaMetrics components
## tip
## [v1.92.1](
Released at 2023-07-28
* BUGFIX: [vmalert]( revert unit test feature for alerting and recording rules introduced in [this pull request]( See the following [change](
## [v1.92.0](
Released at 2023-07-27
@ -40,6 +46,8 @@ The previous behavior can be restored in the following ways:
- by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics;
- by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`.
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](

View file

@ -73,7 +73,7 @@ and make it merged. See example in this [commit](
* linux/ppc64le
* linux/386
This step can be run manually with the command `make publish` from the needed git tag.
1. Push the tags `v1.xx.y` and `v1.xx.y-cluster` created at step 2 to public GitHub repository at
1. Push the tags `v1.xx.y` and `v1.xx.y-cluster` created at step 8 to public GitHub repository at
Push the tags `v1.xx.y`, `v1.xx.y-cluster`, `v1.xx.y-enterprise` and `v1.xx.y-enterprise-cluster` to the corresponding
branches in private repository.
**Important note:** do not push enterprise tags to public GitHub repository - they must be pushed only to private repository.
@ -81,7 +81,7 @@ and make it merged. See example in this [commit](
a) Create draft GitHub release with the name `TAG`. This step can be run manually
with the command `TAG=v1.xx.y make github-create-release`.
The release id is stored at `/tmp/vm-github-release` file.
b) Upload all the binaries and checksums created at step `3a` to that release.
b) Upload all the binaries and checksums created at step `9a` to that release.
This step can be run manually with the command `make github-upload-assets`.
It is expected that the needed release id is stored at `/tmp/vm-github-release` file,
which must be created at the step `a`.

View file

@ -143,6 +143,7 @@ Here are a Docker-compose demos, which start VictoriaLogs and push logs to it vi
- [Fluentbit demo](
- [Logstash demo](
- [Vector demo](
- [Promtail demo](
You can use [this Helm chart](
as a demo for running Fluentbit in Kubernetes with VictoriaLogs.

View file

@ -29,9 +29,9 @@ See [these docs]( for details.
The following functionality is planned in the future versions of VictoriaLogs:
- Support for [data ingestion]( from popular log collectors and formats:
- Promtail (aka Grafana Loki)
- Fluentd
- Syslog
- Journald (systemd)
- Add missing functionality to [LogsQL](
- [Stream context](
- [Transformation functions](

View file

@ -8,12 +8,18 @@ for sending the collected logs to [VictoriaLogs](
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app
Substitute `localhost:9428` address inside `clients` with the real TCP address of VictoriaLogs.
See [these docs]( for details on the used URL query parameter section.
By default VictoriaLogs stores all the ingested logs into a single [log stream](
Storing all the logs in a single log stream may be not so efficient, so it is recommended to specify `_stream_fields` query arg
with the list of labels, which uniquely identify log streams. There is no need in specifying all the labels Promtail generates there -
it is usually enough specifying `instance` and `job` labels. See [these docs](
for details.
See also [these docs]( for details on other supported query args.
There is no need in specifying `_msg_field` and `_time_field` query args, since VictoriaLogs automatically extracts log message and timestamp from the ingested Loki data.
It is recommended verifying whether the initial setup generates the needed [log fields](
@ -23,7 +29,7 @@ and inspecting VictoriaLogs logs then:
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app&debug=1
If some [log fields]( must be skipped
@ -32,7 +38,7 @@ For example, the following config instructs VictoriaLogs to ignore `filename` an
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&ignore_fields=filename,stream
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app&ignore_fields=filename,stream
By default the ingested logs are stored in the `(AccountID=0, ProjectID=0)` [tenant](
@ -43,7 +49,7 @@ For example, the following config instructs VictoriaLogs to store logs in the `(
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app,pid&debug=1
- url: http://localhost:9428/insert/loki/api/v1/push?_stream_fields=instance,job,host,app&debug=1
tenant_id: "12:34"

View file

@ -261,3 +261,4 @@ Here is the list of log collectors and their ingestion formats supported by Vict
| [Fluentbit]( | No | [Yes]( |
| [Logstash]( | [Yes]( | No |
| [Vector]( | [Yes]( | No |
| [Promtail]( | No | No |

View file

@ -389,7 +389,7 @@ to persist the state to the remote destination via the following flags:
Both flags are required for proper state restoration. Restore process may fail if time series are missing
in configured `-remoteRead.url`, weren't updated in the last `1h` (controlled by `-remoteRead.lookback`)
or received state doesn't match current `vmalert` rules configuration. `vmalert` marks successfully restored rules
with `restored` label in [web UI](#WEB).
with `restored` label in [web UI](#web).
### Multitenancy
@ -753,249 +753,6 @@ See full description for these flags in `./vmalert -help`.
* `limit` group's param has no effect during replay (might be changed in future);
* `keep_firing_for` alerting rule param has no effect during replay (might be changed in future).
## Unit Testing for Rules
> Unit testing is available from v1.92.0.
> Unit tests do not respect `-clusterMode` for now.
You can use `vmalert` to run unit tests for alerting and recording rules.
In unit test mode vmalert performs the following actions:
* sets up an isolated VictoriaMetrics instance;
* simulates the periodic ingestion of time series;
* queries the ingested data for recording and alerting rules evaluation;
* tests whether the firing alerts or resulting recording rules match the expected results.
See how to run vmalert in unit test mode below:
# Run vmalert with one or multiple test files via -unittestFile cmd-line flag
./vmalert -unittestFile=test1.yaml -unittestFile=test2.yaml
vmalert is compatible with [Prometheus config format for tests](
except `promql_expr_test` field. Use `metricsql_expr_test` field name instead. The name is different because vmalert
validates and executes [MetricsQL]( expressions,
which aren't always backward compatible with [PromQL](
### Test file format
The configuration format for files specified in `-unittestFile` cmd-line flag is the following:
# Path to the files or http url containing [rule groups]( configuration.
# Enterprise version of vmalert supports S3 and GCS paths to rules.
[ - <string> ]
# The evaluation interval for rules specified in `rule_files`
[ evaluation_interval: <duration> | default = 1m ]
# Groups listed below will be evaluated by order.
# Not All the groups need not be mentioned, if not, they will be evaluated by define order in rule_files.
[ - <string> ]
# The list of unit test files to be checked during evaluation.
[ - <test_group> ]
#### `<test_group>`
# Interval between samples for input series
interval: <duration>
# Time series to persist into the database according to configured <interval> before running tests.
[ - <series> ]
# Name of the test group, optional
[ name: <string> ]
# Unit tests for alerting rules
[ - <alert_test_case> ]
# Unit tests for Metricsql expressions.
[ - <metricsql_expr_test> ]
# External labels accessible for templating.
[ <labelname>: <string> ... ]
#### `<series>`
# series in the following format '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
series: <string>
# values support several special equations:
# 'a+bxc' becomes 'a a+b a+(2*b) a+(3*b) … a+(c*b)'
# Read this as series starts at a, then c further samples incrementing by b.
# 'a-bxc' becomes 'a a-b a-(2*b) a-(3*b) … a-(c*b)'
# Read this as series starts at a, then c further samples decrementing by b (or incrementing by negative b).
# '_' represents a missing sample from scrape
# 'stale' indicates a stale sample
# Examples:
# 1. '-2+4x3' becomes '-2 2 6 10' - series starts at -2, then 3 further samples incrementing by 4.
# 2. ' 1-2x4' becomes '1 -1 -3 -5 -7' - series starts at 1, then 4 further samples decrementing by 2.
# 3. ' 1x4' becomes '1 1 1 1 1' - shorthand for '1+0x4', series starts at 1, then 4 further samples incrementing by 0.
# 4. ' 1 _x3 stale' becomes '1 _ _ _ stale' - the missing sample cannot increment, so 3 missing samples are produced by the '_x3' expression.
values: <string>
#### `<alert_test_case>`
vmalert by default adds `alertgroup` and `alertname` to the generated alerts and time series.
So you will need to specify both `groupname` and `alertname` under a single `<alert_test_case>`,
but no need to add them under `exp_alerts`.
You can also pass `--disableAlertgroupLabel` to prevent vmalert from adding `alertgroup` label.
# The time elapsed from time=0s when this alerting rule should be checked.
# Means this rule should be firing at this point, or shouldn't be firing if 'exp_alerts' is empty.
eval_time: <duration>
# Name of the group name to be tested.
groupname: <string>
# Name of the alert to be tested.
alertname: <string>
# List of the expected alerts that are firing under the given alertname at
# the given evaluation time. If you want to test if an alerting rule should
# not be firing, then you can mention only the fields above and leave 'exp_alerts' empty.
[ - <alert> ]
#### `<alert>`
# These are the expanded labels and annotations of the expected alert.
# Note: labels also include the labels of the sample associated with the alert
[ <labelname>: <string> ]
[ <labelname>: <string> ]
#### `<metricsql_expr_test>`
# Expression to evaluate
expr: <string>
# The time elapsed from time=0s when this expression be evaluated.
eval_time: <duration>
# Expected samples at the given evaluation time.
[ - <sample> ]
#### `<sample>`
# Labels of the sample in usual series notation '<metric name>{<label name>=<label value>, ...}'
# Examples:
# series_name{label1="value1", label2="value2"}
# go_goroutines{job="prometheus", instance="localhost:9090"}
labels: <string>
# The expected value of the Metricsql expression.
value: <number>
### Example
This is an example input file for unit testing which will pass.
`test.yaml` is the test file which follows the syntax above and `alerts.yaml` contains the alerting rules.
With `rules.yaml` in the same directory, run `./vmalert -unittestFile=./unittest/testdata/test.yaml`.
#### `test.yaml`
- rules.yaml
evaluation_interval: 1m
- interval: 1m
- series: 'up{job="prometheus", instance="localhost:9090"}'
values: "0+0x1440"
- expr: suquery_interval_test
eval_time: 4m
- labels: '{__name__="suquery_interval_test", datacenter="dc-123", instance="localhost:9090", job="prometheus"}'
value: 1
- eval_time: 2h
groupname: group1
alertname: InstanceDown
- exp_labels:
job: prometheus
severity: page
instance: localhost:9090
datacenter: dc-123
summary: "Instance localhost:9090 down"
description: "localhost:9090 of job prometheus has been down for more than 5 minutes."
- eval_time: 0
groupname: group1
alertname: AlwaysFiring
- exp_labels:
datacenter: dc-123
- eval_time: 0
groupname: group1
alertname: InstanceDown
exp_alerts: []
datacenter: dc-123
#### `alerts.yaml`
# This is the rules file.
- name: group1
- alert: InstanceDown
expr: up == 0
for: 5m
severity: page
summary: "Instance {{ $labels.instance }} down"
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes."
- alert: AlwaysFiring
expr: 1
- name: group2
- record: job:test:count_over_time1m
expr: sum without(instance) (count_over_time(test[1m]))
- record: suquery_interval_test
expr: count_over_time(up[5m:])
## Monitoring
`vmalert` exports various metrics in Prometheus exposition format at `http://vmalert-host:8880/metrics` page.
@ -1546,11 +1303,6 @@ The shortlist of configuration flags is the following:
Path to file with TLS key if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated
-tlsMinVersion string
Optional minimum TLS version to use for incoming requests over HTTPS if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
-unittestFile array
Path to the unit test files. When set, vmalert starts in unit test mode and performs only tests on configured files.
See more information here
Show VictoriaMetrics version
@ -1618,7 +1370,7 @@ dns_sd_configs:
port: 9093
The list of configured or discovered Notifiers can be explored via [UI](#Web).
The list of configured or discovered Notifiers can be explored via [UI](#web).
If Alertmanager runs in cluster mode then all its URLs needs to be available during discovery
to ensure [high availability](

View file

@ -45,14 +45,6 @@ func (pd *Duration) Duration() time.Duration {
return pd.D
// ParseTime returns time for pd.
func (pd *Duration) ParseTime() time.Time {
if pd == nil {
return time.Time{}
return time.UnixMilli(pd.Duration().Milliseconds())
// ParseDuration parses duration string in Prometheus format
func ParseDuration(s string) (time.Duration, error) {
ms, err := metricsql.DurationValue(s, 0)