From 6e43664e24af0e5c42b788060e1746622dcecc0a Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Mon, 24 Jul 2023 16:10:47 -0700
Subject: [PATCH] lib/promrelabel: add support for a list of series selectors
 at IfExpression

This makes possible specifying a list of series selectors at the following places:

- Inside `if` option at relabeling rules
- Inside `match` option at stream aggregation rules

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4635
---
 app/vmagent/README.md                 |  13 +++
 docs/CHANGELOG.md                     |   5 +-
 docs/stream-aggregation.md            |  21 +++-
 docs/vmagent.md                       |  13 +++
 lib/promrelabel/if_expression.go      | 153 ++++++++++++++++++++++++--
 lib/promrelabel/if_expression_test.go |   4 +-
 lib/promrelabel/relabel_test.go       |   2 +-
 lib/streamaggr/streamaggr_test.go     |   4 +-
 8 files changed, 196 insertions(+), 19 deletions(-)

diff --git a/app/vmagent/README.md b/app/vmagent/README.md
index bac17b84f4..fe52a83d3a 100644
--- a/app/vmagent/README.md
+++ b/app/vmagent/README.md
@@ -514,6 +514,7 @@ The following articles contain useful information about Prometheus relabeling:
 
 * An optional `if` filter can be used for conditional relabeling. The `if` filter may contain
   arbitrary [time series selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
+  The `action` is performed only for [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples), which match the provided `if` filter.
   For example, the following relabeling rule keeps metrics matching `foo{bar="baz"}` series selector, while dropping the rest of metrics:
 
   ```yaml
@@ -529,6 +530,18 @@ The following articles contain useful information about Prometheus relabeling:
     regex: 'foo;baz'
   ```
 
+  The `if` option may contain more than one filter. In this case the `action` is performed if at least a single filter
+  matches the given [sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
+  For example, the following relabeling rule adds `foo="bar"` label to samples with `job="foo"` or `instance="bar"` labels:
+
+  ```yaml
+  - target_label: foo
+    replacement: bar
+    if:
+    - '{job="foo"}'
+    - '{instance="bar"}'
+  ```
+
 * The `regex` value can be split into multiple lines for improved readability and maintainability.
   These lines are automatically joined with `|` char when parsed. For example, the following configs are equivalent:
 
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 6651ffc537..7c7d4e11c0 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -34,8 +34,7 @@ Previously only aggregated samples were written to the storage by default.
 The previous behavior can be restored in the following ways:
 
 - by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics;
-- by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`.
-**
+- by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`.**
 
 * SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
 * SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
@@ -51,6 +50,8 @@ The previous behavior can be restored in the following ways:
   Thanks to @lujiajing1126 for the initial idea and [implementation](https://github.com/VictoriaMetrics/metricsql/pull/13). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4025).
 * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page with the list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598) and [these docs](https://docs.victoriametrics.com/#active-queries).
 * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details.
+* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow specifying a list of [series selectors](https://docs.victoriametrics.com/keyConcepts.html#filtering) inside `if` option of relabeling rules. The corresponding relabeling rule is executed when at least a single series selector matches. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling-enhancements).
+* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): allow specifying a list of [series selectors](https://docs.victoriametrics.com/keyConcepts.html#filtering) inside `match` option of [stream aggregation configs](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). The input sample is aggregated when at least a single series selector matches. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4635).
 * FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): preserve input samples, which match zero `match` options from the [configured aggregations](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). Previously all the input samples were dropped by default, so only the aggregated samples are written to the output storage. The previous behavior can be restored by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics or by passing `-remoteWrite.streamAggr.dropInput` command-line flag to `vmagent`.
 * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
 * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442).
diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md
index c281e74e63..8f9becf0ca 100644
--- a/docs/stream-aggregation.md
+++ b/docs/stream-aggregation.md
@@ -32,7 +32,7 @@ These flags must point to a file containing [stream aggregation config](#stream-
 By default, the following data is written to the storage when stream aggregation is enabled:
 
 - the aggregated samples;
-- the raw input samples, which didn't match all the `match` options in the provided [config](#stream-aggregation-config).
+- the raw input samples, which didn't match any `match` option in the provided [config](#stream-aggregation-config).
 
 This behaviour can be changed via the following command-line flags:
 
@@ -240,7 +240,9 @@ per each incoming request, then the following [stream aggregation config](#strea
 can be used for calculating 50th and 99th percentiles for these metrics every 30 seconds:
 
 ```yaml
-- match: '{__name__=~"request_duration_seconds|response_size_bytes"}'
+- match:
+  - request_duration_seconds
+  - response_size_bytes
   interval: 30s
   outputs: ["quantiles(0.50, 0.99)"]
 ```
@@ -269,7 +271,9 @@ can be used for calculating [VictoriaMetrics histogram buckets](https://valyala.
 for these metrics every 60 seconds:
 
 ```yaml
-- match: '{__name__=~"request_duration_seconds|response_size_bytes"}'
+- match:
+  - request_duration_seconds
+  - response_size_bytes
   interval: 60s
   outputs: [histogram_bucket]
 ```
@@ -600,10 +604,15 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
   # It can contain arbitrary Prometheus series selector
   # according to https://docs.victoriametrics.com/keyConcepts.html#filtering .
   # If match isn't set, then all the incoming samples are aggregated.
+  #
+  # match also can contain a list of series selectors. Then the incoming samples are aggregated
+  # if they match at least a single series selector.
+  #
 - match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}'
 
   # interval is the interval for the aggregation.
   # The aggregated stats is sent to remote storage once per interval.
+  #
   interval: 1m
 
   # staleness_interval defines an interval after which the series state will be reset if no samples have been sent during it.
@@ -614,30 +623,36 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
   # Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value.
   # By default, is equal to x2 of the `interval` field.
   # The parameter is only relevant for outputs: total, increase and histogram_bucket.
+  #
   # staleness_interval: 2m
 
   # without is an optional list of labels, which must be removed from the output aggregation.
   # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
+  #
   without: [instance]
 
   # by is an optional list of labels, which must be preserved in the output aggregation.
   # See https://docs.victoriametrics.com/stream-aggregation.html#aggregating-by-labels
+  #
   # by: [job, vmrange]
 
   # outputs is the list of aggregations to perform on the input data.
   # See https://docs.victoriametrics.com/stream-aggregation.html#aggregation-outputs
+  #
   outputs: [total]
 
   # input_relabel_configs is an optional relabeling rules,
   # which are applied to the incoming samples after they pass the match filter
   # and before being aggregated.
   # See https://docs.victoriametrics.com/stream-aggregation.html#relabeling
+  #
   input_relabel_configs:
   - target_label: vmaggr
     replacement: before
 
   # output_relabel_configs is an optional relabeling rules,
   # which are applied to the aggregated output metrics.
+  #
   output_relabel_configs:
   - target_label: vmaggr
     replacement: after
diff --git a/docs/vmagent.md b/docs/vmagent.md
index ae143f164e..de561a18a4 100644
--- a/docs/vmagent.md
+++ b/docs/vmagent.md
@@ -525,6 +525,7 @@ The following articles contain useful information about Prometheus relabeling:
 
 * An optional `if` filter can be used for conditional relabeling. The `if` filter may contain
   arbitrary [time series selector](https://docs.victoriametrics.com/keyConcepts.html#filtering).
+  The `action` is performed only for [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples), which match the provided `if` filter.
   For example, the following relabeling rule keeps metrics matching `foo{bar="baz"}` series selector, while dropping the rest of metrics:
 
   ```yaml
@@ -540,6 +541,18 @@ The following articles contain useful information about Prometheus relabeling:
     regex: 'foo;baz'
   ```
 
+  The `if` option may contain more than one filter. In this case the `action` is performed if at least a single filter
+  matches the given [sample](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
+  For example, the following relabeling rule adds `foo="bar"` label to samples with `job="foo"` or `instance="bar"` labels:
+
+  ```yaml
+  - target_label: foo
+    replacement: bar
+    if:
+    - '{job="foo"}'
+    - '{instance="bar"}'
+  ```
+
 * The `regex` value can be split into multiple lines for improved readability and maintainability.
   These lines are automatically joined with `|` char when parsed. For example, the following configs are equivalent:
 
diff --git a/lib/promrelabel/if_expression.go b/lib/promrelabel/if_expression.go
index e001c91ab7..6ae413efa1 100644
--- a/lib/promrelabel/if_expression.go
+++ b/lib/promrelabel/if_expression.go
@@ -10,24 +10,155 @@ import (
 	"github.com/VictoriaMetrics/metricsql"
 )
 
-// IfExpression represents `if` expression at RelabelConfig.
+// IfExpression represents PromQL-like label filters such as `metric_name{filters...}`.
 //
-// The `if` expression can contain arbitrary PromQL-like label filters such as `metric_name{filters...}`
+// It may contain either a single filter or multiple filters, which are executed with `or` operator.
+//
+// Examples:
+//
+// if: 'foo{bar="baz"}'
+//
+// if:
+// - 'foo{bar="baz"}'
+// - '{x=~"y"}'
 type IfExpression struct {
-	s    string
-	lfss [][]*labelFilter
+	ies []*ifExpression
+}
+
+// Match returns true if labels match at least a single label filter inside ie.
+//
+// Match returns true for empty ie.
+func (ie *IfExpression) Match(labels []prompbmarshal.Label) bool {
+	if ie == nil || len(ie.ies) == 0 {
+		return true
+	}
+	for _, ie := range ie.ies {
+		if ie.Match(labels) {
+			return true
+		}
+	}
+	return false
+}
+
+// Parse parses ie from s.
+func (ie *IfExpression) Parse(s string) error {
+	ieLocal, err := newIfExpression(s)
+	if err != nil {
+		return err
+	}
+	ie.ies = []*ifExpression{ieLocal}
+	return nil
+}
+
+// UnmarshalJSON unmarshals ie from JSON data.
+func (ie *IfExpression) UnmarshalJSON(data []byte) error {
+	var v interface{}
+	if err := json.Unmarshal(data, &v); err != nil {
+		return err
+	}
+	return ie.unmarshalFromInterface(v)
+}
+
+// MarshalJSON marshals ie to JSON.
+func (ie *IfExpression) MarshalJSON() ([]byte, error) {
+	if ie == nil || len(ie.ies) == 0 {
+		return nil, nil
+	}
+	if len(ie.ies) == 1 {
+		return json.Marshal(ie.ies[0])
+	}
+	return json.Marshal(ie.ies)
+}
+
+// UnmarshalYAML unmarshals ie from YAML passed to f.
+func (ie *IfExpression) UnmarshalYAML(f func(interface{}) error) error {
+	var v interface{}
+	if err := f(&v); err != nil {
+		return fmt.Errorf("cannot unmarshal `match` option: %w", err)
+	}
+	return ie.unmarshalFromInterface(v)
+}
+
+func (ie *IfExpression) unmarshalFromInterface(v interface{}) error {
+	logger.Infof("DEBUG: unmarshaling ifExpr from %#v", v)
+	ies := ie.ies[:0]
+	switch t := v.(type) {
+	case string:
+		ieLocal, err := newIfExpression(t)
+		if err != nil {
+			return fmt.Errorf("unexpected `match` option: %w", err)
+		}
+		ies = append(ies, ieLocal)
+		logger.Infof("DEBUG: unmarshaled ifExpr from %#v to %s", t, ieLocal)
+	case []interface{}:
+		for _, x := range t {
+			s, ok := x.(string)
+			if !ok {
+				return fmt.Errorf("unexpected `match` item type; got %#v; want string", x)
+			}
+			ieLocal, err := newIfExpression(s)
+			if err != nil {
+				return fmt.Errorf("unexpected `match` item: %w", err)
+			}
+			ies = append(ies, ieLocal)
+		}
+		logger.Infof("DEBUG: unmarshaled ifExpr from %#v to %s", t, ies)
+	default:
+		return fmt.Errorf("unexpected `match` type; got %#v; want string or an array of strings", t)
+	}
+	ie.ies = ies
+	return nil
+}
+
+// MarshalYAML marshals ie to YAML
+func (ie *IfExpression) MarshalYAML() (interface{}, error) {
+	if ie == nil || len(ie.ies) == 0 {
+		return nil, nil
+	}
+	if len(ie.ies) == 1 {
+		return ie.ies[0].MarshalYAML()
+	}
+	a := make([]string, 0, len(ie.ies))
+	for _, ieLocal := range ie.ies {
+		v, err := ieLocal.MarshalYAML()
+		if err != nil {
+			logger.Panicf("BUG: unexpected error: %s", err)
+		}
+		s := v.(string)
+		a = append(a, s)
+	}
+	return a, nil
+}
+
+func newIfExpression(s string) (*ifExpression, error) {
+	var ie ifExpression
+	if err := ie.Parse(s); err != nil {
+		return nil, err
+	}
+	return &ie, nil
 }
 
 // String returns string representation of ie.
 func (ie *IfExpression) String() string {
+	if ie == nil {
+		return "[]"
+	}
+	return fmt.Sprintf("%s", ie.ies)
+}
+
+type ifExpression struct {
+	s    string
+	lfss [][]*labelFilter
+}
+
+func (ie *ifExpression) String() string {
 	if ie == nil {
 		return ""
 	}
 	return ie.s
 }
 
-// Parse parses `if` expression from s and stores it to ie.
-func (ie *IfExpression) Parse(s string) error {
+func (ie *ifExpression) Parse(s string) error {
 	expr, err := metricsql.Parse(s)
 	if err != nil {
 		return err
@@ -46,7 +177,7 @@ func (ie *IfExpression) Parse(s string) error {
 }
 
 // UnmarshalJSON unmarshals ie from JSON data.
-func (ie *IfExpression) UnmarshalJSON(data []byte) error {
+func (ie *ifExpression) UnmarshalJSON(data []byte) error {
 	var s string
 	if err := json.Unmarshal(data, &s); err != nil {
 		return err
@@ -55,12 +186,12 @@ func (ie *IfExpression) UnmarshalJSON(data []byte) error {
 }
 
 // MarshalJSON marshals ie to JSON.
-func (ie *IfExpression) MarshalJSON() ([]byte, error) {
+func (ie *ifExpression) MarshalJSON() ([]byte, error) {
 	return json.Marshal(ie.s)
 }
 
 // UnmarshalYAML unmarshals ie from YAML passed to f.
-func (ie *IfExpression) UnmarshalYAML(f func(interface{}) error) error {
+func (ie *ifExpression) UnmarshalYAML(f func(interface{}) error) error {
 	var s string
 	if err := f(&s); err != nil {
 		return fmt.Errorf("cannot unmarshal `if` option: %w", err)
@@ -72,12 +203,12 @@ func (ie *IfExpression) UnmarshalYAML(f func(interface{}) error) error {
 }
 
 // MarshalYAML marshals ie to YAML.
-func (ie *IfExpression) MarshalYAML() (interface{}, error) {
+func (ie *ifExpression) MarshalYAML() (interface{}, error) {
 	return ie.s, nil
 }
 
 // Match returns true if ie matches the given labels.
-func (ie *IfExpression) Match(labels []prompbmarshal.Label) bool {
+func (ie *ifExpression) Match(labels []prompbmarshal.Label) bool {
 	if ie == nil {
 		return true
 	}
diff --git a/lib/promrelabel/if_expression_test.go b/lib/promrelabel/if_expression_test.go
index 88296fe2dd..434dfa6ad9 100644
--- a/lib/promrelabel/if_expression_test.go
+++ b/lib/promrelabel/if_expression_test.go
@@ -84,7 +84,7 @@ func TestIfExpressionUnmarshalFailure(t *testing.T) {
 	}
 	f(`{`)
 	f(`{x:y}`)
-	f(`[]`)
+	f(`[1]`)
 	f(`"{"`)
 	f(`'{'`)
 	f(`foo{bar`)
@@ -122,6 +122,8 @@ func TestIfExpressionUnmarshalSuccess(t *testing.T) {
 	f(`'{a="b", c!="d", e=~"g", h!~"d"}'`)
 	f(`foo{bar="zs",a=~"b|c"}`)
 	f(`foo{z="y" or bar="zs",a=~"b|c"}`)
+	f(`- foo
+- bar{baz="abc"}`)
 }
 
 func TestIfExpressionMatch(t *testing.T) {
diff --git a/lib/promrelabel/relabel_test.go b/lib/promrelabel/relabel_test.go
index 35e60ff415..a71bb5a6c0 100644
--- a/lib/promrelabel/relabel_test.go
+++ b/lib/promrelabel/relabel_test.go
@@ -449,7 +449,7 @@ func TestParsedRelabelConfigsApply(t *testing.T) {
 	t.Run("keep-if-hit", func(t *testing.T) {
 		f(`
 - action: keep
-  if: '{foo="yyy"}'
+  if: ['foobar', '{foo="yyy"}', '{a="b"}']
 `, `{foo="yyy"}`, false, `{foo="yyy"}`)
 	})
 	t.Run("keep-hit", func(t *testing.T) {
diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go
index ae987444b5..91ced47e1c 100644
--- a/lib/streamaggr/streamaggr_test.go
+++ b/lib/streamaggr/streamaggr_test.go
@@ -440,7 +440,9 @@ foo{abc="456",de="fg"} 8
 - interval: 1m
   by: [abc]
   outputs: [count_samples, sum_samples, count_series]
-  match: 'foo{abc=~".+"}'
+  match:
+  - foo{abc=~".+"}
+  - '{non_existing_label!=""}'
 `, `
 foo{abc="123"} 4
 bar 5