mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: add buckets_limit(k, buckets)
function, which limits the number of buckets per time series to k
This function works with both Prometheus-style and VictoriaMetrics-style buckets. The function removes buckets with the lowest values in order to reserve the highest precision. The function is useful for building heatmaps in Grafana from too big number of buckets.
This commit is contained in:
parent
45334f61de
commit
cf69b1ea6f
7 changed files with 186 additions and 4 deletions
|
@ -3012,6 +3012,101 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`buckets_limit(zero)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `buckets_limit(0, (
|
||||
alias(label_set(100, "le", "inf", "x", "y"), "metric"),
|
||||
alias(label_set(50, "le", "120", "x", "y"), "metric"),
|
||||
))`
|
||||
resultExpected := []netstorage.Result{}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`buckets_limit(unused)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(buckets_limit(5, (
|
||||
alias(label_set(100, "le", "inf", "x", "y"), "metric"),
|
||||
alias(label_set(50, "le", "120", "x", "y"), "metric"),
|
||||
)))`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{50, 50, 50, 50, 50, 50},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.MetricGroup = []byte("metric")
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("le"),
|
||||
Value: []byte("120"),
|
||||
},
|
||||
{
|
||||
Key: []byte("x"),
|
||||
Value: []byte("y"),
|
||||
},
|
||||
}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{100, 100, 100, 100, 100, 100},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.MetricGroup = []byte("metric")
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("le"),
|
||||
Value: []byte("inf"),
|
||||
},
|
||||
{
|
||||
Key: []byte("x"),
|
||||
Value: []byte("y"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`buckets_limit(used)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(buckets_limit(2, (
|
||||
alias(label_set(100, "le", "inf", "x", "y"), "metric"),
|
||||
alias(label_set(52, "le", "200", "x", "y"), "metric"),
|
||||
alias(label_set(50, "le", "120", "x", "y"), "metric"),
|
||||
alias(label_set(20, "le", "70", "x", "y"), "metric"),
|
||||
alias(label_set(10, "le", "30", "x", "y"), "metric"),
|
||||
alias(label_set(9, "le", "10", "x", "y"), "metric"),
|
||||
)))`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{50, 50, 50, 50, 50, 50},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.MetricGroup = []byte("metric")
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("le"),
|
||||
Value: []byte("120"),
|
||||
},
|
||||
{
|
||||
Key: []byte("x"),
|
||||
Value: []byte("y"),
|
||||
},
|
||||
}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{100, 100, 100, 100, 100, 100},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.MetricGroup = []byte("metric")
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("le"),
|
||||
Value: []byte("inf"),
|
||||
},
|
||||
{
|
||||
Key: []byte("x"),
|
||||
Value: []byte("y"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`prometheus_buckets(missing-vmrange)`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(prometheus_buckets((
|
||||
|
@ -5776,6 +5871,9 @@ func TestExecError(t *testing.T) {
|
|||
f(`mode_over_time()`)
|
||||
f(`rate_over_sum()`)
|
||||
f(`mode()`)
|
||||
f(`prometheus_buckets()`)
|
||||
f(`buckets_limit()`)
|
||||
f(`buckets_limit(1)`)
|
||||
|
||||
// Invalid argument type
|
||||
f(`median_over_time({}, 2)`)
|
||||
|
|
|
@ -98,6 +98,7 @@ var transformFuncs = map[string]transformFunc{
|
|||
"asin": newTransformFuncOneArg(transformAsin),
|
||||
"acos": newTransformFuncOneArg(transformAcos),
|
||||
"prometheus_buckets": transformPrometheusBuckets,
|
||||
"buckets_limit": transformBucketsLimit,
|
||||
"histogram_share": transformHistogramShare,
|
||||
"sort_by_label": newTransformFuncSortByLabel(false),
|
||||
"sort_by_label_desc": newTransformFuncSortByLabel(true),
|
||||
|
@ -282,6 +283,86 @@ func transformFloor(v float64) float64 {
|
|||
return math.Floor(v)
|
||||
}
|
||||
|
||||
func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||
args := tfa.args
|
||||
if err := expectTransformArgsNum(args, 2); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
limits, err := getScalar(args[0], 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
limit := int(limits[0])
|
||||
if limit <= 0 {
|
||||
return nil, nil
|
||||
}
|
||||
tss := vmrangeBucketsToLE(args[1])
|
||||
if len(tss) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Group timeseries by all MetricGroup+tags excluding `le` tag.
|
||||
type x struct {
|
||||
le float64
|
||||
delta float64
|
||||
ts *timeseries
|
||||
}
|
||||
m := make(map[string][]x)
|
||||
var b []byte
|
||||
var mn storage.MetricName
|
||||
for _, ts := range tss {
|
||||
leStr := ts.MetricName.GetTagValue("le")
|
||||
if len(leStr) == 0 {
|
||||
// Skip time series without `le` tag.
|
||||
continue
|
||||
}
|
||||
le, err := strconv.ParseFloat(string(leStr), 64)
|
||||
if err != nil {
|
||||
// Skip time series with invalid `le` tag.
|
||||
continue
|
||||
}
|
||||
mn.CopyFrom(&ts.MetricName)
|
||||
mn.RemoveTag("le")
|
||||
b = marshalMetricNameSorted(b[:0], &mn)
|
||||
m[string(b)] = append(m[string(b)], x{
|
||||
le: le,
|
||||
ts: ts,
|
||||
})
|
||||
}
|
||||
|
||||
// Remove buckets with the smallest counters.
|
||||
rvs := make([]*timeseries, 0, len(tss))
|
||||
for _, leGroup := range m {
|
||||
if len(leGroup) <= limit {
|
||||
// The number of buckets in leGroup doesn't exceed the limit.
|
||||
for _, xx := range leGroup {
|
||||
rvs = append(rvs, xx.ts)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// The number of buckets in leGroup exceeds the limit. Remove buckets with the smallest sums.
|
||||
sort.Slice(leGroup, func(i, j int) bool {
|
||||
return leGroup[i].le < leGroup[j].le
|
||||
})
|
||||
for n := range tss[0].Values {
|
||||
prevValue := float64(0)
|
||||
for i := range leGroup {
|
||||
xx := &leGroup[i]
|
||||
value := xx.ts.Values[n]
|
||||
xx.delta += value - prevValue
|
||||
prevValue = value
|
||||
}
|
||||
}
|
||||
sort.Slice(leGroup, func(i, j int) bool {
|
||||
return leGroup[i].delta < leGroup[j].delta
|
||||
})
|
||||
for _, xx := range leGroup[len(leGroup)-limit:] {
|
||||
rvs = append(rvs, xx.ts)
|
||||
}
|
||||
}
|
||||
return rvs, nil
|
||||
}
|
||||
|
||||
func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||
args := tfa.args
|
||||
if err := expectTransformArgsNum(args, 1); err != nil {
|
||||
|
|
|
@ -90,6 +90,8 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
|
|||
- `rand()`, `rand_normal()` and `rand_exponential()` functions - for generating pseudo-random series with even, normal and exponential distribution.
|
||||
- `increases_over_time(m[d])` and `decreases_over_time(m[d])` - returns the number of `m` increases or decreases over the given duration `d`.
|
||||
- `prometheus_buckets(q)` - converts [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) buckets to Prometheus buckets with `le` labels.
|
||||
- `buckets_limit(k, q)` - limits the number of buckets (Prometheus-style or [VictoriaMetrics-style](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram))
|
||||
per each metric returned by by `q` to `k`. It also converts VictoriaMetrics-style buckets to Prometheus-style buckets, i.e. the end result are buckets with with `le` labels.
|
||||
- `histogram(q)` - calculates aggregate histogram over `q` time series for each point on the graph. See [this article](https://medium.com/@valyala/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) for more details.
|
||||
- `histogram_over_time(m[d])` - calculates [VictoriaMetrics histogram](https://godoc.org/github.com/VictoriaMetrics/metrics#Histogram) for `m` over `d`.
|
||||
For example, the following query calculates median temperature by country over the last 24 hours:
|
||||
|
|
2
go.mod
2
go.mod
|
@ -9,7 +9,7 @@ require (
|
|||
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
||||
github.com/VictoriaMetrics/fasthttp v1.0.1
|
||||
github.com/VictoriaMetrics/metrics v1.12.0
|
||||
github.com/VictoriaMetrics/metricsql v0.2.8
|
||||
github.com/VictoriaMetrics/metricsql v0.2.9
|
||||
github.com/aws/aws-sdk-go v1.33.9
|
||||
github.com/cespare/xxhash/v2 v2.1.1
|
||||
github.com/golang/snappy v0.0.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -53,8 +53,8 @@ github.com/VictoriaMetrics/metrics v1.11.2 h1:t/ceLP6SvagUqypCKU7cI7+tQn54+TIV/t
|
|||
github.com/VictoriaMetrics/metrics v1.11.2/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
|
||||
github.com/VictoriaMetrics/metrics v1.12.0 h1:BudxtRYSA6j8H9mzjhXNEIsCPIEUPCb76QwFEptQzvQ=
|
||||
github.com/VictoriaMetrics/metrics v1.12.0/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.8 h1:RET+5ZKSHFpcm7RNEEHFMiSNYtd6GlGKyNn/ZO53zhA=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.8/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.9 h1:RHLEmt4VNZ2RAqZjmXyRtKpCrtSuYUS1+TyOqfXbHWs=
|
||||
github.com/VictoriaMetrics/metricsql v0.2.9/go.mod h1:UIjd9S0W1UnTWlJdM0wLS+2pfuPqjwqKoK8yTos+WyE=
|
||||
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
|
||||
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
|
||||
github.com/aws/aws-sdk-go v1.33.9 h1:nkC8YxL1nxwshIoO3UM2486Ph+zs7IZWjhRHjmXeCPw=
|
||||
|
|
1
vendor/github.com/VictoriaMetrics/metricsql/transform.go
generated
vendored
1
vendor/github.com/VictoriaMetrics/metricsql/transform.go
generated
vendored
|
@ -77,6 +77,7 @@ var transformFuncs = map[string]bool{
|
|||
"asin": true,
|
||||
"acos": true,
|
||||
"prometheus_buckets": true,
|
||||
"buckets_limit": true,
|
||||
"histogram_share": true,
|
||||
"sort_by_label": true,
|
||||
"sort_by_label_desc": true,
|
||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -16,7 +16,7 @@ github.com/VictoriaMetrics/fasthttp/fasthttputil
|
|||
github.com/VictoriaMetrics/fasthttp/stackless
|
||||
# github.com/VictoriaMetrics/metrics v1.12.0
|
||||
github.com/VictoriaMetrics/metrics
|
||||
# github.com/VictoriaMetrics/metricsql v0.2.8
|
||||
# github.com/VictoriaMetrics/metricsql v0.2.9
|
||||
github.com/VictoriaMetrics/metricsql
|
||||
github.com/VictoriaMetrics/metricsql/binaryop
|
||||
# github.com/aws/aws-sdk-go v1.33.9
|
||||
|
|
Loading…
Reference in a new issue