lib/promrelabel: add support for a list of series selectors at IfExpression

This makes possible specifying a list of series selectors at the following places:

- Inside `if` option at relabeling rules
- Inside `match` option at stream aggregation rules

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4635
This commit is contained in:
Aliaksandr Valialkin 2023-07-24 16:10:47 -07:00
parent 52c13e9515
commit 62651570bb
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 196 additions and 19 deletions

View file

@ -514,6 +514,7 @@ The following articles contain useful information about Prometheus relabeling:
* An optional `if` filter can be used for conditional relabeling. The `if` filter may contain * An optional `if` filter can be used for conditional relabeling. The `if` filter may contain
arbitrary [time series selector](https://docs.victoriametrics.com/keyConcepts.html#filtering). arbitrary [time series selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
The `action` is performed only for [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples), which match the provided `if` filter.
For example, the following relabeling rule keeps metrics matching `foo{bar="baz"}` series selector, while dropping the rest of metrics: For example, the following relabeling rule keeps metrics matching `foo{bar="baz"}` series selector, while dropping the rest of metrics:
```yaml ```yaml
@ -529,6 +530,18 @@ The following articles contain useful information about Prometheus relabeling:
regex: 'foo;baz' regex: 'foo;baz'
``` ```
The `if` option may contain more than one filter. In this case the `action` is performed if at least a single filter
matches the given [sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
For example, the following relabeling rule adds `foo="bar"` label to samples with `job="foo"` or `instance="bar"` labels:
```yaml
- target_label: foo
replacement: bar
if:
- '{job="foo"}'
- '{instance="bar"}'
```
* The `regex` value can be split into multiple lines for improved readability and maintainability. * The `regex` value can be split into multiple lines for improved readability and maintainability.
These lines are automatically joined with `|` char when parsed. For example, the following configs are equivalent: These lines are automatically joined with `|` char when parsed. For example, the following configs are equivalent:

View file

@ -34,8 +34,7 @@ Previously only aggregated samples were written to the storage by default.
The previous behavior can be restored in the following ways: The previous behavior can be restored in the following ways:
- by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics; - by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics;
- by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`. - by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`.**
**
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html). * SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved). * SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
@ -51,6 +50,8 @@ The previous behavior can be restored in the following ways:
Thanks to @lujiajing1126 for the initial idea and [implementation](https://github.com/VictoriaMetrics/metricsql/pull/13). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4025). Thanks to @lujiajing1126 for the initial idea and [implementation](https://github.com/VictoriaMetrics/metricsql/pull/13). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4025).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page with the list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598) and [these docs](https://docs.victoriametrics.com/#active-queries). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page with the list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598) and [these docs](https://docs.victoriametrics.com/#active-queries).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow specifying a list of [series selectors](https://docs.victoriametrics.com/keyConcepts.html#filtering) inside `if` option of relabeling rules. The corresponding relabeling rule is executed when at least a single series selector matches. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): allow specifying a list of [series selectors](https://docs.victoriametrics.com/keyConcepts.html#filtering) inside `match` option of [stream aggregation configs](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). The input sample is aggregated when at least a single series selector matches. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4635).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): preserve input samples, which match zero `match` options from the [configured aggregations](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). Previously all the input samples were dropped by default, so only the aggregated samples are written to the output storage. The previous behavior can be restored by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics or by passing `-remoteWrite.streamAggr.dropInput` command-line flag to `vmagent`. * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): preserve input samples, which match zero `match` options from the [configured aggregations](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). Previously all the input samples were dropped by default, so only the aggregated samples are written to the output storage. The previous behavior can be restored by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics or by passing `-remoteWrite.streamAggr.dropInput` command-line flag to `vmagent`.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442). * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442).

View file

@ -32,7 +32,7 @@ These flags must point to a file containing [stream aggregation config](#stream-
By default, the following data is written to the storage when stream aggregation is enabled: By default, the following data is written to the storage when stream aggregation is enabled:
- the aggregated samples; - the aggregated samples;
- the raw input samples, which didn't match all the `match` options in the provided [config](#stream-aggregation-config). - the raw input samples, which didn't match any `match` option in the provided [config](#stream-aggregation-config).
This behaviour can be changed via the following command-line flags: This behaviour can be changed via the following command-line flags:
@ -240,7 +240,9 @@ per each incoming request, then the following [stream aggregation config](#strea
can be used for calculating 50th and 99th percentiles for these metrics every 30 seconds: can be used for calculating 50th and 99th percentiles for these metrics every 30 seconds:
```yaml ```yaml
- match: '{__name__=~"request_duration_seconds|response_size_bytes"}' - match:
- request_duration_seconds
- response_size_bytes
interval: 30s interval: 30s
outputs: ["quantiles(0.50, 0.99)"] outputs: ["quantiles(0.50, 0.99)"]
``` ```
@ -269,7 +271,9 @@ can be used for calculating [VictoriaMetrics histogram buckets](https://valyala.
for these metrics every 60 seconds: for these metrics every 60 seconds:
```yaml ```yaml
- match: '{__name__=~"request_duration_seconds|response_size_bytes"}' - match:
- request_duration_seconds
- response_size_bytes
interval: 60s interval: 60s
outputs: [histogram_bucket] outputs: [histogram_bucket]
``` ```
@ -600,10 +604,15 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# It can contain arbitrary Prometheus series selector # It can contain arbitrary Prometheus series selector
# according to https://docs.victoriametrics.com/keyConcepts.html#filtering . # according to https://docs.victoriametrics.com/keyConcepts.html#filtering .
# If match isn't set, then all the incoming samples are aggregated. # If match isn't set, then all the incoming samples are aggregated.
#
# match also can contain a list of series selectors. Then the incoming samples are aggregated
# if they match at least a single series selector.
#
- match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}' - match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}'
# interval is the interval for the aggregation. # interval is the interval for the aggregation.
# The aggregated stats is sent to remote storage once per interval. # The aggregated stats is sent to remote storage once per interval.
#
interval: 1m interval: 1m
# staleness_interval defines an interval after which the series state will be reset if no samples have been sent during it. # staleness_interval defines an interval after which the series state will be reset if no samples have been sent during it.
@ -614,30 +623,36 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value. # Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value.
# By default, is equal to x2 of the `interval` field. # By default, is equal to x2 of the `interval` field.
# The parameter is only relevant for outputs: total, increase and histogram_bucket. # The parameter is only relevant for outputs: total, increase and histogram_bucket.
#
# staleness_interval: 2m # staleness_interval: 2m
# without is an optional list of labels, which must be removed from the output aggregation. # without is an optional list of labels, which must be removed from the output aggregation.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
#
without: [instance] without: [instance]
# by is an optional list of labels, which must be preserved in the output aggregation. # by is an optional list of labels, which must be preserved in the output aggregation.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
#
# by: [job, vmrange] # by: [job, vmrange]
# outputs is the list of aggregations to perform on the input data. # outputs is the list of aggregations to perform on the input data.
# See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs # See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs
#
outputs: [total] outputs: [total]
# input_relabel_configs is an optional relabeling rules, # input_relabel_configs is an optional relabeling rules,
# which are applied to the incoming samples after they pass the match filter # which are applied to the incoming samples after they pass the match filter
# and before being aggregated. # and before being aggregated.
# See https://docs.victoriametrics.com/stream-aggregation.html#relabeling # See https://docs.victoriametrics.com/stream-aggregation.html#relabeling
#
input_relabel_configs: input_relabel_configs:
- target_label: vmaggr - target_label: vmaggr
replacement: before replacement: before
# output_relabel_configs is an optional relabeling rules, # output_relabel_configs is an optional relabeling rules,
# which are applied to the aggregated output metrics. # which are applied to the aggregated output metrics.
#
output_relabel_configs: output_relabel_configs:
- target_label: vmaggr - target_label: vmaggr
replacement: after replacement: after

View file

@ -525,6 +525,7 @@ The following articles contain useful information about Prometheus relabeling:
* An optional `if` filter can be used for conditional relabeling. The `if` filter may contain * An optional `if` filter can be used for conditional relabeling. The `if` filter may contain
arbitrary [time series selector](https://docs.victoriametrics.com/keyConcepts.html#filtering). arbitrary [time series selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
The `action` is performed only for [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples), which match the provided `if` filter.
For example, the following relabeling rule keeps metrics matching `foo{bar="baz"}` series selector, while dropping the rest of metrics: For example, the following relabeling rule keeps metrics matching `foo{bar="baz"}` series selector, while dropping the rest of metrics:
```yaml ```yaml
@ -540,6 +541,18 @@ The following articles contain useful information about Prometheus relabeling:
regex: 'foo;baz' regex: 'foo;baz'
``` ```
The `if` option may contain more than one filter. In this case the `action` is performed if at least a single filter
matches the given [sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
For example, the following relabeling rule adds `foo="bar"` label to samples with `job="foo"` or `instance="bar"` labels:
```yaml
- target_label: foo
replacement: bar
if:
- '{job="foo"}'
- '{instance="bar"}'
```
* The `regex` value can be split into multiple lines for improved readability and maintainability. * The `regex` value can be split into multiple lines for improved readability and maintainability.
These lines are automatically joined with `|` char when parsed. For example, the following configs are equivalent: These lines are automatically joined with `|` char when parsed. For example, the following configs are equivalent:

View file

@ -10,24 +10,155 @@ import (
"github.com/VictoriaMetrics/metricsql" "github.com/VictoriaMetrics/metricsql"
) )
// IfExpression represents `if` expression at RelabelConfig. // IfExpression represents PromQL-like label filters such as `metric_name{filters...}`.
// //
// The `if` expression can contain arbitrary PromQL-like label filters such as `metric_name{filters...}` // It may contain either a single filter or multiple filters, which are executed with `or` operator.
//
// Examples:
//
// if: 'foo{bar="baz"}'
//
// if:
// - 'foo{bar="baz"}'
// - '{x=~"y"}'
type IfExpression struct { type IfExpression struct {
s string ies []*ifExpression
lfss [][]*labelFilter }
// Match returns true if labels match at least a single label filter inside ie.
//
// Match returns true for empty ie.
func (ie *IfExpression) Match(labels []prompbmarshal.Label) bool {
if ie == nil || len(ie.ies) == 0 {
return true
}
for _, ie := range ie.ies {
if ie.Match(labels) {
return true
}
}
return false
}
// Parse parses ie from s.
func (ie *IfExpression) Parse(s string) error {
ieLocal, err := newIfExpression(s)
if err != nil {
return err
}
ie.ies = []*ifExpression{ieLocal}
return nil
}
// UnmarshalJSON unmarshals ie from JSON data.
func (ie *IfExpression) UnmarshalJSON(data []byte) error {
var v interface{}
if err := json.Unmarshal(data, &v); err != nil {
return err
}
return ie.unmarshalFromInterface(v)
}
// MarshalJSON marshals ie to JSON.
func (ie *IfExpression) MarshalJSON() ([]byte, error) {
if ie == nil || len(ie.ies) == 0 {
return nil, nil
}
if len(ie.ies) == 1 {
return json.Marshal(ie.ies[0])
}
return json.Marshal(ie.ies)
}
// UnmarshalYAML unmarshals ie from YAML passed to f.
func (ie *IfExpression) UnmarshalYAML(f func(interface{}) error) error {
var v interface{}
if err := f(&v); err != nil {
return fmt.Errorf("cannot unmarshal `match` option: %w", err)
}
return ie.unmarshalFromInterface(v)
}
func (ie *IfExpression) unmarshalFromInterface(v interface{}) error {
logger.Infof("DEBUG: unmarshaling ifExpr from %#v", v)
ies := ie.ies[:0]
switch t := v.(type) {
case string:
ieLocal, err := newIfExpression(t)
if err != nil {
return fmt.Errorf("unexpected `match` option: %w", err)
}
ies = append(ies, ieLocal)
logger.Infof("DEBUG: unmarshaled ifExpr from %#v to %s", t, ieLocal)
case []interface{}:
for _, x := range t {
s, ok := x.(string)
if !ok {
return fmt.Errorf("unexpected `match` item type; got %#v; want string", x)
}
ieLocal, err := newIfExpression(s)
if err != nil {
return fmt.Errorf("unexpected `match` item: %w", err)
}
ies = append(ies, ieLocal)
}
logger.Infof("DEBUG: unmarshaled ifExpr from %#v to %s", t, ies)
default:
return fmt.Errorf("unexpected `match` type; got %#v; want string or an array of strings", t)
}
ie.ies = ies
return nil
}
// MarshalYAML marshals ie to YAML
func (ie *IfExpression) MarshalYAML() (interface{}, error) {
if ie == nil || len(ie.ies) == 0 {
return nil, nil
}
if len(ie.ies) == 1 {
return ie.ies[0].MarshalYAML()
}
a := make([]string, 0, len(ie.ies))
for _, ieLocal := range ie.ies {
v, err := ieLocal.MarshalYAML()
if err != nil {
logger.Panicf("BUG: unexpected error: %s", err)
}
s := v.(string)
a = append(a, s)
}
return a, nil
}
func newIfExpression(s string) (*ifExpression, error) {
var ie ifExpression
if err := ie.Parse(s); err != nil {
return nil, err
}
return &ie, nil
} }
// String returns string representation of ie. // String returns string representation of ie.
func (ie *IfExpression) String() string { func (ie *IfExpression) String() string {
if ie == nil {
return "[]"
}
return fmt.Sprintf("%s", ie.ies)
}
type ifExpression struct {
s string
lfss [][]*labelFilter
}
func (ie *ifExpression) String() string {
if ie == nil { if ie == nil {
return "" return ""
} }
return ie.s return ie.s
} }
// Parse parses `if` expression from s and stores it to ie. func (ie *ifExpression) Parse(s string) error {
func (ie *IfExpression) Parse(s string) error {
expr, err := metricsql.Parse(s) expr, err := metricsql.Parse(s)
if err != nil { if err != nil {
return err return err
@ -46,7 +177,7 @@ func (ie *IfExpression) Parse(s string) error {
} }
// UnmarshalJSON unmarshals ie from JSON data. // UnmarshalJSON unmarshals ie from JSON data.
func (ie *IfExpression) UnmarshalJSON(data []byte) error { func (ie *ifExpression) UnmarshalJSON(data []byte) error {
var s string var s string
if err := json.Unmarshal(data, &s); err != nil { if err := json.Unmarshal(data, &s); err != nil {
return err return err
@ -55,12 +186,12 @@ func (ie *IfExpression) UnmarshalJSON(data []byte) error {
} }
// MarshalJSON marshals ie to JSON. // MarshalJSON marshals ie to JSON.
func (ie *IfExpression) MarshalJSON() ([]byte, error) { func (ie *ifExpression) MarshalJSON() ([]byte, error) {
return json.Marshal(ie.s) return json.Marshal(ie.s)
} }
// UnmarshalYAML unmarshals ie from YAML passed to f. // UnmarshalYAML unmarshals ie from YAML passed to f.
func (ie *IfExpression) UnmarshalYAML(f func(interface{}) error) error { func (ie *ifExpression) UnmarshalYAML(f func(interface{}) error) error {
var s string var s string
if err := f(&s); err != nil { if err := f(&s); err != nil {
return fmt.Errorf("cannot unmarshal `if` option: %w", err) return fmt.Errorf("cannot unmarshal `if` option: %w", err)
@ -72,12 +203,12 @@ func (ie *IfExpression) UnmarshalYAML(f func(interface{}) error) error {
} }
// MarshalYAML marshals ie to YAML. // MarshalYAML marshals ie to YAML.
func (ie *IfExpression) MarshalYAML() (interface{}, error) { func (ie *ifExpression) MarshalYAML() (interface{}, error) {
return ie.s, nil return ie.s, nil
} }
// Match returns true if ie matches the given labels. // Match returns true if ie matches the given labels.
func (ie *IfExpression) Match(labels []prompbmarshal.Label) bool { func (ie *ifExpression) Match(labels []prompbmarshal.Label) bool {
if ie == nil { if ie == nil {
return true return true
} }

View file

@ -84,7 +84,7 @@ func TestIfExpressionUnmarshalFailure(t *testing.T) {
} }
f(`{`) f(`{`)
f(`{x:y}`) f(`{x:y}`)
f(`[]`) f(`[1]`)
f(`"{"`) f(`"{"`)
f(`'{'`) f(`'{'`)
f(`foo{bar`) f(`foo{bar`)
@ -122,6 +122,8 @@ func TestIfExpressionUnmarshalSuccess(t *testing.T) {
f(`'{a="b", c!="d", e=~"g", h!~"d"}'`) f(`'{a="b", c!="d", e=~"g", h!~"d"}'`)
f(`foo{bar="zs",a=~"b|c"}`) f(`foo{bar="zs",a=~"b|c"}`)
f(`foo{z="y" or bar="zs",a=~"b|c"}`) f(`foo{z="y" or bar="zs",a=~"b|c"}`)
f(`- foo
- bar{baz="abc"}`)
} }
func TestIfExpressionMatch(t *testing.T) { func TestIfExpressionMatch(t *testing.T) {

View file

@ -449,7 +449,7 @@ func TestParsedRelabelConfigsApply(t *testing.T) {
t.Run("keep-if-hit", func(t *testing.T) { t.Run("keep-if-hit", func(t *testing.T) {
f(` f(`
- action: keep - action: keep
if: '{foo="yyy"}' if: ['foobar', '{foo="yyy"}', '{a="b"}']
`, `{foo="yyy"}`, false, `{foo="yyy"}`) `, `{foo="yyy"}`, false, `{foo="yyy"}`)
}) })
t.Run("keep-hit", func(t *testing.T) { t.Run("keep-hit", func(t *testing.T) {

View file

@ -440,7 +440,9 @@ foo{abc="456",de="fg"} 8
- interval: 1m - interval: 1m
by: [abc] by: [abc]
outputs: [count_samples, sum_samples, count_series] outputs: [count_samples, sum_samples, count_series]
match: 'foo{abc=~".+"}' match:
- foo{abc=~".+"}
- '{non_existing_label!=""}'
`, ` `, `
foo{abc="123"} 4 foo{abc="123"} 4
bar 5 bar 5