vmalert: integrate with victorialogs (#7255)

address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6706.
See
https://github.com/VictoriaMetrics/VictoriaMetrics/blob/vmalert-support-vlog-ds/docs/VictoriaLogs/vmalert.md.

Related fix
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/7254.

Note: in this pull request, vmalert doesn't support
[backfilling](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/vmalert-support-vlog-ds/docs/VictoriaLogs/vmalert.md#rules-backfilling)
for rules with a customized time filter. It might be added in the
future, see [this
issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7289)
for details.

Feature can be tested with image
`victoriametrics/vmalert:heads-vmalert-support-vlog-ds-0-g420629c-scratch`.

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Hui Wang 2024-10-29 23:30:39 +08:00 committed by GitHub
parent 5d73b8b866
commit 68bad22fd2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1819 additions and 425 deletions

View file

@ -3,6 +3,7 @@ package config
import (
"bytes"
"crypto/md5"
"flag"
"fmt"
"hash/fnv"
"io"
@ -17,6 +18,10 @@ import (
"gopkg.in/yaml.v2"
)
var (
defaultRuleType = flag.String("rule.defaultRuleType", "prometheus", `Default type for rule expressions, can be overridden via "type" parameter on the group level, see https://docs.victoriametrics.com/vmalert/#groups. Supported values: "graphite", "prometheus" and "vlogs".`)
)
// Group contains list of Rules grouped into
// entity with one name and evaluation interval
type Group struct {
@ -59,11 +64,9 @@ func (g *Group) UnmarshalYAML(unmarshal func(any) error) error {
if err != nil {
return fmt.Errorf("failed to marshal group configuration for checksum: %w", err)
}
// change default value to prometheus datasource.
if g.Type.Get() == "" {
g.Type.Set(NewPrometheusType())
g.Type = NewRawType(*defaultRuleType)
}
h := md5.New()
h.Write(b)
g.Checksum = fmt.Sprintf("%x", h.Sum(nil))

View file

@ -122,6 +122,7 @@ func TestParse_Failure(t *testing.T) {
f([]string{"testdata/dir/rules3-bad.rules"}, "either `record` or `alert` must be set")
f([]string{"testdata/dir/rules4-bad.rules"}, "either `record` or `alert` must be set")
f([]string{"testdata/rules/rules1-bad.rules"}, "bad graphite expr")
f([]string{"testdata/rules/vlog-rules0-bad.rules"}, "bad LogsQL expr")
f([]string{"testdata/dir/rules6-bad.rules"}, "missing ':' in header")
f([]string{"testdata/rules/rules-multi-doc-bad.rules"}, "unknown fields")
f([]string{"testdata/rules/rules-multi-doc-duplicates-bad.rules"}, "duplicate")
@ -240,7 +241,7 @@ func TestGroupValidate_Failure(t *testing.T) {
}, false, "duplicate")
f(&Group{
Name: "test graphite prometheus bad expr",
Name: "test graphite with prometheus expr",
Type: NewGraphiteType(),
Rules: []Rule{
{
@ -267,6 +268,20 @@ func TestGroupValidate_Failure(t *testing.T) {
},
}, false, "either `record` or `alert` must be set")
f(&Group{
Name: "test vlogs with prometheus expr",
Type: NewVLogsType(),
Rules: []Rule{
{
Expr: "sum(up == 0 ) by (host)",
For: promutils.NewDuration(10 * time.Millisecond),
},
{
Expr: "sumSeries(time('foo.bar',10))",
},
},
}, false, "invalid rule")
// validate expressions
f(&Group{
Name: "test",
@ -297,6 +312,16 @@ func TestGroupValidate_Failure(t *testing.T) {
}},
},
}, true, "bad graphite expr")
f(&Group{
Name: "test vlogs",
Type: NewVLogsType(),
Rules: []Rule{
{Alert: "alert", Expr: "stats count(*) as requests", Labels: map[string]string{
"description": "some-description",
}},
},
}, true, "bad LogsQL expr")
}
func TestGroupValidate_Success(t *testing.T) {
@ -336,7 +361,7 @@ func TestGroupValidate_Success(t *testing.T) {
},
}, false, false)
// validate annotiations
// validate annotations
f(&Group{
Name: "test",
Rules: []Rule{
@ -363,6 +388,15 @@ func TestGroupValidate_Success(t *testing.T) {
}},
},
}, false, true)
f(&Group{
Name: "test victorialogs",
Type: NewVLogsType(),
Rules: []Rule{
{Alert: "alert", Expr: " _time: 1m | stats count(*) as requests", Labels: map[string]string{
"description": "{{ value|query }}",
}},
},
}, false, true)
}
func TestHashRule_NotEqual(t *testing.T) {

View file

@ -0,0 +1,10 @@
groups:
- name: InvalidStatsLogsql
type: vlogs
interval: 5m
rules:
- record: MissingFilter
expr: 'stats count(*) as requests'
- record: MissingStatsPipe
expr: 'service: "nginx"'

View file

@ -0,0 +1,29 @@
groups:
- name: RequestCount
type: vlogs
interval: 5m
rules:
- record: nginxRequestCount
expr: 'env: "test" AND service: "nginx" | stats count(*) as requests'
annotations:
description: "Service nginx on env test accepted {{$labels.requests}} requests in the last 5 minutes"
- record: prodRequestCount
expr: 'env: "prod" | stats by (service) count(*) as requests'
annotations:
description: "Service {{$labels.service}} on env prod accepted {{$labels.requests}} requests in the last 5 minutes"
- name: ServiceLog
type: vlogs
interval: 5m
rules:
- alert: HasErrorLog
expr: 'env: "prod" AND status:~"error|warn" | stats by (service) count(*) as errorLog | filter errorLog:>0'
annotations:
description: "Service {{$labels.service}} generated {{$labels.errorLog}} error logs in the last 5 minutes"
- name: ServiceRequest
type: vlogs
interval: 10m
rules:
- alert: TooManyFailedRequest
expr: '* | extract "ip=<ip> " | extract "status_code=<code>;" | stats by (ip) count() if (code:!~200) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage'
annotations:
description: "Connection from address {{$labels.ip}} has {{$value}} failed requests ratio in last 10 minutes"

View file

@ -5,6 +5,7 @@ import (
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphiteql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/metricsql"
)
@ -27,6 +28,13 @@ func NewGraphiteType() Type {
}
}
// NewVLogsType returns victorialogs datasource type
func NewVLogsType() Type {
return Type{
Name: "vlogs",
}
}
// NewRawType returns datasource type from raw string
// without validation.
func NewRawType(d string) Type {
@ -62,6 +70,10 @@ func (t *Type) ValidateExpr(expr string) error {
if _, err := metricsql.Parse(expr); err != nil {
return fmt.Errorf("bad prometheus expr: %q, err: %w", expr, err)
}
case "vlogs":
if _, err := logstorage.ParseStatsQuery(expr); err != nil {
return fmt.Errorf("bad LogsQL expr: %q, err: %w", expr, err)
}
default:
return fmt.Errorf("unknown datasource type=%q", t.Name)
}
@ -74,13 +86,10 @@ func (t *Type) UnmarshalYAML(unmarshal func(any) error) error {
if err := unmarshal(&s); err != nil {
return err
}
if s == "" {
s = "prometheus"
}
switch s {
case "graphite", "prometheus":
case "graphite", "prometheus", "vlogs":
default:
return fmt.Errorf("unknown datasource type=%q, want %q or %q", s, "prometheus", "graphite")
return fmt.Errorf("unknown datasource type=%q, want prometheus, graphite or vlogs", s)
}
t.Name = s
return nil

View file

@ -0,0 +1,333 @@
package datasource
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
type datasourceType string
const (
datasourcePrometheus datasourceType = "prometheus"
datasourceGraphite datasourceType = "graphite"
datasourceVLogs datasourceType = "vlogs"
)
func toDatasourceType(s string) datasourceType {
switch s {
case string(datasourcePrometheus):
return datasourcePrometheus
case string(datasourceGraphite):
return datasourceGraphite
case string(datasourceVLogs):
return datasourceVLogs
default:
logger.Panicf("BUG: unknown datasource type %q", s)
}
return ""
}
// Client is a datasource entity for reading data,
// supported clients are enumerated in datasourceType.
// WARN: when adding a new field, remember to check if Clone() method needs to be updated.
type Client struct {
c *http.Client
authCfg *promauth.Config
datasourceURL string
appendTypePrefix bool
queryStep time.Duration
dataSourceType datasourceType
// ApplyIntervalAsTimeFilter is only valid for vlogs datasource.
// Set to true if there is no [timeFilter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) in the rule expression,
// and we will add evaluation interval as an additional timeFilter when querying.
applyIntervalAsTimeFilter bool
// evaluationInterval will help setting request's `step` param,
// or adding time filter for LogsQL expression.
evaluationInterval time.Duration
// extraParams contains params to be attached to each HTTP request
extraParams url.Values
// extraHeaders are headers to be attached to each HTTP request
extraHeaders []keyValue
// whether to print additional log messages
// for each sent request
debug bool
}
type keyValue struct {
key string
value string
}
// Clone clones shared http client and other configuration to the new client.
func (c *Client) Clone() *Client {
ns := &Client{
c: c.c,
authCfg: c.authCfg,
datasourceURL: c.datasourceURL,
appendTypePrefix: c.appendTypePrefix,
queryStep: c.queryStep,
dataSourceType: c.dataSourceType,
evaluationInterval: c.evaluationInterval,
// init map so it can be populated below
extraParams: url.Values{},
debug: c.debug,
}
if len(c.extraHeaders) > 0 {
ns.extraHeaders = make([]keyValue, len(c.extraHeaders))
copy(ns.extraHeaders, c.extraHeaders)
}
for k, v := range c.extraParams {
ns.extraParams[k] = v
}
return ns
}
// ApplyParams - changes given querier params.
func (c *Client) ApplyParams(params QuerierParams) *Client {
if params.DataSourceType != "" {
c.dataSourceType = toDatasourceType(params.DataSourceType)
}
c.evaluationInterval = params.EvaluationInterval
c.applyIntervalAsTimeFilter = params.ApplyIntervalAsTimeFilter
if params.QueryParams != nil {
if c.extraParams == nil {
c.extraParams = url.Values{}
}
for k, vl := range params.QueryParams {
// custom query params are prior to default ones
if c.extraParams.Has(k) {
c.extraParams.Del(k)
}
for _, v := range vl {
// don't use .Set() instead of Del/Add since it is allowed
// for GET params to be duplicated
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4908
c.extraParams.Add(k, v)
}
}
}
if params.Headers != nil {
for key, value := range params.Headers {
kv := keyValue{key: key, value: value}
c.extraHeaders = append(c.extraHeaders, kv)
}
}
c.debug = params.Debug
return c
}
// BuildWithParams - implements interface.
func (c *Client) BuildWithParams(params QuerierParams) Querier {
return c.Clone().ApplyParams(params)
}
// NewPrometheusClient returns a new prometheus datasource client.
func NewPrometheusClient(baseURL string, authCfg *promauth.Config, appendTypePrefix bool, c *http.Client) *Client {
return &Client{
c: c,
authCfg: authCfg,
datasourceURL: strings.TrimSuffix(baseURL, "/"),
appendTypePrefix: appendTypePrefix,
queryStep: *queryStep,
dataSourceType: datasourcePrometheus,
extraParams: url.Values{},
}
}
// Query executes the given query and returns parsed response
func (c *Client) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
req, err := c.newQueryRequest(ctx, query, ts)
if err != nil {
return Result{}, nil, err
}
resp, err := c.do(req)
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
// Return unexpected error to the caller.
return Result{}, nil, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = c.newQueryRequest(ctx, query, ts)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
resp, err = c.do(req)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
}
// Process the received response.
var parseFn func(req *http.Request, resp *http.Response) (Result, error)
switch c.dataSourceType {
case datasourcePrometheus:
parseFn = parsePrometheusResponse
case datasourceGraphite:
parseFn = parseGraphiteResponse
case datasourceVLogs:
parseFn = parseVLogsResponse
default:
logger.Panicf("BUG: unsupported datasource type %q to parse query response", c.dataSourceType)
}
result, err := parseFn(req, resp)
_ = resp.Body.Close()
return result, req, err
}
// QueryRange executes the given query on the given time range.
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
// Graphite type isn't supported.
func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) {
if c.dataSourceType == datasourceGraphite {
return res, fmt.Errorf("%q is not supported for QueryRange", c.dataSourceType)
}
// TODO: disable range query LogsQL with time filter now
if c.dataSourceType == datasourceVLogs && !c.applyIntervalAsTimeFilter {
return res, fmt.Errorf("range query is not supported for LogsQL expression %q because it contains time filter. Remove time filter from the expression and try again", query)
}
if start.IsZero() {
return res, fmt.Errorf("start param is missing")
}
if end.IsZero() {
return res, fmt.Errorf("end param is missing")
}
req, err := c.newQueryRangeRequest(ctx, query, start, end)
if err != nil {
return res, err
}
resp, err := c.do(req)
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
// Return unexpected error to the caller.
return res, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = c.newQueryRangeRequest(ctx, query, start, end)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
resp, err = c.do(req)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
}
// Process the received response.
var parseFn func(req *http.Request, resp *http.Response) (Result, error)
switch c.dataSourceType {
case datasourcePrometheus:
parseFn = parsePrometheusResponse
case datasourceVLogs:
parseFn = parseVLogsResponse
default:
logger.Panicf("BUG: unsupported datasource type %q to parse query range response", c.dataSourceType)
}
res, err = parseFn(req, resp)
_ = resp.Body.Close()
return res, err
}
func (c *Client) do(req *http.Request) (*http.Response, error) {
ru := req.URL.Redacted()
if *showDatasourceURL {
ru = req.URL.String()
}
if c.debug {
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru)
}
resp, err := c.c.Do(req)
if err != nil {
return nil, fmt.Errorf("error getting response from %s: %w", ru, err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, ru, body)
}
return resp, nil
}
func (c *Client) newQueryRangeRequest(ctx context.Context, query string, start, end time.Time) (*http.Request, error) {
req, err := c.newRequest(ctx)
if err != nil {
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", c.datasourceURL, err)
}
switch c.dataSourceType {
case datasourcePrometheus:
c.setPrometheusRangeReqParams(req, query, start, end)
case datasourceVLogs:
c.setVLogsRangeReqParams(req, query, start, end)
default:
logger.Panicf("BUG: unsupported datasource type %q to create range query request", c.dataSourceType)
}
return req, nil
}
func (c *Client) newQueryRequest(ctx context.Context, query string, ts time.Time) (*http.Request, error) {
req, err := c.newRequest(ctx)
if err != nil {
return nil, fmt.Errorf("cannot create query request to datasource %q: %w", c.datasourceURL, err)
}
switch c.dataSourceType {
case datasourcePrometheus:
c.setPrometheusInstantReqParams(req, query, ts)
case datasourceGraphite:
c.setGraphiteReqParams(req, query)
case datasourceVLogs:
c.setVLogsInstantReqParams(req, query, ts)
default:
logger.Panicf("BUG: unsupported datasource type %q to create query request", c.dataSourceType)
}
return req, nil
}
func (c *Client) newRequest(ctx context.Context) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.datasourceURL, nil)
if err != nil {
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", c.datasourceURL, err)
}
req.Header.Set("Content-Type", "application/json")
if c.authCfg != nil {
err = c.authCfg.SetHeaders(req, true)
if err != nil {
return nil, err
}
}
for _, h := range c.extraHeaders {
req.Header.Set(h.key, h.value)
}
return req, nil
}
// setReqParams adds query and other extra params for the request.
func (c *Client) setReqParams(r *http.Request, query string) {
q := r.URL.Query()
for k, vs := range c.extraParams {
if q.Has(k) { // extraParams are prior to params in URL
q.Del(k)
}
for _, v := range vs {
q.Add(k, v)
}
}
q.Set("query", query)
r.URL.RawQuery = q.Encode()
}

