app/vmselect/promql: add histogram_share(le, buckets) function

This commit is contained in:
Aliaksandr Valialkin 2020-01-04 12:44:09 +02:00
parent b1ded7cf9a
commit f409f2d050
4 changed files with 401 additions and 54 deletions

View file

@ -2308,18 +2308,36 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`histogram_share(scalar)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(123, time())`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-no-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6, label_set(100, "foo", "bar"))`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-no-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(123, label_set(100, "foo", "bar"))`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-invalid-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6, label_set(100, "le", "foobar"))`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-invalid-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(50, label_set(100, "le", "foobar"))`
resultExpected := []netstorage.Result{}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-valid-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6, label_set(100, "le", "200"))`
@ -2331,6 +2349,39 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-valid-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(80, label_set(100, "le", "200"))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.4, 0.4, 0.4, 0.4, 0.4, 0.4},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-valid-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(200, label_set(100, "le", "200"))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-valid-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(300, label_set(100, "le", "200"))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-valid-le, boundsLabel)`, func(t *testing.T) {
t.Parallel()
q := `sort(histogram_quantile(0.6, label_set(100, "le", "200"), "foobar"))`
@ -2349,7 +2400,6 @@ func TestExecSuccess(t *testing.T) {
Timestamps: timestampsExpected,
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{200, 200, 200, 200, 200, 200},
Timestamps: timestampsExpected,
}
@ -2360,6 +2410,34 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-valid-le, boundsLabel)`, func(t *testing.T) {
t.Parallel()
q := `sort(histogram_share(120, label_set(100, "le", "200"), "foobar"))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foobar"),
Value: []byte("lower"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.6, 0.6, 0.6, 0.6, 0.6, 0.6},
Timestamps: timestampsExpected,
}
r3 := netstorage.Result{
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{{
Key: []byte("foobar"),
Value: []byte("upper"),
}}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-valid-le-max-phi)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(1, (
@ -2374,6 +2452,20 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-valid-le-max-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(200, (
label_set(100, "le", "200"),
label_set(0, "le", "55"),
))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-valid-le-min-phi)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0, (
@ -2388,6 +2480,48 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-valid-le-min-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(0, (
label_set(100, "le", "200"),
label_set(0, "le", "55"),
))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-valid-le-low-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(55, (
label_set(100, "le", "200"),
label_set(0, "le", "55"),
))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(single-value-valid-le-mid-le)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(105, (
label_set(100, "le", "200"),
label_set(0, "le", "55"),
))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.3448275862068966, 0.3448275862068966, 0.3448275862068966, 0.3448275862068966, 0.3448275862068966, 0.3448275862068966},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(single-value-valid-le-min-phi-no-zero-bucket)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0, label_set(100, "le", "200"))`
@ -2410,6 +2544,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(scalar-phi)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(time() / 8, label_set(100, "le", "200"))`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.625, 0.75, 0.875, 1, 1, 1},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(valid)`, func(t *testing.T) {
t.Parallel()
q := `sort(histogram_quantile(0.6,
@ -2440,6 +2585,36 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`histogram_share(valid)`, func(t *testing.T) {
t.Parallel()
q := `sort(histogram_share(25,
label_set(90, "foo", "bar", "le", "10")
or label_set(100, "foo", "bar", "le", "30")
or label_set(300, "foo", "bar", "le", "+Inf")
or label_set(200, "tag", "xx", "le", "10")
or label_set(300, "tag", "xx", "le", "30")
))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.325, 0.325, 0.325, 0.325, 0.325, 0.325},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.9166666666666666, 0.9166666666666666, 0.9166666666666666, 0.9166666666666666, 0.9166666666666666, 0.9166666666666666},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("tag"),
Value: []byte("xx"),
}}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`histogram_quantile(negative-bucket-count)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6,
@ -2468,7 +2643,7 @@ func TestExecSuccess(t *testing.T) {
)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10, 10, 10, 10, 10, 10},
Values: []float64{30, 30, 30, 30, 30, 30},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
@ -2497,6 +2672,25 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_share(normal-bucket-count)`, func(t *testing.T) {
t.Parallel()
q := `histogram_share(22,
label_set(0, "foo", "bar", "le", "10")
or label_set(100, "foo", "bar", "le", "30")
or label_set(300, "foo", "bar", "le", "+Inf")
)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.2, 0.2, 0.2, 0.2, 0.2, 0.2},
Timestamps: timestampsExpected,
}
r.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram_quantile(normal-bucket-count, boundsLabel)`, func(t *testing.T) {
t.Parallel()
q := `sort(histogram_quantile(0.2,
@ -2547,6 +2741,56 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`histogram_share(normal-bucket-count, boundsLabel)`, func(t *testing.T) {
t.Parallel()
q := `sort(histogram_share(22,
label_set(0, "foo", "bar", "le", "10")
or label_set(100, "foo", "bar", "le", "30")
or label_set(300, "foo", "bar", "le", "+Inf"),
"xxx"
))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xxx"),
Value: []byte("lower"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.2, 0.2, 0.2, 0.2, 0.2, 0.2},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{{
Key: []byte("foo"),
Value: []byte("bar"),
}}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333, 0.3333333333333333},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("xxx"),
Value: []byte("upper"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`histogram_quantile(zero-bucket-count)`, func(t *testing.T) {
t.Parallel()
q := `histogram_quantile(0.6,

View file

@ -93,6 +93,7 @@ var transformFuncs = map[string]transformFunc{
"asin": newTransformFuncOneArg(transformAsin),
"acos": newTransformFuncOneArg(transformAcos),
"prometheus_buckets": transformPrometheusBuckets,
"histogram_share": transformHistogramShare,
}
func getTransformFunc(s string) transformFunc {
@ -399,6 +400,104 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
return rvs
}
func transformHistogramShare(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args
if len(args) < 2 || len(args) > 3 {
return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args))
}
les, err := getScalar(args[0], 0)
if err != nil {
return nil, fmt.Errorf("cannot parse le: %s", err)
}
// Convert buckets with `vmrange` labels to buckets with `le` labels.
tss := vmrangeBucketsToLE(args[1])
// Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details.
var boundsLabel string
if len(args) > 2 {
s, err := getString(args[2], 2)
if err != nil {
return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %s", err)
}
boundsLabel = s
}
// Group metrics by all tags excluding "le"
m := groupLeTimeseries(tss)
// Calculate share for les
share := func(i int, les []float64, xss []leTimeseries) (q, lower, upper float64) {
leReq := les[i]
if math.IsNaN(leReq) || len(xss) == 0 {
return nan, nan, nan
}
fixBrokenBuckets(i, xss)
if leReq < 0 {
return 0, 0, 0
}
if math.IsInf(leReq, 1) {
return 1, 1, 1
}
for j, xs := range xss {
v := xs.ts.Values[i]
le := xs.le
if leReq < le {
continue
}
// precondition: leReq >= le
if j+1 >= len(xss) {
return 1, 1, 1
}
vNext := xss[j+1].ts.Values[i]
leNext := xss[j+1].le
if math.IsInf(leNext, 1) {
return v / vNext, v / vNext, 1
}
vLast := xss[len(xss)-1].ts.Values[i]
lower = v / vLast
upper = vNext / vLast
q = lower + (vNext-v)/vLast*(leReq-le)/(leNext-le)
return q, lower, upper
}
leLast := xss[len(xss)-1].le
return leReq / leLast, 0, 1
}
rvs := make([]*timeseries, 0, len(m))
for _, xss := range m {
sort.Slice(xss, func(i, j int) bool {
return xss[i].le < xss[j].le
})
dst := xss[0].ts
var tsLower, tsUpper *timeseries
if len(boundsLabel) > 0 {
tsLower = &timeseries{}
tsLower.CopyFromShallowTimestamps(dst)
tsLower.MetricName.RemoveTag(boundsLabel)
tsLower.MetricName.AddTag(boundsLabel, "lower")
tsUpper = &timeseries{}
tsUpper.CopyFromShallowTimestamps(dst)
tsUpper.MetricName.RemoveTag(boundsLabel)
tsUpper.MetricName.AddTag(boundsLabel, "upper")
}
for i := range dst.Values {
q, lower, upper := share(i, les, xss)
dst.Values[i] = q
if len(boundsLabel) > 0 {
tsLower.Values[i] = lower
tsUpper.Values[i] = upper
}
}
rvs = append(rvs, dst)
if len(boundsLabel) > 0 {
rvs = append(rvs, tsLower)
rvs = append(rvs, tsUpper)
}
}
return rvs, nil
}
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args
if len(args) < 2 || len(args) > 3 {
@ -423,73 +522,35 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
}
// Group metrics by all tags excluding "le"
type x struct {
le float64
ts *timeseries
}
m := make(map[string][]x)
bb := bbPool.Get()
for _, ts := range tss {
tagValue := ts.MetricName.GetTagValue("le")
if len(tagValue) == 0 {
continue
}
le, err := strconv.ParseFloat(bytesutil.ToUnsafeString(tagValue), 64)
if err != nil {
continue
}
ts.MetricName.ResetMetricGroup()
ts.MetricName.RemoveTag("le")
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
m[string(bb.B)] = append(m[string(bb.B)], x{
le: le,
ts: ts,
})
}
bbPool.Put(bb)
m := groupLeTimeseries(tss)
// Calculate quantile for each group in m
lastNonInf := func(i int, xss []x) float64 {
lastNonInf := func(i int, xss []leTimeseries) float64 {
for len(xss) > 0 {
xsLast := xss[len(xss)-1]
v := xsLast.ts.Values[i]
if v == 0 {
return nan
}
if !math.IsNaN(v) && !math.IsInf(xsLast.le, 0) {
if !math.IsInf(xsLast.le, 0) {
return xsLast.le
}
xss = xss[:len(xss)-1]
}
return nan
}
quantile := func(i int, phis []float64, xss []x) (q, lower, upper float64) {
quantile := func(i int, phis []float64, xss []leTimeseries) (q, lower, upper float64) {
phi := phis[i]
if math.IsNaN(phi) {
return nan, nan, nan
}
// Fix broken buckets.
// They are already sorted by le, so their values must be in ascending order,
// since the next bucket value includes all the previous buckets.
vPrev := float64(0)
for _, xs := range xss {
v := xs.ts.Values[i]
if v < vPrev {
xs.ts.Values[i] = vPrev
} else if !math.IsNaN(v) {
vPrev = v
}
}
vLast := nan
for len(xss) > 0 {
fixBrokenBuckets(i, xss)
vLast := float64(0)
if len(xss) > 0 {
vLast = xss[len(xss)-1].ts.Values[i]
if !math.IsNaN(vLast) {
break
}
xss = xss[:len(xss)-1]
}
if vLast == 0 || math.IsNaN(vLast) {
if vLast == 0 {
return nan, nan, nan
}
if phi < 0 {
@ -499,15 +560,10 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
return inf, vLast, inf
}
vReq := vLast * phi
vPrev = 0
vPrev := float64(0)
lePrev := float64(0)
for _, xs := range xss {
v := xs.ts.Values[i]
if math.IsNaN(v) {
// Skip NaNs - they may appear if the selected time range
// contains multiple different bucket sets.
continue
}
le := xs.le
if v <= 0 {
// Skip zero buckets.
@ -566,6 +622,50 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
return rvs, nil
}
type leTimeseries struct {
le float64
ts *timeseries
}
func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries {
m := make(map[string][]leTimeseries)
bb := bbPool.Get()
for _, ts := range tss {
tagValue := ts.MetricName.GetTagValue("le")
if len(tagValue) == 0 {
continue
}
le, err := strconv.ParseFloat(bytesutil.ToUnsafeString(tagValue), 64)
if err != nil {
continue
}
ts.MetricName.ResetMetricGroup()
ts.MetricName.RemoveTag("le")
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
m[string(bb.B)] = append(m[string(bb.B)], leTimeseries{
le: le,
ts: ts,
})
}
bbPool.Put(bb)
return m
}
func fixBrokenBuckets(i int, xss []leTimeseries) {
// Fix broken buckets.
// They are already sorted by le, so their values must be in ascending order,
// since the next bucket includes all the previous buckets.
vPrev := float64(0)
for _, xs := range xss {
v := xs.ts.Values[i]
if v < vPrev || math.IsNaN(v) {
xs.ts.Values[i] = vPrev
} else {
vPrev = v
}
}
}
func transformHour(t time.Time) int {
return t.Hour()
}

View file

@ -82,6 +82,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`.
For example, the following query calculates median temperature by country over the last 24 hours:
`histogram_quantile(0.5, sum(histogram_over_time(temperature[24h])) by (vmbucket, country))`.
- `histogram_share(le, buckets)` - returns share (in the range 0..1) for `buckets`. Useful for calculating SLI and SLO.
For instance, the following query returns the share of requests which are performed under 1.5 seconds: `histogram_share(1.5, sum(request_duration_seconds_bucket) by (le))`.
- `topk_*` and `bottomk_*` aggregate functions, which return up to K time series. Note that the standard `topk` function may return more than K time series -
see [this article](https://www.robustperception.io/graph-top-n-time-series-in-grafana) for details.
- `topk_min(k, q)` - returns top K time series with the max minimums on the given time range

View file

@ -72,6 +72,7 @@ var transformFuncs = map[string]bool{
"asin": true,
"acos": true,
"prometheus_buckets": true,
"histogram_share": true,
}
func isTransformFunc(s string) bool {