From 04d6596298079eb79430ac36d8ef53e84d2eb7b3 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Mon, 31 Jan 2022 19:32:36 +0200
Subject: [PATCH] app/vmselect/promql: optimize queries, which join on `_info`
 metrics.

Automatically add common filters from one side of binary operation
to the other side before sending the query to storage subsystem.

See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
and https://www.robustperception.io/exposing-the-software-version-to-prometheus
---
 app/vmselect/promql/eval.go                   | 156 +++++++++++++-----
 app/vmselect/promql/eval_test.go              |  50 ++++++
 docs/CHANGELOG.md                             |   1 +
 go.mod                                        |   2 +-
 go.sum                                        |   4 +-
 .../VictoriaMetrics/metricsql/optimizer.go    | 120 ++++++++++----
 vendor/modules.txt                            |   2 +-
 7 files changed, 259 insertions(+), 76 deletions(-)
 create mode 100644 app/vmselect/promql/eval_test.go

diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go
index 57f64005f..36baaaa40 100644
--- a/app/vmselect/promql/eval.go
+++ b/app/vmselect/promql/eval.go
@@ -4,6 +4,8 @@ import (
 	"flag"
 	"fmt"
 	"math"
+	"regexp"
+	"sort"
 	"strings"
 	"sync"
 
@@ -292,55 +294,29 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
 		return rv, nil
 	}
 	if be, ok := e.(*metricsql.BinaryOpExpr); ok {
-		// Execute left and right sides of the binary operation in parallel.
-		// This should reduce execution times for heavy queries.
-		// On the other side this can increase CPU and RAM usage when executing heavy queries.
-		// TODO: think on how to limit CPU and RAM usage while leaving short execution times.
-		var left, right []*timeseries
-		var mu sync.Mutex
-		var wg sync.WaitGroup
-		var errGlobal error
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			ecCopy := newEvalConfig(ec)
-			tss, err := evalExpr(ecCopy, be.Left)
-			mu.Lock()
-			if err != nil {
-				if errGlobal == nil {
-					errGlobal = err
-				}
-			}
-			left = tss
-			mu.Unlock()
-		}()
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			ecCopy := newEvalConfig(ec)
-			tss, err := evalExpr(ecCopy, be.Right)
-			mu.Lock()
-			if err != nil {
-				if errGlobal == nil {
-					errGlobal = err
-				}
-			}
-			right = tss
-			mu.Unlock()
-		}()
-		wg.Wait()
-		if errGlobal != nil {
-			return nil, errGlobal
-		}
-
 		bf := getBinaryOpFunc(be.Op)
 		if bf == nil {
 			return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
 		}
+		var err error
+		var tssLeft, tssRight []*timeseries
+		switch be.Op {
+		case "and", "if":
+			// Fetch right-side series at first, since the left side of `and` and `if` operator
+			// usually contains lower number of time series. This should produce more specific label filters
+			// for the left side of the query. This, in turn, should reduce the time to select series
+			// for the left side of the query.
+			tssRight, tssLeft, err = execBinaryOpArgs(ec, be.Right, be.Left, be)
+		default:
+			tssLeft, tssRight, err = execBinaryOpArgs(ec, be.Left, be.Right, be)
+		}
+		if err != nil {
+			return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
+		}
 		bfa := &binaryOpFuncArg{
 			be:    be,
-			left:  left,
-			right: right,
+			left:  tssLeft,
+			right: tssRight,
 		}
 		rv, err := bf(bfa)
 		if err != nil {
@@ -365,6 +341,100 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
 	return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
 }
 
