mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmselect/promql: add prometheus_buckets
function for converting the upcoming histogram buckets from github.com/VictoriaMetrics/metrics
to Prometheus-compatible buckets
This commit is contained in:
parent
17d08c1fe0
commit
5f6f03c692
2 changed files with 199 additions and 0 deletions
|
@ -2417,6 +2417,143 @@ func TestExecSuccess(t *testing.T) {
|
||||||
resultExpected := []netstorage.Result{}
|
resultExpected := []netstorage.Result{}
|
||||||
f(q, resultExpected)
|
f(q, resultExpected)
|
||||||
})
|
})
|
||||||
|
t.Run(`prometheus_buckets(missing-vmrange)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(prometheus_buckets((
|
||||||
|
alias(label_set(90, "foo", "bar", "le", "0"), "xxx"),
|
||||||
|
alias(label_set(time()/20, "foo", "bar", "le", "0.2"), "xxx"),
|
||||||
|
alias(label_set(time()/100, "foo", "bar", "vmrange", "foobar"), "xxx"),
|
||||||
|
alias(label_set(time()/100, "foo", "bar", "vmrange", "30...foobar"), "xxx"),
|
||||||
|
alias(label_set(time()/100, "foo", "bar", "vmrange", "30...40"), "xxx"),
|
||||||
|
alias(label_set(time()/40, "foo", "bar", "vmrange", "900...1000", "le", "2343"), "yyy"),
|
||||||
|
alias(label_set(time()/10, "foo", "bar", "vmrange", "1000...Inf", "le", "2343"), "yyy"),
|
||||||
|
)))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{10, 12, 14, 16, 18, 20},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.MetricGroup = []byte("xxx")
|
||||||
|
r1.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("le"),
|
||||||
|
Value: []byte("40"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r2 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{25, 30, 35, 40, 45, 50},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r2.MetricName.MetricGroup = []byte("yyy")
|
||||||
|
r2.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("le"),
|
||||||
|
Value: []byte("1000"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r3 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{125, 150, 175, 200, 225, 250},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r3.MetricName.MetricGroup = []byte("yyy")
|
||||||
|
r3.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("le"),
|
||||||
|
Value: []byte("Inf"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
resultExpected := []netstorage.Result{r1, r2, r3}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
|
t.Run(`prometheus_buckets(valid)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(prometheus_buckets((
|
||||||
|
alias(label_set(90, "foo", "bar", "vmrange", "0...0"), "xxx"),
|
||||||
|
alias(label_set(time()/20, "foo", "bar", "vmrange", "0.1...0.2"), "xxx"),
|
||||||
|
alias(label_set(time()/100, "foo", "bar", "vmrange", "30...40"), "xxx"),
|
||||||
|
alias(label_set(time()/10, "foo", "bar", "vmrange", "1000...Inf"), "xxx"),
|
||||||
|
)))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{90, 90, 90, 90, 90, 90},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.MetricGroup = []byte("xxx")
|
||||||
|
r1.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("le"),
|
||||||
|
Value: []byte("0"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r2 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{140, 150, 160, 170, 180, 190},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r2.MetricName.MetricGroup = []byte("xxx")
|
||||||
|
r2.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("le"),
|
||||||
|
Value: []byte("0.2"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r3 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{150, 162, 174, 186, 198, 210},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r3.MetricName.MetricGroup = []byte("xxx")
|
||||||
|
r3.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("le"),
|
||||||
|
Value: []byte("40"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r4 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{250, 282, 314, 346, 378, 410},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r4.MetricName.MetricGroup = []byte("xxx")
|
||||||
|
r4.MetricName.Tags = []storage.Tag{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("le"),
|
||||||
|
Value: []byte("Inf"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
resultExpected := []netstorage.Result{r1, r2, r3, r4}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
t.Run(`median_over_time()`, func(t *testing.T) {
|
t.Run(`median_over_time()`, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := `median_over_time({})`
|
q := `median_over_time({})`
|
||||||
|
|
|
@ -91,6 +91,7 @@ var transformFuncs = map[string]transformFunc{
|
||||||
"cos": newTransformFuncOneArg(transformCos),
|
"cos": newTransformFuncOneArg(transformCos),
|
||||||
"asin": newTransformFuncOneArg(transformAsin),
|
"asin": newTransformFuncOneArg(transformAsin),
|
||||||
"acos": newTransformFuncOneArg(transformAcos),
|
"acos": newTransformFuncOneArg(transformAcos),
|
||||||
|
"prometheus_buckets": transformPrometheusBuckets,
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTransformFunc(s string) transformFunc {
|
func getTransformFunc(s string) transformFunc {
|
||||||
|
@ -272,6 +273,67 @@ func transformFloor(v float64) float64 {
|
||||||
return math.Floor(v)
|
return math.Floor(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
|
args := tfa.args
|
||||||
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group timeseries by MetricGroup+tags excluding `vmrange` tag.
|
||||||
|
type x struct {
|
||||||
|
leStr string
|
||||||
|
le float64
|
||||||
|
ts *timeseries
|
||||||
|
}
|
||||||
|
m := make(map[string][]x)
|
||||||
|
bb := bbPool.Get()
|
||||||
|
defer bbPool.Put(bb)
|
||||||
|
for _, ts := range args[0] {
|
||||||
|
vmrange := ts.MetricName.GetTagValue("vmrange")
|
||||||
|
if len(vmrange) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
n := strings.Index(bytesutil.ToUnsafeString(vmrange), "...")
|
||||||
|
if n < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
leStr := string(vmrange[n+len("..."):])
|
||||||
|
le, err := strconv.ParseFloat(leStr, 64)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ts.MetricName.RemoveTag("vmrange")
|
||||||
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
||||||
|
m[string(bb.B)] = append(m[string(bb.B)], x{
|
||||||
|
leStr: leStr,
|
||||||
|
le: le,
|
||||||
|
ts: ts,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
rvs := make([]*timeseries, 0, len(args[0]))
|
||||||
|
for _, xss := range m {
|
||||||
|
sort.Slice(xss, func(i, j int) bool { return xss[i].le < xss[j].le })
|
||||||
|
for i := range xss[0].ts.Values {
|
||||||
|
count := float64(0)
|
||||||
|
for _, xs := range xss {
|
||||||
|
ts := xs.ts
|
||||||
|
v := ts.Values[i]
|
||||||
|
if !math.IsNaN(v) {
|
||||||
|
count += v
|
||||||
|
}
|
||||||
|
ts.MetricName.RemoveTag("le")
|
||||||
|
ts.MetricName.AddTag("le", xs.leStr)
|
||||||
|
ts.Values[i] = count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := range xss {
|
||||||
|
rvs = append(rvs, xss[i].ts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rvs, nil
|
||||||
|
}
|
||||||
|
|
||||||
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||||
args := tfa.args
|
args := tfa.args
|
||||||
if err := expectTransformArgsNum(args, 2); err != nil {
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue