Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
hagen1778 2024-06-14 13:20:44 +02:00
parent 51d19485bb
commit 1181763641
No known key found for this signature in database
GPG key ID: 3BF75F3741CA9640
3 changed files with 363 additions and 1 deletions

View file

@ -52,6 +52,7 @@ var transformFuncs = map[string]transformFunc{
"floor": newTransformFuncOneArg(transformFloor),
"histogram_avg": transformHistogramAvg,
"histogram_quantile": transformHistogramQuantile,
"histogram_quantile2": transformHistogramQuantile2,
"histogram_quantiles": transformHistogramQuantiles,
"histogram_share": transformHistogramShare,
"histogram_stddev": transformHistogramStddev,
@ -890,7 +891,6 @@ func transformHistogramQuantiles(tfa *transformFuncArg) ([]*timeseries, error) {
}
return rvs, nil
}
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args
if len(args) < 2 || len(args) > 3 {
@ -980,6 +980,11 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
sort.Slice(xss, func(i, j int) bool {
return xss[i].le < xss[j].le
})
//fmt.Println(">>")
//for i := range xss {
// xs := xss[i]
// fmt.Println(xs.le, xs.ts)
//}
xss = mergeSameLE(xss)
dst := xss[0].ts
var tsLower, tsUpper *timeseries
@ -995,6 +1000,136 @@ func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
}
for i := range dst.Values {
v, lower, upper := quantile(i, phis, xss)
//if v > 8 {
// fmt.Println(">> HERE!")
//}
dst.Values[i] = v
if len(boundsLabel) > 0 {
tsLower.Values[i] = lower
tsUpper.Values[i] = upper
}
}
rvs = append(rvs, dst)
if len(boundsLabel) > 0 {
rvs = append(rvs, tsLower)
rvs = append(rvs, tsUpper)
}
}
return rvs, nil
}
func transformHistogramQuantile2(tfa *transformFuncArg) ([]*timeseries, error) {
args := tfa.args
if len(args) < 2 || len(args) > 3 {
return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args))
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, fmt.Errorf("cannot parse phi: %w", err)
}
// Convert buckets with `vmrange` labels to buckets with `le` labels.
tss := vmrangeBucketsToLE(args[1])
// Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details.
var boundsLabel string
if len(args) > 2 {
s, err := getString(args[2], 2)
if err != nil {
return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %w", err)
}
boundsLabel = s
}
// Group metrics by all tags excluding "le"
m := groupLeTimeseries(tss)
// Calculate quantile for each group in m
lastNonInf := func(_ int, xss []leTimeseries) float64 {
for len(xss) > 0 {
xsLast := xss[len(xss)-1]
if !math.IsInf(xsLast.le, 0) {
return xsLast.le
}
xss = xss[:len(xss)-1]
}
return nan
}
quantile := func(i int, phis []float64, xss []leTimeseries) (q, lower, upper float64) {
phi := phis[i]
if math.IsNaN(phi) {
return nan, nan, nan
}
fixBrokenBuckets2(i, xss)
vLast := float64(0)
if len(xss) > 0 {
vLast = xss[len(xss)-1].ts.Values[i]
}
if vLast == 0 {
return nan, nan, nan
}
if phi < 0 {
return -inf, -inf, xss[0].ts.Values[i]
}
if phi > 1 {
return inf, vLast, inf
}
vReq := vLast * phi
vPrev := float64(0)
lePrev := float64(0)
for _, xs := range xss {
v := xs.ts.Values[i]
le := xs.le
if v <= 0 {
// Skip zero buckets.
lePrev = le
continue
}
if v < vReq {
vPrev = v
lePrev = le
continue
}
if math.IsInf(le, 0) {
break
}
if v == vPrev {
return lePrev, lePrev, v
}
vv := lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev)
return vv, lePrev, le
}
vv := lastNonInf(i, xss)
return vv, vv, inf
}
rvs := make([]*timeseries, 0, len(m))
for _, xss := range m {
sort.Slice(xss, func(i, j int) bool {
return xss[i].le < xss[j].le
})
//fmt.Println(">>")
//for i := range xss {
// xs := xss[i]
// fmt.Println(xs.le, xs.ts)
//}
xss = mergeSameLE(xss)
dst := xss[0].ts
var tsLower, tsUpper *timeseries
if len(boundsLabel) > 0 {
tsLower = &timeseries{}
tsLower.CopyFromShallowTimestamps(dst)
tsLower.MetricName.RemoveTag(boundsLabel)
tsLower.MetricName.AddTag(boundsLabel, "lower")
tsUpper = &timeseries{}
tsUpper.CopyFromShallowTimestamps(dst)
tsUpper.MetricName.RemoveTag(boundsLabel)
tsUpper.MetricName.AddTag(boundsLabel, "upper")
}
for i := range dst.Values {
v, lower, upper := quantile(i, phis, xss)
//if v > 8 {
// fmt.Println(">> HERE!")
//}
dst.Values[i] = v
if len(boundsLabel) > 0 {
tsLower.Values[i] = lower
@ -1074,6 +1209,53 @@ func fixBrokenBuckets(i int, xss []leTimeseries) {
}
}
func fixBrokenBuckets2(i int, xss []leTimeseries) {
// Buckets are already sorted by le, so their values must be in ascending order,
// since the next bucket includes all the previous buckets.
// If the next bucket has lower value than the current bucket,
// then the current bucket must be substituted with the next bucket value.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819
if len(xss) < 2 {
return
}
// Fill NaN in upper buckets with the first non-NaN value found in lower buckets.
for j := len(xss) - 1; j >= 0; j-- {
v := xss[j].ts.Values[i]
if !math.IsNaN(v) {
j++
for j < len(xss) {
xss[j].ts.Values[i] = v
j++
}
break
}
}
v := xss[0].ts.Values[0]
for j := 1; j < len(xss); j++ {
vNext := xss[j].ts.Values[i]
if math.IsNaN(vNext) || v > vNext {
xss[j].ts.Values[i] = v
} else {
v = vNext
}
}
return
// Substitute lower bucket values with upper values if the lower values are NaN
// or are bigger than the upper bucket values.
vNext := xss[len(xss)-1].ts.Values[i]
for j := len(xss) - 2; j >= 0; j-- {
v := xss[j].ts.Values[i]
if math.IsNaN(v) || v > vNext {
xss[j].ts.Values[i] = vNext
} else {
vNext = v
}
}
}
func mergeSameLE(xss []leTimeseries) []leTimeseries {
// Merge buckets with identical le values.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225

View file

@ -2,6 +2,7 @@ package promql
import (
"fmt"
"math"
"reflect"
"strconv"
"strings"
@ -11,6 +12,169 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestQunatile(t *testing.T) {
// Calculate quantile for each group in m
lastNonInf := func(_ int, xss []leTimeseries) float64 {
for len(xss) > 0 {
xsLast := xss[len(xss)-1]
if !math.IsInf(xsLast.le, 0) {
return xsLast.le
}
xss = xss[:len(xss)-1]
}
return nan
}
quantile := func(i int, phis []float64, xss []leTimeseries) (q, lower, upper float64) {
phi := phis[i]
if math.IsNaN(phi) {
return nan, nan, nan
}
fixBrokenBuckets(i, xss)
vLast := float64(0)
if len(xss) > 0 {
vLast = xss[len(xss)-1].ts.Values[i]
}
if vLast == 0 {
return nan, nan, nan
}
if phi < 0 {
return -inf, -inf, xss[0].ts.Values[i]
}
if phi > 1 {
return inf, vLast, inf
}
vReq := vLast * phi
vPrev := float64(0)
lePrev := float64(0)
for _, xs := range xss {
v := xs.ts.Values[i]
le := xs.le
if v <= 0 {
// Skip zero buckets.
lePrev = le
continue
}
if v < vReq {
vPrev = v
lePrev = le
continue
}
if math.IsInf(le, 0) {
break
}
if v == vPrev {
return lePrev, lePrev, v
}
vv := lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev)
return vv, lePrev, le
}
vv := lastNonInf(i, xss)
return vv, vv, inf
}
f := func(les, values []float64) {
xss := make([]leTimeseries, len(values))
for i, v := range values {
xss[i].ts = &timeseries{
Values: []float64{v},
}
xss[i].le = les[i]
}
fixBrokenBuckets(0, xss)
res, _, _ := quantile(0, []float64{0.99}, xss)
fmt.Println("<<", res)
}
f([]float64{
0.005,
0.01,
0.025,
0.05,
0.075,
0.1,
0.25,
0.5,
0.75,
1,
2.5,
5,
7.5,
10,
math.Inf(0),
},
[]float64{2,
2,
3.45484949832,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.41806020066,
3.47157190635,
3.47157190635,
3.41806020066,
3.41806020066,
3.41806020066,
3.47157190635})
}
func TestFixBrokenBuckets2(t *testing.T) {
f := func(values, expectedResult []float64) {
t.Helper()
xss := make([]leTimeseries, len(values))
for i, v := range values {
xss[i].ts = &timeseries{
Values: []float64{v},
}
}
fixBrokenBuckets2(0, xss)
result := make([]float64, len(values))
for i, xs := range xss {
result[i] = xs.ts.Values[0]
}
if !reflect.DeepEqual(result, expectedResult) {
t.Fatalf("unexpected result for values=%v\ngot\n%v\nwant\n%v", values, result, expectedResult)
}
}
f(nil, []float64{})
f([]float64{1}, []float64{1})
f([]float64{1, 2}, []float64{1, 2})
f([]float64{2, 1}, []float64{2, 2})
f([]float64{1, 2, 3, nan, nan}, []float64{1, 2, 3, 3, 3})
f([]float64{5, 1, 2, 3, nan}, []float64{5, 5, 5, 5, 5})
f([]float64{1, 5, 2, nan, 6, 3}, []float64{1, 5, 5, 5, 6, 6})
f([]float64{5, 10, 4, 3}, []float64{5, 10, 10, 10})
f([]float64{2,
2,
3.45484949832,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.41806020066,
3.47157190635,
3.47157190635,
3.41806020066,
3.41806020066,
3.41806020066,
3.47157190635}, []float64{2,
2,
3.45484949832,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635})
}
func TestFixBrokenBuckets(t *testing.T) {
f := func(values, expectedResult []float64) {
t.Helper()
@ -37,6 +201,21 @@ func TestFixBrokenBuckets(t *testing.T) {
f([]float64{5, 1, 2, 3, nan}, []float64{1, 1, 2, 3, 3})
f([]float64{1, 5, 2, nan, 6, 3}, []float64{1, 2, 2, 3, 3, 3})
f([]float64{5, 10, 4, 3}, []float64{3, 3, 3, 3})
f([]float64{2,
2,
3.45484949832,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.47157190635,
3.41806020066,
3.47157190635,
3.47157190635,
3.41806020066,
3.41806020066,
3.41806020066,
3.47157190635}, []float64{1, 2, 3, 3, 3})
}
func TestVmrangeBucketsToLE(t *testing.T) {

View file

@ -36,6 +36,7 @@ var transformFuncs = map[string]bool{
"floor": true,
"histogram_avg": true,
"histogram_quantile": true,
"histogram_quantile2": true,
"histogram_quantiles": true,
"histogram_share": true,
"histogram_stddev": true,