package promql import ( "fmt" "math" "sort" "strconv" "strings" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" ) var aggrFuncs = map[string]aggrFunc{ // See https://prometheus.io/docs/prometheus/latest/querying/operators/#aggregation-operators "sum": newAggrFunc(aggrFuncSum), "min": newAggrFunc(aggrFuncMin), "max": newAggrFunc(aggrFuncMax), "avg": newAggrFunc(aggrFuncAvg), "stddev": newAggrFunc(aggrFuncStddev), "stdvar": newAggrFunc(aggrFuncStdvar), "count": newAggrFunc(aggrFuncCount), "count_values": aggrFuncCountValues, "bottomk": newAggrFuncTopK(true), "topk": newAggrFuncTopK(false), "quantile": aggrFuncQuantile, // Extended PromQL funcs "median": aggrFuncMedian, "limitk": aggrFuncLimitK, "distinct": newAggrFunc(aggrFuncDistinct), "sum2": newAggrFunc(aggrFuncSum2), "geomean": newAggrFunc(aggrFuncGeomean), "histogram": newAggrFunc(aggrFuncHistogram), } type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error) type aggrFuncArg struct { args [][]*timeseries ae *aggrFuncExpr ec *EvalConfig } func getAggrFunc(s string) aggrFunc { s = strings.ToLower(s) return aggrFuncs[s] } func isAggrFunc(s string) bool { return getAggrFunc(s) != nil } func isAggrFuncModifier(s string) bool { s = strings.ToLower(s) switch s { case "by", "without": return true default: return false } } func newAggrFunc(afe func(tss []*timeseries) []*timeseries) aggrFunc { return func(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } return aggrFuncExt(afe, args[0], &afa.ae.Modifier, false) } } func removeGroupTags(metricName *storage.MetricName, modifier *modifierExpr) { groupOp := strings.ToLower(modifier.Op) switch groupOp { case "", "by": metricName.RemoveTagsOn(modifier.Args) case "without": metricName.RemoveTagsIgnoring(modifier.Args) default: logger.Panicf("BUG: unknown group modifier: %q", groupOp) } } func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *modifierExpr, keepOriginal bool) ([]*timeseries, error) { arg := copyTimeseriesMetricNames(argOrig) // Perform grouping. m := make(map[string][]*timeseries) bb := bbPool.Get() for i, ts := range arg { removeGroupTags(&ts.MetricName, modifier) bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if keepOriginal { ts = argOrig[i] } m[string(bb.B)] = append(m[string(bb.B)], ts) } bbPool.Put(bb) srcTssCount := 0 dstTssCount := 0 rvs := make([]*timeseries, 0, len(m)) for _, tss := range m { rv := afe(tss) rvs = append(rvs, rv...) srcTssCount += len(tss) dstTssCount += len(rv) if dstTssCount > 2000 && dstTssCount > 16*srcTssCount { // This looks like count_values explosion. return nil, fmt.Errorf(`too many timeseries after aggragation; got %d; want less than %d`, dstTssCount, 16*srcTssCount) } } return rvs, nil } func aggrFuncSum(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - nothing to sum. return tss } dst := tss[0] for i := range dst.Values { sum := float64(0) count := 0 for _, ts := range tss { if math.IsNaN(ts.Values[i]) { continue } sum += ts.Values[i] count++ } if count == 0 { sum = nan } dst.Values[i] = sum } return tss[:1] } func aggrFuncSum2(tss []*timeseries) []*timeseries { dst := tss[0] for i := range dst.Values { sum2 := float64(0) count := 0 for _, ts := range tss { v := ts.Values[i] if math.IsNaN(v) { continue } sum2 += v * v count++ } if count == 0 { sum2 = nan } dst.Values[i] = sum2 } return tss[:1] } func aggrFuncGeomean(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - nothing to geomean. return tss } dst := tss[0] for i := range dst.Values { p := 1.0 count := 0 for _, ts := range tss { v := ts.Values[i] if math.IsNaN(v) { continue } p *= v count++ } if count == 0 { p = nan } dst.Values[i] = math.Pow(p, 1/float64(count)) } return tss[:1] } func aggrFuncHistogram(tss []*timeseries) []*timeseries { m := make(map[string]*timeseries) for i := range tss[0].Values { var h metrics.Histogram for _, ts := range tss { v := ts.Values[i] h.Update(v) } h.VisitNonZeroBuckets(func(vmrange string, count uint64) { ts := m[vmrange] if ts == nil { ts = ×eries{} ts.CopyFromShallowTimestamps(tss[0]) ts.MetricName.RemoveTag("vmrange") ts.MetricName.AddTag("vmrange", vmrange) values := ts.Values for k := range values { values[k] = 0 } m[vmrange] = ts } ts.Values[i] = float64(count) }) } rvs := make([]*timeseries, 0, len(m)) for _, ts := range m { rvs = append(rvs, ts) } return vmrangeBucketsToLE(rvs) } func aggrFuncMin(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - nothing to min. return tss } dst := tss[0] for i := range dst.Values { min := dst.Values[i] for _, ts := range tss { if math.IsNaN(min) || ts.Values[i] < min { min = ts.Values[i] } } dst.Values[i] = min } return tss[:1] } func aggrFuncMax(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - nothing to max. return tss } dst := tss[0] for i := range dst.Values { max := dst.Values[i] for _, ts := range tss { if math.IsNaN(max) || ts.Values[i] > max { max = ts.Values[i] } } dst.Values[i] = max } return tss[:1] } func aggrFuncAvg(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - nothing to avg. return tss } dst := tss[0] for i := range dst.Values { // Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation, // since it is slower and has no obvious benefits in increased precision. var sum float64 count := 0 for _, ts := range tss { v := ts.Values[i] if math.IsNaN(v) { continue } count++ sum += v } avg := nan if count > 0 { avg = sum / float64(count) } dst.Values[i] = avg } return tss[:1] } func aggrFuncStddev(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - stddev over a single time series is zero values := tss[0].Values for i, v := range values { if !math.IsNaN(v) { values[i] = 0 } } return tss } rvs := aggrFuncStdvar(tss) dst := rvs[0] for i, v := range dst.Values { dst.Values[i] = math.Sqrt(v) } return rvs } func aggrFuncStdvar(tss []*timeseries) []*timeseries { if len(tss) == 1 { // Fast path - stdvar over a single time series is zero values := tss[0].Values for i, v := range values { if !math.IsNaN(v) { values[i] = 0 } } return tss } dst := tss[0] for i := range dst.Values { // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation var avg float64 var count float64 var q float64 for _, ts := range tss { v := ts.Values[i] if math.IsNaN(v) { continue } count++ avgNew := avg + (v-avg)/count q += (v - avg) * (v - avgNew) avg = avgNew } if count == 0 { q = nan } dst.Values[i] = q / count } return tss[:1] } func aggrFuncCount(tss []*timeseries) []*timeseries { dst := tss[0] for i := range dst.Values { count := 0 for _, ts := range tss { if math.IsNaN(ts.Values[i]) { continue } count++ } v := float64(count) if count == 0 { v = nan } dst.Values[i] = v } return tss[:1] } func aggrFuncDistinct(tss []*timeseries) []*timeseries { dst := tss[0] m := make(map[float64]struct{}, len(tss)) for i := range dst.Values { for _, ts := range tss { v := ts.Values[i] if math.IsNaN(v) { continue } m[v] = struct{}{} } n := float64(len(m)) if n == 0 { n = nan } dst.Values[i] = n for k := range m { delete(m, k) } } return tss[:1] } func aggrFuncCountValues(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } dstLabel, err := getString(args[0], 0) if err != nil { return nil, err } // Remove dstLabel from grouping like Prometheus does. modifier := &afa.ae.Modifier switch strings.ToLower(modifier.Op) { case "without": modifier.Args = append(modifier.Args, dstLabel) case "by": dstArgs := modifier.Args[:0] for _, arg := range modifier.Args { if arg == dstLabel { continue } dstArgs = append(dstArgs, arg) } modifier.Args = dstArgs default: // Do nothing } afe := func(tss []*timeseries) []*timeseries { m := make(map[float64]bool) for _, ts := range tss { for _, v := range ts.Values { if math.IsNaN(v) { continue } m[v] = true } } values := make([]float64, 0, len(m)) for v := range m { values = append(values, v) } sort.Float64s(values) var rvs []*timeseries for _, v := range values { var dst timeseries dst.CopyFromShallowTimestamps(tss[0]) dst.MetricName.RemoveTag(dstLabel) dst.MetricName.AddTag(dstLabel, strconv.FormatFloat(v, 'g', -1, 64)) for i := range dst.Values { count := 0 for _, ts := range tss { if ts.Values[i] == v { count++ } } n := float64(count) if n == 0 { n = nan } dst.Values[i] = n } rvs = append(rvs, &dst) } return rvs } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, false) } func newAggrFuncTopK(isReverse bool) aggrFunc { return func(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } ks, err := getScalar(args[0], 0) if err != nil { return nil, err } afe := func(tss []*timeseries) []*timeseries { rvs := tss for n := range rvs[0].Values { sort.Slice(rvs, func(i, j int) bool { a := rvs[i].Values[n] b := rvs[j].Values[n] cmp := lessWithNaNs(a, b) if isReverse { cmp = !cmp } return cmp }) if math.IsNaN(ks[n]) { ks[n] = 0 } k := int(ks[n]) if k < 0 { k = 0 } if k > len(rvs) { k = len(rvs) } for _, ts := range rvs[:len(rvs)-k] { ts.Values[n] = nan } } return removeNaNs(rvs) } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true) } } func aggrFuncLimitK(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } ks, err := getScalar(args[0], 0) if err != nil { return nil, err } maxK := 0 for _, kf := range ks { k := int(kf) if k > maxK { maxK = k } } afe := func(tss []*timeseries) []*timeseries { if len(tss) > maxK { tss = tss[:maxK] } for i, kf := range ks { k := int(kf) if k < 0 { k = 0 } for j := k; j < len(tss); j++ { tss[j].Values[i] = nan } } return tss } return aggrFuncExt(afe, args[1], &afa.ae.Modifier, true) } func aggrFuncQuantile(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } phis, err := getScalar(args[0], 0) if err != nil { return nil, err } afe := newAggrQuantileFunc(phis) return aggrFuncExt(afe, args[1], &afa.ae.Modifier, false) } func aggrFuncMedian(afa *aggrFuncArg) ([]*timeseries, error) { args := afa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } phis := evalNumber(afa.ec, 0.5)[0].Values afe := newAggrQuantileFunc(phis) return aggrFuncExt(afe, args[0], &afa.ae.Modifier, false) } func newAggrQuantileFunc(phis []float64) func(tss []*timeseries) []*timeseries { return func(tss []*timeseries) []*timeseries { dst := tss[0] for n := range dst.Values { sort.Slice(tss, func(i, j int) bool { a := tss[i].Values[n] b := tss[j].Values[n] return lessWithNaNs(a, b) }) phi := phis[n] if math.IsNaN(phi) { phi = 1 } if phi < 0 { phi = 0 } if phi > 1 { phi = 1 } idx := int(math.Round(float64(len(tss)-1) * phi)) dst.Values[n] = tss[idx].Values[n] } tss[0] = dst return tss[:1] } } func lessWithNaNs(a, b float64) bool { if math.IsNaN(a) { return !math.IsNaN(b) } return a < b }