mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: add quantile("phiLabel", phi1, ..., phiN, q)
aggregate function to MetricsQL
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1573
This commit is contained in:
parent
bec18e4fe9
commit
5ea689d61b
9 changed files with 85 additions and 6 deletions
|
@ -48,6 +48,7 @@ var aggrFuncs = map[string]aggrFunc{
|
||||||
"outliersk": aggrFuncOutliersK,
|
"outliersk": aggrFuncOutliersK,
|
||||||
"mode": newAggrFunc(aggrFuncMode),
|
"mode": newAggrFunc(aggrFuncMode),
|
||||||
"zscore": aggrFuncZScore,
|
"zscore": aggrFuncZScore,
|
||||||
|
"quantiles": aggrFuncQuantiles,
|
||||||
}
|
}
|
||||||
|
|
||||||
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
|
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
|
||||||
|
@ -883,6 +884,42 @@ func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||||
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
|
return aggrFuncExt(afe, args[1], &afa.ae.Modifier, afa.ae.Limit, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func aggrFuncQuantiles(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||||
|
args := afa.args
|
||||||
|
if len(args) < 3 {
|
||||||
|
return nil, fmt.Errorf("unexpected number of args: %d; expecting at least 3 args", len(args))
|
||||||
|
}
|
||||||
|
dstLabel, err := getString(args[0], 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot obtain dstLabel: %w", err)
|
||||||
|
}
|
||||||
|
phiArgs := args[1 : len(args)-1]
|
||||||
|
argOrig := args[len(args)-1]
|
||||||
|
var rvs []*timeseries
|
||||||
|
for i, phiArg := range phiArgs {
|
||||||
|
phis, err := getScalar(phiArg, i+1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(phis) == 0 {
|
||||||
|
logger.Panicf("BUG: expecting at least a single sample")
|
||||||
|
}
|
||||||
|
phi := phis[0]
|
||||||
|
afe := newAggrQuantileFunc(phis)
|
||||||
|
arg := copyTimeseries(argOrig)
|
||||||
|
tss, err := aggrFuncExt(afe, arg, &afa.ae.Modifier, afa.ae.Limit, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot calculate quantile %g: %w", phi, err)
|
||||||
|
}
|
||||||
|
for _, ts := range tss {
|
||||||
|
ts.MetricName.RemoveTag(dstLabel)
|
||||||
|
ts.MetricName.AddTag(dstLabel, fmt.Sprintf("%g", phi))
|
||||||
|
}
|
||||||
|
rvs = append(rvs, tss...)
|
||||||
|
}
|
||||||
|
return rvs, nil
|
||||||
|
}
|
||||||
|
|
||||||
func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) {
|
func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) {
|
||||||
args := afa.args
|
args := afa.args
|
||||||
if err := expectTransformArgsNum(args, 2); err != nil {
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
||||||
|
|
|
@ -5461,6 +5461,30 @@ func TestExecSuccess(t *testing.T) {
|
||||||
resultExpected := []netstorage.Result{r}
|
resultExpected := []netstorage.Result{r}
|
||||||
f(q, resultExpected)
|
f(q, resultExpected)
|
||||||
})
|
})
|
||||||
|
t.Run(`quantiles("phi", 0.2, 0.5)`, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
q := `sort(quantiles("phi", 0.2, 0.5, label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss")))`
|
||||||
|
r1 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{6.666666666666667, 8, 9.333333333333334, 10, 10, 10},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r1.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("phi"),
|
||||||
|
Value: []byte("0.2"),
|
||||||
|
}}
|
||||||
|
r2 := netstorage.Result{
|
||||||
|
MetricName: metricNameExpected,
|
||||||
|
Values: []float64{10, 10, 10, 10.666666666666666, 12, 13.333333333333334},
|
||||||
|
Timestamps: timestampsExpected,
|
||||||
|
}
|
||||||
|
r2.MetricName.Tags = []storage.Tag{{
|
||||||
|
Key: []byte("phi"),
|
||||||
|
Value: []byte("0.5"),
|
||||||
|
}}
|
||||||
|
resultExpected := []netstorage.Result{r1, r2}
|
||||||
|
f(q, resultExpected)
|
||||||
|
})
|
||||||
t.Run(`median()`, func(t *testing.T) {
|
t.Run(`median()`, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
q := `median(label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))`
|
q := `median(label_set(10, "foo", "bar") or label_set(time()/150, "baz", "sss"))`
|
||||||
|
@ -6961,6 +6985,7 @@ func TestExecError(t *testing.T) {
|
||||||
f(`bitmap_and()`)
|
f(`bitmap_and()`)
|
||||||
f(`bitmap_or()`)
|
f(`bitmap_or()`)
|
||||||
f(`bitmap_xor()`)
|
f(`bitmap_xor()`)
|
||||||
|
f(`quantiles()`)
|
||||||
|
|
||||||
// Invalid argument type
|
// Invalid argument type
|
||||||
f(`median_over_time({}, 2)`)
|
f(`median_over_time({}, 2)`)
|
||||||
|
|
|
@ -2016,6 +2016,17 @@ func transformEnd(tfa *transformFuncArg) float64 {
|
||||||
return float64(tfa.ec.End) / 1e3
|
return float64(tfa.ec.End) / 1e3
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copyTimeseries returns a copy of tss.
|
||||||
|
func copyTimeseries(tss []*timeseries) []*timeseries {
|
||||||
|
rvs := make([]*timeseries, len(tss))
|
||||||
|
for i, src := range tss {
|
||||||
|
var dst timeseries
|
||||||
|
dst.CopyFromShallowTimestamps(src)
|
||||||
|
rvs[i] = &dst
|
||||||
|
}
|
||||||
|
return rvs
|
||||||
|
}
|
||||||
|
|
||||||
// copyTimeseriesMetricNames returns a copy of tss with real copy of MetricNames,
|
// copyTimeseriesMetricNames returns a copy of tss with real copy of MetricNames,
|
||||||
// but with shallow copy of Timestamps and Values if makeCopy is set.
|
// but with shallow copy of Timestamps and Values if makeCopy is set.
|
||||||
//
|
//
|
||||||
|
@ -2033,7 +2044,7 @@ func copyTimeseriesMetricNames(tss []*timeseries, makeCopy bool) []*timeseries {
|
||||||
return rvs
|
return rvs
|
||||||
}
|
}
|
||||||
|
|
||||||
// copyShallow returns a copy of arg with shallow copies of MetricNames,
|
// copyTimeseriesShallow returns a copy of arg with shallow copies of MetricNames,
|
||||||
// Timestamps and Values.
|
// Timestamps and Values.
|
||||||
func copyTimeseriesShallow(arg []*timeseries) []*timeseries {
|
func copyTimeseriesShallow(arg []*timeseries) []*timeseries {
|
||||||
rvs := make([]*timeseries, len(arg))
|
rvs := make([]*timeseries, len(arg))
|
||||||
|
|
|
@ -13,6 +13,7 @@ sort: 15
|
||||||
* FEATURE: update Go builder from v1.16.7 to v1.17.0. This improves data ingestion and query performance by up to 5% according to benchmarks. See [the release post for Go1.17](https://go.dev/blog/go1.17).
|
* FEATURE: update Go builder from v1.16.7 to v1.17.0. This improves data ingestion and query performance by up to 5% according to benchmarks. See [the release post for Go1.17](https://go.dev/blog/go1.17).
|
||||||
* FEATURE: vmagent: expose `promscrape_discovery_http_errors_total` metric, which can be used for monitoring the number of failed discovery attempts per each `http_sd` config.
|
* FEATURE: vmagent: expose `promscrape_discovery_http_errors_total` metric, which can be used for monitoring the number of failed discovery attempts per each `http_sd` config.
|
||||||
* FEATURE: do not reset response cache when a sample with old timestamp is ingested into VictoriaMetrics if `-search.disableAutoCacheReset` command-line option is set. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1570).
|
* FEATURE: do not reset response cache when a sample with old timestamp is ingested into VictoriaMetrics if `-search.disableAutoCacheReset` command-line option is set. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1570).
|
||||||
|
* FEATURE: add `quantiles("quantileLabel", phi1, ..., phiN, q)` aggregate function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html), which calculates the given `phi*` quantiles over time series returned by `q`. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1573).
|
||||||
|
|
||||||
* BUGFIX: rename `sign` function to `sgn` in order to be consistent with PromQL. See [this pull request from Prometheus](https://github.com/prometheus/prometheus/pull/8457).
|
* BUGFIX: rename `sign` function to `sgn` in order to be consistent with PromQL. See [this pull request from Prometheus](https://github.com/prometheus/prometheus/pull/8457).
|
||||||
* BUGFIX: improve the detection of the needed free space for background merge operation. This should prevent from possible out of disk space crashes during big merges. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1560).
|
* BUGFIX: improve the detection of the needed free space for background merge operation. This should prevent from possible out of disk space crashes during big merges. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1560).
|
||||||
|
|
|
@ -770,7 +770,11 @@ See also [implicit query conversions](#implicit-query-conversions).
|
||||||
|
|
||||||
#### quantile
|
#### quantile
|
||||||
|
|
||||||
`quantile(phi, q) by (group_labels)` calculates `phi`-quantile per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL.
|
`quantile(phi, q) by (group_labels)` calculates `phi`-quantile per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL. See also [quantiles](#quantiles).
|
||||||
|
|
||||||
|
#### quantiles
|
||||||
|
|
||||||
|
`quantiles("quantileLabel", phi1, ..., phiN, q)` calculates `phi*`-quantiles for all the time series returned by `q` and return them in time series with `{quantileLabel="phi*"}` label. The aggregate is calculated individually per each group of points with the same timestamp. See also [quantile](#quantile).
|
||||||
|
|
||||||
#### stddev
|
#### stddev
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -9,7 +9,7 @@ require (
|
||||||
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
|
||||||
github.com/VictoriaMetrics/fasthttp v1.0.16
|
github.com/VictoriaMetrics/fasthttp v1.0.16
|
||||||
github.com/VictoriaMetrics/metrics v1.17.3
|
github.com/VictoriaMetrics/metrics v1.17.3
|
||||||
github.com/VictoriaMetrics/metricsql v0.20.0
|
github.com/VictoriaMetrics/metricsql v0.21.0
|
||||||
github.com/VividCortex/ewma v1.2.0 // indirect
|
github.com/VividCortex/ewma v1.2.0 // indirect
|
||||||
github.com/aws/aws-sdk-go v1.40.30
|
github.com/aws/aws-sdk-go v1.40.30
|
||||||
github.com/cespare/xxhash/v2 v2.1.2
|
github.com/cespare/xxhash/v2 v2.1.2
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -108,8 +108,8 @@ github.com/VictoriaMetrics/fasthttp v1.0.16/go.mod h1:s9o5H4T58Kt4CTrdyJp4RorBKC
|
||||||
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
||||||
github.com/VictoriaMetrics/metrics v1.17.3 h1:QPUakR6JRy8BhL2C2kOgYKLuoPDwtJQ+7iKIZSjt1A4=
|
github.com/VictoriaMetrics/metrics v1.17.3 h1:QPUakR6JRy8BhL2C2kOgYKLuoPDwtJQ+7iKIZSjt1A4=
|
||||||
github.com/VictoriaMetrics/metrics v1.17.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
github.com/VictoriaMetrics/metrics v1.17.3/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
|
||||||
github.com/VictoriaMetrics/metricsql v0.20.0 h1:1rL/naP4+PXdY3Hg5Oj8+Ql+NZCIHzl/OjTmOdg3+mM=
|
github.com/VictoriaMetrics/metricsql v0.21.0 h1:wA/IVfRFQaThy4bM1kAmPiCR0BkWv4tEXD9lBF+GPdU=
|
||||||
github.com/VictoriaMetrics/metricsql v0.20.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
|
github.com/VictoriaMetrics/metricsql v0.21.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
|
||||||
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
|
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
|
||||||
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
||||||
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
|
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
|
||||||
|
|
1
vendor/github.com/VictoriaMetrics/metricsql/aggr.go
generated
vendored
1
vendor/github.com/VictoriaMetrics/metricsql/aggr.go
generated
vendored
|
@ -38,6 +38,7 @@ var aggrFuncs = map[string]bool{
|
||||||
"outliersk": true,
|
"outliersk": true,
|
||||||
"mode": true,
|
"mode": true,
|
||||||
"zscore": true,
|
"zscore": true,
|
||||||
|
"quantiles": true,
|
||||||
}
|
}
|
||||||
|
|
||||||
func isAggrFunc(s string) bool {
|
func isAggrFunc(s string) bool {
|
||||||
|
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -21,7 +21,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
|
||||||
# github.com/VictoriaMetrics/metrics v1.17.3
|
# github.com/VictoriaMetrics/metrics v1.17.3
|
||||||
## explicit
|
## explicit
|
||||||
github.com/VictoriaMetrics/metrics
|
github.com/VictoriaMetrics/metrics
|
||||||
# github.com/VictoriaMetrics/metricsql v0.20.0
|
# github.com/VictoriaMetrics/metricsql v0.21.0
|
||||||
## explicit
|
## explicit
|
||||||
github.com/VictoriaMetrics/metricsql
|
github.com/VictoriaMetrics/metricsql
|
||||||
github.com/VictoriaMetrics/metricsql/binaryop
|
github.com/VictoriaMetrics/metricsql/binaryop
|
||||||
|
|
Loading…
Reference in a new issue