From e7f1ceeb840fddbe7a98460b21a88642649a7354 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 31 Jan 2022 19:32:36 +0200 Subject: [PATCH] app/vmselect/promql: optimize queries, which join on `_info` metrics. Automatically add common filters from one side of binary operation to the other side before sending the query to storage subsystem. See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/ and https://www.robustperception.io/exposing-the-software-version-to-prometheus --- app/vmselect/promql/eval.go | 156 +++++++++++++----- app/vmselect/promql/eval_test.go | 50 ++++++ docs/CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/optimizer.go | 120 ++++++++++---- vendor/modules.txt | 2 +- 7 files changed, 259 insertions(+), 76 deletions(-) create mode 100644 app/vmselect/promql/eval_test.go diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index f6ef72f0a..ce6a7ccb5 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "math" + "regexp" + "sort" "strings" "sync" @@ -275,55 +277,29 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { return rv, nil } if be, ok := e.(*metricsql.BinaryOpExpr); ok { - // Execute left and right sides of the binary operation in parallel. - // This should reduce execution times for heavy queries. - // On the other side this can increase CPU and RAM usage when executing heavy queries. - // TODO: think on how to limit CPU and RAM usage while leaving short execution times. - var left, right []*timeseries - var mu sync.Mutex - var wg sync.WaitGroup - var errGlobal error - wg.Add(1) - go func() { - defer wg.Done() - ecCopy := newEvalConfig(ec) - tss, err := evalExpr(ecCopy, be.Left) - mu.Lock() - if err != nil { - if errGlobal == nil { - errGlobal = err - } - } - left = tss - mu.Unlock() - }() - wg.Add(1) - go func() { - defer wg.Done() - ecCopy := newEvalConfig(ec) - tss, err := evalExpr(ecCopy, be.Right) - mu.Lock() - if err != nil { - if errGlobal == nil { - errGlobal = err - } - } - right = tss - mu.Unlock() - }() - wg.Wait() - if errGlobal != nil { - return nil, errGlobal - } - bf := getBinaryOpFunc(be.Op) if bf == nil { return nil, fmt.Errorf(`unknown binary op %q`, be.Op) } + var err error + var tssLeft, tssRight []*timeseries + switch be.Op { + case "and", "if": + // Fetch right-side series at first, since the left side of `and` and `if` operator + // usually contains lower number of time series. This should produce more specific label filters + // for the left side of the query. This, in turn, should reduce the time to select series + // for the left side of the query. + tssRight, tssLeft, err = execBinaryOpArgs(ec, be.Right, be.Left, be) + default: + tssLeft, tssRight, err = execBinaryOpArgs(ec, be.Left, be.Right, be) + } + if err != nil { + return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err) + } bfa := &binaryOpFuncArg{ be: be, - left: left, - right: right, + left: tssLeft, + right: tssRight, } rv, err := bf(bfa) if err != nil { @@ -348,6 +324,100 @@ func evalExpr(ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) { return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil)) } +func execBinaryOpArgs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) { + // Execute binary operation in the following way: + // + // 1) execute the exprFirst + // 2) get common label filters for series returned at step 1 + // 3) push down the found common label filters to exprSecond. This filters out unneeded series + // during exprSecond exection instead of spending compute resources on extracting and processing these series + // before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching + // 4) execute the exprSecond with possible additional filters found at step 3 + // + // Typical use cases: + // - Kubernetes-related: show pod creation time with the node name: + // + // kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info + // + // Without the optimization `kube_pod_info` would select and spend compute resources + // for more time series than needed. The selected time series would be dropped later + // when matching time series on the right and left sides of binary operand. + // + // - Generic alerting queries, which rely on `info` metrics. + // See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/ + // + // - Queries, which get additional labels from `info` metrics. + // See https://www.robustperception.io/exposing-the-software-version-to-prometheus + tssFirst, err := evalExpr(ec, exprFirst) + if err != nil { + return nil, nil, err + } + lfs := getCommonLabelFilters(tssFirst) + lfs = metricsql.TrimFiltersByGroupModifier(lfs, be) + exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs) + tssSecond, err := evalExpr(ec, exprSecond) + if err != nil { + return nil, nil, err + } + return tssFirst, tssSecond, nil +} + +func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter { + m := make(map[string][]string) + for _, ts := range tss { + for _, tag := range ts.MetricName.Tags { + m[string(tag.Key)] = append(m[string(tag.Key)], string(tag.Value)) + } + } + lfs := make([]metricsql.LabelFilter, 0, len(m)) + for key, values := range m { + if len(values) != len(tss) { + // Skip the tag, since it doesn't belong to all the time series. + continue + } + values = getUniqueValues(values) + lf := metricsql.LabelFilter{ + Label: key, + } + if len(values) == 1 { + lf.Value = values[0] + } else { + lf.Value = joinRegexpValues(values) + lf.IsRegexp = true + } + lfs = append(lfs, lf) + } + sort.Slice(lfs, func(i, j int) bool { + return lfs[i].Label < lfs[j].Label + }) + return lfs +} + +func getUniqueValues(a []string) []string { + m := make(map[string]struct{}, len(a)) + results := make([]string, 0, len(a)) + for _, s := range a { + if _, ok := m[s]; !ok { + results = append(results, s) + m[s] = struct{}{} + } + } + sort.Strings(results) + return results +} + +func joinRegexpValues(a []string) string { + var b []byte + for i, s := range a { + sQuoted := regexp.QuoteMeta(s) + b = append(b, sQuoted...) + if i < len(a)-1 { + b = append(b, '|') + } + } + return string(b) +} + func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) { if len(ae.Args) != 1 { return nil, nil diff --git a/app/vmselect/promql/eval_test.go b/app/vmselect/promql/eval_test.go new file mode 100644 index 000000000..43fb83154 --- /dev/null +++ b/app/vmselect/promql/eval_test.go @@ -0,0 +1,50 @@ +package promql + +import ( + "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/metricsql" +) + +func TestGetCommonLabelFilters(t *testing.T) { + f := func(metrics string, lfsExpected string) { + t.Helper() + var tss []*timeseries + var rows prometheus.Rows + rows.UnmarshalWithErrLogger(metrics, func(errStr string) { + t.Fatalf("unexpected error when parsing %s: %s", metrics, errStr) + }) + for _, row := range rows.Rows { + var tags []storage.Tag + for _, tag := range row.Tags { + tags = append(tags, storage.Tag{ + Key: []byte(tag.Key), + Value: []byte(tag.Value), + }) + } + var ts timeseries + ts.MetricName.Tags = tags + tss = append(tss, &ts) + } + lfs := getCommonLabelFilters(tss) + me := &metricsql.MetricExpr{ + LabelFilters: lfs, + } + lfsMarshaled := me.AppendString(nil) + if string(lfsMarshaled) != lfsExpected { + t.Fatalf("unexpected common label filters;\ngot\n%s\nwant\n%s", lfsMarshaled, lfsExpected) + } + } + f(``, `{}`) + f(`m 1`, `{}`) + f(`m{a="b"} 1`, `{a="b"}`) + f(`m{c="d",a="b"} 1`, `{a="b", c="d"}`) + f(`m1{a="foo"} 1 +m2{a="bar"} 1`, `{a=~"bar|foo"}`) + f(`m1{a="foo"} 1 +m2{b="bar"} 1`, `{}`) + f(`m1{a="foo",b="bar"} 1 +m2{b="bar",c="x"} 1`, `{b="bar"}`) +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7810466d7..f7ede2820 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,6 +11,7 @@ sort: 15 * Binary operations with `on()`, `without()`, `group_left()` and `group_right()` modifiers. For example, `foo{a="b"} on (a) + bar` is now optimized to `foo{a="b"} on (a) + bar{a="b"}` * Multi-level binary operations. For example, `foo{a="b"} + bar{x="y"} + baz{z="q"}` is now optimized to `foo{a="b",x="y",z="q"} + bar{a="b",x="y",z="q"} + baz{a="b",x="y",z="q"}` * Aggregate functions. For example, `sum(foo{a="b"}) by (c) + bar{c="d"}` is now optimized to `sum(foo{a="b",c="d"}) by (c) + bar{c="d"}` +* FEATURE [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): optimize joining with `*_info` labels. For example: `kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info` now automatically adds the needed filters on `uid` label to `kube_pod_info` before selecting series for the right side of `*` operation. This may save CPU, RAM and disk IO resources. See [this article](https://www.robustperception.io/exposing-the-software-version-to-prometheus) for details on `*_info` labels. * BUGFIX: return proper results from `highestMax()` function at [Graphite render API](https://docs.victoriametrics.com/#graphite-render-api-usage). Previously it was incorrectly returning timeseries with min peaks instead of max peaks. * BUGFIX: properly limit indexdb cache sizes. Previously they could exceed values set via `-memory.allowedPercent` and/or `-memory.allowedBytes` when `indexdb` contained many data parts. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2007). diff --git a/go.mod b/go.mod index 151441482..68f0c3d64 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.1.0 github.com/VictoriaMetrics/metrics v1.18.1 - github.com/VictoriaMetrics/metricsql v0.38.0 + github.com/VictoriaMetrics/metricsql v0.39.0 github.com/aws/aws-sdk-go v1.42.44 github.com/cespare/xxhash/v2 v2.1.2 github.com/cheggaaa/pb/v3 v3.0.8 diff --git a/go.sum b/go.sum index ae760a6ce..ea6f537a7 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0 h1:3crd4YWHsMwu60GUXRH6OstowiFvqrwS4a github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR2uydjiWvoLp5ZTqQ= github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= -github.com/VictoriaMetrics/metricsql v0.38.0 h1:YBAzxKyr2QLFXYap8Nd0bxIr0e8mE/aUIyBYgDFMpK4= -github.com/VictoriaMetrics/metricsql v0.38.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0= +github.com/VictoriaMetrics/metricsql v0.39.0 h1:tm1hneyxVhm1oeJ/1T4007Y5Bn+LKN+Aw3l6XGwvgRM= +github.com/VictoriaMetrics/metricsql v0.39.0/go.mod h1:6pP1ZeLVJHqJrHlF6Ij3gmpQIznSsgktEcZgsAWYel0= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go index 4d76652b4..88a54876e 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/optimizer.go @@ -1,6 +1,7 @@ package metricsql import ( + "fmt" "sort" "strings" ) @@ -13,26 +14,64 @@ import ( // according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization // I.e. such query is converted to `foo{filters1, filters2} op bar{filters1, filters2}` func Optimize(e Expr) Expr { - switch t := e.(type) { - case *RollupExpr: - t.Expr = Optimize(t.Expr) - t.At = Optimize(t.At) - case *FuncExpr: - optimizeFuncArgs(t.Args) - case *AggrFuncExpr: - optimizeFuncArgs(t.Args) - case *BinaryOpExpr: - t.Left = Optimize(t.Left) - t.Right = Optimize(t.Right) - lfs := getCommonLabelFilters(t) - pushdownLabelFilters(t, lfs) + if !canOptimize(e) { + return e } - return e + eCopy := Clone(e) + optimizeInplace(eCopy) + return eCopy } -func optimizeFuncArgs(args []Expr) { - for i := range args { - args[i] = Optimize(args[i]) +func canOptimize(e Expr) bool { + switch t := e.(type) { + case *RollupExpr: + return canOptimize(t.Expr) || canOptimize(t.At) + case *FuncExpr: + for _, arg := range t.Args { + if canOptimize(arg) { + return true + } + } + case *AggrFuncExpr: + for _, arg := range t.Args { + if canOptimize(arg) { + return true + } + } + case *BinaryOpExpr: + return true + } + return false +} + +// Clone clones the given expression e and returns the cloned copy. +func Clone(e Expr) Expr { + s := e.AppendString(nil) + eCopy, err := Parse(string(s)) + if err != nil { + panic(fmt.Errorf("BUG: cannot parse the expression %q: %w", s, err)) + } + return eCopy +} + +func optimizeInplace(e Expr) { + switch t := e.(type) { + case *RollupExpr: + optimizeInplace(t.Expr) + optimizeInplace(t.At) + case *FuncExpr: + for _, arg := range t.Args { + optimizeInplace(arg) + } + case *AggrFuncExpr: + for _, arg := range t.Args { + optimizeInplace(arg) + } + case *BinaryOpExpr: + optimizeInplace(t.Left) + optimizeInplace(t.Right) + lfs := getCommonLabelFilters(t) + pushdownBinaryOpFiltersInplace(t, lfs) } } @@ -54,7 +93,7 @@ func getCommonLabelFilters(e Expr) []LabelFilter { return nil } lfs := getCommonLabelFilters(arg) - return filterLabelFiltersByAggrModifier(lfs, t) + return trimFiltersByAggrModifier(lfs, t) case *BinaryOpExpr: if !canOptimizeBinaryOp(t) { return nil @@ -62,13 +101,13 @@ func getCommonLabelFilters(e Expr) []LabelFilter { lfsLeft := getCommonLabelFilters(t.Left) lfsRight := getCommonLabelFilters(t.Right) lfs := unionLabelFilters(lfsLeft, lfsRight) - return filterLabelFiltersByGroupModifier(lfs, t) + return TrimFiltersByGroupModifier(lfs, t) default: return nil } } -func filterLabelFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []LabelFilter { +func trimFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []LabelFilter { switch strings.ToLower(afe.Modifier.Op) { case "by": return filterLabelFiltersOn(lfs, afe.Modifier.Args) @@ -79,7 +118,13 @@ func filterLabelFiltersByAggrModifier(lfs []LabelFilter, afe *AggrFuncExpr) []La } } -func filterLabelFiltersByGroupModifier(lfs []LabelFilter, be *BinaryOpExpr) []LabelFilter { +// TrimFiltersByGroupModifier trims lfs by the specified be.GroupModifier.Op (e.g. on() or ignoring()). +// +// The following cases are possible: +// - It returns lfs as is if be doesn't contain any group modifier +// - It returns only filters specified in on() +// - It drops filters specified inside ignoring() +func TrimFiltersByGroupModifier(lfs []LabelFilter, be *BinaryOpExpr) []LabelFilter { switch strings.ToLower(be.GroupModifier.Op) { case "on": return filterLabelFiltersOn(lfs, be.GroupModifier.Args) @@ -100,7 +145,24 @@ func getLabelFiltersWithoutMetricName(lfs []LabelFilter) []LabelFilter { return lfsNew } -func pushdownLabelFilters(e Expr, lfs []LabelFilter) { +// PushdownBinaryOpFilters pushes down the given commonFilters to e if possible. +// +// e must be a part of binary operation - either left or right. +// +// For example, if e contains `foo + sum(bar)` and commonFilters={x="y"}, +// then the returned expression will contain `foo{x="y"} + sum(bar)`. +// The `{x="y"}` cannot be pusehd down to `sum(bar)`, since this may change binary operation results. +func PushdownBinaryOpFilters(e Expr, commonFilters []LabelFilter) Expr { + if len(commonFilters) == 0 { + // Fast path - nothing to push down. + return e + } + eCopy := Clone(e) + pushdownBinaryOpFiltersInplace(eCopy, commonFilters) + return eCopy +} + +func pushdownBinaryOpFiltersInplace(e Expr, lfs []LabelFilter) { if len(lfs) == 0 { return } @@ -109,23 +171,23 @@ func pushdownLabelFilters(e Expr, lfs []LabelFilter) { t.LabelFilters = unionLabelFilters(t.LabelFilters, lfs) sortLabelFilters(t.LabelFilters) case *RollupExpr: - pushdownLabelFilters(t.Expr, lfs) + pushdownBinaryOpFiltersInplace(t.Expr, lfs) case *FuncExpr: arg := getFuncArgForOptimization(t.Name, t.Args) if arg != nil { - pushdownLabelFilters(arg, lfs) + pushdownBinaryOpFiltersInplace(arg, lfs) } case *AggrFuncExpr: - lfs = filterLabelFiltersByAggrModifier(lfs, t) + lfs = trimFiltersByAggrModifier(lfs, t) arg := getFuncArgForOptimization(t.Name, t.Args) if arg != nil { - pushdownLabelFilters(arg, lfs) + pushdownBinaryOpFiltersInplace(arg, lfs) } case *BinaryOpExpr: if canOptimizeBinaryOp(t) { - lfs = filterLabelFiltersByGroupModifier(lfs, t) - pushdownLabelFilters(t.Left, lfs) - pushdownLabelFilters(t.Right, lfs) + lfs = TrimFiltersByGroupModifier(lfs, t) + pushdownBinaryOpFiltersInplace(t.Left, lfs) + pushdownBinaryOpFiltersInplace(t.Right, lfs) } } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3043f71d1..00c0597cd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -26,7 +26,7 @@ github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.18.1 ## explicit; go 1.12 github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.38.0 +# github.com/VictoriaMetrics/metricsql v0.39.0 ## explicit; go 1.13 github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop