app/vmselect/promql: add quantiles_over_time("phiLabel", phi1, ..., phiN, m[d]) function for calculating multiple quantiles at once

This commit is contained in:
Aliaksandr Valialkin 2021-09-17 23:33:15 +03:00
parent e1e5a20b36
commit 9a3d0c43b5
10 changed files with 189 additions and 61 deletions

View file

@ -4,6 +4,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"math" "math"
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
@ -390,7 +391,7 @@ func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.F
if nrf == nil { if nrf == nil {
return nil, nil return nil, nil
} }
rollupArgIdx := getRollupArgIdx(fe.Name) rollupArgIdx := getRollupArgIdx(fe)
if rollupArgIdx >= len(fe.Args) { if rollupArgIdx >= len(fe.Args) {
// Incorrect number of args for rollup func. // Incorrect number of args for rollup func.
return nil, nil return nil, nil
@ -430,7 +431,7 @@ func evalExprs(ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
func evalRollupFuncArgs(ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) { func evalRollupFuncArgs(ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
var re *metricsql.RollupExpr var re *metricsql.RollupExpr
rollupArgIdx := getRollupArgIdx(fe.Name) rollupArgIdx := getRollupArgIdx(fe)
if len(fe.Args) <= rollupArgIdx { if len(fe.Args) <= rollupArgIdx {
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil)) return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
} }
@ -478,7 +479,8 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
return &reNew return &reNew
} }
func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { func evalRollupFunc(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
funcName = strings.ToLower(funcName)
ecNew := ec ecNew := ec
var offset int64 var offset int64
if re.Offset != nil { if re.Offset != nil {
@ -491,7 +493,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E
// so cache hit rate should be quite good. // so cache hit rate should be quite good.
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976 // See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
} }
if name == "rollup_candlestick" { if funcName == "rollup_candlestick" {
// Automatically apply `offset -step` to `rollup_candlestick` function // Automatically apply `offset -step` to `rollup_candlestick` function
// in order to obtain expected OHLC results. // in order to obtain expected OHLC results.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
@ -504,12 +506,12 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E
var rvs []*timeseries var rvs []*timeseries
var err error var err error
if me, ok := re.Expr.(*metricsql.MetricExpr); ok { if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, expr, me, iafc, re.Window) rvs, err = evalRollupFuncWithMetricExpr(ecNew, funcName, rf, expr, me, iafc, re.Window)
} else { } else {
if iafc != nil { if iafc != nil {
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil)) logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
} }
rvs, err = evalRollupFuncWithSubquery(ecNew, name, rf, expr, re) rvs, err = evalRollupFuncWithSubquery(ecNew, funcName, rf, expr, re)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -528,7 +530,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E
return rvs, nil return rvs, nil
} }
func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) { func evalRollupFuncWithSubquery(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
// TODO: determine whether to use rollupResultCacheV here. // TODO: determine whether to use rollupResultCacheV here.
step := re.Step.Duration(ec.Step) step := re.Step.Duration(ec.Step)
if step == 0 { if step == 0 {
@ -550,25 +552,24 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr
return nil, err return nil, err
} }
if len(tssSQ) == 0 { if len(tssSQ) == 0 {
if name == "absent_over_time" { if funcName == "absent_over_time" {
tss := evalNumber(ec, 1) tss := evalNumber(ec, 1)
return tss, nil return tss, nil
} }
return nil, nil return nil, nil
} }
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step) sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step)
preFunc, rcs, err := getRollupConfigs(name, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
var tssLock sync.Mutex var tssLock sync.Mutex
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) { doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps) preFunc(values, timestamps)
for _, rc := range rcs { for _, rc := range rcs {
if tsm := newTimeseriesMap(name, sharedTimestamps, &tsSQ.MetricName); tsm != nil { if tsm := newTimeseriesMap(funcName, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
rc.DoTimeseriesMap(tsm, values, timestamps) rc.DoTimeseriesMap(tsm, values, timestamps)
tssLock.Lock() tssLock.Lock()
tss = tsm.AppendTimeseriesTo(tss) tss = tsm.AppendTimeseriesTo(tss)
@ -576,7 +577,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr
continue continue
} }
var ts timeseries var ts timeseries
doRollupForTimeseries(rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps, removeMetricGroup) doRollupForTimeseries(funcName, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
tssLock.Lock() tssLock.Lock()
tss = append(tss, &ts) tss = append(tss, &ts)
tssLock.Unlock() tssLock.Unlock()
@ -642,7 +643,7 @@ var (
rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`) rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`)
) )
func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, func evalRollupFuncWithMetricExpr(ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) { expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) {
if me.IsEmpty() { if me.IsEmpty() {
return evalNumber(ec, nan), nil return evalNumber(ec, nan), nil
@ -665,7 +666,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
// Obtain rollup configs before fetching data from db, // Obtain rollup configs before fetching data from db,
// so type errors can be caught earlier. // so type errors can be caught earlier.
sharedTimestamps := getTimestamps(start, ec.End, ec.Step) sharedTimestamps := getTimestamps(start, ec.End, ec.Step)
preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -689,7 +690,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
if rssLen == 0 { if rssLen == 0 {
rss.Cancel() rss.Cancel()
var tss []*timeseries var tss []*timeseries
if name == "absent_over_time" { if funcName == "absent_over_time" {
tss = getAbsentTimeseries(ec, me) tss = getAbsentTimeseries(ec, me)
} }
// Add missing points until ec.End. // Add missing points until ec.End.
@ -735,12 +736,11 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
defer rml.Put(uint64(rollupMemorySize)) defer rml.Put(uint64(rollupMemorySize))
// Evaluate rollup // Evaluate rollup
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
var tss []*timeseries var tss []*timeseries
if iafc != nil { if iafc != nil {
tss, err = evalRollupWithIncrementalAggregate(name, iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) tss, err = evalRollupWithIncrementalAggregate(funcName, iafc, rss, rcs, preFunc, sharedTimestamps)
} else { } else {
tss, err = evalRollupNoIncrementalAggregate(name, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) tss, err = evalRollupNoIncrementalAggregate(funcName, rss, rcs, preFunc, sharedTimestamps)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -762,15 +762,15 @@ func getRollupMemoryLimiter() *memoryLimiter {
return &rollupMemoryLimiter return &rollupMemoryLimiter
} }
func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, func evalRollupWithIncrementalAggregate(funcName string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(name, rs.Values, rs.Timestamps) rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps)
ts := getTimeseries() ts := getTimeseries()
defer putTimeseries(ts) defer putTimeseries(ts)
for _, rc := range rcs { for _, rc := range rcs {
if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { if tsm := newTimeseriesMap(funcName, sharedTimestamps, &rs.MetricName); tsm != nil {
rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
for _, ts := range tsm.m { for _, ts := range tsm.m {
iafc.updateTimeseries(ts, workerID) iafc.updateTimeseries(ts, workerID)
@ -778,7 +778,7 @@ func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncCo
continue continue
} }
ts.Reset() ts.Reset()
doRollupForTimeseries(rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) doRollupForTimeseries(funcName, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
iafc.updateTimeseries(ts, workerID) iafc.updateTimeseries(ts, workerID)
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used. // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
@ -794,15 +794,15 @@ func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncCo
return tss, nil return tss, nil
} }
func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs []*rollupConfig, func evalRollupNoIncrementalAggregate(funcName string, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
tss := make([]*timeseries, 0, rss.Len()*len(rcs)) tss := make([]*timeseries, 0, rss.Len()*len(rcs))
var tssLock sync.Mutex var tssLock sync.Mutex
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(name, rs.Values, rs.Timestamps) rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps)
for _, rc := range rcs { for _, rc := range rcs {
if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { if tsm := newTimeseriesMap(funcName, sharedTimestamps, &rs.MetricName); tsm != nil {
rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
tssLock.Lock() tssLock.Lock()
tss = tsm.AppendTimeseriesTo(tss) tss = tsm.AppendTimeseriesTo(tss)
@ -810,7 +810,7 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs
continue continue
} }
var ts timeseries var ts timeseries
doRollupForTimeseries(rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) doRollupForTimeseries(funcName, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
tssLock.Lock() tssLock.Lock()
tss = append(tss, &ts) tss = append(tss, &ts)
tssLock.Unlock() tssLock.Unlock()
@ -823,13 +823,13 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs
return tss, nil return tss, nil
} }
func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64, func doRollupForTimeseries(funcName string, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64,
sharedTimestamps []int64, removeMetricGroup bool) { sharedTimestamps []int64) {
tsDst.MetricName.CopyFrom(mnSrc) tsDst.MetricName.CopyFrom(mnSrc)
if len(rc.TagValue) > 0 { if len(rc.TagValue) > 0 {
tsDst.MetricName.AddTag("rollup", rc.TagValue) tsDst.MetricName.AddTag("rollup", rc.TagValue)
} }
if removeMetricGroup { if !rollupFuncsKeepMetricGroup[funcName] {
tsDst.MetricName.ResetMetricGroup() tsDst.MetricName.ResetMetricGroup()
} }
tsDst.Values = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc) tsDst.Values = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
@ -896,8 +896,8 @@ func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) {
dst.IsNegative = src.IsNegative dst.IsNegative = src.IsNegative
} }
func dropStaleNaNs(name string, values []float64, timestamps []int64) ([]float64, []int64) { func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) {
if *noStaleMarkers || name == "default_rollup" { if *noStaleMarkers || funcName == "default_rollup" {
// Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function, // Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function,
// since it uses them for Prometheus-style staleness detection. // since it uses them for Prometheus-style staleness detection.
return values, timestamps return values, timestamps

View file

@ -4615,6 +4615,67 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`quantile_over_time`, func(t *testing.T) {
t.Parallel()
q := `quantile_over_time(0.9, label_set(round(rand(0), 0.01), "__name__", "foo", "xx", "yy")[200s:5s])`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.89, 0.89, 0.95, 0.87, 0.92, 0.89},
Timestamps: timestampsExpected,
}
r.MetricName.MetricGroup = []byte("foo")
r.MetricName.Tags = []storage.Tag{
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`quantiles_over_time`, func(t *testing.T) {
t.Parallel()
q := `sort_by_label(
quantiles_over_time("phi", 0.5, 0.9,
label_set(round(rand(0), 0.01), "__name__", "foo", "xx", "yy")[200s:5s]
),
"phi",
)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.47, 0.57, 0.49, 0.54, 0.56, 0.53},
Timestamps: timestampsExpected,
}
r1.MetricName.MetricGroup = []byte("foo")
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("phi"),
Value: []byte("0.5"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0.89, 0.89, 0.95, 0.87, 0.92, 0.89},
Timestamps: timestampsExpected,
}
r2.MetricName.MetricGroup = []byte("foo")
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("phi"),
Value: []byte("0.9"),
},
{
Key: []byte("xx"),
Value: []byte("yy"),
},
}
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`histogram_over_time`, func(t *testing.T) { t.Run(`histogram_over_time`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort_by_label(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]), "vmrange")` q := `sort_by_label(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]), "vmrange")`

View file

@ -83,6 +83,7 @@ var rollupFuncs = map[string]newRollupFunc{
"ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime), "ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime),
"descent_over_time": newRollupFuncOneArg(rollupDescentOverTime), "descent_over_time": newRollupFuncOneArg(rollupDescentOverTime),
"zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime), "zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime),
"quantiles_over_time": newRollupQuantiles,
// `timestamp` function must return timestamp for the last datapoint on the current window // `timestamp` function must return timestamp for the last datapoint on the current window
// in order to properly handle offset and timestamps unaligned to the current step. // in order to properly handle offset and timestamps unaligned to the current step.
@ -156,6 +157,7 @@ var rollupFuncsCannotAdjustWindow = map[string]bool{
"sum_over_time": true, "sum_over_time": true,
"count_over_time": true, "count_over_time": true,
"quantile_over_time": true, "quantile_over_time": true,
"quantiles_over_time": true,
"stddev_over_time": true, "stddev_over_time": true,
"stdvar_over_time": true, "stdvar_over_time": true,
"absent_over_time": true, "absent_over_time": true,
@ -191,6 +193,7 @@ var rollupFuncsKeepMetricGroup = map[string]bool{
"min_over_time": true, "min_over_time": true,
"max_over_time": true, "max_over_time": true,
"quantile_over_time": true, "quantile_over_time": true,
"quantiles_over_time": true,
"rollup": true, "rollup": true,
"geomean_over_time": true, "geomean_over_time": true,
"hoeffding_bound_lower": true, "hoeffding_bound_lower": true,
@ -250,15 +253,17 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
return aggrFuncNames, nil return aggrFuncNames, nil
} }
func getRollupArgIdx(funcName string) int { func getRollupArgIdx(fe *metricsql.FuncExpr) int {
funcName = strings.ToLower(funcName) funcName := strings.ToLower(fe.Name)
if rollupFuncs[funcName] == nil { if rollupFuncs[funcName] == nil {
logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName) logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", fe.Name)
} }
switch funcName { switch funcName {
case "quantile_over_time", "aggr_over_time", case "quantile_over_time", "aggr_over_time",
"hoeffding_bound_lower", "hoeffding_bound_upper": "hoeffding_bound_lower", "hoeffding_bound_upper":
return 1 return 1
case "quantiles_over_time":
return len(fe.Args) - 1
default: default:
return 0 return 0
} }
@ -423,14 +428,16 @@ var (
const maxSilenceInterval = 5 * 60 * 1000 const maxSilenceInterval = 5 * 60 * 1000
type timeseriesMap struct { type timeseriesMap struct {
origin *timeseries origin *timeseries
labelName string h metrics.Histogram
h metrics.Histogram m map[string]*timeseries
m map[string]*timeseries
} }
func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap { func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap {
if strings.ToLower(funcName) != "histogram_over_time" { funcName = strings.ToLower(funcName)
switch funcName {
case "histogram_over_time", "quantiles_over_time":
default:
return nil return nil
} }
@ -440,13 +447,14 @@ func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.
} }
var origin timeseries var origin timeseries
origin.MetricName.CopyFrom(mnSrc) origin.MetricName.CopyFrom(mnSrc)
origin.MetricName.ResetMetricGroup() if !rollupFuncsKeepMetricGroup[funcName] {
origin.MetricName.ResetMetricGroup()
}
origin.Timestamps = sharedTimestamps origin.Timestamps = sharedTimestamps
origin.Values = values origin.Values = values
return &timeseriesMap{ return &timeseriesMap{
origin: &origin, origin: &origin,
labelName: "vmrange", m: make(map[string]*timeseries),
m: make(map[string]*timeseries),
} }
} }
@ -457,15 +465,15 @@ func (tsm *timeseriesMap) AppendTimeseriesTo(dst []*timeseries) []*timeseries {
return dst return dst
} }
func (tsm *timeseriesMap) GetOrCreateTimeseries(labelValue string) *timeseries { func (tsm *timeseriesMap) GetOrCreateTimeseries(labelName, labelValue string) *timeseries {
ts := tsm.m[labelValue] ts := tsm.m[labelValue]
if ts != nil { if ts != nil {
return ts return ts
} }
ts = &timeseries{} ts = &timeseries{}
ts.CopyFromShallowTimestamps(tsm.origin) ts.CopyFromShallowTimestamps(tsm.origin)
ts.MetricName.RemoveTag(tsm.labelName) ts.MetricName.RemoveTag(labelName)
ts.MetricName.AddTag(tsm.labelName, labelValue) ts.MetricName.AddTag(labelName, labelValue)
tsm.m[labelValue] = ts tsm.m[labelValue] = ts
return ts return ts
} }
@ -1024,6 +1032,57 @@ func rollupHoeffdingBoundInternal(rfa *rollupFuncArg, phis []float64) (float64,
return bound, vAvg return bound, vAvg
} }
func newRollupQuantiles(args []interface{}) (rollupFunc, error) {
if len(args) < 3 {
return nil, fmt.Errorf("unexpected number of args: %d; want at least 3 args", len(args))
}
tssPhi, ok := args[0].([]*timeseries)
if !ok {
return nil, fmt.Errorf("unexpected type for phi arg: %T; want string", args[0])
}
phiLabel, err := getString(tssPhi, 0)
if err != nil {
return nil, err
}
phiArgs := args[1 : len(args)-1]
phis := make([]float64, len(phiArgs))
phiStrs := make([]string, len(phiArgs))
for i, phiArg := range phiArgs {
phiValues, err := getScalar(phiArg, i+1)
if err != nil {
return nil, fmt.Errorf("cannot obtain phi from arg #%d: %w", i+1, err)
}
phis[i] = phiValues[0]
phiStrs[i] = fmt.Sprintf("%g", phiValues[0])
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
if len(values) == 1 {
// Fast path - only a single value.
return values[0]
}
hf := histogram.GetFast()
for _, v := range values {
hf.Update(v)
}
qs := hf.Quantiles(nil, phis)
histogram.PutFast(hf)
idx := rfa.idx
tsm := rfa.tsm
for i, phiStr := range phiStrs {
ts := tsm.GetOrCreateTimeseries(phiLabel, phiStr)
ts.Values[idx] = qs[i]
}
return nan
}
return rf, nil
}
func newRollupQuantile(args []interface{}) (rollupFunc, error) { func newRollupQuantile(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil { if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err return nil, err
@ -1064,7 +1123,7 @@ func rollupHistogram(rfa *rollupFuncArg) float64 {
} }
idx := rfa.idx idx := rfa.idx
tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) { tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ts := tsm.GetOrCreateTimeseries(vmrange) ts := tsm.GetOrCreateTimeseries("vmrange", vmrange)
ts.Values[idx] = float64(count) ts.Values[idx] = float64(count)
}) })
return nan return nan

View file

@ -508,6 +508,7 @@ func TestRollupNewRollupFuncError(t *testing.T) {
f("holt_winters", nil) f("holt_winters", nil)
f("predict_linear", nil) f("predict_linear", nil)
f("quantile_over_time", nil) f("quantile_over_time", nil)
f("quantiles_over_time", nil)
// Invalid arg type // Invalid arg type
scalarTs := []*timeseries{{ scalarTs := []*timeseries{{
@ -521,6 +522,7 @@ func TestRollupNewRollupFuncError(t *testing.T) {
f("predict_linear", []interface{}{123, 123}) f("predict_linear", []interface{}{123, 123})
f("predict_linear", []interface{}{me, 123}) f("predict_linear", []interface{}{me, 123})
f("quantile_over_time", []interface{}{123, 123}) f("quantile_over_time", []interface{}{123, 123})
f("quantiles_over_time", []interface{}{123, 123})
} }
func TestRollupNoWindowNoPoints(t *testing.T) { func TestRollupNoWindowNoPoints(t *testing.T) {

View file

@ -27,6 +27,7 @@ sort: 15
* FEATURE: add [mad(q)](https://docs.victoriametrics.com/MetricsQL.html#mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) for groups of points with identical timestamps across multiple time series. * FEATURE: add [mad(q)](https://docs.victoriametrics.com/MetricsQL.html#mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) for groups of points with identical timestamps across multiple time series.
* FEATURE: add [outliers_mad(tolerance, q)](https://docs.victoriametrics.com/MetricsQL.html#outliers_mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It returns time series with peaks outside the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) multiplied by `tolerance`. * FEATURE: add [outliers_mad(tolerance, q)](https://docs.victoriametrics.com/MetricsQL.html#outliers_mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It returns time series with peaks outside the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) multiplied by `tolerance`.
* FEATURE: add `histogram_quantiles("phiLabel", phi1, ..., phiN, buckets)` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates the given `phi*`-quantiles over the given `buckets` and returns time series per each quantile with the corresponding `{phiLabel="phi*"}` label. * FEATURE: add `histogram_quantiles("phiLabel", phi1, ..., phiN, buckets)` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates the given `phi*`-quantiles over the given `buckets` and returns time series per each quantile with the corresponding `{phiLabel="phi*"}` label.
* FEATURE: add `quantiles_over_time("phiLabel", phi1, ..., phiN, series_selector[d])` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates the given `phi*`-quantiles over raw samples selected by `series_selector` on the given lookbehind window `d`. It returns time series per each quantile with the corresponding `{phiLabel="phi*"}` label.
* FEATURE: [enterprise](https://victoriametrics.com/enterprise.html): do not ask for `-eula` flag if `-version` flag is passed to enteprise app. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1621). * FEATURE: [enterprise](https://victoriametrics.com/enterprise.html): do not ask for `-eula` flag if `-version` flag is passed to enteprise app. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1621).
* BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601). * BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601).
@ -36,7 +37,7 @@ sort: 15
* BUGFIX: vmagent: properly use `https` scheme for wildcard TLS certificates for `role: ingress` targets in Kubernetes service discovery. See [this issue](https://github.com/prometheus/prometheus/issues/8902). * BUGFIX: vmagent: properly use `https` scheme for wildcard TLS certificates for `role: ingress` targets in Kubernetes service discovery. See [this issue](https://github.com/prometheus/prometheus/issues/8902).
* BUGFIX: vmagent: support host networking mode for `docker_sd_config`. See [this issue](https://github.com/prometheus/prometheus/issues/9116). * BUGFIX: vmagent: support host networking mode for `docker_sd_config`. See [this issue](https://github.com/prometheus/prometheus/issues/9116).
* BUGFIX: fix non-repeatable results from `quantile_over_time()` function when the number of input samples exceeds 1000. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612). * BUGFIX: fix non-repeatable results from `quantile_over_time()` function when the number of input samples exceeds 1000. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612).
* BUGFIX: fix EC2 zone discovery when `filters` are specified in [ec2_sc_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1626). * BUGFIX: vmagent: fix EC2 zone discovery when `filters` are specified in [ec2_sc_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1626).
## [v1.65.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.65.0) ## [v1.65.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.65.0)

View file

@ -228,7 +228,11 @@ See also [implicit query conversions](#implicit-query-conversions).
#### quantile_over_time #### quantile_over_time
`quantile_over_time(phi, series_selector[d])` calculates `phi`-quantile over raw samples on the given lookbehind window `d` per each time series returned from the given [series_selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). The `phi` value must be in the range `[0...1]`. This function is supported by PromQL. `quantile_over_time(phi, series_selector[d])` calculates `phi`-quantile over raw samples on the given lookbehind window `d` per each time series returned from the given [series_selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). The `phi` value must be in the range `[0...1]`. This function is supported by PromQL. See also [quantiles_over_time](#quantiles_over_time).
#### quantiles_over_time
`quantiles_over_time("phiLabel", phi1, ..., phiN, series_selector[d])` calculates `phi*`-quantiles over raw samples on the given lookbehind window `d` per each time series returned from the given [series_selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). The function returns individual series per each `phi*` with `{phiLabel="phi*"}` label. `phi*` values must be in the range `[0...1]`. See also [quantile_over_time](#quantile_over_time).
#### range_over_time #### range_over_time
@ -413,11 +417,11 @@ See also [implicit query conversions](#implicit-query-conversions).
#### histogram_quantile #### histogram_quantile
`histogram_quantile(phi, buckets)` calculates `phi`-quantile over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). For example, `histogram_quantile(0.5, sum(rate(http_request_duration_seconds_bucket[5m]) by (le))` would return median request duration for all the requests during the last 5 minutes. It accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706). This function is supported by PromQL (except of the `boundLabel` arg). See also [histogram_quantiles](#histogram_quantiles) and [histogram_share](#histogram_share). `histogram_quantile(phi, buckets)` calculates `phi`-quantile over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). `phi` must be in the range `[0...1]`. For example, `histogram_quantile(0.5, sum(rate(http_request_duration_seconds_bucket[5m]) by (le))` would return median request duration for all the requests during the last 5 minutes. It accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706). This function is supported by PromQL (except of the `boundLabel` arg). See also [histogram_quantiles](#histogram_quantiles) and [histogram_share](#histogram_share).
#### histogram_quantiles #### histogram_quantiles
`histogram_quantiles("phiLabel", phi1, ..., phiN, buckets)` calculates the given `phi*`-quantiles over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). Each calculated quantile is returned in a separate time series with the corresponding `{phiLabel="phi*"}` label. See also [histogram_quantile](#histogram_quantile). `histogram_quantiles("phiLabel", phi1, ..., phiN, buckets)` calculates the given `phi*`-quantiles over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). `phi*` must be in the range `[0...1]`. Each calculated quantile is returned in a separate time series with the corresponding `{phiLabel="phi*"}` label. See also [histogram_quantile](#histogram_quantile).
#### histogram_share #### histogram_share
@ -513,7 +517,7 @@ See also [implicit query conversions](#implicit-query-conversions).
#### range_quantile #### range_quantile
`range_quantile(phi, q)` returns `phi`-quantile across points per each time series returned by `q`. `range_quantile(phi, q)` returns `phi`-quantile across points per each time series returned by `q`. `phi` must be in the range `[0...1]`.
#### range_sum #### range_sum
@ -782,11 +786,11 @@ See also [implicit query conversions](#implicit-query-conversions).
#### quantile #### quantile
`quantile(phi, q) by (group_labels)` calculates `phi`-quantile per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL. See also [quantiles](#quantiles). `quantile(phi, q) by (group_labels)` calculates `phi`-quantile per each `group_labels` for all the time series returned by `q`. `phi` must be in the range `[0...1]`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL. See also [quantiles](#quantiles).
#### quantiles #### quantiles
`quantiles("phiLabel", phi1, ..., phiN, q)` calculates `phi*`-quantiles for all the time series returned by `q` and return them in time series with `{phiLabel="phi*"}` label. The aggregate is calculated individually per each group of points with the same timestamp. See also [quantile](#quantile). `quantiles("phiLabel", phi1, ..., phiN, q)` calculates `phi*`-quantiles for all the time series returned by `q` and return them in time series with `{phiLabel="phi*"}` label. `phi*` must be in the range `[0...1]`. The aggregate is calculated individually per each group of points with the same timestamp. See also [quantile](#quantile).
#### stddev #### stddev

2
go.mod
View file

@ -9,7 +9,7 @@ require (
// like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b
github.com/VictoriaMetrics/fasthttp v1.1.0 github.com/VictoriaMetrics/fasthttp v1.1.0
github.com/VictoriaMetrics/metrics v1.18.0 github.com/VictoriaMetrics/metrics v1.18.0
github.com/VictoriaMetrics/metricsql v0.23.0 github.com/VictoriaMetrics/metricsql v0.24.0
github.com/VividCortex/ewma v1.2.0 // indirect github.com/VividCortex/ewma v1.2.0 // indirect
github.com/aws/aws-sdk-go v1.40.43 github.com/aws/aws-sdk-go v1.40.43
github.com/cespare/xxhash/v2 v2.1.2 github.com/cespare/xxhash/v2 v2.1.2

4
go.sum
View file

@ -109,8 +109,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR
github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE=
github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ= github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ=
github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA=
github.com/VictoriaMetrics/metricsql v0.23.0 h1:NWqoCrL2kz864OlaDBEU7c2fA7AjKfFq/nXM6di4xz8= github.com/VictoriaMetrics/metricsql v0.24.0 h1:1SOiuEaSgfS2CiQyCAlYQs3WPHzXNMPUSXtE1Zx6qDw=
github.com/VictoriaMetrics/metricsql v0.23.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= github.com/VictoriaMetrics/metricsql v0.24.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=

View file

@ -68,6 +68,7 @@ var rollupFuncs = map[string]bool{
"ascent_over_time": true, "ascent_over_time": true,
"descent_over_time": true, "descent_over_time": true,
"zscore_over_time": true, "zscore_over_time": true,
"quantiles_over_time": true,
// `timestamp` func has been moved here because it must work properly with offsets and samples unaligned to the current step. // `timestamp` func has been moved here because it must work properly with offsets and samples unaligned to the current step.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.

2
vendor/modules.txt vendored
View file

@ -22,7 +22,7 @@ github.com/VictoriaMetrics/fasthttp/stackless
# github.com/VictoriaMetrics/metrics v1.18.0 # github.com/VictoriaMetrics/metrics v1.18.0
## explicit ## explicit
github.com/VictoriaMetrics/metrics github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.23.0 # github.com/VictoriaMetrics/metricsql v0.24.0
## explicit ## explicit
github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop github.com/VictoriaMetrics/metricsql/binaryop