From 9a3d0c43b5bad3bdd27fed752d20a799b8adb700 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 17 Sep 2021 23:33:15 +0300 Subject: [PATCH] app/vmselect/promql: add `quantiles_over_time("phiLabel", phi1, ..., phiN, m[d])` function for calculating multiple quantiles at once --- app/vmselect/promql/eval.go | 68 +++++++------- app/vmselect/promql/exec_test.go | 61 +++++++++++++ app/vmselect/promql/rollup.go | 91 +++++++++++++++---- app/vmselect/promql/rollup_test.go | 2 + docs/CHANGELOG.md | 3 +- docs/MetricsQL.md | 16 ++-- go.mod | 2 +- go.sum | 4 +- .../VictoriaMetrics/metricsql/rollup.go | 1 + vendor/modules.txt | 2 +- 10 files changed, 189 insertions(+), 61 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 0445b7d73..f4ac9dc78 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "math" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" @@ -390,7 +391,7 @@ func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.F if nrf == nil { return nil, nil } - rollupArgIdx := getRollupArgIdx(fe.Name) + rollupArgIdx := getRollupArgIdx(fe) if rollupArgIdx >= len(fe.Args) { // Incorrect number of args for rollup func. return nil, nil @@ -430,7 +431,7 @@ func evalExprs(ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) { func evalRollupFuncArgs(ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) { var re *metricsql.RollupExpr - rollupArgIdx := getRollupArgIdx(fe.Name) + rollupArgIdx := getRollupArgIdx(fe) if len(fe.Args) <= rollupArgIdx { return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil)) } @@ -478,7 +479,8 @@ func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr { return &reNew } -func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { +func evalRollupFunc(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) { + funcName = strings.ToLower(funcName) ecNew := ec var offset int64 if re.Offset != nil { @@ -491,7 +493,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E // so cache hit rate should be quite good. // See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976 } - if name == "rollup_candlestick" { + if funcName == "rollup_candlestick" { // Automatically apply `offset -step` to `rollup_candlestick` function // in order to obtain expected OHLC results. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462 @@ -504,12 +506,12 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E var rvs []*timeseries var err error if me, ok := re.Expr.(*metricsql.MetricExpr); ok { - rvs, err = evalRollupFuncWithMetricExpr(ecNew, name, rf, expr, me, iafc, re.Window) + rvs, err = evalRollupFuncWithMetricExpr(ecNew, funcName, rf, expr, me, iafc, re.Window) } else { if iafc != nil { - logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", name, re.AppendString(nil)) + logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil)) } - rvs, err = evalRollupFuncWithSubquery(ecNew, name, rf, expr, re) + rvs, err = evalRollupFuncWithSubquery(ecNew, funcName, rf, expr, re) } if err != nil { return nil, err @@ -528,7 +530,7 @@ func evalRollupFunc(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.E return rvs, nil } -func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) { +func evalRollupFuncWithSubquery(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) { // TODO: determine whether to use rollupResultCacheV here. step := re.Step.Duration(ec.Step) if step == 0 { @@ -550,25 +552,24 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr return nil, err } if len(tssSQ) == 0 { - if name == "absent_over_time" { + if funcName == "absent_over_time" { tss := evalNumber(ec, 1) return tss, nil } return nil, nil } sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step) - preFunc, rcs, err := getRollupConfigs(name, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) + preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) if err != nil { return nil, err } tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) var tssLock sync.Mutex - removeMetricGroup := !rollupFuncsKeepMetricGroup[name] doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) { values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) for _, rc := range rcs { - if tsm := newTimeseriesMap(name, sharedTimestamps, &tsSQ.MetricName); tsm != nil { + if tsm := newTimeseriesMap(funcName, sharedTimestamps, &tsSQ.MetricName); tsm != nil { rc.DoTimeseriesMap(tsm, values, timestamps) tssLock.Lock() tss = tsm.AppendTimeseriesTo(tss) @@ -576,7 +577,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, expr continue } var ts timeseries - doRollupForTimeseries(rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps, removeMetricGroup) + doRollupForTimeseries(funcName, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps) tssLock.Lock() tss = append(tss, &ts) tssLock.Unlock() @@ -642,7 +643,7 @@ var ( rollupResultCacheMiss = metrics.NewCounter(`vm_rollup_result_cache_miss_total`) ) -func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, +func evalRollupFuncWithMetricExpr(ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, me *metricsql.MetricExpr, iafc *incrementalAggrFuncContext, windowExpr *metricsql.DurationExpr) ([]*timeseries, error) { if me.IsEmpty() { return evalNumber(ec, nan), nil @@ -665,7 +666,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, // Obtain rollup configs before fetching data from db, // so type errors can be caught earlier. sharedTimestamps := getTimestamps(start, ec.End, ec.Step) - preFunc, rcs, err := getRollupConfigs(name, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) + preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) if err != nil { return nil, err } @@ -689,7 +690,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, if rssLen == 0 { rss.Cancel() var tss []*timeseries - if name == "absent_over_time" { + if funcName == "absent_over_time" { tss = getAbsentTimeseries(ec, me) } // Add missing points until ec.End. @@ -735,12 +736,11 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, defer rml.Put(uint64(rollupMemorySize)) // Evaluate rollup - removeMetricGroup := !rollupFuncsKeepMetricGroup[name] var tss []*timeseries if iafc != nil { - tss, err = evalRollupWithIncrementalAggregate(name, iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) + tss, err = evalRollupWithIncrementalAggregate(funcName, iafc, rss, rcs, preFunc, sharedTimestamps) } else { - tss, err = evalRollupNoIncrementalAggregate(name, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) + tss, err = evalRollupNoIncrementalAggregate(funcName, rss, rcs, preFunc, sharedTimestamps) } if err != nil { return nil, err @@ -762,15 +762,15 @@ func getRollupMemoryLimiter() *memoryLimiter { return &rollupMemoryLimiter } -func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, - preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { +func evalRollupWithIncrementalAggregate(funcName string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, + preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { - rs.Values, rs.Timestamps = dropStaleNaNs(name, rs.Values, rs.Timestamps) + rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) for _, rc := range rcs { - if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { + if tsm := newTimeseriesMap(funcName, sharedTimestamps, &rs.MetricName); tsm != nil { rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) for _, ts := range tsm.m { iafc.updateTimeseries(ts, workerID) @@ -778,7 +778,7 @@ func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncCo continue } ts.Reset() - doRollupForTimeseries(rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) + doRollupForTimeseries(funcName, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) iafc.updateTimeseries(ts, workerID) // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used. @@ -794,15 +794,15 @@ func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncCo return tss, nil } -func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs []*rollupConfig, - preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { +func evalRollupNoIncrementalAggregate(funcName string, rss *netstorage.Results, rcs []*rollupConfig, + preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { tss := make([]*timeseries, 0, rss.Len()*len(rcs)) var tssLock sync.Mutex err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { - rs.Values, rs.Timestamps = dropStaleNaNs(name, rs.Values, rs.Timestamps) + rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { - if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { + if tsm := newTimeseriesMap(funcName, sharedTimestamps, &rs.MetricName); tsm != nil { rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) tssLock.Lock() tss = tsm.AppendTimeseriesTo(tss) @@ -810,7 +810,7 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs continue } var ts timeseries - doRollupForTimeseries(rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) + doRollupForTimeseries(funcName, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) tssLock.Lock() tss = append(tss, &ts) tssLock.Unlock() @@ -823,13 +823,13 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs return tss, nil } -func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64, - sharedTimestamps []int64, removeMetricGroup bool) { +func doRollupForTimeseries(funcName string, rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64, + sharedTimestamps []int64) { tsDst.MetricName.CopyFrom(mnSrc) if len(rc.TagValue) > 0 { tsDst.MetricName.AddTag("rollup", rc.TagValue) } - if removeMetricGroup { + if !rollupFuncsKeepMetricGroup[funcName] { tsDst.MetricName.ResetMetricGroup() } tsDst.Values = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc) @@ -896,8 +896,8 @@ func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) { dst.IsNegative = src.IsNegative } -func dropStaleNaNs(name string, values []float64, timestamps []int64) ([]float64, []int64) { - if *noStaleMarkers || name == "default_rollup" { +func dropStaleNaNs(funcName string, values []float64, timestamps []int64) ([]float64, []int64) { + if *noStaleMarkers || funcName == "default_rollup" { // Do not drop Prometheus staleness marks (aka stale NaNs) for default_rollup() function, // since it uses them for Prometheus-style staleness detection. return values, timestamps diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 8c7081635..f0c6fe446 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -4615,6 +4615,67 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`quantile_over_time`, func(t *testing.T) { + t.Parallel() + q := `quantile_over_time(0.9, label_set(round(rand(0), 0.01), "__name__", "foo", "xx", "yy")[200s:5s])` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.89, 0.89, 0.95, 0.87, 0.92, 0.89}, + Timestamps: timestampsExpected, + } + r.MetricName.MetricGroup = []byte("foo") + r.MetricName.Tags = []storage.Tag{ + { + Key: []byte("xx"), + Value: []byte("yy"), + }, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`quantiles_over_time`, func(t *testing.T) { + t.Parallel() + q := `sort_by_label( + quantiles_over_time("phi", 0.5, 0.9, + label_set(round(rand(0), 0.01), "__name__", "foo", "xx", "yy")[200s:5s] + ), + "phi", + )` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.47, 0.57, 0.49, 0.54, 0.56, 0.53}, + Timestamps: timestampsExpected, + } + r1.MetricName.MetricGroup = []byte("foo") + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("phi"), + Value: []byte("0.5"), + }, + { + Key: []byte("xx"), + Value: []byte("yy"), + }, + } + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{0.89, 0.89, 0.95, 0.87, 0.92, 0.89}, + Timestamps: timestampsExpected, + } + r2.MetricName.MetricGroup = []byte("foo") + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("phi"), + Value: []byte("0.9"), + }, + { + Key: []byte("xx"), + Value: []byte("yy"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) t.Run(`histogram_over_time`, func(t *testing.T) { t.Parallel() q := `sort_by_label(histogram_over_time(alias(label_set(rand(0)*1.3+1.1, "foo", "bar"), "xxx")[200s:5s]), "vmrange")` diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index aaeb95bc9..1be901f65 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -83,6 +83,7 @@ var rollupFuncs = map[string]newRollupFunc{ "ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime), "descent_over_time": newRollupFuncOneArg(rollupDescentOverTime), "zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime), + "quantiles_over_time": newRollupQuantiles, // `timestamp` function must return timestamp for the last datapoint on the current window // in order to properly handle offset and timestamps unaligned to the current step. @@ -156,6 +157,7 @@ var rollupFuncsCannotAdjustWindow = map[string]bool{ "sum_over_time": true, "count_over_time": true, "quantile_over_time": true, + "quantiles_over_time": true, "stddev_over_time": true, "stdvar_over_time": true, "absent_over_time": true, @@ -191,6 +193,7 @@ var rollupFuncsKeepMetricGroup = map[string]bool{ "min_over_time": true, "max_over_time": true, "quantile_over_time": true, + "quantiles_over_time": true, "rollup": true, "geomean_over_time": true, "hoeffding_bound_lower": true, @@ -250,15 +253,17 @@ func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) { return aggrFuncNames, nil } -func getRollupArgIdx(funcName string) int { - funcName = strings.ToLower(funcName) +func getRollupArgIdx(fe *metricsql.FuncExpr) int { + funcName := strings.ToLower(fe.Name) if rollupFuncs[funcName] == nil { - logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName) + logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", fe.Name) } switch funcName { case "quantile_over_time", "aggr_over_time", "hoeffding_bound_lower", "hoeffding_bound_upper": return 1 + case "quantiles_over_time": + return len(fe.Args) - 1 default: return 0 } @@ -423,14 +428,16 @@ var ( const maxSilenceInterval = 5 * 60 * 1000 type timeseriesMap struct { - origin *timeseries - labelName string - h metrics.Histogram - m map[string]*timeseries + origin *timeseries + h metrics.Histogram + m map[string]*timeseries } func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap { - if strings.ToLower(funcName) != "histogram_over_time" { + funcName = strings.ToLower(funcName) + switch funcName { + case "histogram_over_time", "quantiles_over_time": + default: return nil } @@ -440,13 +447,14 @@ func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage. } var origin timeseries origin.MetricName.CopyFrom(mnSrc) - origin.MetricName.ResetMetricGroup() + if !rollupFuncsKeepMetricGroup[funcName] { + origin.MetricName.ResetMetricGroup() + } origin.Timestamps = sharedTimestamps origin.Values = values return ×eriesMap{ - origin: &origin, - labelName: "vmrange", - m: make(map[string]*timeseries), + origin: &origin, + m: make(map[string]*timeseries), } } @@ -457,15 +465,15 @@ func (tsm *timeseriesMap) AppendTimeseriesTo(dst []*timeseries) []*timeseries { return dst } -func (tsm *timeseriesMap) GetOrCreateTimeseries(labelValue string) *timeseries { +func (tsm *timeseriesMap) GetOrCreateTimeseries(labelName, labelValue string) *timeseries { ts := tsm.m[labelValue] if ts != nil { return ts } ts = ×eries{} ts.CopyFromShallowTimestamps(tsm.origin) - ts.MetricName.RemoveTag(tsm.labelName) - ts.MetricName.AddTag(tsm.labelName, labelValue) + ts.MetricName.RemoveTag(labelName) + ts.MetricName.AddTag(labelName, labelValue) tsm.m[labelValue] = ts return ts } @@ -1024,6 +1032,57 @@ func rollupHoeffdingBoundInternal(rfa *rollupFuncArg, phis []float64) (float64, return bound, vAvg } +func newRollupQuantiles(args []interface{}) (rollupFunc, error) { + if len(args) < 3 { + return nil, fmt.Errorf("unexpected number of args: %d; want at least 3 args", len(args)) + } + tssPhi, ok := args[0].([]*timeseries) + if !ok { + return nil, fmt.Errorf("unexpected type for phi arg: %T; want string", args[0]) + } + phiLabel, err := getString(tssPhi, 0) + if err != nil { + return nil, err + } + phiArgs := args[1 : len(args)-1] + phis := make([]float64, len(phiArgs)) + phiStrs := make([]string, len(phiArgs)) + for i, phiArg := range phiArgs { + phiValues, err := getScalar(phiArg, i+1) + if err != nil { + return nil, fmt.Errorf("cannot obtain phi from arg #%d: %w", i+1, err) + } + phis[i] = phiValues[0] + phiStrs[i] = fmt.Sprintf("%g", phiValues[0]) + } + rf := func(rfa *rollupFuncArg) float64 { + // There is no need in handling NaNs here, since they must be cleaned up + // before calling rollup funcs. + values := rfa.values + if len(values) == 0 { + return rfa.prevValue + } + if len(values) == 1 { + // Fast path - only a single value. + return values[0] + } + hf := histogram.GetFast() + for _, v := range values { + hf.Update(v) + } + qs := hf.Quantiles(nil, phis) + histogram.PutFast(hf) + idx := rfa.idx + tsm := rfa.tsm + for i, phiStr := range phiStrs { + ts := tsm.GetOrCreateTimeseries(phiLabel, phiStr) + ts.Values[idx] = qs[i] + } + return nan + } + return rf, nil +} + func newRollupQuantile(args []interface{}) (rollupFunc, error) { if err := expectRollupArgsNum(args, 2); err != nil { return nil, err @@ -1064,7 +1123,7 @@ func rollupHistogram(rfa *rollupFuncArg) float64 { } idx := rfa.idx tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) { - ts := tsm.GetOrCreateTimeseries(vmrange) + ts := tsm.GetOrCreateTimeseries("vmrange", vmrange) ts.Values[idx] = float64(count) }) return nan diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 4b5eb540f..a74491ab5 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -508,6 +508,7 @@ func TestRollupNewRollupFuncError(t *testing.T) { f("holt_winters", nil) f("predict_linear", nil) f("quantile_over_time", nil) + f("quantiles_over_time", nil) // Invalid arg type scalarTs := []*timeseries{{ @@ -521,6 +522,7 @@ func TestRollupNewRollupFuncError(t *testing.T) { f("predict_linear", []interface{}{123, 123}) f("predict_linear", []interface{}{me, 123}) f("quantile_over_time", []interface{}{123, 123}) + f("quantiles_over_time", []interface{}{123, 123}) } func TestRollupNoWindowNoPoints(t *testing.T) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 207e77dad..827e4f3ce 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -27,6 +27,7 @@ sort: 15 * FEATURE: add [mad(q)](https://docs.victoriametrics.com/MetricsQL.html#mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) for groups of points with identical timestamps across multiple time series. * FEATURE: add [outliers_mad(tolerance, q)](https://docs.victoriametrics.com/MetricsQL.html#outliers_mad) function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It returns time series with peaks outside the [Median absolute deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) multiplied by `tolerance`. * FEATURE: add `histogram_quantiles("phiLabel", phi1, ..., phiN, buckets)` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates the given `phi*`-quantiles over the given `buckets` and returns time series per each quantile with the corresponding `{phiLabel="phi*"}` label. +* FEATURE: add `quantiles_over_time("phiLabel", phi1, ..., phiN, series_selector[d])` function to [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html). It calculates the given `phi*`-quantiles over raw samples selected by `series_selector` on the given lookbehind window `d`. It returns time series per each quantile with the corresponding `{phiLabel="phi*"}` label. * FEATURE: [enterprise](https://victoriametrics.com/enterprise.html): do not ask for `-eula` flag if `-version` flag is passed to enteprise app. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1621). * BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601). @@ -36,7 +37,7 @@ sort: 15 * BUGFIX: vmagent: properly use `https` scheme for wildcard TLS certificates for `role: ingress` targets in Kubernetes service discovery. See [this issue](https://github.com/prometheus/prometheus/issues/8902). * BUGFIX: vmagent: support host networking mode for `docker_sd_config`. See [this issue](https://github.com/prometheus/prometheus/issues/9116). * BUGFIX: fix non-repeatable results from `quantile_over_time()` function when the number of input samples exceeds 1000. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1612). -* BUGFIX: fix EC2 zone discovery when `filters` are specified in [ec2_sc_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1626). +* BUGFIX: vmagent: fix EC2 zone discovery when `filters` are specified in [ec2_sc_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1626). ## [v1.65.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.65.0) diff --git a/docs/MetricsQL.md b/docs/MetricsQL.md index affe22655..fac354a79 100644 --- a/docs/MetricsQL.md +++ b/docs/MetricsQL.md @@ -228,7 +228,11 @@ See also [implicit query conversions](#implicit-query-conversions). #### quantile_over_time -`quantile_over_time(phi, series_selector[d])` calculates `phi`-quantile over raw samples on the given lookbehind window `d` per each time series returned from the given [series_selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). The `phi` value must be in the range `[0...1]`. This function is supported by PromQL. +`quantile_over_time(phi, series_selector[d])` calculates `phi`-quantile over raw samples on the given lookbehind window `d` per each time series returned from the given [series_selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). The `phi` value must be in the range `[0...1]`. This function is supported by PromQL. See also [quantiles_over_time](#quantiles_over_time). + +#### quantiles_over_time + +`quantiles_over_time("phiLabel", phi1, ..., phiN, series_selector[d])` calculates `phi*`-quantiles over raw samples on the given lookbehind window `d` per each time series returned from the given [series_selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors). The function returns individual series per each `phi*` with `{phiLabel="phi*"}` label. `phi*` values must be in the range `[0...1]`. See also [quantile_over_time](#quantile_over_time). #### range_over_time @@ -413,11 +417,11 @@ See also [implicit query conversions](#implicit-query-conversions). #### histogram_quantile -`histogram_quantile(phi, buckets)` calculates `phi`-quantile over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). For example, `histogram_quantile(0.5, sum(rate(http_request_duration_seconds_bucket[5m]) by (le))` would return median request duration for all the requests during the last 5 minutes. It accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706). This function is supported by PromQL (except of the `boundLabel` arg). See also [histogram_quantiles](#histogram_quantiles) and [histogram_share](#histogram_share). +`histogram_quantile(phi, buckets)` calculates `phi`-quantile over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). `phi` must be in the range `[0...1]`. For example, `histogram_quantile(0.5, sum(rate(http_request_duration_seconds_bucket[5m]) by (le))` would return median request duration for all the requests during the last 5 minutes. It accepts optional third arg - `boundsLabel`. In this case it returns `lower` and `upper` bounds for the estimated percentile. See [this issue for details](https://github.com/prometheus/prometheus/issues/5706). This function is supported by PromQL (except of the `boundLabel` arg). See also [histogram_quantiles](#histogram_quantiles) and [histogram_share](#histogram_share). #### histogram_quantiles -`histogram_quantiles("phiLabel", phi1, ..., phiN, buckets)` calculates the given `phi*`-quantiles over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). Each calculated quantile is returned in a separate time series with the corresponding `{phiLabel="phi*"}` label. See also [histogram_quantile](#histogram_quantile). +`histogram_quantiles("phiLabel", phi1, ..., phiN, buckets)` calculates the given `phi*`-quantiles over the given [histogram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350). `phi*` must be in the range `[0...1]`. Each calculated quantile is returned in a separate time series with the corresponding `{phiLabel="phi*"}` label. See also [histogram_quantile](#histogram_quantile). #### histogram_share @@ -513,7 +517,7 @@ See also [implicit query conversions](#implicit-query-conversions). #### range_quantile -`range_quantile(phi, q)` returns `phi`-quantile across points per each time series returned by `q`. +`range_quantile(phi, q)` returns `phi`-quantile across points per each time series returned by `q`. `phi` must be in the range `[0...1]`. #### range_sum @@ -782,11 +786,11 @@ See also [implicit query conversions](#implicit-query-conversions). #### quantile -`quantile(phi, q) by (group_labels)` calculates `phi`-quantile per each `group_labels` for all the time series returned by `q`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL. See also [quantiles](#quantiles). +`quantile(phi, q) by (group_labels)` calculates `phi`-quantile per each `group_labels` for all the time series returned by `q`. `phi` must be in the range `[0...1]`. The aggregate is calculated individually per each group of points with the same timestamp. This function is supported by PromQL. See also [quantiles](#quantiles). #### quantiles -`quantiles("phiLabel", phi1, ..., phiN, q)` calculates `phi*`-quantiles for all the time series returned by `q` and return them in time series with `{phiLabel="phi*"}` label. The aggregate is calculated individually per each group of points with the same timestamp. See also [quantile](#quantile). +`quantiles("phiLabel", phi1, ..., phiN, q)` calculates `phi*`-quantiles for all the time series returned by `q` and return them in time series with `{phiLabel="phi*"}` label. `phi*` must be in the range `[0...1]`. The aggregate is calculated individually per each group of points with the same timestamp. See also [quantile](#quantile). #### stddev diff --git a/go.mod b/go.mod index 7b9779d74..58058a4f6 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( // like https://github.com/valyala/fasthttp/commit/996610f021ff45fdc98c2ce7884d5fa4e7f9199b github.com/VictoriaMetrics/fasthttp v1.1.0 github.com/VictoriaMetrics/metrics v1.18.0 - github.com/VictoriaMetrics/metricsql v0.23.0 + github.com/VictoriaMetrics/metricsql v0.24.0 github.com/VividCortex/ewma v1.2.0 // indirect github.com/aws/aws-sdk-go v1.40.43 github.com/cespare/xxhash/v2 v2.1.2 diff --git a/go.sum b/go.sum index 5af185252..d53fcbbb1 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,8 @@ github.com/VictoriaMetrics/fasthttp v1.1.0/go.mod h1:/7DMcogqd+aaD3G3Hg5kFgoFwlR github.com/VictoriaMetrics/metrics v1.12.2/go.mod h1:Z1tSfPfngDn12bTfZSCqArT3OPY3u88J12hSoOhuiRE= github.com/VictoriaMetrics/metrics v1.18.0 h1:vov5NxDHRSXFbdiH4dYLYEjKLoAXXSQ7hcnG8TSD9JQ= github.com/VictoriaMetrics/metrics v1.18.0/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= -github.com/VictoriaMetrics/metricsql v0.23.0 h1:NWqoCrL2kz864OlaDBEU7c2fA7AjKfFq/nXM6di4xz8= -github.com/VictoriaMetrics/metricsql v0.23.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= +github.com/VictoriaMetrics/metricsql v0.24.0 h1:1SOiuEaSgfS2CiQyCAlYQs3WPHzXNMPUSXtE1Zx6qDw= +github.com/VictoriaMetrics/metricsql v0.24.0/go.mod h1:ylO7YITho/Iw6P71oEaGyHbO94bGoGtzWfLGqFhMIg8= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= diff --git a/vendor/github.com/VictoriaMetrics/metricsql/rollup.go b/vendor/github.com/VictoriaMetrics/metricsql/rollup.go index a7d17216e..68b1c8929 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/rollup.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/rollup.go @@ -68,6 +68,7 @@ var rollupFuncs = map[string]bool{ "ascent_over_time": true, "descent_over_time": true, "zscore_over_time": true, + "quantiles_over_time": true, // `timestamp` func has been moved here because it must work properly with offsets and samples unaligned to the current step. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details. diff --git a/vendor/modules.txt b/vendor/modules.txt index 659da9db3..3e5e92daa 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -22,7 +22,7 @@ github.com/VictoriaMetrics/fasthttp/stackless # github.com/VictoriaMetrics/metrics v1.18.0 ## explicit github.com/VictoriaMetrics/metrics -# github.com/VictoriaMetrics/metricsql v0.23.0 +# github.com/VictoriaMetrics/metricsql v0.24.0 ## explicit github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql/binaryop