mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
app/vmselect/promql: add label_transform(q, label, regexp, replacement)
function for replacing all the occurences of regexp with replacement in the given label for q
This commit is contained in:
parent
398ec4383e
commit
fb140eda33
3 changed files with 71 additions and 7 deletions
|
@ -1266,6 +1266,34 @@ func TestExecSuccess(t *testing.T) {
|
||||||
resultExpected := []netstorage.Result{r}
|
resultExpected := []netstorage.Result{r}
|
||||||
f(q, resultExpected)
|
f(q, resultExpected)
|
||||||
})
|
})
|
||||||
|
t.Run(`label_transform(mismatch)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `label_transform(time(), "__name__", "foobar", "xx")`
|
||||||
|
r := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
resultExpected := []netstorage.Result{r}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`label_transform(match)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `label_transform(
|
||||||
|
label_set(time(), "foo", "a.bar.baz"),
|
||||||
|
"foo", "\\.", "-")`
|
||||||
|
r := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("a-bar-baz"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
t.Run(`label_replace(mismatch)`, func(t *testing.T) {
|
t.Run(`label_replace(mismatch)`, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := `label_replace(time(), "__name__", "x${1}y", "foo", ".+")`
|
q := `label_replace(time(), "__name__", "x${1}y", "foo", ".+")`
|
||||||
|
@ -3461,6 +3489,7 @@ func TestExecError(t *testing.T) {
|
||||||
f(`hour(1,2)`)
|
f(`hour(1,2)`)
|
||||||
f(`label_join()`)
|
f(`label_join()`)
|
||||||
f(`label_replace(1)`)
|
f(`label_replace(1)`)
|
||||||
|
f(`label_transform(1)`)
|
||||||
f(`label_set()`)
|
f(`label_set()`)
|
||||||
f(`label_set(1, "foo")`)
|
f(`label_set(1, "foo")`)
|
||||||
f(`label_del()`)
|
f(`label_del()`)
|
||||||
|
@ -3535,6 +3564,10 @@ func TestExecError(t *testing.T) {
|
||||||
f(`label_replace(1, "foo", "bar", 4, 5)`)
|
f(`label_replace(1, "foo", "bar", 4, 5)`)
|
||||||
f(`label_replace(1, "foo", "bar", "baz", 5)`)
|
f(`label_replace(1, "foo", "bar", "baz", 5)`)
|
||||||
f(`label_replace(1, "foo", "bar", "baz", "invalid(regexp")`)
|
f(`label_replace(1, "foo", "bar", "baz", "invalid(regexp")`)
|
||||||
|
f(`label_transform(1, 2, 3, 4)`)
|
||||||
|
f(`label_transform(1, "foo", 3, 4)`)
|
||||||
|
f(`label_transform(1, "foo", "bar", 4)`)
|
||||||
|
f(`label_transform(1, "foo", "invalid(regexp", "baz`)
|
||||||
|
|
||||||
// Duplicate timeseries
|
// Duplicate timeseries
|
||||||
f(`(label_set(1, "foo", "bar") or label_set(2, "foo", "baz"))
|
f(`(label_set(1, "foo", "bar") or label_set(2, "foo", "baz"))
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package promql
|
package promql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -10,12 +9,16 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func compileRegexpAnchored(re string) (*regexp.Regexp, error) {
|
func compileRegexpAnchored(re string) (*regexp.Regexp, error) {
|
||||||
|
reAnchored := "^(?:" + re + ")$"
|
||||||
|
return compileRegexp(reAnchored)
|
||||||
|
}
|
||||||
|
|
||||||
|
func compileRegexp(re string) (*regexp.Regexp, error) {
|
||||||
rcv := regexpCacheV.Get(re)
|
rcv := regexpCacheV.Get(re)
|
||||||
if rcv != nil {
|
if rcv != nil {
|
||||||
return rcv.r, rcv.err
|
return rcv.r, rcv.err
|
||||||
}
|
}
|
||||||
regexAnchored := fmt.Sprintf("^(?:%s)$", re)
|
r, err := regexp.Compile(re)
|
||||||
r, err := regexp.Compile(regexAnchored)
|
|
||||||
rcv = ®expCacheValue{
|
rcv = ®expCacheValue{
|
||||||
r: r,
|
r: r,
|
||||||
err: err,
|
err: err,
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -61,6 +62,7 @@ var transformFuncs = map[string]transformFunc{
|
||||||
"label_keep": transformLabelKeep,
|
"label_keep": transformLabelKeep,
|
||||||
"label_copy": transformLabelCopy,
|
"label_copy": transformLabelCopy,
|
||||||
"label_move": transformLabelMove,
|
"label_move": transformLabelMove,
|
||||||
|
"label_transform": transformLabelTransform,
|
||||||
"union": transformUnion,
|
"union": transformUnion,
|
||||||
"": transformUnion, // empty func is a synonim to union
|
"": transformUnion, // empty func is a synonim to union
|
||||||
"keep_last_value": transformKeepLastValue,
|
"keep_last_value": transformKeepLastValue,
|
||||||
|
@ -816,6 +818,31 @@ func transformLabelJoin(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
return rvs, nil
|
return rvs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func transformLabelTransform(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
|
args := tfa.args
|
||||||
|
if err := expectTransformArgsNum(args, 4); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
label, err := getString(args[1], 1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
regex, err := getString(args[2], 2)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
replacement, err := getString(args[3], 3)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := compileRegexp(regex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf(`cannot compile regex %q: %s`, regex, err)
|
||||||
|
}
|
||||||
|
return labelReplace(args[0], label, r, label, replacement)
|
||||||
|
}
|
||||||
|
|
||||||
func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
|
func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
args := tfa.args
|
args := tfa.args
|
||||||
if err := expectTransformArgsNum(args, 5); err != nil {
|
if err := expectTransformArgsNum(args, 5); err != nil {
|
||||||
|
@ -842,11 +869,12 @@ func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf(`cannot compile regex %q: %s`, regex, err)
|
return nil, fmt.Errorf(`cannot compile regex %q: %s`, regex, err)
|
||||||
}
|
}
|
||||||
|
return labelReplace(args[0], srcLabel, r, dstLabel, replacement)
|
||||||
|
}
|
||||||
|
|
||||||
|
func labelReplace(tss []*timeseries, srcLabel string, r *regexp.Regexp, dstLabel, replacement string) ([]*timeseries, error) {
|
||||||
replacementBytes := []byte(replacement)
|
replacementBytes := []byte(replacement)
|
||||||
|
for _, ts := range tss {
|
||||||
rvs := args[0]
|
|
||||||
for _, ts := range rvs {
|
|
||||||
mn := &ts.MetricName
|
mn := &ts.MetricName
|
||||||
dstValue := getDstValue(mn, dstLabel)
|
dstValue := getDstValue(mn, dstLabel)
|
||||||
srcValue := mn.GetTagValue(srcLabel)
|
srcValue := mn.GetTagValue(srcLabel)
|
||||||
|
@ -856,7 +884,7 @@ func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
mn.RemoveTag(dstLabel)
|
mn.RemoveTag(dstLabel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return rvs, nil
|
return tss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func transformLn(v float64) float64 {
|
func transformLn(v float64) float64 {
|
||||||
|
|
Loading…
Reference in a new issue