+func execBinaryOpArgs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
+	// Execute binary operation in the following way:
+	//
+	// 1) execute the exprFirst
+	// 2) get common label filters for series returned at step 1
+	// 3) push down the found common label filters to exprSecond. This filters out unneeded series
+	//    during exprSecond exection instead of spending compute resources on extracting and processing these series
+	//    before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
+	// 4) execute the exprSecond with possible additional filters found at step 3
+	//
+	// Typical use cases:
+	// - Kubernetes-related: show pod creation time with the node name:
+	//
+	//     kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
+	//
+	//   Without the optimization `kube_pod_info` would select and spend compute resources
+	//   for more time series than needed. The selected time series would be dropped later
+	//   when matching time series on the right and left sides of binary operand.
+	//
+	// - Generic alerting queries, which rely on `info` metrics.
+	//   See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
+	//
+	// - Queries, which get additional labels from `info` metrics.
+	//   See https://www.robustperception.io/exposing-the-software-version-to-prometheus
+	tssFirst, err := evalExpr(ec, exprFirst)
+	if err != nil {
+		return nil, nil, err
+	}
+	lfs := getCommonLabelFilters(tssFirst)
+	lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
+	exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
+	tssSecond, err := evalExpr(ec, exprSecond)
+	if err != nil {
+		return nil, nil, err
+	}
+	return tssFirst, tssSecond, nil
+}
+
+func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
+	m := make(map[string][]string)
+	for _, ts := range tss {
+		for _, tag := range ts.MetricName.Tags {
+			m[string(tag.Key)] = append(m[string(tag.Key)], string(tag.Value))
+		}
+	}
+	lfs := make([]metricsql.LabelFilter, 0, len(m))
+	for key, values := range m {
+		if len(values) != len(tss) {
+			// Skip the tag, since it doesn't belong to all the time series.
+			continue
+		}
+		values = getUniqueValues(values)
+		lf := metricsql.LabelFilter{
+			Label: key,
+		}
+		if len(values) == 1 {
+			lf.Value = values[0]
+		} else {
+			lf.Value = joinRegexpValues(values)
+			lf.IsRegexp = true
+		}
+		lfs = append(lfs, lf)
+	}
+	sort.Slice(lfs, func(i, j int) bool {
+		return lfs[i].Label < lfs[j].Label
+	})
+	return lfs
+}
+
+func getUniqueValues(a []string) []string {
+	m := make(map[string]struct{}, len(a))
+	results := make([]string, 0, len(a))
+	for _, s := range a {
+		if _, ok := m[s]; !ok {
+			results = append(results, s)
+			m[s] = struct{}{}
+		}
+	}
+	sort.Strings(results)
+	return results
+}
+
+func joinRegexpValues(a []string) string {
+	var b []byte
+	for i, s := range a {
+		sQuoted := regexp.QuoteMeta(s)
+		b = append(b, sQuoted...)
+		if i < len(a)-1 {
+			b = append(b, '|')
+		}
+	}
+	return string(b)
+}
+
 func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
 	if len(ae.Args) != 1 {
 		return nil, nil
diff --git a/app/vmselect/promql/eval_test.go b/app/vmselect/promql/eval_test.go
new file mode 100644
index 000000000..43fb83154
--- /dev/null
+++ b/app/vmselect/promql/eval_test.go
@@ -0,0 +1,50 @@
+package promql
+
+import (
+	"testing"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
+	"github.com/VictoriaMetrics/metricsql"
+)
+
+func TestGetCommonLabelFilters(t *testing.T) {
+	f := func(metrics string, lfsExpected string) {
+		t.Helper()
+		var tss []*timeseries
+		var rows prometheus.Rows
+		rows.UnmarshalWithErrLogger(metrics, func(errStr string) {
+			t.Fatalf("unexpected error when parsing %s: %s", metrics, errStr)
+		})
+		for _, row := range rows.Rows {
+			var tags []storage.Tag
+			for _, tag := range row.Tags {
+				tags = append(tags, storage.Tag{
+					Key:   []byte(tag.Key),
+					Value: []byte(tag.Value),
+				})
+			}
+			var ts timeseries
+			ts.MetricName.Tags = tags
+			tss = append(tss, &ts)
+		}
+		lfs := getCommonLabelFilters(tss)
+		me := &metricsql.MetricExpr{
+			LabelFilters: lfs,
+		}
+		lfsMarshaled := me.AppendString(nil)
+		if string(lfsMarshaled) != lfsExpected {
+			t.Fatalf("unexpected common label filters;\ngot\n%s\nwant\n%s", lfsMarshaled, lfsExpected)
+		}
+	}
+	f(``, `{}`)
+	f(`m 1`, `{}`)
+	f(`m{a="b"} 1`, `{a="b"}`)
+	f(`m{c="d",a="b"} 1`, `{a="b", c="d"}`)
+	f(`m1{a="foo"} 1
+m2{a="bar"} 1`, `{a=~"bar|foo"}`)
+	f(`m1{a="foo"} 1
+m2{b="bar"} 1`, `{}`)
+	f(`m1{a="foo",b="bar"} 1
+m2{b="bar",c="x"} 1`, `{b="bar"}`)
+}
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 7810466d7..f7ede2820 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -11,6 +11,7 @@ sort: 15
   * Binary operations with `on()`, `without()`, `group_left()` and `group_right()` modifiers. For example, `foo{a="b"} on (a) + bar` is now optimized to `foo{a="b"} on (a) + bar{a="b"}`
   * Multi-level binary operations. For example, `foo{a="b"} + bar{x="y"} + baz{z="q"}` is now optimized to `foo{a="b",x="y",z="q"} + bar{a="b",x="y",z="q"} + baz{a="b",x="y",z="q"}`
   * Aggregate functions. For example, `sum(foo{a="b"}) by (c) + bar{c="d"}` is now optimized to `sum(foo{a="b",c="d"}) by (c) + bar{c="d"}`
+* FEATURE [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): optimize joining with `*_info` labels. For example: `kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info` now automatically adds the needed filters on `uid` label to `kube_pod_info` before selecting series for the right side of `*` operation. This may save CPU, RAM and disk IO resources. See [this article](https://www.robustperception.io/exposing-the-software-version-to-prometheus) for details on `*_info` labels.
 
 * BUGFIX: return proper results from `highestMax()` function at [Graphite render API](https://docs.victoriametrics.com/#graphite-render-api-usage). Previously it was incorrectly returning timeseries with min peaks instead of max peaks.
 * BUGFIX: properly limit indexdb cache sizes. Previously they could exceed values set via `-memory.allowedPercent` and/or `-memory.allowedBytes` when `indexdb` contained many data parts. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007).
diff --git a/go.mod b/go.mod
index 511cb243d..cd4a64c71 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
 	// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
 	github.com/VictoriaMetrics/fasthttp v1.1.0
 	github.com/VictoriaMetrics/metrics v1.18.1
-	github.com/VictoriaMetrics/metricsql v0.38.0
+	github.com/VictoriaMetrics/metricsql v0.39.0
 	github.com/aws/aws-sdk-go v1.42.44
 	github.com/cespare/xxhash/v2 v2.1.2
 	github.com/cheggaaa/pb/v3 v3.0.8
diff --git a/go.sum b/go.sum
index e7e8e6c62..09adac331 100644
--- a/go.sum
+++ b/go.sum
@@ -115,8 +115,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a
 github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ=
 github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0=
 github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
-github.com/VictoriaMetrics/metricsql v0.38.0 h1:YBAzxKyr2QLFXYap8Nd0bxIr0e8mE/aUIyBYgDFMpK4=
-github.com/VictoriaMetrics/metricsql v0.38.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0=
+github.com/VictoriaMetrics/metricsql v0.39.0 h1:tm1hneyxVhm1oeJ/1T4007Y5Bn+LKN+Aw3l6XGwvgRM=
+github.com/VictoriaMetrics/metricsql v0.39.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0=
 github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
 github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
 github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
diff --git a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go
index 4d76652b4..88a54876e 100644
--- a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go
+++ b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go
@@ -1,6 +1,7 @@
 package metricsql
 
 import (
+	"fmt"
 	"sort"
 	"strings"
 )
@@ -13,26 +14,64 @@ import (
 //   according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization
 //   I.e. such query is converted to `foo{filters1, filters2} op bar{filters1, filters2}`
 func Optimize(e Expr) Expr {
-	switch t := e.(type) {
-	case *RollupExpr:
-		t.Expr = Optimize(t.Expr)
-		t.At = Optimize(t.At)
-	case *FuncExpr:
-		optimizeFuncArgs(t.Args)
-	case *AggrFuncExpr:
-		optimizeFuncArgs(t.Args)
-	case *BinaryOpExpr:
-		t.Left = Optimize(t.Left)
-		t.Right = Optimize(t.Right)
-		lfs := getCommonLabelFilters(t)
-		pushdownLabelFilters(t, lfs)
+	if !canOptimize(e) {
+		return e
 	}
-	return e
+	eCopy := Clone(e)
+	optimizeInplace(eCopy)
+	return eCopy
 }
 
-func optimizeFuncArgs(args []Expr) {
-	for i := range args {
-		args[i] = Optimize(args[i])
+func canOptimize(e Expr) bool {
+	switch t := e.(type) {
+	case *RollupExpr:
+		return canOptimize(t.Expr) || canOptimize(t.At)
+	case *FuncExpr:
+		for _, arg := range t.Args {
+			if canOptimize(arg) {
+				return true
+			}
+		}
+	case *AggrFuncExpr:
+		for _, arg := range t.Args {
+			if canOptimize(arg) {
+				return true
+			}
+		}
+	case *BinaryOpExpr:
+		return true
+	}
+	return false
+}
+
+// Clone clones the given expression e and returns the cloned copy.
+func Clone(e Expr) Expr {
+	s := e.AppendString(nil)
+	eCopy, err := Parse(string(s))
+	if err != nil {
+		panic(fmt.Errorf("BUG: cannot parse the expression %q: %w", s, err))
+	}
+	return eCopy
+}
+
+func optimizeInplace(e Expr) {
+	switch t := e.(type) {
+	case *RollupExpr:
+		optimizeInplace(t.Expr)
+		optimizeInplace(t.At)
+	case *FuncExpr:
+		for _, arg := range t.Args {
+			optimizeInplace(arg)
+		}
+	case *AggrFuncExpr:
+		for _, arg := range t.Args {
+			optimizeInplace(arg)
+		}
+	case *BinaryOpExpr:
+		optimizeInplace(t.Left)
+		optimizeInplace(t.Right)
+		lfs := getCommonLabelFilters(t)
+		pushdownBinaryOpFiltersInplace(t, lfs)
 	}
 }
 
@@ -54,7 +93,7 @@ func getCommonLabelFilters(e Expr) []LabelFilter {
 			return nil
 		}
 		lfs := getCommonLabelFilters(arg)
-		return filterLabelFiltersByAggrModifier(lfs, t)
+		return trimFiltersByAggrModifier(lfs, t)
 	case *BinaryOpExpr:
 		if !canOptimizeBinaryOp(t) {
 			return nil
@@ -62,13 +101,13 @@ func getCommonLabelFilters(e Expr) []LabelFilter {
 		lfsLeft := getCommonLabelFilters(t.Left)
 		lfsRight := getCommonLabelFilters(t.Right)
 		lfs := unionLabelFilters(lfsLeft, lfsRight)
-		return filterLabelFiltersByGroupModifier(lfs, t)
+		return TrimFiltersByGroupModifier(lfs, t)
 	default:
 		return nil
 	}
 }
 
-func filterLabelFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []LabelFilter {
+func trimFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []LabelFilter {
 	switch strings.ToLower(afe.Modifier.Op) {
 	case "by":
 		return filterLabelFiltersOn(lfs, afe.Modifier.Args)
@@ -79,7 +118,13 @@ func filterLabelFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []La
 	}
 }
 
-func filterLabelFiltersByGroupModifier(lfs []LabelFilter, be *BinaryOpExpr) []LabelFilter {
+// TrimFiltersByGroupModifier trims lfs by the specified be.GroupModifier.Op (e.g. on() or ignoring()).
+//
+// The following cases are possible:
+// - It returns lfs as is if be doesn't contain any group modifier
+// - It returns only filters specified in on()
+// - It drops filters specified inside ignoring()
+func TrimFiltersByGroupModifier(lfs []LabelFilter, be *BinaryOpExpr) []LabelFilter {
 	switch strings.ToLower(be.GroupModifier.Op) {
 	case "on":
 		return filterLabelFiltersOn(lfs, be.GroupModifier.Args)
@@ -100,7 +145,24 @@ func getLabelFiltersWithoutMetricName(lfs []LabelFilter) []LabelFilter {
 	return lfsNew
 }
 
-func pushdownLabelFilters(e Expr, lfs []LabelFilter) {
+// PushdownBinaryOpFilters pushes down the given commonFilters to e if possible.
+//
+// e must be a part of binary operation - either left or right.
+//
+// For example, if e contains `foo + sum(bar)` and commonFilters={x="y"},
+// then the returned expression will contain `foo{x="y"} + sum(bar)`.
+// The `{x="y"}` cannot be pusehd down to `sum(bar)`, since this may change binary operation results.
+func PushdownBinaryOpFilters(e Expr, commonFilters []LabelFilter) Expr {
+	if len(commonFilters) == 0 {
+		// Fast path - nothing to push down.
+		return e
+	}
+	eCopy := Clone(e)
+	pushdownBinaryOpFiltersInplace(eCopy, commonFilters)
+	return eCopy
+}
+
+func pushdownBinaryOpFiltersInplace(e Expr, lfs []LabelFilter) {
 	if len(lfs) == 0 {
 		return
 	}
@@ -109,23 +171,23 @@ func pushdownLabelFilters(e Expr, lfs []LabelFilter) {
 		t.LabelFilters = unionLabelFilters(t.LabelFilters, lfs)
 		sortLabelFilters(t.LabelFilters)
 	case *RollupExpr:
-		pushdownLabelFilters(t.Expr, lfs)
+		pushdownBinaryOpFiltersInplace(t.Expr, lfs)
 	case *FuncExpr:
 		arg := getFuncArgForOptimization(t.Name, t.Args)
 		if arg != nil {
-			pushdownLabelFilters(arg, lfs)
+			pushdownBinaryOpFiltersInplace(arg, lfs)
 		}
 	case *AggrFuncExpr:
-		lfs = filterLabelFiltersByAggrModifier(lfs, t)
+		lfs = trimFiltersByAggrModifier(lfs, t)
 		arg := getFuncArgForOptimization(t.Name, t.Args)
 		if arg != nil {
-			pushdownLabelFilters(arg, lfs)
+			pushdownBinaryOpFiltersInplace(arg, lfs)
 		}
 	case *BinaryOpExpr:
 		if canOptimizeBinaryOp(t) {
-			lfs = filterLabelFiltersByGroupModifier(lfs, t)
-			pushdownLabelFilters(t.Left, lfs)
-			pushdownLabelFilters(t.Right, lfs)
+			lfs = TrimFiltersByGroupModifier(lfs, t)
+			pushdownBinaryOpFiltersInplace(t.Left, lfs)
+			pushdownBinaryOpFiltersInplace(t.Right, lfs)
 		}
 	}
 }
diff --git a/vendor/modules.txt b/vendor/modules.txt
index ba5677316..d49c255e8 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -26,7 +26,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
 # github.com/VictoriaMetrics/metrics v1.18.1
 ## explicit; go 1.12
 github.com/VictoriaMetrics/metrics
-# github.com/VictoriaMetrics/metricsql v0.38.0
+# github.com/VictoriaMetrics/metricsql v0.39.0
 ## explicit; go 1.13
 github.com/VictoriaMetrics/metricsql
 github.com/VictoriaMetrics/metricsql/binaryop