app/vmselect/promql: add hoeffding_bound_upper(phi, m[d]) and hoeffding_bound_lower(phi, m[d]) functions

These functions can be used for calculating Hoeffding bounds
for `m` over `d` time range and for the given `phi` in the range `[0..1]`.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/283
This commit is contained in:
Aliaksandr Valialkin 2020-01-11 14:40:32 +02:00
parent 8eaced8cae
commit 8b14572f70
6 changed files with 205 additions and 63 deletions

View file

@ -605,6 +605,14 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
rollupResultCacheMiss.Inc()
}
// Obtain rollup configs before fetching data from db,
// so type errors can be caught earlier.
sharedTimestamps := getTimestamps(start, ec.End, ec.Step)
preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {
return nil, err
}
// Fetch the remaining part of the result.
tfs := toTagFilters(me.LabelFilters)
sq := &storage.SearchQuery{
@ -629,12 +637,6 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
tss = mergeTimeseries(tssCached, tss, start, ec)
return tss, nil
}
sharedTimestamps := getTimestamps(start, ec.End, ec.Step)
preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {
rss.Cancel()
return nil, err
}
// Verify timeseries fit available memory after the rollup.
// Take into account points from tssCached.

View file

@ -4480,6 +4480,29 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`hoeffding_bound_lower()`, func(t *testing.T) {
t.Parallel()
q := `hoeffding_bound_lower(0.9, rand(0)[:10s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.2516770508510652, 0.2830570387745462, 0.27716232108436645, 0.3679356319931767, 0.3168460474120903, 0.23156726248243734},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`hoeffding_bound_upper()`, func(t *testing.T) {
t.Parallel()
q := `hoeffding_bound_upper(0.9, alias(rand(0), "foobar")[:10s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.6510581320042821, 0.7261021731890429, 0.7245290097397009, 0.8113950442584258, 0.7736122275568004, 0.6658564048254882},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foobar")
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`aggr_over_time(single-func)`, func(t *testing.T) {
t.Parallel()
q := `aggr_over_time("increase", rand(0)[:10s])`
@ -5267,6 +5290,12 @@ func TestExecError(t *testing.T) {
f(`aggr_over_time()`)
f(`aggr_over_time(foo)`)
f(`aggr_over_time("foo", bar, 1)`)
f(`hoeffding_bound_lower()`)
f(`hoeffding_bound_lower(1)`)
f(`hoeffding_bound_lower(0.99, foo, 1)`)
f(`hoeffding_bound_upper()`)
f(`hoeffding_bound_upper(1)`)
f(`hoeffding_bound_upper(0.99, foo, 1)`)
// Invalid argument type
f(`median_over_time({}, 2)`)

View file

@ -65,6 +65,8 @@ var rollupFuncs = map[string]newRollupFunc{
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
"hoeffding_bound_upper": newRollupHoeffdingBoundUpper,
"hoeffding_bound_lower": newRollupHoeffdingBoundLower,
}
// rollupAggrFuncs are functions that can be passed to `aggr_over_time()`
@ -143,6 +145,8 @@ var rollupFuncsKeepMetricGroup = map[string]bool{
"quantile_over_time": true,
"rollup": true,
"geomean_over_time": true,
"hoeffding_bound_lower": true,
"hoeffding_bound_upper": true,
}
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
@ -200,7 +204,8 @@ func getRollupArgIdx(funcName string) int {
logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName)
}
switch funcName {
case "quantile_over_time", "aggr_over_time":
case "quantile_over_time", "aggr_over_time",
"hoeffding_bound_lower", "hoeffding_bound_upper":
return 1
default:
return 0
@ -828,6 +833,66 @@ func newRollupShareFilter(args []interface{}, countFilter func(values []float64,
return rf, nil
}
func newRollupHoeffdingBoundLower(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
bound, avg := rollupHoeffdingBoundInternal(rfa, phis)
return avg - bound
}
return rf, nil
}
func newRollupHoeffdingBoundUpper(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
bound, avg := rollupHoeffdingBoundInternal(rfa, phis)
return avg + bound
}
return rf, nil
}
func rollupHoeffdingBoundInternal(rfa *rollupFuncArg, phis []float64) (float64, float64) {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan, nan
}
if len(values) == 1 {
return 0, values[0]
}
vMax := rollupMax(rfa)
vMin := rollupMin(rfa)
vAvg := rollupAvg(rfa)
vRange := vMax - vMin
if vRange <= 0 {
return 0, vAvg
}
phi := phis[rfa.idx]
if phi >= 1 {
return inf, vAvg
}
if phi <= 0 {
return 0, vAvg
}
// See https://en.wikipedia.org/wiki/Hoeffding%27s_inequality
// and https://www.youtube.com/watch?v=6UwcqiNsZ8U&feature=youtu.be&t=1237
bound := vRange * math.Sqrt(math.Log(1/(1-phi))/(2*float64(len(values))))
return bound, vAvg
}
func newRollupQuantile(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err

View file

@ -310,6 +310,48 @@ func TestRollupHoltWinters(t *testing.T) {
f(0.9, 0.9, 33.99637566941818)
}
func TestRollupHoeffdingBoundLower(t *testing.T) {
f := func(phi, vExpected float64) {
t.Helper()
phis := []*timeseries{{
Values: []float64{phi},
Timestamps: []int64{123},
}}
var me metricsql.MetricExpr
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
testRollupFunc(t, "hoeffding_bound_lower", args, &me, vExpected)
}
f(0.5, 28.21949401521037)
f(-1, 47.083333333333336)
f(0, 47.083333333333336)
f(1, -inf)
f(2, -inf)
f(0.1, 39.72878000047643)
f(0.9, 12.701803086472331)
}
func TestRollupHoeffdingBoundUpper(t *testing.T) {
f := func(phi, vExpected float64) {
t.Helper()
phis := []*timeseries{{
Values: []float64{phi},
Timestamps: []int64{123},
}}
var me metricsql.MetricExpr
args := []interface{}{phis, &metricsql.RollupExpr{Expr: &me}}
testRollupFunc(t, "hoeffding_bound_upper", args, &me, vExpected)
}
f(0.5, 65.9471726514563)
f(-1, 47.083333333333336)
f(0, 47.083333333333336)
f(1, inf)
f(2, inf)
f(0.1, 54.43788666619024)
f(0.9, 81.46486358019433)
}
func TestRollupNewRollupFuncSuccess(t *testing.T) {
f := func(funcName string, vExpected float64) {
t.Helper()

View file

@ -103,3 +103,5 @@ This functionality can be tried at [an editable Grafana dashboard](http://play-g
- `aggr_over_time(("aggr_func1", "aggr_func2", ...), m[d])` - simultaneously calculates all the listed `aggr_func*` for `m` over `d` time range.
`aggr_func*` can contain any functions that accept range vector. For instance, `aggr_over_time(("min_over_time", "max_over_time", "rate"), m[d])`
would calculate `min_over_time`, `max_over_time` and `rate` for `m[d]`.
- `hoeffding_bound_upper(phi, m[d])` and `hoeffding_bound_lower(phi, m[d])` - return upper and lower [Hoeffding bounds](https://en.wikipedia.org/wiki/Hoeffding%27s_inequality)
for the given `phi` in the range `[0..1]`.

View file

@ -54,6 +54,8 @@ var rollupFuncs = map[string]bool{
"rollup_increase": true,
"rollup_candlestick": true,
"aggr_over_time": true,
"hoeffding_bound_upper": true,
"hoeffding_bound_lower": true,
}
// IsRollupFunc returns whether funcName is known rollup function.