diff --git a/app/vmalert/config/config.go b/app/vmalert/config/config.go index 31e16a115..353424b75 100644 --- a/app/vmalert/config/config.go +++ b/app/vmalert/config/config.go @@ -3,6 +3,7 @@ package config import ( "bytes" "crypto/md5" + "flag" "fmt" "hash/fnv" "io" @@ -17,6 +18,10 @@ import ( "gopkg.in/yaml.v2" ) +var ( + defaultRuleType = flag.String("rule.defaultRuleType", "prometheus", `Default type for rule expressions, can be overridden via "type" parameter on the group level, see https://docs.victoriametrics.com/vmalert/#groups. Supported values: "graphite", "prometheus" and "vlogs".`) +) + // Group contains list of Rules grouped into // entity with one name and evaluation interval type Group struct { @@ -59,11 +64,9 @@ func (g *Group) UnmarshalYAML(unmarshal func(any) error) error { if err != nil { return fmt.Errorf("failed to marshal group configuration for checksum: %w", err) } - // change default value to prometheus datasource. if g.Type.Get() == "" { - g.Type.Set(NewPrometheusType()) + g.Type = NewRawType(*defaultRuleType) } - h := md5.New() h.Write(b) g.Checksum = fmt.Sprintf("%x", h.Sum(nil)) diff --git a/app/vmalert/config/config_test.go b/app/vmalert/config/config_test.go index 1aa06d582..85207d53a 100644 --- a/app/vmalert/config/config_test.go +++ b/app/vmalert/config/config_test.go @@ -122,6 +122,7 @@ func TestParse_Failure(t *testing.T) { f([]string{"testdata/dir/rules3-bad.rules"}, "either `record` or `alert` must be set") f([]string{"testdata/dir/rules4-bad.rules"}, "either `record` or `alert` must be set") f([]string{"testdata/rules/rules1-bad.rules"}, "bad graphite expr") + f([]string{"testdata/rules/vlog-rules0-bad.rules"}, "bad LogsQL expr") f([]string{"testdata/dir/rules6-bad.rules"}, "missing ':' in header") f([]string{"testdata/rules/rules-multi-doc-bad.rules"}, "unknown fields") f([]string{"testdata/rules/rules-multi-doc-duplicates-bad.rules"}, "duplicate") @@ -240,7 +241,7 @@ func TestGroupValidate_Failure(t *testing.T) { }, false, "duplicate") f(&Group{ - Name: "test graphite prometheus bad expr", + Name: "test graphite with prometheus expr", Type: NewGraphiteType(), Rules: []Rule{ { @@ -267,6 +268,20 @@ func TestGroupValidate_Failure(t *testing.T) { }, }, false, "either `record` or `alert` must be set") + f(&Group{ + Name: "test vlogs with prometheus expr", + Type: NewVLogsType(), + Rules: []Rule{ + { + Expr: "sum(up == 0 ) by (host)", + For: promutils.NewDuration(10 * time.Millisecond), + }, + { + Expr: "sumSeries(time('foo.bar',10))", + }, + }, + }, false, "invalid rule") + // validate expressions f(&Group{ Name: "test", @@ -297,6 +312,16 @@ func TestGroupValidate_Failure(t *testing.T) { }}, }, }, true, "bad graphite expr") + + f(&Group{ + Name: "test vlogs", + Type: NewVLogsType(), + Rules: []Rule{ + {Alert: "alert", Expr: "stats count(*) as requests", Labels: map[string]string{ + "description": "some-description", + }}, + }, + }, true, "bad LogsQL expr") } func TestGroupValidate_Success(t *testing.T) { @@ -336,7 +361,7 @@ func TestGroupValidate_Success(t *testing.T) { }, }, false, false) - // validate annotiations + // validate annotations f(&Group{ Name: "test", Rules: []Rule{ @@ -363,6 +388,15 @@ func TestGroupValidate_Success(t *testing.T) { }}, }, }, false, true) + f(&Group{ + Name: "test victorialogs", + Type: NewVLogsType(), + Rules: []Rule{ + {Alert: "alert", Expr: " _time: 1m | stats count(*) as requests", Labels: map[string]string{ + "description": "{{ value|query }}", + }}, + }, + }, false, true) } func TestHashRule_NotEqual(t *testing.T) { diff --git a/app/vmalert/config/testdata/rules/vlog-rules0-bad.rules b/app/vmalert/config/testdata/rules/vlog-rules0-bad.rules new file mode 100644 index 000000000..91c248ae7 --- /dev/null +++ b/app/vmalert/config/testdata/rules/vlog-rules0-bad.rules @@ -0,0 +1,10 @@ +groups: + - name: InvalidStatsLogsql + type: vlogs + interval: 5m + rules: + - record: MissingFilter + expr: 'stats count(*) as requests' + - record: MissingStatsPipe + expr: 'service: "nginx"' + diff --git a/app/vmalert/config/testdata/rules/vlog-rules0-good.rules b/app/vmalert/config/testdata/rules/vlog-rules0-good.rules new file mode 100644 index 000000000..a41e44de1 --- /dev/null +++ b/app/vmalert/config/testdata/rules/vlog-rules0-good.rules @@ -0,0 +1,29 @@ +groups: + - name: RequestCount + type: vlogs + interval: 5m + rules: + - record: nginxRequestCount + expr: 'env: "test" AND service: "nginx" | stats count(*) as requests' + annotations: + description: "Service nginx on env test accepted {{$labels.requests}} requests in the last 5 minutes" + - record: prodRequestCount + expr: 'env: "prod" | stats by (service) count(*) as requests' + annotations: + description: "Service {{$labels.service}} on env prod accepted {{$labels.requests}} requests in the last 5 minutes" + - name: ServiceLog + type: vlogs + interval: 5m + rules: + - alert: HasErrorLog + expr: 'env: "prod" AND status:~"error|warn" | stats by (service) count(*) as errorLog | filter errorLog:>0' + annotations: + description: "Service {{$labels.service}} generated {{$labels.errorLog}} error logs in the last 5 minutes" + - name: ServiceRequest + type: vlogs + interval: 10m + rules: + - alert: TooManyFailedRequest + expr: '* | extract "ip= " | extract "status_code=;" | stats by (ip) count() if (code:!~200) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage' + annotations: + description: "Connection from address {{$labels.ip}} has {{$value}} failed requests ratio in last 10 minutes" diff --git a/app/vmalert/config/types.go b/app/vmalert/config/types.go index cf16d0d8b..b71325ec3 100644 --- a/app/vmalert/config/types.go +++ b/app/vmalert/config/types.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphiteql" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/metricsql" ) @@ -27,6 +28,13 @@ func NewGraphiteType() Type { } } +// NewVLogsType returns victorialogs datasource type +func NewVLogsType() Type { + return Type{ + Name: "vlogs", + } +} + // NewRawType returns datasource type from raw string // without validation. func NewRawType(d string) Type { @@ -62,6 +70,10 @@ func (t *Type) ValidateExpr(expr string) error { if _, err := metricsql.Parse(expr); err != nil { return fmt.Errorf("bad prometheus expr: %q, err: %w", expr, err) } + case "vlogs": + if _, err := logstorage.ParseStatsQuery(expr); err != nil { + return fmt.Errorf("bad LogsQL expr: %q, err: %w", expr, err) + } default: return fmt.Errorf("unknown datasource type=%q", t.Name) } @@ -74,13 +86,10 @@ func (t *Type) UnmarshalYAML(unmarshal func(any) error) error { if err := unmarshal(&s); err != nil { return err } - if s == "" { - s = "prometheus" - } switch s { - case "graphite", "prometheus": + case "graphite", "prometheus", "vlogs": default: - return fmt.Errorf("unknown datasource type=%q, want %q or %q", s, "prometheus", "graphite") + return fmt.Errorf("unknown datasource type=%q, want prometheus, graphite or vlogs", s) } t.Name = s return nil diff --git a/app/vmalert/datasource/client.go b/app/vmalert/datasource/client.go new file mode 100644 index 000000000..4fa884824 --- /dev/null +++ b/app/vmalert/datasource/client.go @@ -0,0 +1,333 @@ +package datasource + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" +) + +type datasourceType string + +const ( + datasourcePrometheus datasourceType = "prometheus" + datasourceGraphite datasourceType = "graphite" + datasourceVLogs datasourceType = "vlogs" +) + +func toDatasourceType(s string) datasourceType { + switch s { + case string(datasourcePrometheus): + return datasourcePrometheus + case string(datasourceGraphite): + return datasourceGraphite + case string(datasourceVLogs): + return datasourceVLogs + default: + logger.Panicf("BUG: unknown datasource type %q", s) + } + return "" +} + +// Client is a datasource entity for reading data, +// supported clients are enumerated in datasourceType. +// WARN: when adding a new field, remember to check if Clone() method needs to be updated. +type Client struct { + c *http.Client + authCfg *promauth.Config + datasourceURL string + appendTypePrefix bool + queryStep time.Duration + dataSourceType datasourceType + // ApplyIntervalAsTimeFilter is only valid for vlogs datasource. + // Set to true if there is no [timeFilter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) in the rule expression, + // and we will add evaluation interval as an additional timeFilter when querying. + applyIntervalAsTimeFilter bool + + // evaluationInterval will help setting request's `step` param, + // or adding time filter for LogsQL expression. + evaluationInterval time.Duration + // extraParams contains params to be attached to each HTTP request + extraParams url.Values + // extraHeaders are headers to be attached to each HTTP request + extraHeaders []keyValue + + // whether to print additional log messages + // for each sent request + debug bool +} + +type keyValue struct { + key string + value string +} + +// Clone clones shared http client and other configuration to the new client. +func (c *Client) Clone() *Client { + ns := &Client{ + c: c.c, + authCfg: c.authCfg, + datasourceURL: c.datasourceURL, + appendTypePrefix: c.appendTypePrefix, + queryStep: c.queryStep, + + dataSourceType: c.dataSourceType, + evaluationInterval: c.evaluationInterval, + + // init map so it can be populated below + extraParams: url.Values{}, + + debug: c.debug, + } + if len(c.extraHeaders) > 0 { + ns.extraHeaders = make([]keyValue, len(c.extraHeaders)) + copy(ns.extraHeaders, c.extraHeaders) + } + for k, v := range c.extraParams { + ns.extraParams[k] = v + } + + return ns +} + +// ApplyParams - changes given querier params. +func (c *Client) ApplyParams(params QuerierParams) *Client { + if params.DataSourceType != "" { + c.dataSourceType = toDatasourceType(params.DataSourceType) + } + c.evaluationInterval = params.EvaluationInterval + c.applyIntervalAsTimeFilter = params.ApplyIntervalAsTimeFilter + if params.QueryParams != nil { + if c.extraParams == nil { + c.extraParams = url.Values{} + } + for k, vl := range params.QueryParams { + // custom query params are prior to default ones + if c.extraParams.Has(k) { + c.extraParams.Del(k) + } + for _, v := range vl { + // don't use .Set() instead of Del/Add since it is allowed + // for GET params to be duplicated + // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4908 + c.extraParams.Add(k, v) + } + } + } + if params.Headers != nil { + for key, value := range params.Headers { + kv := keyValue{key: key, value: value} + c.extraHeaders = append(c.extraHeaders, kv) + } + } + c.debug = params.Debug + return c +} + +// BuildWithParams - implements interface. +func (c *Client) BuildWithParams(params QuerierParams) Querier { + return c.Clone().ApplyParams(params) +} + +// NewPrometheusClient returns a new prometheus datasource client. +func NewPrometheusClient(baseURL string, authCfg *promauth.Config, appendTypePrefix bool, c *http.Client) *Client { + return &Client{ + c: c, + authCfg: authCfg, + datasourceURL: strings.TrimSuffix(baseURL, "/"), + appendTypePrefix: appendTypePrefix, + queryStep: *queryStep, + dataSourceType: datasourcePrometheus, + extraParams: url.Values{}, + } +} + +// Query executes the given query and returns parsed response +func (c *Client) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) { + req, err := c.newQueryRequest(ctx, query, ts) + if err != nil { + return Result{}, nil, err + } + resp, err := c.do(req) + if err != nil { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) { + // Return unexpected error to the caller. + return Result{}, nil, err + } + // Something in the middle between client and datasource might be closing + // the connection. So we do a one more attempt in hope request will succeed. + req, err = c.newQueryRequest(ctx, query, ts) + if err != nil { + return Result{}, nil, fmt.Errorf("second attempt: %w", err) + } + resp, err = c.do(req) + if err != nil { + return Result{}, nil, fmt.Errorf("second attempt: %w", err) + } + } + + // Process the received response. + var parseFn func(req *http.Request, resp *http.Response) (Result, error) + switch c.dataSourceType { + case datasourcePrometheus: + parseFn = parsePrometheusResponse + case datasourceGraphite: + parseFn = parseGraphiteResponse + case datasourceVLogs: + parseFn = parseVLogsResponse + default: + logger.Panicf("BUG: unsupported datasource type %q to parse query response", c.dataSourceType) + } + result, err := parseFn(req, resp) + _ = resp.Body.Close() + return result, req, err +} + +// QueryRange executes the given query on the given time range. +// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries +// Graphite type isn't supported. +func (c *Client) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) { + if c.dataSourceType == datasourceGraphite { + return res, fmt.Errorf("%q is not supported for QueryRange", c.dataSourceType) + } + // TODO: disable range query LogsQL with time filter now + if c.dataSourceType == datasourceVLogs && !c.applyIntervalAsTimeFilter { + return res, fmt.Errorf("range query is not supported for LogsQL expression %q because it contains time filter. Remove time filter from the expression and try again", query) + } + if start.IsZero() { + return res, fmt.Errorf("start param is missing") + } + if end.IsZero() { + return res, fmt.Errorf("end param is missing") + } + req, err := c.newQueryRangeRequest(ctx, query, start, end) + if err != nil { + return res, err + } + resp, err := c.do(req) + if err != nil { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) { + // Return unexpected error to the caller. + return res, err + } + // Something in the middle between client and datasource might be closing + // the connection. So we do a one more attempt in hope request will succeed. + req, err = c.newQueryRangeRequest(ctx, query, start, end) + if err != nil { + return res, fmt.Errorf("second attempt: %w", err) + } + resp, err = c.do(req) + if err != nil { + return res, fmt.Errorf("second attempt: %w", err) + } + } + + // Process the received response. + var parseFn func(req *http.Request, resp *http.Response) (Result, error) + switch c.dataSourceType { + case datasourcePrometheus: + parseFn = parsePrometheusResponse + case datasourceVLogs: + parseFn = parseVLogsResponse + default: + logger.Panicf("BUG: unsupported datasource type %q to parse query range response", c.dataSourceType) + } + res, err = parseFn(req, resp) + _ = resp.Body.Close() + return res, err +} + +func (c *Client) do(req *http.Request) (*http.Response, error) { + ru := req.URL.Redacted() + if *showDatasourceURL { + ru = req.URL.String() + } + if c.debug { + logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru) + } + resp, err := c.c.Do(req) + if err != nil { + return nil, fmt.Errorf("error getting response from %s: %w", ru, err) + } + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, ru, body) + } + return resp, nil +} + +func (c *Client) newQueryRangeRequest(ctx context.Context, query string, start, end time.Time) (*http.Request, error) { + req, err := c.newRequest(ctx) + if err != nil { + return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", c.datasourceURL, err) + } + switch c.dataSourceType { + case datasourcePrometheus: + c.setPrometheusRangeReqParams(req, query, start, end) + case datasourceVLogs: + c.setVLogsRangeReqParams(req, query, start, end) + default: + logger.Panicf("BUG: unsupported datasource type %q to create range query request", c.dataSourceType) + } + return req, nil +} + +func (c *Client) newQueryRequest(ctx context.Context, query string, ts time.Time) (*http.Request, error) { + req, err := c.newRequest(ctx) + if err != nil { + return nil, fmt.Errorf("cannot create query request to datasource %q: %w", c.datasourceURL, err) + } + switch c.dataSourceType { + case datasourcePrometheus: + c.setPrometheusInstantReqParams(req, query, ts) + case datasourceGraphite: + c.setGraphiteReqParams(req, query) + case datasourceVLogs: + c.setVLogsInstantReqParams(req, query, ts) + default: + logger.Panicf("BUG: unsupported datasource type %q to create query request", c.dataSourceType) + } + return req, nil +} + +func (c *Client) newRequest(ctx context.Context) (*http.Request, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.datasourceURL, nil) + if err != nil { + logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", c.datasourceURL, err) + } + req.Header.Set("Content-Type", "application/json") + if c.authCfg != nil { + err = c.authCfg.SetHeaders(req, true) + if err != nil { + return nil, err + } + } + for _, h := range c.extraHeaders { + req.Header.Set(h.key, h.value) + } + return req, nil +} + +// setReqParams adds query and other extra params for the request. +func (c *Client) setReqParams(r *http.Request, query string) { + q := r.URL.Query() + for k, vs := range c.extraParams { + if q.Has(k) { // extraParams are prior to params in URL + q.Del(k) + } + for _, v := range vs { + q.Add(k, v) + } + } + q.Set("query", query) + r.URL.RawQuery = q.Encode() +} diff --git a/app/vmalert/datasource/vm_graphite_api.go b/app/vmalert/datasource/client_graphite.go similarity index 95% rename from app/vmalert/datasource/vm_graphite_api.go rename to app/vmalert/datasource/client_graphite.go index 699919e77..2200f6ea5 100644 --- a/app/vmalert/datasource/vm_graphite_api.go +++ b/app/vmalert/datasource/client_graphite.go @@ -46,7 +46,7 @@ const ( graphitePrefix = "/graphite" ) -func (s *VMStorage) setGraphiteReqParams(r *http.Request, query string) { +func (s *Client) setGraphiteReqParams(r *http.Request, query string) { if s.appendTypePrefix { r.URL.Path += graphitePrefix } diff --git a/app/vmalert/datasource/vm_prom_api.go b/app/vmalert/datasource/client_prom.go similarity index 92% rename from app/vmalert/datasource/vm_prom_api.go rename to app/vmalert/datasource/client_prom.go index f2b4bddee..4747e8c51 100644 --- a/app/vmalert/datasource/vm_prom_api.go +++ b/app/vmalert/datasource/client_prom.go @@ -14,7 +14,7 @@ import ( ) var ( - disablePathAppend = flag.Bool("remoteRead.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/query' path "+ + disablePathAppend = flag.Bool("remoteRead.disablePathAppend", false, "Whether to disable automatic appending of '/api/v1/query' or '/select/logsql/stats_query' path "+ "to the configured -datasource.url and -remoteRead.url") disableStepParam = flag.Bool("datasource.disableStepParam", false, "Whether to disable adding 'step' param to the issued instant queries. "+ "This might be useful when using vmalert with datasources that do not support 'step' param for instant queries, like Google Managed Prometheus. "+ @@ -171,7 +171,7 @@ const ( func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result, err error) { r := &promResponse{} if err = json.NewDecoder(resp.Body).Decode(r); err != nil { - return res, fmt.Errorf("error parsing prometheus metrics for %s: %w", req.URL.Redacted(), err) + return res, fmt.Errorf("error parsing response from %s: %w", req.URL.Redacted(), err) } if r.Status == statusError { return res, fmt.Errorf("response error, query: %s, errorType: %s, error: %s", req.URL.Redacted(), r.ErrorType, r.Error) @@ -218,7 +218,7 @@ func parsePrometheusResponse(req *http.Request, resp *http.Response) (res Result return res, nil } -func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) { +func (s *Client) setPrometheusInstantReqParams(r *http.Request, query string, timestamp time.Time) { if s.appendTypePrefix { r.URL.Path += "/prometheus" } @@ -238,10 +238,10 @@ func (s *VMStorage) setPrometheusInstantReqParams(r *http.Request, query string, q.Set("step", fmt.Sprintf("%ds", int(s.queryStep.Seconds()))) } r.URL.RawQuery = q.Encode() - s.setPrometheusReqParams(r, query) + s.setReqParams(r, query) } -func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) { +func (s *Client) setPrometheusRangeReqParams(r *http.Request, query string, start, end time.Time) { if s.appendTypePrefix { r.URL.Path += "/prometheus" } @@ -257,19 +257,5 @@ func (s *VMStorage) setPrometheusRangeReqParams(r *http.Request, query string, s q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds()))) } r.URL.RawQuery = q.Encode() - s.setPrometheusReqParams(r, query) -} - -func (s *VMStorage) setPrometheusReqParams(r *http.Request, query string) { - q := r.URL.Query() - for k, vs := range s.extraParams { - if q.Has(k) { // extraParams are prior to params in URL - q.Del(k) - } - for _, v := range vs { - q.Add(k, v) - } - } - q.Set("query", query) - r.URL.RawQuery = q.Encode() + s.setReqParams(r, query) } diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/client_test.go similarity index 64% rename from app/vmalert/datasource/vm_test.go rename to app/vmalert/datasource/client_test.go index 9c3519a24..35da2279a 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/client_test.go @@ -24,8 +24,10 @@ var ( Username: basicAuthName, Password: promauth.NewSecret(basicAuthPass), } - query = "vm_rows" - queryRender = "constantLine(10)" + vmQuery = "vm_rows" + queryRender = "constantLine(10)" + vlogsQuery = "_time: 5m | stats by (foo) count() total" + vlogsRangeQuery = "* | stats by (foo) count() total" ) func TestVMInstantQuery(t *testing.T) { @@ -42,8 +44,8 @@ func TestVMInstantQuery(t *testing.T) { if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass { t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass) } - if r.URL.Query().Get("query") != query { - t.Fatalf("expected %s in query param, got %s", query, r.URL.Query().Get("query")) + if r.URL.Query().Get("query") != vmQuery { + t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query")) } timeParam := r.URL.Query().Get("time") if timeParam == "" { @@ -78,6 +80,31 @@ func TestVMInstantQuery(t *testing.T) { w.Write([]byte(`[{"target":"constantLine(10)","tags":{"name":"constantLine(10)"},"datapoints":[[10,1611758343],[10,1611758373],[10,1611758403]]}]`)) } }) + mux.HandleFunc("/select/logsql/stats_query", func(w http.ResponseWriter, r *http.Request) { + c++ + if r.Method != http.MethodPost { + t.Fatalf("expected POST method got %s", r.Method) + } + if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass { + t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass) + } + if r.URL.Query().Get("query") != vlogsQuery { + t.Fatalf("expected %s in query param, got %s", vlogsQuery, r.URL.Query().Get("query")) + } + timeParam := r.URL.Query().Get("time") + if timeParam == "" { + t.Fatalf("expected 'time' in query param, got nil instead") + } + if _, err := time.Parse(time.RFC3339, timeParam); err != nil { + t.Fatalf("failed to parse 'time' query param %q: %s", timeParam, err) + } + switch c { + case 9: + w.Write([]byte("[]")) + case 10: + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"total","foo":"bar"},"value":[1583786142,"13763"]},{"metric":{"__name__":"total","foo":"baz"},"value":[1583786140,"2000"]}]}}`)) + } + }) srv := httptest.NewServer(mux) defer srv.Close() @@ -86,13 +113,13 @@ func TestVMInstantQuery(t *testing.T) { if err != nil { t.Fatalf("unexpected: %s", err) } - s := NewVMStorage(srv.URL, authCfg, 0, false, srv.Client()) + s := NewPrometheusClient(srv.URL, authCfg, false, srv.Client()) p := datasourcePrometheus pq := s.BuildWithParams(QuerierParams{DataSourceType: string(p), EvaluationInterval: 15 * time.Second}) ts := time.Now() - expErr := func(err string) { + expErr := func(query, err string) { _, _, gotErr := pq.Query(ctx, query, ts) if gotErr == nil { t.Fatalf("expected %q got nil", err) @@ -102,13 +129,13 @@ func TestVMInstantQuery(t *testing.T) { } } - expErr("500") // 0 - expErr("error parsing prometheus metrics") // 1 - expErr("response error") // 2 - expErr("unknown status") // 3 - expErr("unexpected end of JSON input") // 4 + expErr(vmQuery, "500") // 0 + expErr(vmQuery, "error parsing response") // 1 + expErr(vmQuery, "response error") // 2 + expErr(vmQuery, "unknown status") // 3 + expErr(vmQuery, "unexpected end of JSON input") // 4 - res, _, err := pq.Query(ctx, query, ts) // 5 - vector + res, _, err := pq.Query(ctx, vmQuery, ts) // 5 - vector if err != nil { t.Fatalf("unexpected %s", err) } @@ -129,7 +156,7 @@ func TestVMInstantQuery(t *testing.T) { } metricsEqual(t, res.Data, expected) - res, req, err := pq.Query(ctx, query, ts) // 6 - scalar + res, req, err := pq.Query(ctx, vmQuery, ts) // 6 - scalar if err != nil { t.Fatalf("unexpected %s", err) } @@ -154,7 +181,7 @@ func TestVMInstantQuery(t *testing.T) { res.SeriesFetched) } - res, _, err = pq.Query(ctx, query, ts) // 7 - scalar with stats + res, _, err = pq.Query(ctx, vmQuery, ts) // 7 - scalar with stats if err != nil { t.Fatalf("unexpected %s", err) } @@ -175,6 +202,7 @@ func TestVMInstantQuery(t *testing.T) { *res.SeriesFetched) } + // test graphite gq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceGraphite)}) res, _, err = gq.Query(ctx, queryRender, ts) // 8 - graphite @@ -192,6 +220,33 @@ func TestVMInstantQuery(t *testing.T) { }, } metricsEqual(t, res.Data, exp) + + // test victorialogs + vlogs := datasourceVLogs + pq = s.BuildWithParams(QuerierParams{DataSourceType: string(vlogs), EvaluationInterval: 15 * time.Second}) + + expErr(vlogsQuery, "error parsing response") // 9 + + res, _, err = pq.Query(ctx, vlogsQuery, ts) // 10 + if err != nil { + t.Fatalf("unexpected %s", err) + } + if len(res.Data) != 2 { + t.Fatalf("expected 2 metrics got %d in %+v", len(res.Data), res.Data) + } + expected = []Metric{ + { + Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "bar", Name: "foo"}}, + Timestamps: []int64{1583786142}, + Values: []float64{13763}, + }, + { + Labels: []Label{{Value: "total", Name: "stats_result"}, {Value: "baz", Name: "foo"}}, + Timestamps: []int64{1583786140}, + Values: []float64{2000}, + }, + } + metricsEqual(t, res.Data, expected) } func TestVMInstantQueryWithRetry(t *testing.T) { @@ -202,8 +257,8 @@ func TestVMInstantQueryWithRetry(t *testing.T) { c := -1 mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) { c++ - if r.URL.Query().Get("query") != query { - t.Fatalf("expected %s in query param, got %s", query, r.URL.Query().Get("query")) + if r.URL.Query().Get("query") != vmQuery { + t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query")) } switch c { case 0: @@ -225,11 +280,11 @@ func TestVMInstantQueryWithRetry(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() - s := NewVMStorage(srv.URL, nil, 0, false, srv.Client()) + s := NewPrometheusClient(srv.URL, nil, false, srv.Client()) pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus)}) expErr := func(err string) { - _, _, gotErr := pq.Query(ctx, query, time.Now()) + _, _, gotErr := pq.Query(ctx, vmQuery, time.Now()) if gotErr == nil { t.Fatalf("expected %q got nil", err) } @@ -239,7 +294,7 @@ func TestVMInstantQueryWithRetry(t *testing.T) { } expValue := func(v float64) { - res, _, err := pq.Query(ctx, query, time.Now()) + res, _, err := pq.Query(ctx, vmQuery, time.Now()) if err != nil { t.Fatalf("unexpected %s", err) } @@ -300,8 +355,8 @@ func TestVMRangeQuery(t *testing.T) { if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass { t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass) } - if r.URL.Query().Get("query") != query { - t.Fatalf("expected %s in query param, got %s", query, r.URL.Query().Get("query")) + if r.URL.Query().Get("query") != vmQuery { + t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query")) } startTS := r.URL.Query().Get("start") if startTS == "" { @@ -326,6 +381,40 @@ func TestVMRangeQuery(t *testing.T) { w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"vm_rows"},"values":[[1583786142,"13763"]]}]}}`)) } }) + mux.HandleFunc("/select/logsql/stats_query_range", func(w http.ResponseWriter, r *http.Request) { + c++ + if r.Method != http.MethodPost { + t.Fatalf("expected POST method got %s", r.Method) + } + if name, pass, _ := r.BasicAuth(); name != basicAuthName || pass != basicAuthPass { + t.Fatalf("expected %s:%s as basic auth got %s:%s", basicAuthName, basicAuthPass, name, pass) + } + if r.URL.Query().Get("query") != vlogsRangeQuery { + t.Fatalf("expected %s in query param, got %s", vmQuery, r.URL.Query().Get("query")) + } + startTS := r.URL.Query().Get("start") + if startTS == "" { + t.Fatalf("expected 'start' in query param, got nil instead") + } + if _, err := time.Parse(time.RFC3339, startTS); err != nil { + t.Fatalf("failed to parse 'start' query param: %s", err) + } + endTS := r.URL.Query().Get("end") + if endTS == "" { + t.Fatalf("expected 'end' in query param, got nil instead") + } + if _, err := time.Parse(time.RFC3339, endTS); err != nil { + t.Fatalf("failed to parse 'end' query param: %s", err) + } + step := r.URL.Query().Get("step") + if step != "60s" { + t.Fatalf("expected 'step' query param to be 60s; got %q instead", step) + } + switch c { + case 1: + w.Write([]byte(`{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"total"},"values":[[1583786142,"10"]]}]}}`)) + } + }) srv := httptest.NewServer(mux) defer srv.Close() @@ -334,19 +423,19 @@ func TestVMRangeQuery(t *testing.T) { if err != nil { t.Fatalf("unexpected: %s", err) } - s := NewVMStorage(srv.URL, authCfg, *queryStep, false, srv.Client()) + s := NewPrometheusClient(srv.URL, authCfg, false, srv.Client()) pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus), EvaluationInterval: 15 * time.Second}) - _, err = pq.QueryRange(ctx, query, time.Now(), time.Time{}) + _, err = pq.QueryRange(ctx, vmQuery, time.Now(), time.Time{}) expectError(t, err, "is missing") - _, err = pq.QueryRange(ctx, query, time.Time{}, time.Now()) + _, err = pq.QueryRange(ctx, vmQuery, time.Time{}, time.Now()) expectError(t, err, "is missing") start, end := time.Now().Add(-time.Minute), time.Now() - res, err := pq.QueryRange(ctx, query, start, end) + res, err := pq.QueryRange(ctx, vmQuery, start, end) if err != nil { t.Fatalf("unexpected %s", err) } @@ -363,33 +452,66 @@ func TestVMRangeQuery(t *testing.T) { t.Fatalf("unexpected metric %+v want %+v", m[0], expected) } + // test unsupported graphite gq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceGraphite)}) _, err = gq.QueryRange(ctx, queryRender, start, end) expectError(t, err, "is not supported") + + // unsupported logsql + gq = s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceVLogs), EvaluationInterval: 60 * time.Second}) + + res, err = gq.QueryRange(ctx, vlogsRangeQuery, start, end) + expectError(t, err, "is not supported") + + // supported logsql + gq = s.BuildWithParams(QuerierParams{DataSourceType: string(datasourceVLogs), EvaluationInterval: 60 * time.Second, ApplyIntervalAsTimeFilter: true}) + res, err = gq.QueryRange(ctx, vlogsRangeQuery, start, end) + if err != nil { + t.Fatalf("unexpected %s", err) + } + m = res.Data + if len(m) != 1 { + t.Fatalf("expected 1 metric got %d in %+v", len(m), m) + } + expected = Metric{ + Labels: []Label{{Value: "total", Name: "stats_result"}}, + Timestamps: []int64{1583786142}, + Values: []float64{10}, + } + if !reflect.DeepEqual(m[0], expected) { + t.Fatalf("unexpected metric %+v want %+v", m[0], expected) + } } func TestRequestParams(t *testing.T) { query := "up" + vlogsQuery := "_time: 5m | stats count() total" timestamp := time.Date(2001, 2, 3, 4, 5, 6, 0, time.UTC) - f := func(isQueryRange bool, vm *VMStorage, checkFn func(t *testing.T, r *http.Request)) { + f := func(isQueryRange bool, c *Client, checkFn func(t *testing.T, r *http.Request)) { t.Helper() - req, err := vm.newRequest(ctx) + req, err := c.newRequest(ctx) if err != nil { t.Fatalf("error in newRequest: %s", err) } - switch vm.dataSourceType { - case "", datasourcePrometheus: + switch c.dataSourceType { + case datasourcePrometheus: if isQueryRange { - vm.setPrometheusRangeReqParams(req, query, timestamp, timestamp) + c.setPrometheusRangeReqParams(req, query, timestamp, timestamp) } else { - vm.setPrometheusInstantReqParams(req, query, timestamp) + c.setPrometheusInstantReqParams(req, query, timestamp) } case datasourceGraphite: - vm.setGraphiteReqParams(req, query) + c.setGraphiteReqParams(req, query) + case datasourceVLogs: + if isQueryRange { + c.setVLogsRangeReqParams(req, vlogsQuery, timestamp, timestamp) + } else { + c.setVLogsInstantReqParams(req, vlogsQuery, timestamp) + } } checkFn(t, req) @@ -399,19 +521,19 @@ func TestRequestParams(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %s", err) } - storage := VMStorage{ + storage := Client{ extraParams: url.Values{"round_digits": {"10"}}, } // prometheus path - f(false, &VMStorage{ + f(false, &Client{ dataSourceType: datasourcePrometheus, }, func(t *testing.T, r *http.Request) { checkEqualString(t, "/api/v1/query", r.URL.Path) }) // prometheus prefix - f(false, &VMStorage{ + f(false, &Client{ dataSourceType: datasourcePrometheus, appendTypePrefix: true, }, func(t *testing.T, r *http.Request) { @@ -419,14 +541,14 @@ func TestRequestParams(t *testing.T) { }) // prometheus range path - f(true, &VMStorage{ + f(true, &Client{ dataSourceType: datasourcePrometheus, }, func(t *testing.T, r *http.Request) { checkEqualString(t, "/api/v1/query_range", r.URL.Path) }) // prometheus range prefix - f(true, &VMStorage{ + f(true, &Client{ dataSourceType: datasourcePrometheus, appendTypePrefix: true, }, func(t *testing.T, r *http.Request) { @@ -434,14 +556,14 @@ func TestRequestParams(t *testing.T) { }) // graphite path - f(false, &VMStorage{ + f(false, &Client{ dataSourceType: datasourceGraphite, }, func(t *testing.T, r *http.Request) { checkEqualString(t, graphitePath, r.URL.Path) }) // graphite prefix - f(false, &VMStorage{ + f(false, &Client{ dataSourceType: datasourceGraphite, appendTypePrefix: true, }, func(t *testing.T, r *http.Request) { @@ -449,21 +571,27 @@ func TestRequestParams(t *testing.T) { }) // default params - f(false, &VMStorage{}, func(t *testing.T, r *http.Request) { + f(false, &Client{dataSourceType: datasourcePrometheus}, func(t *testing.T, r *http.Request) { + exp := url.Values{"query": {query}, "time": {timestamp.Format(time.RFC3339)}} + checkEqualString(t, exp.Encode(), r.URL.RawQuery) + }) + + f(false, &Client{dataSourceType: datasourcePrometheus, applyIntervalAsTimeFilter: true}, func(t *testing.T, r *http.Request) { exp := url.Values{"query": {query}, "time": {timestamp.Format(time.RFC3339)}} checkEqualString(t, exp.Encode(), r.URL.RawQuery) }) // default range params - f(true, &VMStorage{}, func(t *testing.T, r *http.Request) { + f(true, &Client{dataSourceType: datasourcePrometheus}, func(t *testing.T, r *http.Request) { ts := timestamp.Format(time.RFC3339) exp := url.Values{"query": {query}, "start": {ts}, "end": {ts}} checkEqualString(t, exp.Encode(), r.URL.RawQuery) }) // basic auth - f(false, &VMStorage{ - authCfg: authCfg, + f(false, &Client{ + dataSourceType: datasourcePrometheus, + authCfg: authCfg, }, func(t *testing.T, r *http.Request) { u, p, _ := r.BasicAuth() checkEqualString(t, "foo", u) @@ -471,8 +599,9 @@ func TestRequestParams(t *testing.T) { }) // basic auth range - f(true, &VMStorage{ - authCfg: authCfg, + f(true, &Client{ + dataSourceType: datasourcePrometheus, + authCfg: authCfg, }, func(t *testing.T, r *http.Request) { u, p, _ := r.BasicAuth() checkEqualString(t, "foo", u) @@ -480,7 +609,8 @@ func TestRequestParams(t *testing.T) { }) // evaluation interval - f(false, &VMStorage{ + f(false, &Client{ + dataSourceType: datasourcePrometheus, evaluationInterval: 15 * time.Second, }, func(t *testing.T, r *http.Request) { evalInterval := 15 * time.Second @@ -489,8 +619,9 @@ func TestRequestParams(t *testing.T) { }) // step override - f(false, &VMStorage{ - queryStep: time.Minute, + f(false, &Client{ + dataSourceType: datasourcePrometheus, + queryStep: time.Minute, }, func(t *testing.T, r *http.Request) { exp := url.Values{ "query": {query}, @@ -501,7 +632,8 @@ func TestRequestParams(t *testing.T) { }) // step to seconds - f(false, &VMStorage{ + f(false, &Client{ + dataSourceType: datasourcePrometheus, evaluationInterval: 3 * time.Hour, }, func(t *testing.T, r *http.Request) { evalInterval := 3 * time.Hour @@ -510,15 +642,17 @@ func TestRequestParams(t *testing.T) { }) // prometheus extra params - f(false, &VMStorage{ - extraParams: url.Values{"round_digits": {"10"}}, + f(false, &Client{ + dataSourceType: datasourcePrometheus, + extraParams: url.Values{"round_digits": {"10"}}, }, func(t *testing.T, r *http.Request) { exp := url.Values{"query": {query}, "round_digits": {"10"}, "time": {timestamp.Format(time.RFC3339)}} checkEqualString(t, exp.Encode(), r.URL.RawQuery) }) // prometheus extra params range - f(true, &VMStorage{ + f(true, &Client{ + dataSourceType: datasourcePrometheus, extraParams: url.Values{ "nocache": {"1"}, "max_lookback": {"1h"}, @@ -536,7 +670,8 @@ func TestRequestParams(t *testing.T) { // custom params overrides the original params f(false, storage.Clone().ApplyParams(QuerierParams{ - QueryParams: url.Values{"round_digits": {"2"}}, + DataSourceType: string(datasourcePrometheus), + QueryParams: url.Values{"round_digits": {"2"}}, }), func(t *testing.T, r *http.Request) { exp := url.Values{"query": {query}, "round_digits": {"2"}, "time": {timestamp.Format(time.RFC3339)}} checkEqualString(t, exp.Encode(), r.URL.RawQuery) @@ -544,14 +679,15 @@ func TestRequestParams(t *testing.T) { // allow duplicates in query params f(false, storage.Clone().ApplyParams(QuerierParams{ - QueryParams: url.Values{"extra_labels": {"env=dev", "foo=bar"}}, + DataSourceType: string(datasourcePrometheus), + QueryParams: url.Values{"extra_labels": {"env=dev", "foo=bar"}}, }), func(t *testing.T, r *http.Request) { exp := url.Values{"query": {query}, "round_digits": {"10"}, "extra_labels": {"env=dev", "foo=bar"}, "time": {timestamp.Format(time.RFC3339)}} checkEqualString(t, exp.Encode(), r.URL.RawQuery) }) // graphite extra params - f(false, &VMStorage{ + f(false, &Client{ dataSourceType: datasourceGraphite, extraParams: url.Values{ "nocache": {"1"}, @@ -563,7 +699,7 @@ func TestRequestParams(t *testing.T) { }) // graphite extra params allows to override from - f(false, &VMStorage{ + f(false, &Client{ dataSourceType: datasourceGraphite, extraParams: url.Values{ "from": {"-10m"}, @@ -572,10 +708,38 @@ func TestRequestParams(t *testing.T) { exp := fmt.Sprintf("format=json&from=-10m&target=%s&until=now", query) checkEqualString(t, exp, r.URL.RawQuery) }) + + // test vlogs + f(false, &Client{ + dataSourceType: datasourceVLogs, + evaluationInterval: time.Minute, + }, func(t *testing.T, r *http.Request) { + exp := url.Values{"query": {vlogsQuery}, "time": {timestamp.Format(time.RFC3339)}} + checkEqualString(t, exp.Encode(), r.URL.RawQuery) + }) + + f(false, &Client{ + dataSourceType: datasourceVLogs, + evaluationInterval: time.Minute, + applyIntervalAsTimeFilter: true, + }, func(t *testing.T, r *http.Request) { + ts := timestamp.Format(time.RFC3339) + exp := url.Values{"query": {vlogsQuery}, "time": {ts}, "start": {timestamp.Add(-time.Minute).Format(time.RFC3339)}, "end": {ts}} + checkEqualString(t, exp.Encode(), r.URL.RawQuery) + }) + + f(true, &Client{ + dataSourceType: datasourceVLogs, + evaluationInterval: time.Minute, + }, func(t *testing.T, r *http.Request) { + ts := timestamp.Format(time.RFC3339) + exp := url.Values{"query": {vlogsQuery}, "start": {ts}, "end": {ts}, "step": {"60s"}} + checkEqualString(t, exp.Encode(), r.URL.RawQuery) + }) } func TestHeaders(t *testing.T) { - f := func(vmFn func() *VMStorage, checkFn func(t *testing.T, r *http.Request)) { + f := func(vmFn func() *Client, checkFn func(t *testing.T, r *http.Request)) { t.Helper() vm := vmFn() @@ -587,12 +751,12 @@ func TestHeaders(t *testing.T) { } // basic auth - f(func() *VMStorage { + f(func() *Client { cfg, err := utils.AuthConfig(utils.WithBasicAuth("foo", "bar", "")) if err != nil { t.Fatalf("Error get auth config: %s", err) } - return &VMStorage{authCfg: cfg} + return NewPrometheusClient("", cfg, false, nil) }, func(t *testing.T, r *http.Request) { u, p, _ := r.BasicAuth() checkEqualString(t, "foo", u) @@ -600,12 +764,12 @@ func TestHeaders(t *testing.T) { }) // bearer auth - f(func() *VMStorage { + f(func() *Client { cfg, err := utils.AuthConfig(utils.WithBearer("foo", "")) if err != nil { t.Fatalf("Error get auth config: %s", err) } - return &VMStorage{authCfg: cfg} + return NewPrometheusClient("", cfg, false, nil) }, func(t *testing.T, r *http.Request) { reqToken := r.Header.Get("Authorization") splitToken := strings.Split(reqToken, "Bearer ") @@ -617,11 +781,13 @@ func TestHeaders(t *testing.T) { }) // custom extraHeaders - f(func() *VMStorage { - return &VMStorage{extraHeaders: []keyValue{ + f(func() *Client { + c := NewPrometheusClient("", nil, false, nil) + c.extraHeaders = []keyValue{ {key: "Foo", value: "bar"}, {key: "Baz", value: "qux"}, - }} + } + return c }, func(t *testing.T, r *http.Request) { h1 := r.Header.Get("Foo") checkEqualString(t, "bar", h1) @@ -630,17 +796,16 @@ func TestHeaders(t *testing.T) { }) // custom header overrides basic auth - f(func() *VMStorage { + f(func() *Client { cfg, err := utils.AuthConfig(utils.WithBasicAuth("foo", "bar", "")) if err != nil { t.Fatalf("Error get auth config: %s", err) } - return &VMStorage{ - authCfg: cfg, - extraHeaders: []keyValue{ - {key: "Authorization", value: "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="}, - }, + c := NewPrometheusClient("", cfg, false, nil) + c.extraHeaders = []keyValue{ + {key: "Authorization", value: "Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=="}, } + return c }, func(t *testing.T, r *http.Request) { u, p, _ := r.BasicAuth() checkEqualString(t, "Aladdin", u) diff --git a/app/vmalert/datasource/client_vlogs.go b/app/vmalert/datasource/client_vlogs.go new file mode 100644 index 000000000..cb6e8892c --- /dev/null +++ b/app/vmalert/datasource/client_vlogs.go @@ -0,0 +1,61 @@ +package datasource + +import ( + "fmt" + "net/http" + "time" +) + +func (s *Client) setVLogsInstantReqParams(r *http.Request, query string, timestamp time.Time) { + // there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix. + if !*disablePathAppend { + r.URL.Path += "/select/logsql/stats_query" + } + q := r.URL.Query() + // set `time` param explicitly, it will be used as the timestamp of query results. + q.Set("time", timestamp.Format(time.RFC3339)) + // set the `start` and `end` params if applyIntervalAsTimeFilter is enabled(time filter is missing in the rule expr), + // so the query will be executed in time range [timestamp - evaluationInterval, timestamp]. + if s.applyIntervalAsTimeFilter && s.evaluationInterval > 0 { + q.Set("start", timestamp.Add(-s.evaluationInterval).Format(time.RFC3339)) + q.Set("end", timestamp.Format(time.RFC3339)) + } + r.URL.RawQuery = q.Encode() + s.setReqParams(r, query) +} + +func (s *Client) setVLogsRangeReqParams(r *http.Request, query string, start, end time.Time) { + // there is no type path prefix in victorialogs APIs right now, ignore appendTypePrefix. + if !*disablePathAppend { + r.URL.Path += "/select/logsql/stats_query_range" + } + q := r.URL.Query() + q.Add("start", start.Format(time.RFC3339)) + q.Add("end", end.Format(time.RFC3339)) + // set step as evaluationInterval by default + if s.evaluationInterval > 0 { + q.Set("step", fmt.Sprintf("%ds", int(s.evaluationInterval.Seconds()))) + } + r.URL.RawQuery = q.Encode() + s.setReqParams(r, query) +} + +func parseVLogsResponse(req *http.Request, resp *http.Response) (res Result, err error) { + res, err = parsePrometheusResponse(req, resp) + if err != nil { + return Result{}, err + } + for i := range res.Data { + m := &res.Data[i] + for j := range m.Labels { + // reserve the stats func result name with a new label `stats_result` instead of dropping it, + // since there could be multiple stats results in a single query, for instance: + // _time:5m | stats quantile(0.5, request_duration_seconds) p50, quantile(0.9, request_duration_seconds) p90 + if m.Labels[j].Name == "__name__" { + m.Labels[j].Name = "stats_result" + break + } + } + } + return +} diff --git a/app/vmalert/datasource/datasource.go b/app/vmalert/datasource/datasource.go index 31e4689c4..97a0f8d49 100644 --- a/app/vmalert/datasource/datasource.go +++ b/app/vmalert/datasource/datasource.go @@ -42,11 +42,15 @@ type QuerierBuilder interface { // QuerierParams params for Querier. type QuerierParams struct { - DataSourceType string - EvaluationInterval time.Duration - QueryParams url.Values - Headers map[string]string - Debug bool + DataSourceType string + // ApplyIntervalAsTimeFilter is only valid for vlogs datasource. + // Set to true if there is no [timeFilter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) in the rule expression, + // and we will add evaluation interval as an additional timeFilter when querying. + ApplyIntervalAsTimeFilter bool + EvaluationInterval time.Duration + QueryParams url.Values + Headers map[string]string + Debug bool } // Metric is the basic entity which should be return by datasource diff --git a/app/vmalert/datasource/init.go b/app/vmalert/datasource/init.go index a99f130e8..4e163ab71 100644 --- a/app/vmalert/datasource/init.go +++ b/app/vmalert/datasource/init.go @@ -133,13 +133,12 @@ func Init(extraParams url.Values) (QuerierBuilder, error) { return nil, fmt.Errorf("failed to set request auth header to datasource %q: %w", *addr, err) } - return &VMStorage{ + return &Client{ c: &http.Client{Transport: tr}, authCfg: authCfg, datasourceURL: strings.TrimSuffix(*addr, "/"), appendTypePrefix: *appendTypePrefix, queryStep: *queryStep, - dataSourceType: datasourcePrometheus, extraParams: extraParams, }, nil } diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go deleted file mode 100644 index c8258fec4..000000000 --- a/app/vmalert/datasource/vm.go +++ /dev/null @@ -1,272 +0,0 @@ -package datasource - -import ( - "context" - "errors" - "fmt" - "io" - "net/http" - "net/url" - "strings" - "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" -) - -type datasourceType string - -const ( - datasourcePrometheus datasourceType = "prometheus" - datasourceGraphite datasourceType = "graphite" -) - -func toDatasourceType(s string) datasourceType { - if s == string(datasourceGraphite) { - return datasourceGraphite - } - return datasourcePrometheus -} - -// VMStorage represents vmstorage entity with ability to read and write metrics -// WARN: when adding a new field, remember to update Clone() method. -type VMStorage struct { - c *http.Client - authCfg *promauth.Config - datasourceURL string - appendTypePrefix bool - queryStep time.Duration - dataSourceType datasourceType - - // evaluationInterval will help setting request's `step` param. - evaluationInterval time.Duration - // extraParams contains params to be attached to each HTTP request - extraParams url.Values - // extraHeaders are headers to be attached to each HTTP request - extraHeaders []keyValue - - // whether to print additional log messages - // for each sent request - debug bool -} - -type keyValue struct { - key string - value string -} - -// Clone makes clone of VMStorage, shares http client. -func (s *VMStorage) Clone() *VMStorage { - ns := &VMStorage{ - c: s.c, - authCfg: s.authCfg, - datasourceURL: s.datasourceURL, - appendTypePrefix: s.appendTypePrefix, - queryStep: s.queryStep, - - dataSourceType: s.dataSourceType, - evaluationInterval: s.evaluationInterval, - - // init map so it can be populated below - extraParams: url.Values{}, - - debug: s.debug, - } - if len(s.extraHeaders) > 0 { - ns.extraHeaders = make([]keyValue, len(s.extraHeaders)) - copy(ns.extraHeaders, s.extraHeaders) - } - for k, v := range s.extraParams { - ns.extraParams[k] = v - } - - return ns -} - -// ApplyParams - changes given querier params. -func (s *VMStorage) ApplyParams(params QuerierParams) *VMStorage { - s.dataSourceType = toDatasourceType(params.DataSourceType) - s.evaluationInterval = params.EvaluationInterval - if params.QueryParams != nil { - if s.extraParams == nil { - s.extraParams = url.Values{} - } - for k, vl := range params.QueryParams { - // custom query params are prior to default ones - if s.extraParams.Has(k) { - s.extraParams.Del(k) - } - for _, v := range vl { - // don't use .Set() instead of Del/Add since it is allowed - // for GET params to be duplicated - // see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4908 - s.extraParams.Add(k, v) - } - } - } - if params.Headers != nil { - for key, value := range params.Headers { - kv := keyValue{key: key, value: value} - s.extraHeaders = append(s.extraHeaders, kv) - } - } - s.debug = params.Debug - return s -} - -// BuildWithParams - implements interface. -func (s *VMStorage) BuildWithParams(params QuerierParams) Querier { - return s.Clone().ApplyParams(params) -} - -// NewVMStorage is a constructor for VMStorage -func NewVMStorage(baseURL string, authCfg *promauth.Config, queryStep time.Duration, appendTypePrefix bool, c *http.Client) *VMStorage { - return &VMStorage{ - c: c, - authCfg: authCfg, - datasourceURL: strings.TrimSuffix(baseURL, "/"), - appendTypePrefix: appendTypePrefix, - queryStep: queryStep, - dataSourceType: datasourcePrometheus, - extraParams: url.Values{}, - } -} - -// Query executes the given query and returns parsed response -func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) { - req, err := s.newQueryRequest(ctx, query, ts) - if err != nil { - return Result{}, nil, err - } - resp, err := s.do(req) - if err != nil { - if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) { - // Return unexpected error to the caller. - return Result{}, nil, err - } - // Something in the middle between client and datasource might be closing - // the connection. So we do a one more attempt in hope request will succeed. - req, err = s.newQueryRequest(ctx, query, ts) - if err != nil { - return Result{}, nil, fmt.Errorf("second attempt: %w", err) - } - resp, err = s.do(req) - if err != nil { - return Result{}, nil, fmt.Errorf("second attempt: %w", err) - } - } - - // Process the received response. - parseFn := parsePrometheusResponse - if s.dataSourceType != datasourcePrometheus { - parseFn = parseGraphiteResponse - } - result, err := parseFn(req, resp) - _ = resp.Body.Close() - return result, req, err -} - -// QueryRange executes the given query on the given time range. -// For Prometheus type see https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries -// Graphite type isn't supported. -func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end time.Time) (res Result, err error) { - if s.dataSourceType != datasourcePrometheus { - return res, fmt.Errorf("%q is not supported for QueryRange", s.dataSourceType) - } - if start.IsZero() { - return res, fmt.Errorf("start param is missing") - } - if end.IsZero() { - return res, fmt.Errorf("end param is missing") - } - req, err := s.newQueryRangeRequest(ctx, query, start, end) - if err != nil { - return res, err - } - resp, err := s.do(req) - if err != nil { - if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) && !netutil.IsTrivialNetworkError(err) { - // Return unexpected error to the caller. - return res, err - } - // Something in the middle between client and datasource might be closing - // the connection. So we do a one more attempt in hope request will succeed. - req, err = s.newQueryRangeRequest(ctx, query, start, end) - if err != nil { - return res, fmt.Errorf("second attempt: %w", err) - } - resp, err = s.do(req) - if err != nil { - return res, fmt.Errorf("second attempt: %w", err) - } - } - - // Process the received response. - res, err = parsePrometheusResponse(req, resp) - _ = resp.Body.Close() - return res, err -} - -func (s *VMStorage) do(req *http.Request) (*http.Response, error) { - ru := req.URL.Redacted() - if *showDatasourceURL { - ru = req.URL.String() - } - if s.debug { - logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru) - } - resp, err := s.c.Do(req) - if err != nil { - return nil, fmt.Errorf("error getting response from %s: %w", ru, err) - } - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - _ = resp.Body.Close() - return nil, fmt.Errorf("unexpected response code %d for %s. Response body %s", resp.StatusCode, ru, body) - } - return resp, nil -} - -func (s *VMStorage) newQueryRangeRequest(ctx context.Context, query string, start, end time.Time) (*http.Request, error) { - req, err := s.newRequest(ctx) - if err != nil { - return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", s.datasourceURL, err) - } - s.setPrometheusRangeReqParams(req, query, start, end) - return req, nil -} - -func (s *VMStorage) newQueryRequest(ctx context.Context, query string, ts time.Time) (*http.Request, error) { - req, err := s.newRequest(ctx) - if err != nil { - return nil, fmt.Errorf("cannot create query request to datasource %q: %w", s.datasourceURL, err) - } - switch s.dataSourceType { - case "", datasourcePrometheus: - s.setPrometheusInstantReqParams(req, query, ts) - case datasourceGraphite: - s.setGraphiteReqParams(req, query) - default: - logger.Panicf("BUG: engine not found: %q", s.dataSourceType) - } - return req, nil -} - -func (s *VMStorage) newRequest(ctx context.Context) (*http.Request, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.datasourceURL, nil) - if err != nil { - logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err) - } - req.Header.Set("Content-Type", "application/json") - if s.authCfg != nil { - err = s.authCfg.SetHeaders(req, true) - if err != nil { - return nil, err - } - } - for _, h := range s.extraHeaders { - req.Header.Set(h.key, h.value) - } - return req, nil -} diff --git a/app/vmalert/main.go b/app/vmalert/main.go index 53af8f210..9e14d03dd 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -66,7 +66,7 @@ absolute path to all .tpl files in root. evaluationInterval = flag.Duration("evaluationInterval", time.Minute, "How often to evaluate the rules") validateTemplates = flag.Bool("rule.validateTemplates", true, "Whether to validate annotation and label templates") - validateExpressions = flag.Bool("rule.validateExpressions", true, "Whether to validate rules expressions via MetricsQL engine") + validateExpressions = flag.Bool("rule.validateExpressions", true, "Whether to validate rules expressions for different types.") externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier. By default, hostname is used as address.") externalAlertSource = flag.String("external.alert.source", "", `External Alert Source allows to override the Source link for alerts sent to AlertManager `+ diff --git a/app/vmalert/remoteread/init.go b/app/vmalert/remoteread/init.go index 612db0fad..b134dad42 100644 --- a/app/vmalert/remoteread/init.go +++ b/app/vmalert/remoteread/init.go @@ -86,5 +86,5 @@ func Init() (datasource.QuerierBuilder, error) { return nil, fmt.Errorf("failed to configure auth: %w", err) } c := &http.Client{Transport: tr} - return datasource.NewVMStorage(*addr, authCfg, 0, false, c), nil + return datasource.NewPrometheusClient(*addr, authCfg, false, c), nil } diff --git a/app/vmalert/rule/alerting.go b/app/vmalert/rule/alerting.go index 5c9a17393..f0d7101a9 100644 --- a/app/vmalert/rule/alerting.go +++ b/app/vmalert/rule/alerting.go @@ -72,11 +72,12 @@ func NewAlertingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rule EvalInterval: group.Interval, Debug: cfg.Debug, q: qb.BuildWithParams(datasource.QuerierParams{ - DataSourceType: group.Type.String(), - EvaluationInterval: group.Interval, - QueryParams: group.Params, - Headers: group.Headers, - Debug: cfg.Debug, + DataSourceType: group.Type.String(), + ApplyIntervalAsTimeFilter: setIntervalAsTimeFilter(group.Type.String(), cfg.Expr), + EvaluationInterval: group.Interval, + QueryParams: group.Params, + Headers: group.Headers, + Debug: cfg.Debug, }), alerts: make(map[uint64]*notifier.Alert), metrics: &alertingRuleMetrics{}, diff --git a/app/vmalert/rule/group.go b/app/vmalert/rule/group.go index 5051a6082..5f0e42329 100644 --- a/app/vmalert/rule/group.go +++ b/app/vmalert/rule/group.go @@ -213,7 +213,6 @@ func (g *Group) restore(ctx context.Context, qb datasource.QuerierBuilder, ts ti continue } q := qb.BuildWithParams(datasource.QuerierParams{ - DataSourceType: g.Type.String(), EvaluationInterval: g.Interval, QueryParams: g.Params, Headers: g.Headers, diff --git a/app/vmalert/rule/recording.go b/app/vmalert/rule/recording.go index c015dfe06..3de0524b5 100644 --- a/app/vmalert/rule/recording.go +++ b/app/vmalert/rule/recording.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/datasource" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/utils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" ) @@ -64,10 +65,11 @@ func NewRecordingRule(qb datasource.QuerierBuilder, group *Group, cfg config.Rul File: group.File, metrics: &recordingRuleMetrics{}, q: qb.BuildWithParams(datasource.QuerierParams{ - DataSourceType: group.Type.String(), - EvaluationInterval: group.Interval, - QueryParams: group.Params, - Headers: group.Headers, + DataSourceType: group.Type.String(), + ApplyIntervalAsTimeFilter: setIntervalAsTimeFilter(group.Type.String(), cfg.Expr), + EvaluationInterval: group.Interval, + QueryParams: group.Params, + Headers: group.Headers, }), } @@ -213,3 +215,12 @@ func (rr *RecordingRule) updateWith(r Rule) error { rr.q = nr.q return nil } + +// setIntervalAsTimeFilter returns true if given LogsQL has a time filter. +func setIntervalAsTimeFilter(dType, expr string) bool { + if dType != "vlogs" { + return false + } + q, _ := logstorage.ParseStatsQuery(expr) + return !q.ContainAnyTimeFilter() +} diff --git a/app/vmalert/rule/recording_test.go b/app/vmalert/rule/recording_test.go index 62b9e7265..469b4285a 100644 --- a/app/vmalert/rule/recording_test.go +++ b/app/vmalert/rule/recording_test.go @@ -266,3 +266,25 @@ func TestRecordingRuleExec_Negative(t *testing.T) { t.Fatalf("cannot execute recroding rule: %s", err) } } + +func TestSetIntervalAsTimeFilter(t *testing.T) { + f := func(s, dType string, expected bool) { + t.Helper() + + if setIntervalAsTimeFilter(dType, s) != expected { + t.Fatalf("unexpected result for hasTimeFilter(%q); want %v", s, expected) + } + } + + f(`* | count()`, "prometheus", false) + + f(`* | count()`, "vlogs", true) + f(`error OR _time:5m | count()`, "vlogs", true) + f(`(_time: 5m AND error) OR (_time: 5m AND warn) | count()`, "vlogs", true) + f(`* | error OR _time:5m | count()`, "vlogs", true) + + f(`_time:5m | count()`, "vlogs", false) + f(`_time:2023-04-25T22:45:59Z | count()`, "vlogs", false) + f(`error AND _time:5m | count()`, "vlogs", false) + f(`* | error AND _time:5m | count()`, "vlogs", false) +} diff --git a/deployment/docker/README.md b/deployment/docker/README.md index cd012c09a..d9e136af7 100644 --- a/deployment/docker/README.md +++ b/deployment/docker/README.md @@ -105,7 +105,7 @@ vmauth config is available [here](ttps://github.com/VictoriaMetrics/VictoriaMetr ## vmalert -vmalert evaluates alerting rules [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts.yml) +vmalert evaluates alerting rules [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts.yml) to track VictoriaMetrics health state. It is connected with AlertManager for firing alerts, and with VictoriaMetrics for executing queries and storing alert's state. @@ -153,17 +153,17 @@ make docker-cluster-vm-datasource-down # shutdown cluster See below a list of recommended alerting rules for various VictoriaMetrics components for running in production. Some alerting rules thresholds are just recommendations and could require an adjustment. The list of alerting rules is the following: -* [alerts-health.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-health.yml): +* [alerts-health.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-health.yml): alerting rules related to all VictoriaMetrics components for tracking their "health" state; -* [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts.yml): +* [alerts.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts.yml): alerting rules related to [single-server VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) installation; -* [alerts-cluster.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-cluster.yml): +* [alerts-cluster.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-cluster.yml): alerting rules related to [cluster version of VictoriaMetrics](https://docs.victoriametrics.com/cluster-victoriametrics/); -* [alerts-vmagent.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmagent.yml): +* [alerts-vmagent.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmagent.yml): alerting rules related to [vmagent](https://docs.victoriametrics.com/vmagent/) component; -* [alerts-vmalert.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmalert.yml): +* [alerts-vmalert.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmalert.yml): alerting rules related to [vmalert](https://docs.victoriametrics.com/vmalert/) component; -* [alerts-vmauth.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml): +* [alerts-vmauth.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmauth.yml): alerting rules related to [vmauth](https://docs.victoriametrics.com/vmauth/) component; * [alerts-vlogs.yml](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vlogs.yml): alerting rules related to [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/); diff --git a/deployment/docker/auth-mixed-datasource.yml b/deployment/docker/auth-mixed-datasource.yml new file mode 100644 index 000000000..06eb47bbb --- /dev/null +++ b/deployment/docker/auth-mixed-datasource.yml @@ -0,0 +1,9 @@ +# route requests between VictoriaMetrics and VictoriaLogs +unauthorized_user: + url_map: + - src_paths: + - "/api/v1/query.*" + url_prefix: "http://victoriametrics:8428" + - src_paths: + - "/select/logsql/.*" + url_prefix: "http://victorialogs:9428" diff --git a/deployment/docker/docker-compose-cluster.yml b/deployment/docker/docker-compose-cluster.yml index 088bb8681..0ebe2e757 100644 --- a/deployment/docker/docker-compose-cluster.yml +++ b/deployment/docker/docker-compose-cluster.yml @@ -133,10 +133,10 @@ services: ports: - 8880:8880 volumes: - - ./alerts-cluster.yml:/etc/alerts/alerts.yml - - ./alerts-health.yml:/etc/alerts/alerts-health.yml - - ./alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml - - ./alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml + - ./rules/alerts-cluster.yml:/etc/alerts/alerts.yml + - ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml + - ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml + - ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml command: - "--datasource.url=http://vmauth:8427/select/0/prometheus" - "--remoteRead.url=http://vmauth:8427/select/0/prometheus" diff --git a/deployment/docker/docker-compose-victorialogs.yml b/deployment/docker/docker-compose-victorialogs.yml index b18c05718..091c3655d 100644 --- a/deployment/docker/docker-compose-victorialogs.yml +++ b/deployment/docker/docker-compose-victorialogs.yml @@ -26,7 +26,7 @@ services: # and forwards them to VictoriaLogs fluentbit: container_name: fluentbit - image: cr.fluentbit.io/fluent/fluent-bit:2.1.4 + image: fluent/fluent-bit:2.1.4 volumes: - /var/lib/docker/containers:/var/lib/docker/containers:ro - ./fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf @@ -40,7 +40,7 @@ services: # storing logs and serving read queries. victorialogs: container_name: victorialogs - image: docker.io/victoriametrics/victoria-logs:v0.37.0-victorialogs + image: victoriametrics/victoria-logs:v0.37.0-victorialogs command: - "--storageDataPath=/vlogs" - "--httpListenAddr=:9428" @@ -69,29 +69,50 @@ services: - vm_net restart: always - # vmalert executes alerting and recording rules + # vmauth is a router and balancer for HTTP requests. + # It proxies query requests from vmalert to either VictoriaMetrics or VictoriaLogs, + # depending on the requested path. + vmauth: + container_name: vmauth + image: victoriametrics/vmauth:v1.105.0 + depends_on: + - "victoriametrics" + - "victorialogs" + volumes: + - ./auth-mixed-datasource.yml:/etc/auth.yml + command: + - "--auth.config=/etc/auth.yml" + ports: + - 8427:8427 + networks: + - vm_net + restart: always + + # vmalert executes alerting and recording rules according to given rule type. vmalert: container_name: vmalert image: victoriametrics/vmalert:v1.105.0 depends_on: - - "victoriametrics" + - "vmauth" - "alertmanager" + - "victoriametrics" ports: - 8880:8880 volumes: - - ./alerts.yml:/etc/alerts/alerts.yml - - ./alerts-health.yml:/etc/alerts/alerts-health.yml - - ./alerts-vlogs.yml:/etc/alerts/alerts-vlogs.yml - - ./alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml + # disable log-related rules for now, util vmalert supports vlogs type rule + # - ./rules/vlogs-example-alerts.yml:/etc/alerts/vlogs.yml + - ./rules/alerts.yml:/etc/alerts/alerts.yml + - ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml + - ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml + - ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml command: - - "--datasource.url=http://victoriametrics:8428/" + - "--datasource.url=http://vmauth:8427/" - "--remoteRead.url=http://victoriametrics:8428/" - "--remoteWrite.url=http://victoriametrics:8428/" - "--notifier.url=http://alertmanager:9093/" - "--rule=/etc/alerts/*.yml" # display source of alerts in grafana - "--external.url=http://127.0.0.1:3000" #grafana outside container - - '--external.alert.source=explore?orgId=1&left={"datasource":"VictoriaMetrics","queries":[{"expr":{{.Expr|jsonEscape|queryEscape}},"refId":"A"}],"range":{"from":"{{ .ActiveAt.UnixMilli }}","to":"now"}}' networks: - vm_net restart: always diff --git a/deployment/docker/docker-compose.yml b/deployment/docker/docker-compose.yml index 341b3d321..8d279fba1 100644 --- a/deployment/docker/docker-compose.yml +++ b/deployment/docker/docker-compose.yml @@ -72,10 +72,10 @@ services: ports: - 8880:8880 volumes: - - ./alerts.yml:/etc/alerts/alerts.yml - - ./alerts-health.yml:/etc/alerts/alerts-health.yml - - ./alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml - - ./alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml + - ./rules/alerts.yml:/etc/alerts/alerts.yml + - ./rules/alerts-health.yml:/etc/alerts/alerts-health.yml + - ./rules/alerts-vmagent.yml:/etc/alerts/alerts-vmagent.yml + - ./rules/alerts-vmalert.yml:/etc/alerts/alerts-vmalert.yml command: - "--datasource.url=http://victoriametrics:8428/" - "--remoteRead.url=http://victoriametrics:8428/" diff --git a/deployment/docker/alerts-cluster.yml b/deployment/docker/rules/alerts-cluster.yml similarity index 100% rename from deployment/docker/alerts-cluster.yml rename to deployment/docker/rules/alerts-cluster.yml diff --git a/deployment/docker/alerts-health.yml b/deployment/docker/rules/alerts-health.yml similarity index 100% rename from deployment/docker/alerts-health.yml rename to deployment/docker/rules/alerts-health.yml diff --git a/deployment/docker/alerts-vmagent.yml b/deployment/docker/rules/alerts-vmagent.yml similarity index 100% rename from deployment/docker/alerts-vmagent.yml rename to deployment/docker/rules/alerts-vmagent.yml diff --git a/deployment/docker/alerts-vmalert.yml b/deployment/docker/rules/alerts-vmalert.yml similarity index 100% rename from deployment/docker/alerts-vmalert.yml rename to deployment/docker/rules/alerts-vmalert.yml diff --git a/deployment/docker/alerts-vmauth.yml b/deployment/docker/rules/alerts-vmauth.yml similarity index 100% rename from deployment/docker/alerts-vmauth.yml rename to deployment/docker/rules/alerts-vmauth.yml diff --git a/deployment/docker/alerts.yml b/deployment/docker/rules/alerts.yml similarity index 100% rename from deployment/docker/alerts.yml rename to deployment/docker/rules/alerts.yml diff --git a/deployment/docker/rules/vlogs-example-alerts.yml b/deployment/docker/rules/vlogs-example-alerts.yml new file mode 100644 index 000000000..7ba25c9ea --- /dev/null +++ b/deployment/docker/rules/vlogs-example-alerts.yml @@ -0,0 +1,13 @@ +groups: + - name: TestGroup + type: vlogs + interval: 1m + rules: + - record: logCount + expr: '_time: 1m | stats by (path) count () as total' + annotations: + description: "path {{$labels.path}} generated {{$value}} logs in the last 1 minute" + - alert: tooManyLogs + expr: '_time: 1m | stats by (path) count () as total | filter total:>50' + annotations: + description: "path {{$labels.path}} generated more than 50 log entries in the last 1 minute: {{$value}}" diff --git a/docs/VictoriaLogs/vmalert.md b/docs/VictoriaLogs/vmalert.md new file mode 100644 index 000000000..db1feaa1c --- /dev/null +++ b/docs/VictoriaLogs/vmalert.md @@ -0,0 +1,207 @@ +--- +weight: 10 +title: vmalert +menu: + docs: + parent: "victorialogs" + weight: 10 +aliases: +- /VictoriaLogs/vmalert.html +--- + +_Available from [TODO](https://docs.victoriametrics.com/changelog/#TODO) vmalert version and [v0.36.0](https://docs.victoriametrics.com/victorialogs/changelog/#v0360) VictoriaLogs version._ + +[vmalert](https://docs.victoriametrics.com/vmalert/) integrates with VictoriaLogs via stats APIs [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats) +and [`/select/logsql/stats_query_range`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats). +These endpoints return the log stats in a format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries). +It allows using VictoriaLogs as the datasource in vmalert, creating alerting and recording rules via [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/). + +_Note: This page provides only integration instructions for vmalert and VictoriaLogs. See the full textbook for vmalert [here](https://docs.victoriametrics.com/vmalert)._ + +## Quick Start + +Run vmalert with `-rule.defaultRuleType=vlogs` cmd-line flag. +``` +./bin/vmalert -rule=alert.rules \ # Path to the files or http url with alerting and/or recording rules in YAML format. + -datasource.url=http://localhost:9428 \ # VictoriaLogs address. + -rule.defaultRuleType=vlogs \ # Set default rules type to VictoriaLogs. + -notifier.url=http://localhost:9093 \ # AlertManager URL (required if alerting rules are used) + -remoteWrite.url=http://localhost:8428 \ # Remote write compatible storage to persist rules and alerts state info (required for recording rules) + -remoteRead.url=http://localhost:8428 \ # Prometheus HTTP API compatible datasource to restore alerts state from +``` + +> See the full list of configuration flags and their descriptions in [configuration](#configuration) section. + +> Each `-rule` file may contain arbitrary number of [groups](https://docs.victoriametrics.com/vmalert/#groups). +See examples in [Groups](#groups) section. + +With configuration example above, vmalert will perform the following interactions: +![vmalert](vmalert_victorialogs.webp) + +1. Rules listed in `-rule` file are executed against VictoriaLogs service configured via `-datasource.url`; +2. Triggered alerting notifications are sent to [Alertmanager](https://github.com/prometheus/alertmanager) service configured via `-notifier.url`; +3. Results of recording rules expressions and alerts state are persisted to Prometheus-compatible remote-write endpoint (i.e. VictoriaMetrics) configured via `-remoteWrite.url`; +4. On vmalert restarts, alerts state [can be restored](https://docs.victoriametrics.com/vmalert/#alerts-state-on-restarts) by querying Prometheus-compatible HTTP API endpoint (i.e. VictoriaMetrics) configured via `-remoteRead.url`. + +## Configuration + +### Flags + +For a complete list of command-line flags, visit https://docs.victoriametrics.com/vmalert/#flags or execute `./vmalert --help` command. +The following are key flags related to integration with VictoriaLogs: + +``` +-datasource.url string + Datasource address supporting log stats APIs, which can be a single VictoriaLogs node or a proxy in front of VictoriaLogs. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. +-notifier.url array + Prometheus Alertmanager URL, e.g. http://127.0.0.1:9093. List all Alertmanager URLs if it runs in the cluster mode to ensure high availability. + Supports an array of values separated by comma or specified via multiple flags. + Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. +-remoteWrite.url string + Optional URL to VictoriaMetrics or vminsert where to persist alerts state and recording rules results in form of timeseries. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. For example, if -remoteWrite.url=http://127.0.0.1:8428 is specified, then the alerts state will be written to http://127.0.0.1:8428/api/v1/write . See also -remoteWrite.disablePathAppend, '-remoteWrite.showURL'. +-remoteRead.url string + Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'. +-rule array + Path to the files or http url with alerting and/or recording rules in YAML format. + Supports hierarchical patterns and regexpes. + Examples: + -rule="/path/to/file". Path to a single file with alerting rules. + -rule="http:///path/to/rules". HTTP URL to a page with alerting rules. + -rule="dir/*.yaml" -rule="/*.yaml" -rule="gcs://vmalert-rules/tenant_%{TENANT_ID}/prod". + -rule="dir/**/*.yaml". Includes all the .yaml files in "dir" subfolders recursively. + Rule files support YAML multi-document. Files may contain %{ENV_VAR} placeholders, which are substituted by the corresponding env vars. + Enterprise version of vmalert supports S3 and GCS paths to rules. + For example: gs://bucket/path/to/rules, s3://bucket/path/to/rules + S3 and GCS paths support only matching by prefix, e.g. s3://bucket/dir/rule_ matches + all files with prefix rule_ in folder dir. + Supports an array of values separated by comma or specified via multiple flags. + Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. +-rule.defaultRuleType + Default type for rule expressions, can be overridden by type parameter inside the rule group. Supported values: "graphite", "prometheus" and "vlogs". + Default is "prometheus", change it to "vlogs" if all of the rules are written with LogsQL. +-rule.evalDelay time + Adjustment of the time parameter for rule evaluation requests to compensate intentional data delay from the datasource. Normally, should be equal to `-search.latencyOffset` (cm d-line flag configured for VictoriaMetrics single-node or vmselect). + Since there is no intentional search delay in VictoriaLogs, `-rule.evalDelay` can be reduced to a few seconds to accommodate network and ingestion time. +``` + +For more configuration options, such as `notifiers`, visit https://docs.victoriametrics.com/vmalert/#configuration. + +### Groups + +Check the complete group attributes [here](https://docs.victoriametrics.com/vmalert/#groups). + +#### Alerting rules + +Examples: +``` +groups: + - name: ServiceLog + interval: 5m + rules: + - alert: HasErrorLog + expr: 'env: "prod" AND status:~"error|warn" | stats by (service) count() as errorLog | filter errorLog:>0' + annotations: + description: "Service {{$labels.service}} generated {{$labels.errorLog}} error logs in the last 5 minutes" + + - name: ServiceRequest + interval: 5m + rules: + - alert: TooManyFailedRequest + expr: '* | extract "ip= " | extract "status_code=;" | stats by (ip, code) count() if (code:~4.*) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage' + annotations: + description: "Connection from address {{$labels.ip}} has {{$value}}% failed requests in last 5 minutes" +``` + +#### Recording rules + +Examples: +``` +groups: + - name: RequestCount + interval: 5m + rules: + - record: nginxRequestCount + expr: 'env: "test" AND service: "nginx" | stats count(*) as requests' + annotations: + description: "Service nginx on env test accepted {{$labels.requests}} requests in the last 5 minutes" + - record: prodRequestCount + expr: 'env: "prod" | stats by (service) count(*) as requests' + annotations: + description: "Service {{$labels.service}} on env prod accepted {{$labels.requests}} requests in the last 5 minutes" +``` + +## Time filter + +It's recommended to omit the [time filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) in rule expression. +By default, vmalert automatically appends the time filter `_time: ` to the expression. +For instance, the rule below will be evaluated every 5 minutes, and will return the result with logs from the last 5 minutes: +``` +groups: + interval: 5m + rules: + - alert: TooManyFailedRequest + expr: '* | extract "ip= " | extract "status_code=;" | stats by (ip, code) count() if (code:~4.*) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage' + annotations: "Connection from address {{$labels.ip}} has {{$$value}}% failed requests in last 5 minutes" +``` + +User can also specify a customized time filter if needed. For example, rule below will be evaluated every 5 minutes, +but will calculate result over the logs from the last 10 minutes. +``` +groups: + interval: 5m + rules: + - alert: TooManyFailedRequest + expr: '_time: 10m | extract "ip= " | extract "status_code=;" | stats by (ip, code) count() if (code:~4.*) as failed, count() as total| math failed / total as failed_percentage| filter failed_percentage :> 0.01 | fields ip,failed_percentage' + annotations: "Connection from address {{$labels.ip}} has {{$$value}}% failed requests in last 10 minutes" +``` + +Please note, vmalert doesn't support [backfilling](#rules-backfilling) for rules with a customized time filter now. (Might be added in future) + +## Rules backfilling + +vmalert supports alerting and recording rules backfilling (aka replay) against VictoriaLogs as the datasource. +``` +./bin/vmalert -rule=path/to/your.rules \ # path to files with rules you usually use with vmalert + -datasource.url=http://localhost:9428 \ # VictoriaLogs address. + -rule.defaultRuleType=vlogs \ # Set default rule type to VictoriaLogs. + -remoteWrite.url=http://localhost:8428 \ # Remote write compatible storage to persist rules and alerts state info + -replay.timeFrom=2021-05-11T07:21:43Z \ # to start replay from + -replay.timeTo=2021-05-29T18:40:43Z # to finish replay by, is optional +``` + +See more details about backfilling [here](https://docs.victoriametrics.com/vmalert/#rules-backfilling). + +## Performance tip + +LogsQL allows users to obtain multiple stats from a single expression. +For instance, the following query calculates 50th, 90th and 99th percentiles for the `request_duration_seconds` field over logs for the last 5 minutes: + +``` +_time:5m | stats + quantile(0.5, request_duration_seconds) p50, + quantile(0.9, request_duration_seconds) p90, + quantile(0.99, request_duration_seconds) p99 +``` + +This expression can also be used in recording rules as follows: +``` +groups: + - name: requestDuration + interval: 5m + rules: + - record: requestDurationQuantile + expr: '_time:5m | stats by (service) quantile(0.5, request_duration_seconds) p50, quantile(0.9, request_duration_seconds) p90, quantile(0.99, request_duration_seconds) p99' +``` +This creates three metrics for each service: +``` +requestDurationQuantile{stats_result="p50", service="service-1"} +requestDurationQuantile{stats_result="p90", service="service-1"} +requestDurationQuantile{stats_result="p99", service="service-1"} + +requestDurationQuantile{stats_result="p50", service="service-2"} +requestDurationQuantile{stats_result="p90", service="service-2"} +requestDurationQuantile{stats_result="p00", service="service-2"} +... +``` + +For additional tips on writing LogsQL, refer to this [doc](https://docs.victoriametrics.com/victorialogs/logsql/#performance-tips). \ No newline at end of file diff --git a/docs/VictoriaLogs/vmalert_victorialogs.excalidraw b/docs/VictoriaLogs/vmalert_victorialogs.excalidraw new file mode 100644 index 000000000..a3bc8bbb5 --- /dev/null +++ b/docs/VictoriaLogs/vmalert_victorialogs.excalidraw @@ -0,0 +1,687 @@ +{ + "type": "excalidraw", + "version": 2, + "source": "https://excalidraw.com", + "elements": [ + { + "type": "rectangle", + "version": 803, + "versionNonce": 1128884469, + "index": "a0", + "isDeleted": false, + "id": "VgBUzo0blGR-Ijd2mQEEf", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 422.3502197265625, + "y": 215.55953979492188, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 123.7601318359375, + "height": 72.13211059570312, + "seed": 1194011660, + "groupIds": [ + "iBaXgbpyifSwPplm_GO5b" + ], + "frameId": null, + "roundness": null, + "boundElements": [ + { + "type": "arrow", + "id": "sxEhnxlbT7ldlSsmHDUHp" + }, + { + "id": "wRO0q9xKPHc8e8XPPsQWh", + "type": "arrow" + }, + { + "id": "Bpy5by47XGKB4yS99ZkuA", + "type": "arrow" + } + ], + "updated": 1728889265677, + "link": null, + "locked": false + }, + { + "type": "text", + "version": 660, + "versionNonce": 130510869, + "index": "a1", + "isDeleted": false, + "id": "e9TDm09y-GhPm84XWt0Jv", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 443.89678955078125, + "y": 236.64378356933594, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 82, + "height": 24, + "seed": 327273100, + "groupIds": [ + "iBaXgbpyifSwPplm_GO5b" + ], + "frameId": null, + "roundness": null, + "boundElements": [], + "updated": 1728889112138, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 3, + "text": "vmalert", + "textAlign": "center", + "verticalAlign": "middle", + "containerId": null, + "originalText": "vmalert", + "autoResize": true, + "lineHeight": 1.2 + }, + { + "type": "rectangle", + "version": 2608, + "versionNonce": 1050127035, + "index": "a2", + "isDeleted": false, + "id": "dd52BjHfPMPRji9Tws7U-", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 774.7067312730577, + "y": 231.9532470703125, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 275.7981470513237, + "height": 39.621179787868925, + "seed": 1779959692, + "groupIds": [ + "2Lijjn3PwPQW_8KrcDmdu" + ], + "frameId": null, + "roundness": null, + "boundElements": [ + { + "id": "Bpy5by47XGKB4yS99ZkuA", + "type": "arrow" + } + ], + "updated": 1728889420961, + "link": null, + "locked": false + }, + { + "type": "rectangle", + "version": 1099, + "versionNonce": 499029243, + "index": "a6", + "isDeleted": false, + "id": "8-XFSbd6Zw96EUSJbJXZv", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 371.7434387207031, + "y": 398.50787353515625, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 240.10644531249997, + "height": 44.74725341796875, + "seed": 99322124, + "groupIds": [ + "6obQBPHIfExBKfejeLLVO" + ], + "frameId": null, + "roundness": null, + "boundElements": [ + { + "type": "arrow", + "id": "sxEhnxlbT7ldlSsmHDUHp" + } + ], + "updated": 1728889112138, + "link": null, + "locked": false + }, + { + "type": "text", + "version": 865, + "versionNonce": 316509237, + "index": "a7", + "isDeleted": false, + "id": "GUs816aggGqUSdoEsSmea", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 393.73809814453125, + "y": 410.5976257324219, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 199, + "height": 24, + "seed": 1194745268, + "groupIds": [ + "6obQBPHIfExBKfejeLLVO" + ], + "frameId": null, + "roundness": null, + "boundElements": [], + "updated": 1728889112138, + "link": null, + "locked": false, + "fontSize": 20, + "fontFamily": 3, + "text": "alertmanager:9093", + "textAlign": "center", + "verticalAlign": "top", + "containerId": null, + "originalText": "alertmanager:9093", + "autoResize": true, + "lineHeight": 1.2 + }, + { + "type": "arrow", + "version": 3377, + "versionNonce": 359177051, + "index": "a8", + "isDeleted": false, + "id": "Bpy5by47XGKB4yS99ZkuA", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 556.6860961914062, + "y": 252.95352770712083, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 202.02063508165145, + "height": 0.22881326742660235, + "seed": 357577356, + "groupIds": [], + "frameId": null, + "roundness": { + "type": 2 + }, + "boundElements": [], + "updated": 1728889420962, + "link": null, + "locked": false, + "startBinding": { + "elementId": "VgBUzo0blGR-Ijd2mQEEf", + "focus": 0.0344528515859526, + "gap": 10.57574462890625 + }, + "endBinding": { + "elementId": "dd52BjHfPMPRji9Tws7U-", + "focus": -0.039393828258510157, + "gap": 16 + }, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": "arrow", + "points": [ + [ + 0, + 0 + ], + [ + 202.02063508165145, + -0.22881326742660235 + ] + ] + }, + { + "type": "arrow", + "version": 1460, + "versionNonce": 492906299, + "index": "a9", + "isDeleted": false, + "id": "wRO0q9xKPHc8e8XPPsQWh", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 406.0439244722469, + "y": 246.6775563467225, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 161.00829839007181, + "height": 2.320722012761223, + "seed": 656189364, + "groupIds": [], + "frameId": null, + "roundness": { + "type": 2 + }, + "boundElements": [], + "updated": 1728889313672, + "link": null, + "locked": false, + "startBinding": { + "elementId": "VgBUzo0blGR-Ijd2mQEEf", + "focus": 0.13736472619498497, + "gap": 16.306295254315614 + }, + "endBinding": null, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": "arrow", + "points": [ + [ + 0, + 0 + ], + [ + -161.00829839007181, + -2.320722012761223 + ] + ] + }, + { + "type": "text", + "version": 567, + "versionNonce": 737159899, + "index": "aA", + "isDeleted": false, + "id": "RbVSa4PnOgAMtzoKb-DhW", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 552.4987182617188, + "y": 212.27996826171875, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 187.75, + "height": 95, + "seed": 1989838604, + "groupIds": [], + "frameId": null, + "roundness": null, + "boundElements": [ + { + "id": "ijEBAhsESSoR3zLPouUVM", + "type": "arrow" + } + ], + "updated": 1728889402055, + "link": null, + "locked": false, + "fontSize": 16, + "fontFamily": 3, + "text": "persist alerts state\nand recording rules\n\n\n", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "persist alerts state\nand recording rules\n\n\n", + "autoResize": true, + "lineHeight": 1.1875 + }, + { + "type": "text", + "version": 830, + "versionNonce": 1996455189, + "index": "aB", + "isDeleted": false, + "id": "ia2QzZNl_tuvfY3ymLjyJ", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 279.55224609375, + "y": 218.88568115234375, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 122, + "height": 19, + "seed": 157304972, + "groupIds": [], + "frameId": null, + "roundness": null, + "boundElements": [ + { + "type": "arrow", + "id": "wRO0q9xKPHc8e8XPPsQWh" + } + ], + "updated": 1728889440112, + "link": null, + "locked": false, + "fontSize": 16, + "fontFamily": 3, + "text": "execute rules", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "execute rules", + "autoResize": true, + "lineHeight": 1.1875 + }, + { + "type": "arrow", + "version": 1476, + "versionNonce": 1814378875, + "index": "aC", + "isDeleted": false, + "id": "sxEhnxlbT7ldlSsmHDUHp", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 484.18669893674246, + "y": 302.3424013553929, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 1.0484739253853945, + "height": 84.72775855671654, + "seed": 1818348300, + "groupIds": [], + "frameId": null, + "roundness": { + "type": 2 + }, + "boundElements": [], + "updated": 1728889265678, + "link": null, + "locked": false, + "startBinding": { + "elementId": "VgBUzo0blGR-Ijd2mQEEf", + "focus": 0.010768924644894236, + "gap": 14.650750964767894 + }, + "endBinding": { + "elementId": "8-XFSbd6Zw96EUSJbJXZv", + "focus": -0.051051952959743775, + "gap": 11.437713623046818 + }, + "lastCommittedPoint": null, + "startArrowhead": null, + "endArrowhead": "arrow", + "points": [ + [ + 0, + 0 + ], + [ + 1.0484739253853945, + 84.72775855671654 + ] + ] + }, + { + "type": "text", + "version": 631, + "versionNonce": 1909410773, + "index": "aD", + "isDeleted": false, + "id": "E9Run6wCm2chQ6JHrmc_y", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 504.27996826171875, + "y": 322.13031005859375, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 122, + "height": 38, + "seed": 1836541708, + "groupIds": [], + "frameId": null, + "roundness": null, + "boundElements": [ + { + "type": "arrow", + "id": "sxEhnxlbT7ldlSsmHDUHp" + } + ], + "updated": 1728889430719, + "link": null, + "locked": false, + "fontSize": 16, + "fontFamily": 3, + "text": "send alert \nnotifications", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "send alert \nnotifications", + "autoResize": true, + "lineHeight": 1.1875 + }, + { + "type": "text", + "version": 579, + "versionNonce": 326648123, + "index": "aE", + "isDeleted": false, + "id": "ff5OkfgmkKLifS13_TFj3", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 591.5895843505859, + "y": 269.2361297607422, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 121.875, + "height": 19, + "seed": 264004620, + "groupIds": [], + "frameId": null, + "roundness": null, + "boundElements": [ + { + "type": "arrow", + "id": "wRO0q9xKPHc8e8XPPsQWh" + } + ], + "updated": 1728889436228, + "link": null, + "locked": false, + "fontSize": 16, + "fontFamily": 3, + "text": "restore state", + "textAlign": "left", + "verticalAlign": "top", + "containerId": null, + "originalText": "restore state", + "autoResize": true, + "lineHeight": 1.1875 + }, + { + "type": "text", + "version": 1141, + "versionNonce": 39140603, + "index": "aG", + "isDeleted": false, + "id": "J2AqHIHYjG3cvxrBLonQW", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 782.2813415527344, + "y": 238.312045541553, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 254.41375732421875, + "height": 26.05968577236269, + "seed": 254079515, + "groupIds": [ + "fw8b83Mw6tGXQ80jfC5Jx" + ], + "frameId": null, + "roundness": null, + "boundElements": [], + "updated": 1728889417069, + "link": null, + "locked": false, + "fontSize": 21.716404810302244, + "fontFamily": 3, + "text": "victoriametrics:8428", + "textAlign": "center", + "verticalAlign": "top", + "containerId": null, + "originalText": "victoriametrics:8428", + "autoResize": true, + "lineHeight": 1.2 + }, + { + "type": "rectangle", + "version": 2824, + "versionNonce": 1550880827, + "index": "aH", + "isDeleted": false, + "id": "Whj4hd3Al6CbvGs7cQuWk", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": -11.824915810818197, + "y": 223.79106415879994, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 248.85674080132372, + "height": 40.562586037868925, + "seed": 1519267323, + "groupIds": [ + "skPAIqL9ijNA0WE5GV8Gv" + ], + "frameId": null, + "roundness": null, + "boundElements": [], + "updated": 1728889342561, + "link": null, + "locked": false + }, + { + "type": "text", + "version": 1290, + "versionNonce": 1222168987, + "index": "aI", + "isDeleted": false, + "id": "NJgvtn8_Kzy1quxMqyfAr", + "fillStyle": "hachure", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "angle": 0, + "x": 8.194007518174999, + "y": 231.4272063800404, + "strokeColor": "#000000", + "backgroundColor": "transparent", + "width": 216.25169372558594, + "height": 26.05968577236269, + "seed": 1311553179, + "groupIds": [ + "3JfeRMxXtVafxucZgxKNy" + ], + "frameId": null, + "roundness": null, + "boundElements": [], + "updated": 1728889339478, + "link": null, + "locked": false, + "fontSize": 21.716404810302244, + "fontFamily": 3, + "text": "victorialogs:9428", + "textAlign": "center", + "verticalAlign": "top", + "containerId": null, + "originalText": "victorialogs:9428", + "autoResize": true, + "lineHeight": 1.2 + }, + { + "id": "ijEBAhsESSoR3zLPouUVM", + "type": "arrow", + "x": 754.5486716336245, + "y": 263.63184005775634, + "width": 200.78701391878076, + "height": 0.03213913002196023, + "angle": 0, + "strokeColor": "#1e1e1e", + "backgroundColor": "transparent", + "fillStyle": "solid", + "strokeWidth": 1, + "strokeStyle": "solid", + "roughness": 0, + "opacity": 100, + "groupIds": [], + "frameId": null, + "index": "aJ", + "roundness": { + "type": 2 + }, + "seed": 1284919637, + "version": 349, + "versionNonce": 186313781, + "isDeleted": false, + "boundElements": null, + "updated": 1728889427809, + "link": null, + "locked": false, + "points": [ + [ + 0, + 0 + ], + [ + -200.78701391878076, + -0.03213913002196023 + ] + ], + "lastCommittedPoint": null, + "startBinding": { + "elementId": "RbVSa4PnOgAMtzoKb-DhW", + "focus": -0.0807019799085118, + "gap": 14.299953371905758, + "fixedPoint": null + }, + "endBinding": null, + "startArrowhead": null, + "endArrowhead": "arrow", + "elbowed": false + } + ], + "appState": { + "gridSize": 20, + "gridStep": 5, + "gridModeEnabled": false, + "viewBackgroundColor": "#ffffff" + }, + "files": {} +} \ No newline at end of file diff --git a/docs/VictoriaLogs/vmalert_victorialogs.webp b/docs/VictoriaLogs/vmalert_victorialogs.webp new file mode 100644 index 000000000..f199a054e Binary files /dev/null and b/docs/VictoriaLogs/vmalert_victorialogs.webp differ diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index 374a88eca..2b9d9b811 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -18,6 +18,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip +* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): support [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) as a datasource. See [this doc](https://docs.victoriametrics.com/victorialogs/vmalert/) for details. * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): support scraping from Kubernetes Native Sidecars. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7287). * FEATURE: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): add a separate cache type for storing sparse entries when performing large index scans. This significantly reduces memory usage when applying [downsampling filters](https://docs.victoriametrics.com/#downsampling) and [retention filters](https://docs.victoriametrics.com/#retention-filters) during background merge. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7182) for the details. diff --git a/docs/vmalert.md b/docs/vmalert.md index cce31e75a..94e03a03b 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -10,7 +10,7 @@ aliases: --- `vmalert` executes a list of the given [alerting](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) or [recording](https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/) -rules against configured `-datasource.url` compatible with Prometheus HTTP API. For sending alerting notifications +rules against configured `-datasource.url`. For sending alerting notifications `vmalert` relies on [Alertmanager](https://github.com/prometheus/alertmanager) configured via `-notifier.url` flag. Recording rules results are persisted via [remote write](https://prometheus.io/docs/prometheus/latest/storage/#remote-storage-integrations) protocol and require `-remoteWrite.url` to be configured. @@ -31,9 +31,8 @@ please refer to the [VictoriaMetrics Cloud documentation](https://docs.victoriam ## Features -* Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) TSDB; -* VictoriaMetrics [MetricsQL](https://docs.victoriametrics.com/metricsql/) - support and expressions validation; +* Integration with [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) and [MetricsQL](https://docs.victoriametrics.com/metricsql/); +* Integration with [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) and [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/). See [this doc](https://docs.victoriametrics.com/victorialogs/vmalert/); * Prometheus [alerting rules definition format](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/#defining-alerting-rules) support; * Integration with [Alertmanager](https://github.com/prometheus/alertmanager) starting from [Alertmanager v0.16.0-alpha](https://github.com/prometheus/alertmanager/releases/tag/v0.16.0-alpha.0); @@ -458,7 +457,7 @@ In this example, `-external.alert.source` will lead to Grafana's Explore page wi and time range will be selected starting from `"from":"{{ .ActiveAt.UnixMilli }}"` when alert became active. In addition to `source` link, some extra links could be added to alert's [annotations](https://docs.victoriametrics.com/vmalert/#alerting-rules) -field. See [how we use them](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/839596c00df123c639d1244b28ee8137dfc9609c/deployment/docker/alerts-cluster.yml#L43) +field. See [how we use them](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/839596c00df123c639d1244b28ee8137dfc9609c/deployment/docker/rules/alerts-cluster.yml#L43) to link alerting rule and the corresponding panel on Grafana dashboard. ### Multitenancy @@ -728,6 +727,10 @@ implements [Graphite Render API](https://graphite.readthedocs.io/en/stable/rende When using vmalert with both `graphite` and `prometheus` rules configured against cluster version of VM do not forget to set `-datasource.appendTypePrefix` flag to `true`, so vmalert can adjust URL prefix automatically based on the query type. +## VictoriaLogs + +vmalert supports [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) as a datasource for writing alerting and recording rules using [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/). See [this doc](https://docs.victoriametrics.com/victorialogs/vmalert/) for details. + ## Rules backfilling vmalert supports alerting and recording rules backfilling (aka `replay`). In replay mode vmalert @@ -1323,7 +1326,7 @@ The shortlist of configuration flags is the following: -remoteRead.bearerTokenFile string Optional path to bearer token file to use for -remoteRead.url. -remoteRead.disablePathAppend - Whether to disable automatic appending of '/api/v1/query' path to the configured -datasource.url and -remoteRead.url + Whether to disable automatic appending of '/api/v1/query' or '/select/logsql/stats_query' path to the configured -datasource.url and -remoteRead.url -remoteRead.headers string Optional HTTP headers to send with each request to the corresponding -remoteRead.url. For example, -remoteRead.headers='My-Auth:foobar' would send 'My-Auth: foobar' HTTP header with every request to the corresponding -remoteRead.url. Multiple headers must be delimited by '^^': -remoteRead.headers='header1:value1^^header2:value2' -remoteRead.idleConnTimeout duration @@ -1356,7 +1359,7 @@ The shortlist of configuration flags is the following: Optional path to client-side TLS certificate key to use when connecting to -remoteRead.url -remoteRead.tlsServerName string Optional TLS server name to use for connections to -remoteRead.url. By default, the server name from -remoteRead.url is used - -remoteRead.url vmalert + -remoteRead.url string Optional URL to datasource compatible with Prometheus HTTP API. It can be single node VictoriaMetrics or vmselect.Remote read is used to restore alerts state.This configuration makes sense only if vmalert was configured with `remoteWrite.url` before and has been successfully persisted its state. Supports address in the form of IP address with a port (e.g., http://127.0.0.1:8428) or DNS SRV record. See also '-remoteRead.disablePathAppend', '-remoteRead.showURL'. -remoteWrite.basicAuth.password string Optional basic auth password for -remoteWrite.url @@ -1441,6 +1444,8 @@ The shortlist of configuration flags is the following: all files with prefix rule_ in folder dir. Supports an array of values separated by comma or specified via multiple flags. Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces. + -rule.defaultRuleType string + Default type for rule expressions, can be overridden by type parameter inside the rule group. Supported values: "graphite", "prometheus" and "vlogs". (default: "prometheus") -rule.evalDelay time Adjustment of the time parameter for rule evaluation requests to compensate intentional data delay from the datasource.Normally, should be equal to `-search.latencyOffset` (cmd-line flag configured for VictoriaMetrics single-node or vmselect). (default 30s) -rule.maxResolveDuration duration diff --git a/docs/vmauth.md b/docs/vmauth.md index f9e2c5614..9a478d3db 100644 --- a/docs/vmauth.md +++ b/docs/vmauth.md @@ -1059,7 +1059,7 @@ See also [security recommendations](#security). `vmauth` exports various metrics in Prometheus exposition format at `http://vmauth-host:8427/metrics` page. It is recommended setting up regular scraping of this page either via [vmagent](https://docs.victoriametrics.com/vmagent/) or via Prometheus-compatible scraper, so the exported metrics could be analyzed later. -Use the official [Grafana dashboard](https://grafana.com/grafana/dashboards/21394) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) +Use the official [Grafana dashboard](https://grafana.com/grafana/dashboards/21394) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/rules/alerts-vmauth.yml) for `vmauth` monitoring. If you use Google Cloud Managed Prometheus for scraping metrics from VictoriaMetrics components, then pass `-metrics.exposeMetadata` diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 7dc787861..5691487fa 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -786,6 +786,38 @@ func ParseStatsQuery(s string) (*Query, error) { return q, nil } +// ContainAnyTimeFilter returns true when query contains a global time filter. +func (q *Query) ContainAnyTimeFilter() bool { + if hasTimeFilter(q.f) { + return true + } + for _, p := range q.pipes { + if pf, ok := p.(*pipeFilter); ok { + if hasTimeFilter(pf.f) { + return true + } + } + } + return false +} + +func hasTimeFilter(f filter) bool { + if f == nil { + return false + } + switch t := f.(type) { + case *filterAnd: + for _, subF := range t.filters { + if hasTimeFilter(subF) { + return true + } + } + case *filterTime: + return true + } + return false +} + // ParseQueryAtTimestamp parses s in the context of the given timestamp. // // E.g. _time:duration filters are adjusted according to the provided timestamp as _time:[timestamp-duration, duration]. diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 642b88364..9cbcce2b8 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2384,3 +2384,28 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) { // format to the remaining metric field f(`* | by (x) count() y | format 'foo' as y`) } + +func TestHasTimeFilter(t *testing.T) { + f := func(qStr string, expected bool) { + t.Helper() + + q, err := ParseStatsQuery(qStr) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", qStr, err) + } + if q.ContainAnyTimeFilter() != expected { + t.Fatalf("unexpected result for hasTimeFilter(%q); want %v", qStr, expected) + } + } + + f(`* | count()`, false) + f(`error OR _time:5m | count()`, false) + f(`(_time: 5m AND error) OR (_time: 5m AND warn) | count()`, false) + f(`* | error OR _time:5m | count()`, false) + + f(`_time:5m | count()`, true) + f(`_time:2023-04-25T22:45:59Z | count()`, true) + f(`error AND _time:5m | count()`, true) + f(`error AND (_time: 5m AND warn) | count()`, true) + f(`* | error AND _time:5m | count()`, true) +}