View file

@ -46,7 +46,7 @@ const (
graphitePrefix = "/graphite"
)
func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string) {
func (s *Client) setGraphiteReqParams(r *http.Request, query string) {
if s.appendTypePrefix {
r.URL.Path += graphitePrefix
}

View file

@ -14,7 +14,7 @@ import (
)
var (
disablePathAppend = flag.Bool("remoteRead.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/query' path "+
disablePathAppend = flag.Bool("remoteRead.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/query' or '/select/logsql/stats_query' path "+
"to the configured -datasource.url and -remoteRead.url")
disableStepParam = flag.Bool("datasource.disableStepParam", false, "Whether to disable adding 'step' param to the issued instant queries. "+
"This might be useful when using vmalert with datasources that do not support 'step' param for instant queries, like Google Managed Prometheus. "+
@ -171,7 +171,7 @@ const (
func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result, err error) {
r := &promResponse{}
if err = json.NewDecoder(resp.Body).Decode(r); err != nil {
return res, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL.Redacted(), err)
return res, fmt.Errorf("error parsing response from %s: %w", req.URL.Redacted(), err)
}
if r.Status == statusError {
return res, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL.Redacted(), r.ErrorType, r.Error)
@ -218,7 +218,7 @@ func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result
return res, nil
}
func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
func (s *Client) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) {
if s.appendTypePrefix {
r.URL.Path += "/prometheus"
}
@ -238,10 +238,10 @@ func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string,
q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
s.setReqParams(r, query)
}
func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) {
func (s *Client) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) {
if s.appendTypePrefix {
r.URL.Path += "/prometheus"
}
@ -257,19 +257,5 @@ func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, s
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setPrometheusReqParams(r, query)
}
func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) {
q := r.URL.Query()
for k, vs := range s.extraParams {
if q.Has(k) { // extraParams are prior to params in URL
q.Del(k)
}
for _, v := range vs {
q.Add(k, v)
}
}
q.Set("query", query)
r.URL.RawQuery = q.Encode()
s.setReqParams(r, query)
}

View file

@ -24,8 +24,10 @@ var (
Username: basicAuthName,
Password: promauth.NewSecret(basicAuthPass),
}
query = "vm_rows"
queryRender = "constantLine(10)"
vmQuery = "vm_rows"
queryRender = "constantLine(10)"
vlogsQuery = "_time: 5m | stats by (foo) count() total"
vlogsRangeQuery = "* | stats by (foo) count() total"
)
func TestVMInstantQuery(t *testing.T) {
@ -42,8 +44,8 @@ func TestVMInstantQuery(t *testing.T) {
if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass {
t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass)
}
if r.URL.Query().Get("query") != query {
t.Fatalf("expected %s in query param, got %s", query, r.URL.Query().Get("query"))
if r.URL.Query().Get("query") != vmQuery {
t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query"))
}
timeParam := r.URL.Query().Get("time")
if timeParam == "" {
@ -78,6 +80,31 @@ func TestVMInstantQuery(t *testing.T) {
w.Write([]byte(`[{"target":"constantLine(10)","tags":{"name":"constantLine(10)"},"datapoints":[[10,1611758343],[10,1611758373],[10,1611758403]]}]`))
}
})
mux.HandleFunc("/select/logsql/stats_query", func(w http.ResponseWriter, r *http.Request) {
c++
if r.Method != http.MethodPost {
t.Fatalf("expected POST method got %s", r.Method)
}
if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass {
t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass)
}
if r.URL.Query().Get("query") != vlogsQuery {
t.Fatalf("expected %s in query param, got %s", vlogsQuery, r.URL.Query().Get("query"))
}
timeParam := r.URL.Query().Get("time")
if timeParam == "" {
t.Fatalf("expected 'time' in query param, got nil instead")
}
if _, err := time.Parse(time.RFC3339, timeParam); err != nil {
t.Fatalf("failed to parse 'time' query param %q: %s", timeParam, err)
}
switch c {
case 9:
w.Write([]byte("[]"))
case 10:
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"total","foo":"bar"},"value":[1583786142,"13763"]},{"metric":{"__name__":"total","foo":"baz"},"value":[1583786140,"2000"]}]}}`))
}
})
srv := httptest.NewServer(mux)
defer srv.Close()
@ -86,13 +113,13 @@ func TestVMInstantQuery(t *testing.T) {
if err != nil {
t.Fatalf("unexpected: %s", err)
}
s := NewVMStorage(srv.URL, authCfg, 0, false, srv.Client())
s := NewPrometheusClient(srv.URL, authCfg, false, srv.Client())
p := datasourcePrometheus
pq := s.BuildWithParams(QuerierParams{DataSourceType: string(p), EvaluationInterval: 15 * time.Second})
ts := time.Now()
expErr := func(err string) {
expErr := func(query, err string) {
_, _, gotErr := pq.Query(ctx, query, ts)
if gotErr == nil {
t.Fatalf("expected %q got nil", err)
@ -102,13 +129,13 @@ func TestVMInstantQuery(t *testing.T) {
}
}
expErr("500") // 0
expErr("error parsing prometheus metrics") // 1
expErr("response error") // 2
expErr("unknown status") // 3
expErr("unexpected end of JSON input") // 4
expErr(vmQuery, "500") // 0
expErr(vmQuery, "error parsing response") // 1
expErr(vmQuery, "response error") // 2
expErr(vmQuery, "unknown status") // 3
expErr(vmQuery, "unexpected end of JSON input") // 4
res, _, err := pq.Query(ctx, query, ts) // 5 - vector
res, _, err := pq.Query(ctx, vmQuery, ts) // 5 - vector
if err != nil {
t.Fatalf("unexpected %s", err)
}
@ -129,7 +156,7 @@ func TestVMInstantQuery(t *testing.T) {
}
metricsEqual(t, res.Data, expected)
res, req, err := pq.Query(ctx, query, ts) // 6 - scalar
res, req, err := pq.Query(ctx, vmQuery, ts) // 6 - scalar
if err != nil {
t.Fatalf("unexpected %s", err)
}
@ -154,7 +181,7 @@ func TestVMInstantQuery(t *testing.T) {
res.SeriesFetched)
}
res, _, err = pq.Query(ctx, query, ts) // 7 - scalar with stats
res, _, err = pq.Query(ctx, vmQuery, ts) // 7 - scalar with stats
if err != nil {
t.Fatalf("unexpected %s", err)
}
@ -175,6 +202,7 @@ func TestVMInstantQuery(t *testing.T) {
*res.SeriesFetched)
}
// test graphite
gq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceGraphite)})
res, _, err = gq.Query(ctx, queryRender, ts) // 8 - graphite
@ -192,6 +220,33 @@ func TestVMInstantQuery(t *testing.T) {
},
}
metricsEqual(t, res.Data, exp)
// test victorialogs
vlogs := datasourceVLogs
pq = s.BuildWithParams(QuerierParams{DataSourceType: string(vlogs), EvaluationInterval: 15 * time.Second})
expErr(vlogsQuery, "error parsing response") // 9
res, _, err = pq.Query(ctx, vlogsQuery, ts) // 10
if err != nil {
t.Fatalf("unexpected %s", err)
}
if len(res.Data) != 2 {
t.Fatalf("expected 2 metrics got %d in %+v", len(res.Data), res.Data)
}
expected = []Metric{
{
Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "bar", Name: "foo"}},
Timestamps: []int64{1583786142},
Values: []float64{13763},
},
{
Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "baz", Name: "foo"}},
Timestamps: []int64{1583786140},
Values: []float64{2000},
},
}
metricsEqual(t, res.Data, expected)
}
func TestVMInstantQueryWithRetry(t *testing.T) {
@ -202,8 +257,8 @@ func TestVMInstantQueryWithRetry(t *testing.T) {
c := -1
mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) {
c++
if r.URL.Query().Get("query") != query {
t.Fatalf("expected %s in query param, got %s", query, r.URL.Query().Get("query"))
if r.URL.Query().Get("query") != vmQuery {
t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query"))
}
switch c {
case 0:
@ -225,11 +280,11 @@ func TestVMInstantQueryWithRetry(t *testing.T) {
srv := httptest.NewServer(mux)
defer srv.Close()
s := NewVMStorage(srv.URL, nil, 0, false, srv.Client())
s := NewPrometheusClient(srv.URL, nil, false, srv.Client())
pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus)})
expErr := func(err string) {
_, _, gotErr := pq.Query(ctx, query, time.Now())
_, _, gotErr := pq.Query(ctx, vmQuery, time.Now())
if gotErr == nil {
t.Fatalf("expected %q got nil", err)
}
@ -239,7 +294,7 @@ func TestVMInstantQueryWithRetry(t *testing.T) {
}
expValue := func(v float64) {
res, _, err := pq.Query(ctx, query, time.Now())
res, _, err := pq.Query(ctx, vmQuery, time.Now())
if err != nil {
t.Fatalf("unexpected %s", err)
}
@ -300,8 +355,8 @@ func TestVMRangeQuery(t *testing.T) {
if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass {
t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass)
}
if r.URL.Query().Get("query") != query {
t.Fatalf("expected %s in query param, got %s", query, r.URL.Query().Get("query"))
if r.URL.Query().Get("query") != vmQuery {
t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query"))
}
startTS := r.URL.Query().Get("start")
if startTS == "" {
@ -326,6 +381,40 @@ func TestVMRangeQuery(t *testing.T) {
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"vm_rows"},"values":[[1583786142,"13763"]]}]}}`))
}
})
mux.HandleFunc("/select/logsql/stats_query_range", func(w http.ResponseWriter, r *http.Request) {
c++
if r.Method != http.MethodPost {
t.Fatalf("expected POST method got %s", r.Method)
}
if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass {
t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass)
}
if r.URL.Query().Get("query") != vlogsRangeQuery {
t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query"))
}
startTS := r.URL.Query().Get("start")
if startTS == "" {
t.Fatalf("expected 'start' in query param, got nil instead")
}
if _, err := time.Parse(time.RFC3339, startTS); err != nil {
t.Fatalf("failed to parse 'start' query param: %s", err)
}
endTS := r.URL.Query().Get("end")
if endTS == "" {
t.Fatalf("expected 'end' in query param, got nil instead")
}
if _, err := time.Parse(time.RFC3339, endTS); err != nil {
t.Fatalf("failed to parse 'end' query param: %s", err)
}
step := r.URL.Query().Get("step")
if step != "60s" {
t.Fatalf("expected 'step' query param to be 60s; got %q instead", step)
}
switch c {
case 1:
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"total"},"values":[[1583786142,"10"]]}]}}`))
}
})
srv := httptest.NewServer(mux)
defer srv.Close()
@ -334,19 +423,19 @@ func TestVMRangeQuery(t *testing.T) {
if err != nil {
t.Fatalf("unexpected: %s", err)
}
s := NewVMStorage(srv.URL, authCfg, *queryStep, false, srv.Client())
s := NewPrometheusClient(srv.URL, authCfg, false, srv.Client())
pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus), EvaluationInterval: 15 * time.Second})
_, err = pq.QueryRange(ctx, query, time.Now(), time.Time{})
_, err = pq.QueryRange(ctx, vmQuery, time.Now(), time.Time{})
expectError(t, err, "is missing")
_, err = pq.QueryRange(ctx, query, time.Time{}, time.Now())
_, err = pq.QueryRange(ctx, vmQuery, time.Time{}, time.Now())
expectError(t, err, "is missing")
start, end := time.Now().Add(-time.Minute), time.Now()
res, err := pq.QueryRange(ctx, query, start, end)
res, err := pq.QueryRange(ctx, vmQuery, start, end)
if err != nil {
t.Fatalf("unexpected %s", err)
}
@ -363,33 +452,66 @@ func TestVMRangeQuery(t *testing.T) {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
}
// test unsupported graphite
gq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceGraphite)})
_, err = gq.QueryRange(ctx, queryRender, start, end)
expectError(t, err, "is not supported")
// unsupported logsql
gq = s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceVLogs), EvaluationInterval: 60 * time.Second})
res, err = gq.QueryRange(ctx, vlogsRangeQuery, start, end)
expectError(t, err, "is not supported")
// supported logsql
gq = s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceVLogs), EvaluationInterval: 60 * time.Second, ApplyIntervalAsTimeFilter: true})
res, err = gq.QueryRange(ctx, vlogsRangeQuery, start, end)
if err != nil {
t.Fatalf("unexpected %s", err)
}
m = res.Data
if len(m) != 1 {
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
}
expected = Metric{
Labels: []Label{{Value: "total", Name: "stats_result"}},
Timestamps: []int64{1583786142},
Values: []float64{10},
}
if !reflect.DeepEqual(m[0], expected) {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
}
}
func TestRequestParams(t *testing.T) {
query := "up"
vlogsQuery := "_time: 5m | stats count() total"
timestamp := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC)
f := func(isQueryRange bool, vm *VMStorage, checkFn func(t *testing.T, r *http.Request)) {
f := func(isQueryRange bool, c *Client, checkFn func(t *testing.T, r *http.Request)) {
t.Helper()
req, err := vm.newRequest(ctx)
req, err := c.newRequest(ctx)
if err != nil {
t.Fatalf("error in newRequest: %s", err)
}
switch vm.dataSourceType {
case "", datasourcePrometheus:
switch c.dataSourceType {
case datasourcePrometheus:
if isQueryRange {
vm.setPrometheusRangeReqParams(req, query, timestamp, timestamp)
c.setPrometheusRangeReqParams(req, query, timestamp, timestamp)
} else {
vm.setPrometheusInstantReqParams(req, query, timestamp)
c.setPrometheusInstantReqParams(req, query, timestamp)
}
case datasourceGraphite:
vm.setGraphiteReqParams(req, query)
c.setGraphiteReqParams(req, query)
case datasourceVLogs:
if isQueryRange {
c.setVLogsRangeReqParams(req, vlogsQuery, timestamp, timestamp)
} else {
c.setVLogsInstantReqParams(req, vlogsQuery, timestamp)
}
}
checkFn(t, req)
@ -399,19 +521,19 @@ func TestRequestParams(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
storage := VMStorage{
storage := Client{
extraParams: url.Values{"round_digits": {"10"}},
}
// prometheus path
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourcePrometheus,
}, func(t *testing.T, r *http.Request) {
checkEqualString(t, "/api/v1/query", r.URL.Path)
})
// prometheus prefix
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourcePrometheus,
appendTypePrefix: true,
}, func(t *testing.T, r *http.Request) {
@ -419,14 +541,14 @@ func TestRequestParams(t *testing.T) {
})
// prometheus range path
f(true, &VMStorage{
f(true, &Client{
dataSourceType: datasourcePrometheus,
}, func(t *testing.T, r *http.Request) {
checkEqualString(t, "/api/v1/query_range", r.URL.Path)
})
// prometheus range prefix
f(true, &VMStorage{
f(true, &Client{
dataSourceType: datasourcePrometheus,
appendTypePrefix: true,
}, func(t *testing.T, r *http.Request) {
@ -434,14 +556,14 @@ func TestRequestParams(t *testing.T) {
})
// graphite path
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourceGraphite,
}, func(t *testing.T, r *http.Request) {
checkEqualString(t, graphitePath, r.URL.Path)
})
// graphite prefix
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourceGraphite,
appendTypePrefix: true,
}, func(t *testing.T, r *http.Request) {
@ -449,21 +571,27 @@ func TestRequestParams(t *testing.T) {
})
// default params
f(false, &VMStorage{}, func(t *testing.T, r *http.Request) {
f(false, &Client{dataSourceType: datasourcePrometheus}, func(t *testing.T, r *http.Request) {
exp := url.Values{"query": {query}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
f(false, &Client{dataSourceType: datasourcePrometheus, applyIntervalAsTimeFilter: true}, func(t *testing.T, r *http.Request) {
exp := url.Values{"query": {query}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
// default range params
f(true, &VMStorage{}, func(t *testing.T, r *http.Request) {
f(true, &Client{dataSourceType: datasourcePrometheus}, func(t *testing.T, r *http.Request) {
ts := timestamp.Format(time.RFC3339)
exp := url.Values{"query": {query}, "start": {ts}, "end": {ts}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
// basic auth
f(false, &VMStorage{
authCfg: authCfg,
f(false, &Client{
dataSourceType: datasourcePrometheus,
authCfg: authCfg,
}, func(t *testing.T, r *http.Request) {
u, p, _ := r.BasicAuth()
checkEqualString(t, "foo", u)
@ -471,8 +599,9 @@ func TestRequestParams(t *testing.T) {
})
// basic auth range
f(true, &VMStorage{
authCfg: authCfg,
f(true, &Client{
dataSourceType: datasourcePrometheus,
authCfg: authCfg,
}, func(t *testing.T, r *http.Request) {
u, p, _ := r.BasicAuth()
checkEqualString(t, "foo", u)
@ -480,7 +609,8 @@ func TestRequestParams(t *testing.T) {
})
// evaluation interval
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourcePrometheus,
evaluationInterval: 15 * time.Second,
}, func(t *testing.T, r *http.Request) {
evalInterval := 15 * time.Second
@ -489,8 +619,9 @@ func TestRequestParams(t *testing.T) {
})
// step override
f(false, &VMStorage{
queryStep: time.Minute,
f(false, &Client{
dataSourceType: datasourcePrometheus,
queryStep: time.Minute,
}, func(t *testing.T, r *http.Request) {
exp := url.Values{
"query": {query},
@ -501,7 +632,8 @@ func TestRequestParams(t *testing.T) {
})
// step to seconds
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourcePrometheus,
evaluationInterval: 3 * time.Hour,
}, func(t *testing.T, r *http.Request) {
evalInterval := 3 * time.Hour
@ -510,15 +642,17 @@ func TestRequestParams(t *testing.T) {
})
// prometheus extra params
f(false, &VMStorage{
extraParams: url.Values{"round_digits": {"10"}},
f(false, &Client{
dataSourceType: datasourcePrometheus,
extraParams: url.Values{"round_digits": {"10"}},
}, func(t *testing.T, r *http.Request) {
exp := url.Values{"query": {query}, "round_digits": {"10"}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
// prometheus extra params range
f(true, &VMStorage{
f(true, &Client{
dataSourceType: datasourcePrometheus,
extraParams: url.Values{
"nocache": {"1"},
"max_lookback": {"1h"},
@ -536,7 +670,8 @@ func TestRequestParams(t *testing.T) {
// custom params overrides the original params
f(false, storage.Clone().ApplyParams(QuerierParams{
QueryParams: url.Values{"round_digits": {"2"}},
DataSourceType: string(datasourcePrometheus),
QueryParams: url.Values{"round_digits": {"2"}},
}), func(t *testing.T, r *http.Request) {
exp := url.Values{"query": {query}, "round_digits": {"2"}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
@ -544,14 +679,15 @@ func TestRequestParams(t *testing.T) {
// allow duplicates in query params
f(false, storage.Clone().ApplyParams(QuerierParams{
QueryParams: url.Values{"extra_labels": {"env=dev", "foo=bar"}},
DataSourceType: string(datasourcePrometheus),
QueryParams: url.Values{"extra_labels": {"env=dev", "foo=bar"}},
}), func(t *testing.T, r *http.Request) {
exp := url.Values{"query": {query}, "round_digits": {"10"}, "extra_labels": {"env=dev", "foo=bar"}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
// graphite extra params
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourceGraphite,
extraParams: url.Values{
"nocache": {"1"},
@ -563,7 +699,7 @@ func TestRequestParams(t *testing.T) {
})
// graphite extra params allows to override from
f(false, &VMStorage{
f(false, &Client{
dataSourceType: datasourceGraphite,
extraParams: url.Values{
"from": {"-10m"},
@ -572,10 +708,38 @@ func TestRequestParams(t *testing.T) {
exp := fmt.Sprintf("format=json&from=-10m&target=%s&until=now", query)
checkEqualString(t, exp, r.URL.RawQuery)
})
// test vlogs
f(false, &Client{
dataSourceType: datasourceVLogs,
evaluationInterval: time.Minute,
}, func(t *testing.T, r *http.Request) {
exp := url.Values{"query": {vlogsQuery}, "time": {timestamp.Format(time.RFC3339)}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
f(false, &Client{
dataSourceType: datasourceVLogs,
evaluationInterval: time.Minute,
applyIntervalAsTimeFilter: true,
}, func(t *testing.T, r *http.Request) {
ts := timestamp.Format(time.RFC3339)
exp := url.Values{"query": {vlogsQuery}, "time": {ts}, "start": {timestamp.Add(-time.Minute).Format(time.RFC3339)}, "end": {ts}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
f(true, &Client{
dataSourceType: datasourceVLogs,
evaluationInterval: time.Minute,
}, func(t *testing.T, r *http.Request) {
ts := timestamp.Format(time.RFC3339)
exp := url.Values{"query": {vlogsQuery}, "start": {ts}, "end": {ts}, "step": {"60s"}}
checkEqualString(t, exp.Encode(), r.URL.RawQuery)
})
}
func TestHeaders(t *testing.T) {
f := func(vmFn func() *VMStorage, checkFn func(t *testing.T, r *http.Request)) {
f := func(vmFn func() *Client, checkFn func(t *testing.T, r *http.Request)) {
t.Helper()
vm := vmFn()
@ -587,12 +751,12 @@ func TestHeaders(t *testing.T) {
}
// basic auth
f(func() *VMStorage {
f(func() *Client {
cfg, err := utils.AuthConfig(utils.WithBasicAuth("foo", "bar", ""))
if err != nil {
t.Fatalf("Error get auth config: %s", err)
}
return &VMStorage{authCfg: cfg}
return NewPrometheusClient("", cfg, false, nil)
}, func(t *testing.T, r *http.Request) {
u, p, _ := r.BasicAuth()
checkEqualString(t, "foo", u)
@ -600,12 +764,12 @@ func TestHeaders(t *testing.T) {
})
// bearer auth
f(func() *VMStorage {
f(func() *Client {
cfg, err := utils.AuthConfig(utils.WithBearer("foo", ""))
if err != nil {
t.Fatalf("Error get auth config: %s", err)
}
return &VMStorage{authCfg: cfg}
return NewPrometheusClient("", cfg, false, nil)
}, func(t *testing.T, r *http.Request) {
reqToken := r.Header.Get("Authorization")
splitToken := strings.Split(reqToken, "Bearer ")
@ -617,11 +781,13 @@ func TestHeaders(t *testing.T) {
})
// custom extraHeaders
f(func() *VMStorage {
return &VMStorage{extraHeaders: []keyValue{
f(func() *Client {
c := NewPrometheusClient("", nil, false, nil)
c.extraHeaders = []keyValue{
{key: "Foo", value: "bar"},
{key: "Baz", value: "qux"},
}}
}
return c
}, func(t *testing.T, r *http.Request) {
h1 := r.Header.Get("Foo")
checkEqualString(t, "bar", h1)
@ -630,17 +796,16 @@ func TestHeaders(t *testing.T) {
})
// custom header overrides basic auth
f(func() *VMStorage {
f(func() *Client {
cfg, err := utils.AuthConfig(utils.WithBasicAuth("foo", "bar", ""))
if err != nil {
t.Fatalf("Error get auth config: %s", err)
}
return &VMStorage{
authCfg: cfg,
extraHeaders: []keyValue{
{key: "Authorization", value: "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="},
},
c := NewPrometheusClient("", cfg, false, nil)
c.extraHeaders = []keyValue{
{key: "Authorization", value: "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="},
}
return c
}, func(t *testing.T, r *http.Request) {
u, p, _ := r.BasicAuth()
checkEqualString(t, "Aladdin", u)

View file

@ -0,0 +1,61 @@
package datasource
import (
"fmt"
"net/http"
"time"
)
func (s *Client) setVLogsInstantReqParams(r *http.Request, query string, timestamp time.Time) {
// there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix.
if !*disablePathAppend {
r.URL.Path += "/select/logsql/stats_query"
}
q := r.URL.Query()
// set `time` param explicitly, it will be used as the timestamp of query results.
q.Set("time", timestamp.Format(time.RFC3339))
// set the `start` and `end` params if applyIntervalAsTimeFilter is enabled(time filter is missing in the rule expr),
// so the query will be executed in time range [timestamp - evaluationInterval, timestamp].
if s.applyIntervalAsTimeFilter && s.evaluationInterval > 0 {
q.Set("start", timestamp.Add(-s.evaluationInterval).Format(time.RFC3339))
q.Set("end", timestamp.Format(time.RFC3339))
}
r.URL.RawQuery = q.Encode()
s.setReqParams(r, query)
}
func (s *Client) setVLogsRangeReqParams(r *http.Request, query string, start, end time.Time) {
// there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix.
if !*disablePathAppend {
r.URL.Path += "/select/logsql/stats_query_range"
}
q := r.URL.Query()
q.Add("start", start.Format(time.RFC3339))
q.Add("end", end.Format(time.RFC3339))
// set step as evaluationInterval by default
if s.evaluationInterval > 0 {
q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds())))
}
r.URL.RawQuery = q.Encode()
s.setReqParams(r, query)
}
func parseVLogsResponse(req *http.Request, resp *http.Response) (res Result, err error) {
res, err = parsePrometheusResponse(req, resp)
if err != nil {
return Result{}, err
}
for i := range res.Data {
m := &res.Data[i]
for j := range m.Labels {
// reserve the stats func result name with a new label `stats_result` instead of dropping it,
// since there could be multiple stats results in a single query, for instance:
// _time:5m | stats quantile(0.5, request_duration_seconds) p50, quantile(0.9, request_duration_seconds) p90
if m.Labels[j].Name == "__name__" {
m.Labels[j].Name = "stats_result"
break
}
}
}
return
}

View file

@ -42,11 +42,15 @@ type QuerierBuilder interface {
// QuerierParams params for Querier.
type QuerierParams struct {
DataSourceType string
EvaluationInterval time.Duration
QueryParams url.Values
Headers map[string]string
Debug bool
DataSourceType string
// ApplyIntervalAsTimeFilter is only valid for vlogs datasource.
// Set to true if there is no [timeFilter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) in the rule expression,
// and we will add evaluation interval as an additional timeFilter when querying.
ApplyIntervalAsTimeFilter bool
EvaluationInterval time.Duration
QueryParams url.Values
Headers map[string]string
Debug bool
}
// Metric is the basic entity which should be return by datasource

View file

@ -133,13 +133,12 @@ func Init(extraParams url.Values) (QuerierBuilder, error) {
return nil, fmt.Errorf("failed to set request auth header to datasource %q: %w", *addr, err)
}
return &VMStorage{
return &Client{
c: &http.Client{Transport: tr},
authCfg: authCfg,
datasourceURL: strings.TrimSuffix(*addr, "/"),
appendTypePrefix: *appendTypePrefix,
queryStep: *queryStep,
dataSourceType: datasourcePrometheus,
extraParams: extraParams,
}, nil
}

View file

@ -1,272 +0,0 @@
package datasource
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
)
type datasourceType string
const (
datasourcePrometheus datasourceType = "prometheus"
datasourceGraphite datasourceType = "graphite"
)
func toDatasourceType(s string) datasourceType {
if s == string(datasourceGraphite) {
return datasourceGraphite
}
return datasourcePrometheus
}
// VMStorage represents vmstorage entity with ability to read and write metrics
// WARN: when adding a new field, remember to update Clone() method.
type VMStorage struct {
c *http.Client
authCfg *promauth.Config
datasourceURL string
appendTypePrefix bool
queryStep time.Duration
dataSourceType datasourceType
// evaluationInterval will help setting request's `step` param.
evaluationInterval time.Duration
// extraParams contains params to be attached to each HTTP request
extraParams url.Values
// extraHeaders are headers to be attached to each HTTP request
extraHeaders []keyValue
// whether to print additional log messages
// for each sent request
debug bool
}
type keyValue struct {
key string
value string
}
// Clone makes clone of VMStorage, shares http client.
func (s *VMStorage) Clone() *VMStorage {
ns := &VMStorage{
c: s.c,
authCfg: s.authCfg,
datasourceURL: s.datasourceURL,
appendTypePrefix: s.appendTypePrefix,
queryStep: s.queryStep,
dataSourceType: s.dataSourceType,
evaluationInterval: s.evaluationInterval,
// init map so it can be populated below
extraParams: url.Values{},
debug: s.debug,
}
if len(s.extraHeaders) > 0 {
ns.extraHeaders = make([]keyValue, len(s.extraHeaders))
copy(ns.extraHeaders, s.extraHeaders)
}
for k, v := range s.extraParams {
ns.extraParams[k] = v
}
return ns
}
// ApplyParams - changes given querier params.
func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage {
s.dataSourceType = toDatasourceType(params.DataSourceType)
s.evaluationInterval = params.EvaluationInterval
if params.QueryParams != nil {
if s.extraParams == nil {
s.extraParams = url.Values{}
}
for k, vl := range params.QueryParams {
// custom query params are prior to default ones
if s.extraParams.Has(k) {
s.extraParams.Del(k)
}
for _, v := range vl {
// don't use .Set() instead of Del/Add since it is allowed
// for GET params to be duplicated
// see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4908
s.extraParams.Add(k, v)
}
}
}
if params.Headers != nil {
for key, value := range params.Headers {
kv := keyValue{key: key, value: value}
s.extraHeaders = append(s.extraHeaders, kv)
}
}
s.debug = params.Debug
return s
}
// BuildWithParams - implements interface.
func (s *VMStorage) BuildWithParams(params QuerierParams) Querier {
return s.Clone().ApplyParams(params)
}
// NewVMStorage is a constructor for VMStorage
func NewVMStorage(baseURL string, authCfg *promauth.Config, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage {
return &VMStorage{
c: c,
authCfg: authCfg,
datasourceURL: strings.TrimSuffix(baseURL, "/"),
appendTypePrefix: appendTypePrefix,
queryStep: queryStep,
dataSourceType: datasourcePrometheus,
extraParams: url.Values{},
}
}
// Query executes the given query and returns parsed response
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
req, err := s.newQueryRequest(ctx, query, ts)
if err != nil {
return Result{}, nil, err
}
resp, err := s.do(req)
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
// Return unexpected error to the caller.
return Result{}, nil, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = s.newQueryRequest(ctx, query, ts)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
resp, err = s.do(req)
if err != nil {
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
}
}
// Process the received response.
parseFn := parsePrometheusResponse
if s.dataSourceType != datasourcePrometheus {
parseFn = parseGraphiteResponse
}
result, err := parseFn(req, resp)
_ = resp.Body.Close()
return result, req, err
}
// QueryRange executes the given query on the given time range.
// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
// Graphite type isn't supported.
func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) {
if s.dataSourceType != datasourcePrometheus {
return res, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType)
}
if start.IsZero() {
return res, fmt.Errorf("start param is missing")
}
if end.IsZero() {
return res, fmt.Errorf("end param is missing")
}
req, err := s.newQueryRangeRequest(ctx, query, start, end)
if err != nil {
return res, err
}
resp, err := s.do(req)
if err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) {
// Return unexpected error to the caller.
return res, err
}
// Something in the middle between client and datasource might be closing
// the connection. So we do a one more attempt in hope request will succeed.
req, err = s.newQueryRangeRequest(ctx, query, start, end)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
resp, err = s.do(req)
if err != nil {
return res, fmt.Errorf("second attempt: %w", err)
}
}
// Process the received response.
res, err = parsePrometheusResponse(req, resp)
_ = resp.Body.Close()
return res, err
}
func (s *VMStorage) do(req *http.Request) (*http.Response, error) {
ru := req.URL.Redacted()
if *showDatasourceURL {
ru = req.URL.String()
}
if s.debug {
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru)
}
resp, err := s.c.Do(req)
if err != nil {
return nil, fmt.Errorf("error getting response from %s: %w", ru, err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, ru, body)
}
return resp, nil
}
func (s *VMStorage) newQueryRangeRequest(ctx context.Context, query string, start, end time.Time) (*http.Request, error) {
req, err := s.newRequest(ctx)
if err != nil {
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", s.datasourceURL, err)
}
s.setPrometheusRangeReqParams(req, query, start, end)
return req, nil
}
func (s *VMStorage) newQueryRequest(ctx context.Context, query string, ts time.Time) (*http.Request, error) {
req, err := s.newRequest(ctx)
if err != nil {
return nil, fmt.Errorf("cannot create query request to datasource %q: %w", s.datasourceURL, err)
}
switch s.dataSourceType {
case "", datasourcePrometheus:
s.setPrometheusInstantReqParams(req, query, ts)
case datasourceGraphite:
s.setGraphiteReqParams(req, query)
default:
logger.Panicf("BUG: engine not found: %q", s.dataSourceType)
}
return req, nil
}
func (s *VMStorage) newRequest(ctx context.Context) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.datasourceURL, nil)
if err != nil {
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err)
}
req.Header.Set("Content-Type", "application/json")
if s.authCfg != nil {
err = s.authCfg.SetHeaders(req, true)
if err != nil {
return nil, err
}
}
for _, h := range s.extraHeaders {
req.Header.Set(h.key, h.value)
}
return req, nil
}

View file

@ -66,7 +66,7 @@ absolute path to all .tpl files in root.
evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules")
validateTemplates = flag.Bool("rule.validateTemplates", true, "Whether to validate annotation and label templates")
validateExpressions = flag.Bool("rule.validateExpressions", true, "Whether to validate rules expressions via MetricsQL engine")
validateExpressions = flag.Bool("rule.validateExpressions", true, "Whether to validate rules expressions for different types.")
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier. By default, hostname is used as address.")
externalAlertSource = flag.String("external.alert.source", "", `External Alert Source allows to override the Source link for alerts sent to AlertManager `+

View file

@ -86,5 +86,5 @@ func Init() (datasource.QuerierBuilder, error) {
return nil, fmt.Errorf("failed to configure auth: %w", err)
}
c := &http.Client{Transport: tr}
return datasource.NewVMStorage(*addr, authCfg, 0, false, c), nil
return datasource.NewPrometheusClient(*addr, authCfg, false, c), nil
}

View file

@ -72,11 +72,12 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule
EvalInterval: group.Interval,
Debug: cfg.Debug,
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: group.Type.String(),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,
Debug: cfg.Debug,
DataSourceType: group.Type.String(),
ApplyIntervalAsTimeFilter: setIntervalAsTimeFilter(group.Type.String(), cfg.Expr),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,
Debug: cfg.Debug,
}),
alerts: make(map[uint64]*notifier.Alert),
metrics: &alertingRuleMetrics{},

View file

@ -213,7 +213,6 @@ func (g *Group) restore(ctx context.Context, qb datasource.QuerierBuilder, ts ti
continue
}
q := qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: g.Type.String(),
EvaluationInterval: g.Interval,
QueryParams: g.Params,
Headers: g.Headers,

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
@ -64,10 +65,11 @@ func NewRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul
File: group.File,
metrics: &recordingRuleMetrics{},
q: qb.BuildWithParams(datasource.QuerierParams{
DataSourceType: group.Type.String(),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,
DataSourceType: group.Type.String(),
ApplyIntervalAsTimeFilter: setIntervalAsTimeFilter(group.Type.String(), cfg.Expr),
EvaluationInterval: group.Interval,
QueryParams: group.Params,
Headers: group.Headers,
}),
}
@ -213,3 +215,12 @@ func (rr *RecordingRule) updateWith(r Rule) error {
rr.q = nr.q
return nil
}
// setIntervalAsTimeFilter returns true if given LogsQL has a time filter.
func setIntervalAsTimeFilter(dType, expr string) bool {
if dType != "vlogs" {
return false
}
q, _ := logstorage.ParseStatsQuery(expr)
return !q.ContainAnyTimeFilter()
}

View file

@ -266,3 +266,25 @@ func TestRecordingRuleExec_Negative(t *testing.T) {
t.Fatalf("cannot execute recroding rule: %s", err)
}
}
func TestSetIntervalAsTimeFilter(t *testing.T) {
f := func(s, dType string, expected bool) {
t.Helper()
if setIntervalAsTimeFilter(dType, s) != expected {
t.Fatalf("unexpected result for hasTimeFilter(%q); want %v", s, expected)
}
}
f(`* | count()`, "prometheus", false)
f(`* | count()`, "vlogs", true)
f(`error OR _time:5m | count()`, "vlogs", true)
f(`(_time: 5m AND error) OR (_time: 5m AND warn) | count()`, "vlogs", true)
f(`* | error OR _time:5m | count()`, "vlogs", true)
f(`_time:5m | count()`, "vlogs", false)
f(`_time:2023-04-25T22:45:59Z | count()`, "vlogs", false)
f(`error AND _time:5m | count()`, "vlogs", false)
f(`* | error AND _time:5m | count()`, "vlogs", false)
}

View file

@ -105,7 +105,7 @@ vmauth config is available [here](ttps://github.com/VictoriaMetrics/VictoriaMetr
## vmalert
vmalert evaluates alerting rules [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts.yml)
vmalert evaluates alerting rules [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts.yml)
to track VictoriaMetrics health state. It is connected with AlertManager for firing alerts,
and with VictoriaMetrics for executing queries and storing alert's state.
@ -153,17 +153,17 @@ make docker-cluster-vm-datasource-down # shutdown cluster
See below a list of recommended alerting rules for various VictoriaMetrics components for running in production.
Some alerting rules thresholds are just recommendations and could require an adjustment.
The list of alerting rules is the following:
* [alerts-health.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml):
* [alerts-health.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-health.yml):
alerting rules related to all VictoriaMetrics components for tracking their "health" state;
* [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts.yml):
* [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts.yml):
alerting rules related to [single-server VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) installation;
* [alerts-cluster.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-cluster.yml):
* [alerts-cluster.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-cluster.yml):
alerting rules related to [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/cluster-victoriametrics/);
* [alerts-vmagent.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmagent.yml):
* [alerts-vmagent.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmagent.yml):
alerting rules related to [vmagent](https://docs.victoriametrics.com/vmagent/) component;
* [alerts-vmalert.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmalert.yml):
* [alerts-vmalert.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmalert.yml):
alerting rules related to [vmalert](https://docs.victoriametrics.com/vmalert/) component;
* [alerts-vmauth.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml):
* [alerts-vmauth.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmauth.yml):
alerting rules related to [vmauth](https://docs.victoriametrics.com/vmauth/) component;
* [alerts-vlogs.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml):
alerting rules related to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/);

View file

@ -0,0 +1,9 @@
# route requests between VictoriaMetrics and VictoriaLogs
unauthorized_user:
url_map:
- src_paths:
- "/api/v1/query.*"
url_prefix: "http://victoriametrics:8428"
- src_paths:
- "/select/logsql/.*"
url_prefix: "http://victorialogs:9428"

View file

@ -133,10 +133,10 @@ services:
ports:
- 8880:8880
volumes:
- ./alerts-cluster.yml:/etc/alerts/alerts.yml
- ./alerts-health.yml:/etc/alerts/alerts-health.yml
- ./alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml
- ./alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml
- ./rules/alerts-cluster.yml:/etc/alerts/alerts.yml
- ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml
- ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml
- ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml
command:
- "--datasource.url=http://vmauth:8427/select/0/prometheus"
- "--remoteRead.url=http://vmauth:8427/select/0/prometheus"

View file

@ -26,7 +26,7 @@ services:
# and forwards them to VictoriaLogs
fluentbit:
container_name: fluentbit
image: cr.fluentbit.io/fluent/fluent-bit:2.1.4
image: fluent/fluent-bit:2.1.4
volumes:
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf
@ -40,7 +40,7 @@ services:
# storing logs and serving read queries.
victorialogs:
container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.37.0-victorialogs
image: victoriametrics/victoria-logs:v0.37.0-victorialogs
command:
- "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428"
@ -69,29 +69,50 @@ services:
- vm_net
restart: always
# vmalert executes alerting and recording rules
# vmauth is a router and balancer for HTTP requests.
# It proxies query requests from vmalert to either VictoriaMetrics or VictoriaLogs,
# depending on the requested path.
vmauth:
container_name: vmauth
image: victoriametrics/vmauth:v1.105.0
depends_on:
- "victoriametrics"
- "victorialogs"
volumes:
- ./auth-mixed-datasource.yml:/etc/auth.yml
command:
- "--auth.config=/etc/auth.yml"
ports:
- 8427:8427
networks:
- vm_net
restart: always
# vmalert executes alerting and recording rules according to given rule type.
vmalert:
container_name: vmalert
image: victoriametrics/vmalert:v1.105.0
depends_on:
- "victoriametrics"
- "vmauth"
- "alertmanager"
- "victoriametrics"
ports:
- 8880:8880
volumes:
- ./alerts.yml:/etc/alerts/alerts.yml
- ./alerts-health.yml:/etc/alerts/alerts-health.yml
- ./alerts-vlogs.yml:/etc/alerts/alerts-vlogs.yml
- ./alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml
# disable log-related rules for now, util vmalert supports vlogs type rule
# - ./rules/vlogs-example-alerts.yml:/etc/alerts/vlogs.yml
- ./rules/alerts.yml:/etc/alerts/alerts.yml
- ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml
- ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml
- ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml
command:
- "--datasource.url=http://victoriametrics:8428/"
- "--datasource.url=http://vmauth:8427/"
- "--remoteRead.url=http://victoriametrics:8428/"
- "--remoteWrite.url=http://victoriametrics:8428/"
- "--notifier.url=http://alertmanager:9093/"
- "--rule=/etc/alerts/*.yml"
# display source of alerts in grafana
- "--external.url=http://127.0.0.1:3000" #grafana outside container
- '--external.alert.source=explore?orgId=1&left={"datasource":"VictoriaMetrics","queries":[{"expr":{{.Expr|jsonEscape|queryEscape}},"refId":"A"}],"range":{"from":"{{ .ActiveAt.UnixMilli }}","to":"now"}}'
networks:
- vm_net
restart: always

View file

@ -72,10 +72,10 @@ services:
ports:
- 8880:8880
volumes:
- ./alerts.yml:/etc/alerts/alerts.yml
- ./alerts-health.yml:/etc/alerts/alerts-health.yml
- ./alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml
- ./alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml
- ./rules/alerts.yml:/etc/alerts/alerts.yml
- ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml
- ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml
- ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml
command:
- "--datasource.url=http://victoriametrics:8428/"
- "--remoteRead.url=http://victoriametrics:8428/"

View file

@ -0,0 +1,13 @@
groups:
- name: TestGroup
type: vlogs
interval: 1m
rules:
- record: logCount
expr: '_time: 1m | stats by (path) count () as total'
annotations:
description: "path {{$labels.path}} generated {{$value}} logs in the last 1 minute"
- alert: tooManyLogs
expr: '_time: 1m | stats by (path) count () as total | filter total:>50'
annotations:
description: "path {{$labels.path}} generated more than 50 log entries in the last 1 minute: {{$value}}"

View file

@ -0,0 +1,207 @@
---
weight: 10
title: vmalert
menu:
docs:
parent: "victorialogs"
weight: 10
aliases:
- /VictoriaLogs/vmalert.html
---
_Available from [TODO](https://docs.victoriametrics.com/changelog/#TODO) vmalert version and [v0.36.0](https://docs.victoriametrics.com/victorialogs/changelog/#v0360) VictoriaLogs version._
[vmalert](https://docs.victoriametrics.com/vmalert/) integrates with VictoriaLogs via stats APIs [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats)
and [`/select/logsql/stats_query_range`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats).
These endpoints return the log stats in a format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries).
It allows using VictoriaLogs as the datasource in vmalert, creating alerting and recording rules via [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/).
_Note: This page provides only integration instructions for vmalert and VictoriaLogs. See the full textbook for vmalert [here](https://docs.victoriametrics.com/vmalert)._
## Quick Start
Run vmalert with `-rule.defaultRuleType=vlogs` cmd-line flag.
```
./bin/vmalert -rule=alert.rules \ # Path to the files or http url with alerting and/or recording rules in YAML format.
-datasource.url=http://localhost:9428 \ # VictoriaLogs address.
-rule.defaultRuleType=vlogs \ # Set default rules type to VictoriaLogs.
-notifier.url=http://localhost:9093 \ # AlertManager URL (required if alerting rules are used)
-remoteWrite.url=http://localhost:8428 \ # Remote write compatible storage to persist rules and alerts state info (required for recording rules)
-remoteRead.url=http://localhost:8428 \ # Prometheus HTTP API compatible datasource to restore alerts state from
```
> See the full list of configuration flags and their descriptions in [configuration](#configuration) section.
> Each `-rule` file may contain arbitrary number of [groups](https://docs.victoriametrics.com/vmalert/#groups).
See examples in [Groups](#groups) section.
With configuration example above, vmalert will perform the following interactions:
![vmalert](vmalert_victorialogs.webp)
1. Rules listed in `-rule` file are executed against VictoriaLogs service configured via `-datasource.url`;
2. Triggered alerting notifications are sent to [Alertmanager](https://github.com/prometheus/alertmanager) service configured via `-notifier.url`;
3. Results of recording rules expressions and alerts state are persisted to Prometheus-compatible remote-write endpoint (i.e. VictoriaMetrics) configured via `-remoteWrite.url`;
4. On vmalert restarts, alerts state [can be restored](https://docs.victoriametrics.com/vmalert/#alerts-state-on-restarts) by querying Prometheus-compatible HTTP API endpoint (i.e. VictoriaMetrics) configured via `-remoteRead.url`.
## Configuration
### Flags
For a complete list of command-line flags, visit https://docs.victoriametrics.com/vmalert/#flags or execute `./vmalert --help` command.
The following are key flags related to integration with VictoriaLogs:
```
-datasource.url string
Datasource address supporting log stats APIs, which can be a single VictoriaLogs node or a proxy in front of VictoriaLogs. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record.
-notifier.url array
Prometheus Alertmanager URL, e.g. http://127.0.0.1:9093. List all Alertmanager URLs if it runs in the cluster mode to ensure high availability.
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.url string
Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'.
-remoteRead.url string
Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.
-rule array
Path to the files or http url with alerting and/or recording rules in YAML format.
Supports hierarchical patterns and regexpes.
Examples:
-rule="/path/to/file". Path to a single file with alerting rules.
-rule="http://<some-server-addr>/path/to/rules". HTTP URL to a page with alerting rules.
-rule="dir/*.yaml" -rule="/*.yaml" -rule="gcs://vmalert-rules/tenant_%{TENANT_ID}/prod".
-rule="dir/**/*.yaml". Includes all the .yaml files in "dir" subfolders recursively.
Rule files support YAML multi-document. Files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars.
Enterprise version of vmalert supports S3 and GCS paths to rules.
For example: gs://bucket/path/to/rules, s3://bucket/path/to/rules
S3 and GCS paths support only matching by prefix, e.g. s3://bucket/dir/rule_ matches
all files with prefix rule_ in folder dir.
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-rule.defaultRuleType
Default type for rule expressions, can be overridden by type parameter inside the rule group. Supported values: "graphite", "prometheus" and "vlogs".
Default is "prometheus", change it to "vlogs" if all of the rules are written with LogsQL.
-rule.evalDelay time
Adjustment of the time parameter for rule evaluation requests to compensate intentional data delay from the datasource. Normally, should be equal to `-search.latencyOffset` (cm d-line flag configured for VictoriaMetrics single-node or vmselect).
Since there is no intentional search delay in VictoriaLogs, `-rule.evalDelay` can be reduced to a few seconds to accommodate network and ingestion time.
```
For more configuration options, such as `notifiers`, visit https://docs.victoriametrics.com/vmalert/#configuration.
### Groups
Check the complete group attributes [here](https://docs.victoriametrics.com/vmalert/#groups).
#### Alerting rules
Examples:
```
groups:
- name: ServiceLog
interval: 5m
rules:
- alert: HasErrorLog
expr: 'env: "prod" AND status:~"error|warn" | stats by (service) count() as errorLog | filter errorLog:>0'
annotations:
description: "Service {{$labels.service}} generated {{$labels.errorLog}} error logs in the last 5 minutes"
- name: ServiceRequest
interval: 5m
rules:
- alert: TooManyFailedRequest
expr: '* | extract "ip=<ip> " | extract "status_code=<code>;" | stats by (ip, code) count() if (code:~4.*) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage'
annotations:
description: "Connection from address {{$labels.ip}} has {{$value}}% failed requests in last 5 minutes"
```
#### Recording rules
Examples:
```
groups:
- name: RequestCount
interval: 5m
rules:
- record: nginxRequestCount
expr: 'env: "test" AND service: "nginx" | stats count(*) as requests'
annotations:
description: "Service nginx on env test accepted {{$labels.requests}} requests in the last 5 minutes"
- record: prodRequestCount
expr: 'env: "prod" | stats by (service) count(*) as requests'
annotations:
description: "Service {{$labels.service}} on env prod accepted {{$labels.requests}} requests in the last 5 minutes"
```
## Time filter
It's recommended to omit the [time filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) in rule expression.
By default, vmalert automatically appends the time filter `_time: <group_interval>` to the expression.
For instance, the rule below will be evaluated every 5 minutes, and will return the result with logs from the last 5 minutes:
```
groups:
interval: 5m
rules:
- alert: TooManyFailedRequest
expr: '* | extract "ip=<ip> " | extract "status_code=<code>;" | stats by (ip, code) count() if (code:~4.*) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage'
annotations: "Connection from address {{$labels.ip}} has {{$$value}}% failed requests in last 5 minutes"
```
User can also specify a customized time filter if needed. For example, rule below will be evaluated every 5 minutes,
but will calculate result over the logs from the last 10 minutes.
```
groups:
interval: 5m
rules:
- alert: TooManyFailedRequest
expr: '_time: 10m | extract "ip=<ip> " | extract "status_code=<code>;" | stats by (ip, code) count() if (code:~4.*) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage'
annotations: "Connection from address {{$labels.ip}} has {{$$value}}% failed requests in last 10 minutes"
```
Please note, vmalert doesn't support [backfilling](#rules-backfilling) for rules with a customized time filter now. (Might be added in future)
## Rules backfilling
vmalert supports alerting and recording rules backfilling (aka replay) against VictoriaLogs as the datasource.
```
./bin/vmalert -rule=path/to/your.rules \ # path to files with rules you usually use with vmalert
-datasource.url=http://localhost:9428 \ # VictoriaLogs address.
-rule.defaultRuleType=vlogs \ # Set default rule type to VictoriaLogs.
-remoteWrite.url=http://localhost:8428 \ # Remote write compatible storage to persist rules and alerts state info
-replay.timeFrom=2021-05-11T07:21:43Z \ # to start replay from
-replay.timeTo=2021-05-29T18:40:43Z # to finish replay by, is optional
```
See more details about backfilling [here](https://docs.victoriametrics.com/vmalert/#rules-backfilling).
## Performance tip
LogsQL allows users to obtain multiple stats from a single expression.
For instance, the following query calculates 50th, 90th and 99th percentiles for the `request_duration_seconds` field over logs for the last 5 minutes:
```
_time:5m | stats
quantile(0.5, request_duration_seconds) p50,
quantile(0.9, request_duration_seconds) p90,
quantile(0.99, request_duration_seconds) p99
```
This expression can also be used in recording rules as follows:
```
groups:
- name: requestDuration
interval: 5m
rules:
- record: requestDurationQuantile
expr: '_time:5m | stats by (service) quantile(0.5, request_duration_seconds) p50, quantile(0.9, request_duration_seconds) p90, quantile(0.99, request_duration_seconds) p99'
```
This creates three metrics for each service:
```
requestDurationQuantile{stats_result="p50", service="service-1"}
requestDurationQuantile{stats_result="p90", service="service-1"}
requestDurationQuantile{stats_result="p99", service="service-1"}
requestDurationQuantile{stats_result="p50", service="service-2"}
requestDurationQuantile{stats_result="p90", service="service-2"}
requestDurationQuantile{stats_result="p00", service="service-2"}
...
```
For additional tips on writing LogsQL, refer to this [doc](https://docs.victoriametrics.com/victorialogs/logsql/#performance-tips).

View file

@ -0,0 +1,687 @@
{
"type": "excalidraw",
"version": 2,
"source": "https://excalidraw.com",
"elements": [
{
"type": "rectangle",
"version": 803,
"versionNonce": 1128884469,
"index": "a0",
"isDeleted": false,
"id": "VgBUzo0blGR-Ijd2mQEEf",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 422.3502197265625,
"y": 215.55953979492188,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 123.7601318359375,
"height": 72.13211059570312,
"seed": 1194011660,
"groupIds": [
"iBaXgbpyifSwPplm_GO5b"
],
"frameId": null,
"roundness": null,
"boundElements": [
{
"type": "arrow",
"id": "sxEhnxlbT7ldlSsmHDUHp"
},
{
"id": "wRO0q9xKPHc8e8XPPsQWh",
"type": "arrow"
},
{
"id": "Bpy5by47XGKB4yS99ZkuA",
"type": "arrow"
}
],
"updated": 1728889265677,
"link": null,
"locked": false
},
{
"type": "text",
"version": 660,
"versionNonce": 130510869,
"index": "a1",
"isDeleted": false,
"id": "e9TDm09y-GhPm84XWt0Jv",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 443.89678955078125,
"y": 236.64378356933594,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 82,
"height": 24,
"seed": 327273100,
"groupIds": [
"iBaXgbpyifSwPplm_GO5b"
],
"frameId": null,
"roundness": null,
"boundElements": [],
"updated": 1728889112138,
"link": null,
"locked": false,
"fontSize": 20,
"fontFamily": 3,
"text": "vmalert",
"textAlign": "center",
"verticalAlign": "middle",
"containerId": null,
"originalText": "vmalert",
"autoResize": true,
"lineHeight": 1.2
},
{
"type": "rectangle",
"version": 2608,
"versionNonce": 1050127035,
"index": "a2",
"isDeleted": false,
"id": "dd52BjHfPMPRji9Tws7U-",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 774.7067312730577,
"y": 231.9532470703125,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 275.7981470513237,
"height": 39.621179787868925,
"seed": 1779959692,
"groupIds": [
"2Lijjn3PwPQW_8KrcDmdu"
],
"frameId": null,
"roundness": null,
"boundElements": [
{
"id": "Bpy5by47XGKB4yS99ZkuA",
"type": "arrow"
}
],
"updated": 1728889420961,
"link": null,
"locked": false
},
{
"type": "rectangle",
"version": 1099,
"versionNonce": 499029243,
"index": "a6",
"isDeleted": false,
"id": "8-XFSbd6Zw96EUSJbJXZv",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 371.7434387207031,
"y": 398.50787353515625,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 240.10644531249997,
"height": 44.74725341796875,
"seed": 99322124,
"groupIds": [
"6obQBPHIfExBKfejeLLVO"
],
"frameId": null,
"roundness": null,
"boundElements": [
{
"type": "arrow",
"id": "sxEhnxlbT7ldlSsmHDUHp"
}
],
"updated": 1728889112138,
"link": null,
"locked": false
},
{
"type": "text",
"version": 865,
"versionNonce": 316509237,
"index": "a7",
"isDeleted": false,
"id": "GUs816aggGqUSdoEsSmea",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 393.73809814453125,
"y": 410.5976257324219,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 199,
"height": 24,
"seed": 1194745268,
"groupIds": [
"6obQBPHIfExBKfejeLLVO"
],
"frameId": null,
"roundness": null,
"boundElements": [],
"updated": 1728889112138,
"link": null,
"locked": false,
"fontSize": 20,
"fontFamily": 3,
"text": "alertmanager:9093",
"textAlign": "center",
"verticalAlign": "top",
"containerId": null,
"originalText": "alertmanager:9093",
"autoResize": true,
"lineHeight": 1.2
},
{
"type": "arrow",
"version": 3377,
"versionNonce": 359177051,
"index": "a8",
"isDeleted": false,
"id": "Bpy5by47XGKB4yS99ZkuA",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 556.6860961914062,
"y": 252.95352770712083,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 202.02063508165145,
"height": 0.22881326742660235,
"seed": 357577356,
"groupIds": [],
"frameId": null,
"roundness": {
"type": 2
},
"boundElements": [],
"updated": 1728889420962,
"link": null,
"locked": false,
"startBinding": {
"elementId": "VgBUzo0blGR-Ijd2mQEEf",
"focus": 0.0344528515859526,
"gap": 10.57574462890625
},
"endBinding": {
"elementId": "dd52BjHfPMPRji9Tws7U-",
"focus": -0.039393828258510157,
"gap": 16
},
"lastCommittedPoint": null,
"startArrowhead": null,
"endArrowhead": "arrow",
"points": [
[
0,
0
],
[
202.02063508165145,
-0.22881326742660235
]
]
},
{
"type": "arrow",
"version": 1460,
"versionNonce": 492906299,
"index": "a9",
"isDeleted": false,
"id": "wRO0q9xKPHc8e8XPPsQWh",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 406.0439244722469,
"y": 246.6775563467225,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 161.00829839007181,
"height": 2.320722012761223,
"seed": 656189364,
"groupIds": [],
"frameId": null,
"roundness": {
"type": 2
},
"boundElements": [],
"updated": 1728889313672,
"link": null,
"locked": false,
"startBinding": {
"elementId": "VgBUzo0blGR-Ijd2mQEEf",
"focus": 0.13736472619498497,
"gap": 16.306295254315614
},
"endBinding": null,
"lastCommittedPoint": null,
"startArrowhead": null,
"endArrowhead": "arrow",
"points": [
[
0,
0
],
[
-161.00829839007181,
-2.320722012761223
]
]
},
{
"type": "text",
"version": 567,
"versionNonce": 737159899,
"index": "aA",
"isDeleted": false,
"id": "RbVSa4PnOgAMtzoKb-DhW",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 552.4987182617188,
"y": 212.27996826171875,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 187.75,
"height": 95,
"seed": 1989838604,
"groupIds": [],
"frameId": null,
"roundness": null,
"boundElements": [
{
"id": "ijEBAhsESSoR3zLPouUVM",
"type": "arrow"
}
],
"updated": 1728889402055,
"link": null,
"locked": false,
"fontSize": 16,
"fontFamily": 3,
"text": "persist alerts state\nand recording rules\n\n\n",
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "persist alerts state\nand recording rules\n\n\n",
"autoResize": true,
"lineHeight": 1.1875
},
{
"type": "text",
"version": 830,
"versionNonce": 1996455189,
"index": "aB",
"isDeleted": false,
"id": "ia2QzZNl_tuvfY3ymLjyJ",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 279.55224609375,
"y": 218.88568115234375,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 122,
"height": 19,
"seed": 157304972,
"groupIds": [],
"frameId": null,
"roundness": null,
"boundElements": [
{
"type": "arrow",
"id": "wRO0q9xKPHc8e8XPPsQWh"
}
],
"updated": 1728889440112,
"link": null,
"locked": false,
"fontSize": 16,
"fontFamily": 3,
"text": "execute rules",
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "execute rules",
"autoResize": true,
"lineHeight": 1.1875
},
{
"type": "arrow",
"version": 1476,
"versionNonce": 1814378875,
"index": "aC",
"isDeleted": false,
"id": "sxEhnxlbT7ldlSsmHDUHp",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 484.18669893674246,
"y": 302.3424013553929,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 1.0484739253853945,
"height": 84.72775855671654,
"seed": 1818348300,
"groupIds": [],
"frameId": null,
"roundness": {
"type": 2
},
"boundElements": [],
"updated": 1728889265678,
"link": null,
"locked": false,
"startBinding": {
"elementId": "VgBUzo0blGR-Ijd2mQEEf",
"focus": 0.010768924644894236,
"gap": 14.650750964767894
},
"endBinding": {
"elementId": "8-XFSbd6Zw96EUSJbJXZv",
"focus": -0.051051952959743775,
"gap": 11.437713623046818
},
"lastCommittedPoint": null,
"startArrowhead": null,
"endArrowhead": "arrow",
"points": [
[
0,
0
],
[
1.0484739253853945,
84.72775855671654
]
]
},
{
"type": "text",
"version": 631,
"versionNonce": 1909410773,
"index": "aD",
"isDeleted": false,
"id": "E9Run6wCm2chQ6JHrmc_y",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 504.27996826171875,
"y": 322.13031005859375,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 122,
"height": 38,
"seed": 1836541708,
"groupIds": [],
"frameId": null,
"roundness": null,
"boundElements": [
{
"type": "arrow",
"id": "sxEhnxlbT7ldlSsmHDUHp"
}
],
"updated": 1728889430719,
"link": null,
"locked": false,
"fontSize": 16,
"fontFamily": 3,
"text": "send alert \nnotifications",
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "send alert \nnotifications",
"autoResize": true,
"lineHeight": 1.1875
},
{
"type": "text",
"version": 579,
"versionNonce": 326648123,
"index": "aE",
"isDeleted": false,
"id": "ff5OkfgmkKLifS13_TFj3",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 591.5895843505859,
"y": 269.2361297607422,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 121.875,
"height": 19,
"seed": 264004620,
"groupIds": [],
"frameId": null,
"roundness": null,
"boundElements": [
{
"type": "arrow",
"id": "wRO0q9xKPHc8e8XPPsQWh"
}
],
"updated": 1728889436228,
"link": null,
"locked": false,
"fontSize": 16,
"fontFamily": 3,
"text": "restore state",
"textAlign": "left",
"verticalAlign": "top",
"containerId": null,
"originalText": "restore state",
"autoResize": true,
"lineHeight": 1.1875
},
{
"type": "text",
"version": 1141,
"versionNonce": 39140603,
"index": "aG",
"isDeleted": false,
"id": "J2AqHIHYjG3cvxrBLonQW",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 782.2813415527344,
"y": 238.312045541553,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 254.41375732421875,
"height": 26.05968577236269,
"seed": 254079515,
"groupIds": [
"fw8b83Mw6tGXQ80jfC5Jx"
],
"frameId": null,
"roundness": null,
"boundElements": [],
"updated": 1728889417069,
"link": null,
"locked": false,
"fontSize": 21.716404810302244,
"fontFamily": 3,
"text": "victoriametrics:8428",
"textAlign": "center",
"verticalAlign": "top",
"containerId": null,
"originalText": "victoriametrics:8428",
"autoResize": true,
"lineHeight": 1.2
},
{
"type": "rectangle",
"version": 2824,
"versionNonce": 1550880827,
"index": "aH",
"isDeleted": false,
"id": "Whj4hd3Al6CbvGs7cQuWk",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": -11.824915810818197,
"y": 223.79106415879994,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 248.85674080132372,
"height": 40.562586037868925,
"seed": 1519267323,
"groupIds": [
"skPAIqL9ijNA0WE5GV8Gv"
],
"frameId": null,
"roundness": null,
"boundElements": [],
"updated": 1728889342561,
"link": null,
"locked": false
},
{
"type": "text",
"version": 1290,
"versionNonce": 1222168987,
"index": "aI",
"isDeleted": false,
"id": "NJgvtn8_Kzy1quxMqyfAr",
"fillStyle": "hachure",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"angle": 0,
"x": 8.194007518174999,
"y": 231.4272063800404,
"strokeColor": "#000000",
"backgroundColor": "transparent",
"width": 216.25169372558594,
"height": 26.05968577236269,
"seed": 1311553179,
"groupIds": [
"3JfeRMxXtVafxucZgxKNy"
],
"frameId": null,
"roundness": null,
"boundElements": [],
"updated": 1728889339478,
"link": null,
"locked": false,
"fontSize": 21.716404810302244,
"fontFamily": 3,
"text": "victorialogs:9428",
"textAlign": "center",
"verticalAlign": "top",
"containerId": null,
"originalText": "victorialogs:9428",
"autoResize": true,
"lineHeight": 1.2
},
{
"id": "ijEBAhsESSoR3zLPouUVM",
"type": "arrow",
"x": 754.5486716336245,
"y": 263.63184005775634,
"width": 200.78701391878076,
"height": 0.03213913002196023,
"angle": 0,
"strokeColor": "#1e1e1e",
"backgroundColor": "transparent",
"fillStyle": "solid",
"strokeWidth": 1,
"strokeStyle": "solid",
"roughness": 0,
"opacity": 100,
"groupIds": [],
"frameId": null,
"index": "aJ",
"roundness": {
"type": 2
},
"seed": 1284919637,
"version": 349,
"versionNonce": 186313781,
"isDeleted": false,
"boundElements": null,
"updated": 1728889427809,
"link": null,
"locked": false,
"points": [
[
0,
0
],
[
-200.78701391878076,
-0.03213913002196023
]
],
"lastCommittedPoint": null,
"startBinding": {
"elementId": "RbVSa4PnOgAMtzoKb-DhW",
"focus": -0.0807019799085118,
"gap": 14.299953371905758,
"fixedPoint": null
},
"endBinding": null,
"startArrowhead": null,
"endArrowhead": "arrow",
"elbowed": false
}
],
"appState": {
"gridSize": 20,
"gridStep": 5,
"gridModeEnabled": false,
"viewBackgroundColor": "#ffffff"
},
"files": {}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

View file

@ -18,6 +18,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): support [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) as a datasource. See [this doc](https://docs.victoriametrics.com/victorialogs/vmalert/) for details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support scraping from Kubernetes Native Sidecars. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7287).
* FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add a separate cache type for storing sparse entries when performing large index scans. This significantly reduces memory usage when applying [downsampling filters](https://docs.victoriametrics.com/#downsampling) and [retention filters](https://docs.victoriametrics.com/#retention-filters) during background merge. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7182) for the details.

View file

@ -10,7 +10,7 @@ aliases:
---
`vmalert` executes a list of the given [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/)
or [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/)
rules against configured `-datasource.url` compatible with Prometheus HTTP API. For sending alerting notifications
rules against configured `-datasource.url`. For sending alerting notifications
`vmalert` relies on [Alertmanager](https://github.com/prometheus/alertmanager) configured via `-notifier.url` flag.
Recording rules results are persisted via [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations)
protocol and require `-remoteWrite.url` to be configured.
@ -31,9 +31,8 @@ please refer to the [VictoriaMetrics Cloud documentation](https://docs.victoriam
## Features
* Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) TSDB;
* VictoriaMetrics [MetricsQL](https://docs.victoriametrics.com/metricsql/)
support and expressions validation;
* Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) and [MetricsQL](https://docs.victoriametrics.com/metricsql/);
* Integration with [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) and [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/). See [this doc](https://docs.victoriametrics.com/victorialogs/vmalert/);
* Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules)
support;
* Integration with [Alertmanager](https://github.com/prometheus/alertmanager) starting from [Alertmanager v0.16.0-alpha](https://github.com/prometheus/alertmanager/releases/tag/v0.16.0-alpha.0);
@ -458,7 +457,7 @@ In this example, `-external.alert.source` will lead to Grafana's Explore page wi
and time range will be selected starting from `"from":"{{ .ActiveAt.UnixMilli }}"` when alert became active.
In addition to `source` link, some extra links could be added to alert's [annotations](https://docs.victoriametrics.com/vmalert/#alerting-rules)
field. See [how we use them](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/839596c00df123c639d1244b28ee8137dfc9609c/deployment/docker/alerts-cluster.yml#L43)
field. See [how we use them](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/839596c00df123c639d1244b28ee8137dfc9609c/deployment/docker/rules/alerts-cluster.yml#L43)
to link alerting rule and the corresponding panel on Grafana dashboard.
### Multitenancy
@ -728,6 +727,10 @@ implements [Graphite Render API](https://graphite.readthedocs.io/en/stable/rende
When using vmalert with both `graphite` and `prometheus` rules configured against cluster version of VM do not forget
to set `-datasource.appendTypePrefix` flag to `true`, so vmalert can adjust URL prefix automatically based on the query type.
## VictoriaLogs
vmalert supports [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) as a datasource for writing alerting and recording rules using [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/). See [this doc](https://docs.victoriametrics.com/victorialogs/vmalert/) for details.
## Rules backfilling
vmalert supports alerting and recording rules backfilling (aka `replay`). In replay mode vmalert
@ -1323,7 +1326,7 @@ The shortlist of configuration flags is the following:
-remoteRead.bearerTokenFile string
Optional path to bearer token file to use for -remoteRead.url.
-remoteRead.disablePathAppend
Whether to disable automatic appending of '/api/v1/query' path to the configured -datasource.url and -remoteRead.url
Whether to disable automatic appending of '/api/v1/query' or '/select/logsql/stats_query' path to the configured -datasource.url and -remoteRead.url
-remoteRead.headers string
Optional HTTP headers to send with each request to the corresponding -remoteRead.url. For example, -remoteRead.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteRead.url. Multiple headers must be delimited by '^^': -remoteRead.headers='header1:value1^^header2:value2'
-remoteRead.idleConnTimeout duration
@ -1356,7 +1359,7 @@ The shortlist of configuration flags is the following:
Optional path to client-side TLS certificate key to use when connecting to -remoteRead.url
-remoteRead.tlsServerName string
Optional TLS server name to use for connections to -remoteRead.url. By default, the server name from -remoteRead.url is used
-remoteRead.url vmalert
-remoteRead.url string
Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'.
-remoteWrite.basicAuth.password string
Optional basic auth password for -remoteWrite.url
@ -1441,6 +1444,8 @@ The shortlist of configuration flags is the following:
all files with prefix rule_ in folder dir.
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-rule.defaultRuleType string
Default type for rule expressions, can be overridden by type parameter inside the rule group. Supported values: "graphite", "prometheus" and "vlogs". (default: "prometheus")
-rule.evalDelay time
Adjustment of the time parameter for rule evaluation requests to compensate intentional data delay from the datasource.Normally, should be equal to `-search.latencyOffset` (cmd-line flag configured for VictoriaMetrics single-node or vmselect). (default 30s)
-rule.maxResolveDuration duration

View file

@ -1059,7 +1059,7 @@ See also [security recommendations](#security).
`vmauth` exports various metrics in Prometheus exposition format at `http://vmauth-host:8427/metrics` page. It is recommended setting up regular scraping of this page
either via [vmagent](https://docs.victoriametrics.com/vmagent/) or via Prometheus-compatible scraper, so the exported metrics could be analyzed later.
Use the official [Grafana dashboard](https://grafana.com/grafana/dashboards/21394) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml)
Use the official [Grafana dashboard](https://grafana.com/grafana/dashboards/21394) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmauth.yml)
for `vmauth` monitoring.
If you use Google Cloud Managed Prometheus for scraping metrics from VictoriaMetrics components, then pass `-metrics.exposeMetadata`

View file

@ -786,6 +786,38 @@ func ParseStatsQuery(s string) (*Query, error) {
return q, nil
}
// ContainAnyTimeFilter returns true when query contains a global time filter.
func (q *Query) ContainAnyTimeFilter() bool {
if hasTimeFilter(q.f) {
return true
}
for _, p := range q.pipes {
if pf, ok := p.(*pipeFilter); ok {
if hasTimeFilter(pf.f) {
return true
}
}
}
return false
}
func hasTimeFilter(f filter) bool {
if f == nil {
return false
}
switch t := f.(type) {
case *filterAnd:
for _, subF := range t.filters {
if hasTimeFilter(subF) {
return true
}
}
case *filterTime:
return true
}
return false
}
// ParseQueryAtTimestamp parses s in the context of the given timestamp.
//
// E.g. _time:duration filters are adjusted according to the provided timestamp as _time:[timestamp-duration, duration].

View file

@ -2384,3 +2384,28 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) {
// format to the remaining metric field
f(`* | by (x) count() y | format 'foo' as y`)
}
func TestHasTimeFilter(t *testing.T) {
f := func(qStr string, expected bool) {
t.Helper()
q, err := ParseStatsQuery(qStr)
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
if q.ContainAnyTimeFilter() != expected {
t.Fatalf("unexpected result for hasTimeFilter(%q); want %v", qStr, expected)
}
}
f(`* | count()`, false)
f(`error OR _time:5m | count()`, false)
f(`(_time: 5m AND error) OR (_time: 5m AND warn) | count()`, false)
f(`* | error OR _time:5m | count()`, false)
f(`_time:5m | count()`, true)
f(`_time:2023-04-25T22:45:59Z | count()`, true)
f(`error AND _time:5m | count()`, true)
f(`error AND (_time: 5m AND warn) | count()`, true)
f(`* | error AND _time:5m | count()`, true)
}