app/vmselect/promql: add the ability to copy all the labels from one side of group_left()/group_right() operation

This is performed by specifying `*` inside group_left()/group_right().
Also allow specifying prefix for the copied labels via `group_left(...) prefix "..."` and `group_right(...) prefix "..."` syntax.
For example, the following query adds all the namespace-related labels to pod info, and prefixes all the copied label names with "ns_" prefix:

  kube_pod_info * on(namespace) group_left(*) prefix "ns_" kube_namespace_labels

This resolves the following StackOverflow questions:

- https://stackoverflow.com/questions/76661818/how-to-add-namespace-labels-to-pod-labels-in-prometheus
- https://stackoverflow.com/questions/76653997/how-can-i-make-a-new-copy-of-kube-namespace-labels-metric-with-a-different-name
This commit is contained in:
Aliaksandr Valialkin 2023-07-17 16:54:01 -07:00
parent cc54fa2a56
commit 5ace0701d3
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
10 changed files with 241 additions and 49 deletions

View file

@ -225,6 +225,14 @@ func ensureSingleTimeseries(side string, be *metricsql.BinaryOpExpr, tss []*time
func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft, rvsRight, tssLeft, tssRight []*timeseries) ([]*timeseries, []*timeseries, error) {
joinTags := be.JoinModifier.Args
var skipTags []string
if strings.ToLower(be.GroupModifier.Op) == "on" {
skipTags = be.GroupModifier.Args
}
joinPrefix := ""
if be.JoinModifierPrefix != nil {
joinPrefix = be.JoinModifierPrefix.S
}
type tsPair struct {
left *timeseries
right *timeseries
@ -234,7 +242,7 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
resetMetricGroupIfRequired(be, tsLeft)
if len(tssRight) == 1 {
// Easy case - right part contains only a single matching time series.
tsLeft.MetricName.SetTags(joinTags, &tssRight[0].MetricName)
tsLeft.MetricName.SetTags(joinTags, joinPrefix, skipTags, &tssRight[0].MetricName)
rvsLeft = append(rvsLeft, tsLeft)
rvsRight = append(rvsRight, tssRight[0])
continue
@ -249,7 +257,7 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
for _, tsRight := range tssRight {
var tsCopy timeseries
tsCopy.CopyFromShallowTimestamps(tsLeft)
tsCopy.MetricName.SetTags(joinTags, &tsRight.MetricName)
tsCopy.MetricName.SetTags(joinTags, joinPrefix, skipTags, &tsRight.MetricName)
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
pair, ok := m[string(bb.B)]
if !ok {

View file

@ -3585,6 +3585,102 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`vector + vector on group_left(*)`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(
(label_set(time(), "t1", "v123", "t2", "v3"), label_set(10, "t2", "v3", "xxx", "yy"))
+ on (foo, t2) group_left (*)
(label_set(100, "t1", "v1"), label_set(time(), "t2", "v3", "noxxx", "aa"))
)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{2000, 2400, 2800, 3200, 3600, 4000},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("noxxx"),
Value: []byte("aa"),
},
{
Key: []byte("t1"),
Value: []byte("v123"),
},
{
Key: []byte("t2"),
Value: []byte("v3"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1010, 1210, 1410, 1610, 1810, 2010},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("noxxx"),
Value: []byte("aa"),
},
{
Key: []byte("t2"),
Value: []byte("v3"),
},
{
Key: []byte("xxx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`vector + vector on group_left(*) prefix`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(
(label_set(time(), "t1", "v123", "t2", "v3"), label_set(10, "t2", "v3", "xxx", "yy"))
+ on (foo, t2) group_left (*) prefix "abc_"
(label_set(100, "t1", "v1"), label_set(time(), "t2", "v3", "noxxx", "aa"))
)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{2000, 2400, 2800, 3200, 3600, 4000},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("abc_noxxx"),
Value: []byte("aa"),
},
{
Key: []byte("t1"),
Value: []byte("v123"),
},
{
Key: []byte("t2"),
Value: []byte("v3"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1010, 1210, 1410, 1610, 1810, 2010},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("abc_noxxx"),
Value: []byte("aa"),
},
{
Key: []byte("t2"),
Value: []byte("v3"),
},
{
Key: []byte("xxx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`vector + vector on group_left (__name__)`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(

View file

@ -30,6 +30,9 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). See [description for this change](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7094fa38bc207c7bd7330ea8a834310a310ce5e3) for details.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): allow selecting time series matching at least one of multiple `or` filters. For example, `{env="prod",job="a" or env="dev",job="b"}` selects series with either `{env="prod",job="a"}` or `{env="dev",job="b"}` labels. This functionality allows passing the selected series to [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) without the need to use [subqueries](https://docs.victoriametrics.com/MetricsQL.html#subqueries). See [these docs](https://docs.victoriametrics.com/keyConcepts.html#filtering-by-multiple-or-filters).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to preserve metric names for binary operation results via `keep_metric_names` modifier. For example, `({__name__=~"foo|bar"} / 10) keep_metric_names` leaves `foo` and `bar` metric names in division results. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#keep_metric_names). This helps to address issues like [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3710).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to copy all the labels from `one` side of [many-to-one operations](https://prometheus.io/docs/prometheus/latest/querying/operators/#many-to-one-and-one-to-many-vector-matches) by specifying `*` inside `group_left()` or `group_right()`. Also allow adding a prefix for copied label names via `group_left(*) prefix "..."` syntax. For example, the following query copies Kubernetes namespace labels to `kube_pod_info` series and adds `ns_` prefix for the copied label names: `kube_pod_info * on(namespace) group_left(*) prefix "ns_" kube_namespace_labels`. The labels from `on()` list aren't prefixed.
This feature resolves [this](https://stackoverflow.com/questions/76661818/how-to-add-namespace-labels-to-pod-labels-in-prometheus)
and [that](https://stackoverflow.com/questions/76653997/how-can-i-make-a-new-copy-of-kube-namespace-labels-metric-with-a-different-name) questions at StackOverflow.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): update backoff policy on retries to reduce probability of overloading for `source` or `destination` databases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4402).

View file

@ -56,7 +56,7 @@ Feel free [filing a feature request](https://github.com/VictoriaMetrics/Victoria
This functionality can be evaluated at [VictoriaMetrics playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/)
or at your own [VictoriaMetrics instance](https://docs.victoriametrics.com/#how-to-start-victoriametrics).
The list of MetricsQL features:
The list of MetricsQL features on top of PromQL:
* Graphite-compatible filters can be passed via `{__graphite__="foo.*.bar"}` syntax.
See [these docs](https://docs.victoriametrics.com/#selecting-graphite-metrics).
@ -70,6 +70,13 @@ The list of MetricsQL features:
* [Series selectors](https://docs.victoriametrics.com/keyConcepts.html#filtering) accept multiple `or` filters. For example, `{env="prod",job="a" or env="dev",job="b"}`
selects series with either `{env="prod",job="a"}` or `{env="dev",job="b"}` labels.
See [these docs](https://docs.victoriametrics.com/keyConcepts.html#filtering-by-multiple-or-filters) for details.
* Support for `group_left(*)` and `group_right(*)` for copying all the labels from time series on the `one` side
of [many-to-one operations](https://prometheus.io/docs/prometheus/latest/querying/operators/#many-to-one-and-one-to-many-vector-matches).
The copied label names may clash with the existing label names, so MetricsQL provides an ability to add prefix to the copied metric names
via `group_left(*) prefix "..."` syntax.
For example, the following query copies all the `namespace`-related labels from `kube_namespace_labels` to `kube_pod_info` series,
while adding `ns_` prefix to the copied labels: `kube_pod_info * on(namespace) group_left(*) prefix "ns_" kube_namespace_labels`.
Labels from the `on()` list aren't copied.
* [Aggregate functions](#aggregate-functions) accept arbitrary number of args.
For example, `avg(q1, q2, q3)` would return the average values for every point across time series returned by `q1`, `q2` and `q3`.
* [@ modifier](https://prometheus.io/docs/prometheus/latest/querying/basics/#modifier) can be put anywhere in the query.

2
go.mod
View file

@ -12,7 +12,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.2.0
github.com/VictoriaMetrics/metrics v1.24.0
github.com/VictoriaMetrics/metricsql v0.58.2
github.com/VictoriaMetrics/metricsql v0.59.0
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.27
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.71

4
go.sum
View file

@ -69,8 +69,8 @@ github.com/VictoriaMetrics/fasthttp v1.2.0 h1:nd9Wng4DlNtaI27WlYh5mGXCJOmee/2c2b
github.com/VictoriaMetrics/fasthttp v1.2.0/go.mod h1:zv5YSmasAoSyv8sBVexfArzFDIGGTN4TfCKAtAw7IfE=
github.com/VictoriaMetrics/metrics v1.24.0 h1:ILavebReOjYctAGY5QU2F9X0MYvkcrG3aEn2RKa1Zkw=
github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys=
github.com/VictoriaMetrics/metricsql v0.58.2 h1:xECDKK13ZR7LCpepVM7nuzJTnEkQI3r0qNxQIY6uv60=
github.com/VictoriaMetrics/metricsql v0.58.2/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I=
github.com/VictoriaMetrics/metricsql v0.59.0 h1:0kX7hSCcFFciVEOfoJsIkmlaLJkgoP/lmAsqlJTtTgg=
github.com/VictoriaMetrics/metricsql v0.59.0/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

View file

@ -318,8 +318,20 @@ func (mn *MetricName) GetTagValue(tagKey string) []byte {
}
// SetTags sets tags from src with keys matching addTags.
func (mn *MetricName) SetTags(addTags []string, src *MetricName) {
//
// It adds prefix to copied label names.
// skipTags contains a list of tags, which must be skipped.
func (mn *MetricName) SetTags(addTags []string, prefix string, skipTags []string, src *MetricName) {
if len(addTags) == 1 && addTags[0] == "*" {
// Special case for copying all the tags except of skipTags from src to mn.
mn.setAllTags(prefix, skipTags, src)
return
}
bb := bbPool.Get()
for _, tagName := range addTags {
if containsString(skipTags, tagName) {
continue
}
if tagName == string(metricGroupTagKey) {
mn.MetricGroup = append(mn.MetricGroup[:0], src.MetricGroup...)
continue
@ -336,19 +348,47 @@ func (mn *MetricName) SetTags(addTags []string, src *MetricName) {
mn.RemoveTag(tagName)
continue
}
found := false
for i := range mn.Tags {
t := &mn.Tags[i]
if string(t.Key) == tagName {
t.Value = append(t.Value[:0], srcTag.Value...)
found = true
break
}
}
if !found {
mn.AddTagBytes(srcTag.Key, srcTag.Value)
bb.B = append(bb.B[:0], prefix...)
bb.B = append(bb.B, tagName...)
mn.SetTagBytes(bb.B, srcTag.Value)
}
bbPool.Put(bb)
}
var bbPool bytesutil.ByteBufferPool
// SetTagBytes sets tag with the given key to the given value.
func (mn *MetricName) SetTagBytes(key, value []byte) {
for i := range mn.Tags {
t := &mn.Tags[i]
if string(t.Key) == string(key) {
t.Value = append(t.Value[:0], value...)
return
}
}
mn.AddTagBytes(key, value)
}
func (mn *MetricName) setAllTags(prefix string, skipTags []string, src *MetricName) {
bb := bbPool.Get()
for _, tag := range src.Tags {
if containsString(skipTags, bytesutil.ToUnsafeString(tag.Key)) {
continue
}
bb.B = append(bb.B[:0], prefix...)
bb.B = append(bb.B, tag.Key...)
mn.SetTagBytes(bb.B, tag.Value)
}
bbPool.Put(bb)
}
func containsString(a []string, s string) bool {
for _, x := range a {
if x == s {
return true
}
}
return false
}
func hasTag(tags []string, key []byte) bool {

View file

@ -1343,12 +1343,3 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
t.Fatalf("cannot remove %q: %s", path, err)
}
}
func containsString(a []string, s string) bool {
for i := range a {
if a[i] == s {
return true
}
}
return false
}

View file

@ -311,7 +311,7 @@ func (p *parser) parseWithArgExpr() (*withArgExpr, error) {
}
if p.lex.Token == "(" {
// Parse func args.
args, err := p.parseIdentList()
args, err := p.parseIdentList(false)
if err != nil {
return nil, fmt.Errorf(`withArgExpr: cannot parse args for %q: %s`, wa.Name, err)
}
@ -365,16 +365,26 @@ func (p *parser) parseExpr() (Expr, error) {
}
}
if isBinaryOpGroupModifier(p.lex.Token) {
if err := p.parseModifierExpr(&be.GroupModifier); err != nil {
if err := p.parseModifierExpr(&be.GroupModifier, false); err != nil {
return nil, err
}
if isBinaryOpJoinModifier(p.lex.Token) {
if isBinaryOpLogicalSet(be.Op) {
return nil, fmt.Errorf(`modifier %q cannot be applied to %q`, p.lex.Token, be.Op)
}
if err := p.parseModifierExpr(&be.JoinModifier); err != nil {
if err := p.parseModifierExpr(&be.JoinModifier, true); err != nil {
return nil, err
}
if strings.ToLower(p.lex.Token) == "prefix" {
if err := p.lex.Next(); err != nil {
return nil, fmt.Errorf("cannot read prefix for %s: %w", be.JoinModifier.AppendString(nil), err)
}
se, err := p.parseStringExpr()
if err != nil {
return nil, fmt.Errorf("cannot parse prefix for %s: %w", be.JoinModifier.AppendString(nil), err)
}
be.JoinModifierPrefix = se
}
}
}
e2, err := p.parseSingleExpr()
@ -608,7 +618,7 @@ funcPrefixLabel:
if !isAggrFuncModifier(p.lex.Token) {
return nil, fmt.Errorf(`AggrFuncExpr: unexpected token %q; want aggregate func modifier`, p.lex.Token)
}
if err := p.parseModifierExpr(&ae.Modifier); err != nil {
if err := p.parseModifierExpr(&ae.Modifier, false); err != nil {
return nil, err
}
}
@ -623,7 +633,7 @@ funcArgsLabel:
// Verify whether func suffix exists.
if ae.Modifier.Op == "" && isAggrFuncModifier(p.lex.Token) {
if err := p.parseModifierExpr(&ae.Modifier); err != nil {
if err := p.parseModifierExpr(&ae.Modifier, false); err != nil {
return nil, err
}
}
@ -665,6 +675,18 @@ func expandWithExpr(was []*withArgExpr, e Expr) (Expr, error) {
if err != nil {
return nil, err
}
var joinModifierPrefix *StringExpr
if t.JoinModifierPrefix != nil {
jmp, err := expandWithExpr(was, t.JoinModifierPrefix)
if err != nil {
return nil, err
}
se, ok := jmp.(*StringExpr)
if !ok {
return nil, fmt.Errorf("unexpected prefix for %s; want quoted string; got %s", t.JoinModifier.AppendString(nil), jmp.AppendString(nil))
}
joinModifierPrefix = se
}
if t.Op == "+" {
lse, lok := left.(*StringExpr)
rse, rok := right.(*StringExpr)
@ -680,6 +702,7 @@ func expandWithExpr(was []*withArgExpr, e Expr) (Expr, error) {
be.Right = right
be.GroupModifier.Args = groupModifierArgs
be.JoinModifier.Args = joinModifierArgs
be.JoinModifierPrefix = joinModifierPrefix
pe := parensExpr{&be}
return &pe, nil
case *FuncExpr:
@ -1084,7 +1107,7 @@ func isKeepMetricNames(token string) bool {
return token == "keep_metric_names"
}
func (p *parser) parseModifierExpr(me *ModifierExpr) error {
func (p *parser) parseModifierExpr(me *ModifierExpr, allowStar bool) error {
if !isIdentPrefix(p.lex.Token) {
return fmt.Errorf(`ModifierExpr: unexpected token %q; want "ident"`, p.lex.Token)
}
@ -1098,25 +1121,40 @@ func (p *parser) parseModifierExpr(me *ModifierExpr) error {
// join modifier may miss ident list.
return nil
}
args, err := p.parseIdentList()
args, err := p.parseIdentList(allowStar)
if err != nil {
return err
return fmt.Errorf("ModifierExpr: %w", err)
}
me.Args = args
return nil
}
func (p *parser) parseIdentList() ([]string, error) {
func (p *parser) parseIdentList(allowStar bool) ([]string, error) {
if p.lex.Token != "(" {
return nil, fmt.Errorf(`identList: unexpected token %q; want "("`, p.lex.Token)
}
var idents []string
for {
if err := p.lex.Next(); err != nil {
return nil, err
}
if allowStar && p.lex.Token == "*" {
if err := p.lex.Next(); err != nil {
return nil, err
}
if p.lex.Token != ")" {
return nil, fmt.Errorf(`identList: unexpected token %q after "*"; want ")"`, p.lex.Token)
}
if err := p.lex.Next(); err != nil {
return nil, err
}
return []string{"*"}, nil
}
var idents []string
for {
if p.lex.Token == ")" {
goto closeParensLabel
if err := p.lex.Next(); err != nil {
return nil, err
}
return idents, nil
}
if !isIdentPrefix(p.lex.Token) {
return nil, fmt.Errorf(`identList: unexpected token %q; want "ident"`, p.lex.Token)
@ -1127,19 +1165,15 @@ func (p *parser) parseIdentList() ([]string, error) {
}
switch p.lex.Token {
case ",":
continue
if err := p.lex.Next(); err != nil {
return nil, err
}
case ")":
goto closeParensLabel
continue
default:
return nil, fmt.Errorf(`identList: unexpected token %q; want ",", ")"`, p.lex.Token)
}
}
closeParensLabel:
if err := p.lex.Next(); err != nil {
return nil, err
}
return idents, nil
}
func (p *parser) parseArgListExpr() ([]Expr, error) {
@ -1628,6 +1662,11 @@ type BinaryOpExpr struct {
// JoinModifier contains modifier such as "group_left" or "group_right".
JoinModifier ModifierExpr
// JoinModifierPrefix is an optional prefix to add to labels specified inside group_left() or group_right() lists.
//
// The syntax is `group_left(foo,bar) prefix "abc"`
JoinModifierPrefix *StringExpr
// If KeepMetricNames is set to true, then the operation should keep metric names.
KeepMetricNames bool
@ -1668,6 +1707,10 @@ func (be *BinaryOpExpr) appendStringNoKeepMetricNames(dst []byte) []byte {
if be.JoinModifier.Op != "" {
dst = append(dst, ' ')
dst = be.JoinModifier.AppendString(dst)
if prefix := be.JoinModifierPrefix; prefix != nil {
dst = append(dst, " prefix "...)
dst = prefix.AppendString(dst)
}
}
dst = append(dst, ' ')
if be.needRightParens() {
@ -1739,7 +1782,11 @@ func (me *ModifierExpr) AppendString(dst []byte) []byte {
dst = append(dst, me.Op...)
dst = append(dst, '(')
for i, arg := range me.Args {
dst = appendEscapedIdent(dst, arg)
if arg == "*" {
dst = append(dst, '*')
} else {
dst = appendEscapedIdent(dst, arg)
}
if i+1 < len(me.Args) {
dst = append(dst, ',')
}

2
vendor/modules.txt vendored
View file

@ -99,7 +99,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.24.0
## explicit; go 1.20
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.58.2
# github.com/VictoriaMetrics/metricsql v0.59.0
## explicit; go 1.13
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop