package promql import ( "fmt" "math" "math/rand" "regexp" "sort" "strconv" "strings" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metricsql" "github.com/valyala/histogram" ) var transformFuncsKeepMetricGroup = map[string]bool{ "ceil": true, "clamp_max": true, "clamp_min": true, "floor": true, "round": true, "keep_last_value": true, "keep_next_value": true, "interpolate": true, "running_min": true, "running_max": true, "running_avg": true, "range_min": true, "range_max": true, "range_avg": true, "range_first": true, "range_last": true, "range_quantile": true, "smooth_exponential": true, } var transformFuncs = map[string]transformFunc{ // Standard promql funcs // See funcs accepting instant-vector on https://prometheus.io/docs/prometheus/latest/querying/functions/ . "abs": newTransformFuncOneArg(transformAbs), "absent": transformAbsent, "ceil": newTransformFuncOneArg(transformCeil), "clamp_max": transformClampMax, "clamp_min": transformClampMin, "day_of_month": newTransformFuncDateTime(transformDayOfMonth), "day_of_week": newTransformFuncDateTime(transformDayOfWeek), "days_in_month": newTransformFuncDateTime(transformDaysInMonth), "exp": newTransformFuncOneArg(transformExp), "floor": newTransformFuncOneArg(transformFloor), "histogram_quantile": transformHistogramQuantile, "hour": newTransformFuncDateTime(transformHour), "label_join": transformLabelJoin, "label_replace": transformLabelReplace, "ln": newTransformFuncOneArg(transformLn), "log2": newTransformFuncOneArg(transformLog2), "log10": newTransformFuncOneArg(transformLog10), "minute": newTransformFuncDateTime(transformMinute), "month": newTransformFuncDateTime(transformMonth), "round": transformRound, "scalar": transformScalar, "sort": newTransformFuncSort(false), "sort_desc": newTransformFuncSort(true), "sqrt": newTransformFuncOneArg(transformSqrt), "time": transformTime, // "timestamp" has been moved to rollup funcs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 "vector": transformVector, "year": newTransformFuncDateTime(transformYear), // New funcs "label_set": transformLabelSet, "label_map": transformLabelMap, "label_del": transformLabelDel, "label_keep": transformLabelKeep, "label_copy": transformLabelCopy, "label_move": transformLabelMove, "label_transform": transformLabelTransform, "label_value": transformLabelValue, "label_match": transformLabelMatch, "label_mismatch": transformLabelMismatch, "union": transformUnion, "": transformUnion, // empty func is a synonim to union "keep_last_value": transformKeepLastValue, "keep_next_value": transformKeepNextValue, "interpolate": transformInterpolate, "start": newTransformFuncZeroArgs(transformStart), "end": newTransformFuncZeroArgs(transformEnd), "step": newTransformFuncZeroArgs(transformStep), "running_sum": newTransformFuncRunning(runningSum), "running_max": newTransformFuncRunning(runningMax), "running_min": newTransformFuncRunning(runningMin), "running_avg": newTransformFuncRunning(runningAvg), "range_sum": newTransformFuncRange(runningSum), "range_max": newTransformFuncRange(runningMax), "range_min": newTransformFuncRange(runningMin), "range_avg": newTransformFuncRange(runningAvg), "range_first": transformRangeFirst, "range_last": transformRangeLast, "range_quantile": transformRangeQuantile, "smooth_exponential": transformSmoothExponential, "remove_resets": transformRemoveResets, "rand": newTransformRand(newRandFloat64), "rand_normal": newTransformRand(newRandNormFloat64), "rand_exponential": newTransformRand(newRandExpFloat64), "pi": transformPi, "sin": newTransformFuncOneArg(transformSin), "cos": newTransformFuncOneArg(transformCos), "asin": newTransformFuncOneArg(transformAsin), "acos": newTransformFuncOneArg(transformAcos), "prometheus_buckets": transformPrometheusBuckets, "buckets_limit": transformBucketsLimit, "histogram_share": transformHistogramShare, "sort_by_label": newTransformFuncSortByLabel(false), "sort_by_label_desc": newTransformFuncSortByLabel(true), } func getTransformFunc(s string) transformFunc { s = strings.ToLower(s) return transformFuncs[s] } type transformFuncArg struct { ec *EvalConfig fe *metricsql.FuncExpr args [][]*timeseries } type transformFunc func(tfa *transformFuncArg) ([]*timeseries, error) func newTransformFuncOneArg(tf func(v float64) float64) transformFunc { tfe := func(values []float64) { for i, v := range values { values[i] = tf(v) } } return func(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } return doTransformValues(args[0], tfe, tfa.fe) } } func doTransformValues(arg []*timeseries, tf func(values []float64), fe *metricsql.FuncExpr) ([]*timeseries, error) { name := strings.ToLower(fe.Name) keepMetricGroup := transformFuncsKeepMetricGroup[name] for _, ts := range arg { if !keepMetricGroup { ts.MetricName.ResetMetricGroup() } tf(ts.Values) } return arg, nil } func transformAbs(v float64) float64 { return math.Abs(v) } func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } arg := args[0] if len(arg) == 0 { rvs := getAbsentTimeseries(tfa.ec, tfa.fe.Args[0]) return rvs, nil } for _, ts := range arg { ts.MetricName.ResetMetricGroup() for i, v := range ts.Values { if !math.IsNaN(v) { v = nan } else { v = 1 } ts.Values[i] = v } } return arg, nil } func getAbsentTimeseries(ec *EvalConfig, arg metricsql.Expr) []*timeseries { // Copy tags from arg rvs := evalNumber(ec, 1) rv := rvs[0] me, ok := arg.(*metricsql.MetricExpr) if !ok { return rvs } tfs := toTagFilters(me.LabelFilters) for i := range tfs { tf := &tfs[i] if len(tf.Key) == 0 { continue } if tf.IsRegexp || tf.IsNegative { continue } rv.MetricName.AddTagBytes(tf.Key, tf.Value) } return rvs } func transformCeil(v float64) float64 { return math.Ceil(v) } func transformClampMax(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } maxs, err := getScalar(args[1], 1) if err != nil { return nil, err } tf := func(values []float64) { for i, v := range values { if v > maxs[i] { values[i] = maxs[i] } } } return doTransformValues(args[0], tf, tfa.fe) } func transformClampMin(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } mins, err := getScalar(args[1], 1) if err != nil { return nil, err } tf := func(values []float64) { for i, v := range values { if v < mins[i] { values[i] = mins[i] } } } return doTransformValues(args[0], tf, tfa.fe) } func newTransformFuncDateTime(f func(t time.Time) int) transformFunc { return func(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) > 1 { return nil, fmt.Errorf(`too many args; got %d; want up to %d`, len(args), 1) } var arg []*timeseries if len(args) == 0 { arg = evalTime(tfa.ec) } else { arg = args[0] } tf := func(values []float64) { for i, v := range values { t := time.Unix(int64(v), 0).UTC() values[i] = float64(f(t)) } } return doTransformValues(arg, tf, tfa.fe) } } func transformDayOfMonth(t time.Time) int { return t.Day() } func transformDayOfWeek(t time.Time) int { return int(t.Weekday()) } func transformDaysInMonth(t time.Time) int { m := t.Month() if m == 2 && isLeapYear(uint32(t.Year())) { return 29 } return daysInMonth[m] } func transformExp(v float64) float64 { return math.Exp(v) } func transformFloor(v float64) float64 { return math.Floor(v) } func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } limits, err := getScalar(args[0], 1) if err != nil { return nil, err } limit := int(limits[0]) if limit <= 0 { return nil, nil } tss := vmrangeBucketsToLE(args[1]) if len(tss) == 0 { return nil, nil } // Group timeseries by all MetricGroup+tags excluding `le` tag. type x struct { le float64 hits float64 ts *timeseries } m := make(map[string][]x) var b []byte var mn storage.MetricName for _, ts := range tss { leStr := ts.MetricName.GetTagValue("le") if len(leStr) == 0 { // Skip time series without `le` tag. continue } le, err := strconv.ParseFloat(string(leStr), 64) if err != nil { // Skip time series with invalid `le` tag. continue } mn.CopyFrom(&ts.MetricName) mn.RemoveTag("le") b = marshalMetricNameSorted(b[:0], &mn) m[string(b)] = append(m[string(b)], x{ le: le, ts: ts, }) } // Remove buckets with the smallest counters. rvs := make([]*timeseries, 0, len(tss)) for _, leGroup := range m { if len(leGroup) <= limit { // Fast path - the number of buckets doesn't exceed the given limit. // Keep all the buckets as is. for _, xx := range leGroup { rvs = append(rvs, xx.ts) } continue } // Slow path - remove buckets with the smallest number of hits until their count reaches the limit. // Calculate per-bucket hits. sort.Slice(leGroup, func(i, j int) bool { return leGroup[i].le < leGroup[j].le }) for n := range limits { prevValue := float64(0) for i := range leGroup { xx := &leGroup[i] value := xx.ts.Values[n] xx.hits += value - prevValue prevValue = value } } for len(leGroup) > limit { xxMinIdx := 0 for i, xx := range leGroup { if xx.hits < leGroup[xxMinIdx].hits { xxMinIdx = i } } // Merge the leGroup[xxMinIdx] bucket with the smallest adjacent bucket in order to preserve // the maximum accuracy. if xxMinIdx+1 == len(leGroup) || (xxMinIdx > 0 && leGroup[xxMinIdx-1].hits < leGroup[xxMinIdx+1].hits) { xxMinIdx-- } leGroup[xxMinIdx+1].hits += leGroup[xxMinIdx].hits leGroup = append(leGroup[:xxMinIdx], leGroup[xxMinIdx+1:]...) } for _, xx := range leGroup { rvs = append(rvs, xx.ts) } } return rvs, nil } func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := vmrangeBucketsToLE(args[0]) return rvs, nil } func vmrangeBucketsToLE(tss []*timeseries) []*timeseries { rvs := make([]*timeseries, 0, len(tss)) // Group timeseries by MetricGroup+tags excluding `vmrange` tag. type x struct { startStr string endStr string start float64 end float64 ts *timeseries } m := make(map[string][]x) bb := bbPool.Get() defer bbPool.Put(bb) for _, ts := range tss { vmrange := ts.MetricName.GetTagValue("vmrange") if len(vmrange) == 0 { if le := ts.MetricName.GetTagValue("le"); len(le) > 0 { // Keep Prometheus-compatible buckets. rvs = append(rvs, ts) } continue } n := strings.Index(bytesutil.ToUnsafeString(vmrange), "...") if n < 0 { continue } startStr := string(vmrange[:n]) start, err := strconv.ParseFloat(startStr, 64) if err != nil { continue } endStr := string(vmrange[n+len("..."):]) end, err := strconv.ParseFloat(endStr, 64) if err != nil { continue } ts.MetricName.RemoveTag("le") ts.MetricName.RemoveTag("vmrange") bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) m[string(bb.B)] = append(m[string(bb.B)], x{ startStr: startStr, endStr: endStr, start: start, end: end, ts: ts, }) } // Convert `vmrange` label in each group of time series to `le` label. copyTS := func(src *timeseries, leStr string) *timeseries { var ts timeseries ts.CopyFromShallowTimestamps(src) values := ts.Values for i := range values { values[i] = 0 } ts.MetricName.RemoveTag("le") ts.MetricName.AddTag("le", leStr) return &ts } isZeroTS := func(ts *timeseries) bool { for _, v := range ts.Values { if v > 0 { return false } } return true } for _, xss := range m { sort.Slice(xss, func(i, j int) bool { return xss[i].end < xss[j].end }) xssNew := make([]x, 0, len(xss)+2) var xsPrev x for _, xs := range xss { ts := xs.ts if isZeroTS(ts) { // Skip time series with zeros. They are substituted by xssNew below. xsPrev = xs continue } if xs.start != xsPrev.end { xssNew = append(xssNew, x{ endStr: xs.startStr, end: xs.start, ts: copyTS(ts, xs.startStr), }) } ts.MetricName.AddTag("le", xs.endStr) xssNew = append(xssNew, xs) xsPrev = xs } if !math.IsInf(xsPrev.end, 1) { xssNew = append(xssNew, x{ endStr: "+Inf", end: math.Inf(1), ts: copyTS(xsPrev.ts, "+Inf"), }) } xss = xssNew for i := range xss[0].ts.Values { count := float64(0) for _, xs := range xss { ts := xs.ts v := ts.Values[i] if !math.IsNaN(v) && v > 0 { count += v } ts.Values[i] = count } } for _, xs := range xss { rvs = append(rvs, xs.ts) } } return rvs } func transformHistogramShare(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 2 || len(args) > 3 { return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args)) } les, err := getScalar(args[0], 0) if err != nil { return nil, fmt.Errorf("cannot parse le: %w", err) } // Convert buckets with `vmrange` labels to buckets with `le` labels. tss := vmrangeBucketsToLE(args[1]) // Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details. var boundsLabel string if len(args) > 2 { s, err := getString(args[2], 2) if err != nil { return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %w", err) } boundsLabel = s } // Group metrics by all tags excluding "le" m := groupLeTimeseries(tss) // Calculate share for les share := func(i int, les []float64, xss []leTimeseries) (q, lower, upper float64) { leReq := les[i] if math.IsNaN(leReq) || len(xss) == 0 { return nan, nan, nan } fixBrokenBuckets(i, xss) if leReq < 0 { return 0, 0, 0 } if math.IsInf(leReq, 1) { return 1, 1, 1 } var vPrev, lePrev float64 for _, xs := range xss { v := xs.ts.Values[i] le := xs.le if leReq >= le { vPrev = v lePrev = le continue } // precondition: lePrev <= leReq < le vLast := xss[len(xss)-1].ts.Values[i] lower = vPrev / vLast if math.IsInf(le, 1) { return lower, lower, 1 } if lePrev == leReq { return lower, lower, lower } upper = v / vLast q = lower + (v-vPrev)/vLast*(leReq-lePrev)/(le-lePrev) return q, lower, upper } // precondition: leReq > leLast return 1, 1, 1 } rvs := make([]*timeseries, 0, len(m)) for _, xss := range m { sort.Slice(xss, func(i, j int) bool { return xss[i].le < xss[j].le }) dst := xss[0].ts var tsLower, tsUpper *timeseries if len(boundsLabel) > 0 { tsLower = ×eries{} tsLower.CopyFromShallowTimestamps(dst) tsLower.MetricName.RemoveTag(boundsLabel) tsLower.MetricName.AddTag(boundsLabel, "lower") tsUpper = ×eries{} tsUpper.CopyFromShallowTimestamps(dst) tsUpper.MetricName.RemoveTag(boundsLabel) tsUpper.MetricName.AddTag(boundsLabel, "upper") } for i := range dst.Values { q, lower, upper := share(i, les, xss) dst.Values[i] = q if len(boundsLabel) > 0 { tsLower.Values[i] = lower tsUpper.Values[i] = upper } } rvs = append(rvs, dst) if len(boundsLabel) > 0 { rvs = append(rvs, tsLower) rvs = append(rvs, tsUpper) } } return rvs, nil } func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 2 || len(args) > 3 { return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args)) } phis, err := getScalar(args[0], 0) if err != nil { return nil, fmt.Errorf("cannot parse phi: %w", err) } // Convert buckets with `vmrange` labels to buckets with `le` labels. tss := vmrangeBucketsToLE(args[1]) // Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details. var boundsLabel string if len(args) > 2 { s, err := getString(args[2], 2) if err != nil { return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %w", err) } boundsLabel = s } // Group metrics by all tags excluding "le" m := groupLeTimeseries(tss) // Calculate quantile for each group in m lastNonInf := func(i int, xss []leTimeseries) float64 { for len(xss) > 0 { xsLast := xss[len(xss)-1] v := xsLast.ts.Values[i] if v == 0 { return nan } if !math.IsInf(xsLast.le, 0) { return xsLast.le } xss = xss[:len(xss)-1] } return nan } quantile := func(i int, phis []float64, xss []leTimeseries) (q, lower, upper float64) { phi := phis[i] if math.IsNaN(phi) { return nan, nan, nan } fixBrokenBuckets(i, xss) vLast := float64(0) if len(xss) > 0 { vLast = xss[len(xss)-1].ts.Values[i] } if vLast == 0 { return nan, nan, nan } if phi < 0 { return -inf, -inf, xss[0].ts.Values[i] } if phi > 1 { return inf, vLast, inf } vReq := vLast * phi vPrev := float64(0) lePrev := float64(0) for _, xs := range xss { v := xs.ts.Values[i] le := xs.le if v <= 0 { // Skip zero buckets. lePrev = le continue } if v < vReq { vPrev = v lePrev = le continue } if math.IsInf(le, 0) { vv := lastNonInf(i, xss) return vv, vv, inf } if v == vPrev { return lePrev, lePrev, v } vv := lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev) return vv, lePrev, le } vv := lastNonInf(i, xss) return vv, vv, inf } rvs := make([]*timeseries, 0, len(m)) for _, xss := range m { sort.Slice(xss, func(i, j int) bool { return xss[i].le < xss[j].le }) dst := xss[0].ts var tsLower, tsUpper *timeseries if len(boundsLabel) > 0 { tsLower = ×eries{} tsLower.CopyFromShallowTimestamps(dst) tsLower.MetricName.RemoveTag(boundsLabel) tsLower.MetricName.AddTag(boundsLabel, "lower") tsUpper = ×eries{} tsUpper.CopyFromShallowTimestamps(dst) tsUpper.MetricName.RemoveTag(boundsLabel) tsUpper.MetricName.AddTag(boundsLabel, "upper") } for i := range dst.Values { v, lower, upper := quantile(i, phis, xss) dst.Values[i] = v if len(boundsLabel) > 0 { tsLower.Values[i] = lower tsUpper.Values[i] = upper } } rvs = append(rvs, dst) if len(boundsLabel) > 0 { rvs = append(rvs, tsLower) rvs = append(rvs, tsUpper) } } return rvs, nil } type leTimeseries struct { le float64 ts *timeseries } func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries { m := make(map[string][]leTimeseries) bb := bbPool.Get() for _, ts := range tss { tagValue := ts.MetricName.GetTagValue("le") if len(tagValue) == 0 { continue } le, err := strconv.ParseFloat(bytesutil.ToUnsafeString(tagValue), 64) if err != nil { continue } ts.MetricName.ResetMetricGroup() ts.MetricName.RemoveTag("le") bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName) m[string(bb.B)] = append(m[string(bb.B)], leTimeseries{ le: le, ts: ts, }) } bbPool.Put(bb) return m } func fixBrokenBuckets(i int, xss []leTimeseries) { // Fix broken buckets. // They are already sorted by le, so their values must be in ascending order, // since the next bucket includes all the previous buckets. vPrev := float64(0) for _, xs := range xss { v := xs.ts.Values[i] if v < vPrev || math.IsNaN(v) { xs.ts.Values[i] = vPrev } else { vPrev = v } } } func transformHour(t time.Time) int { return t.Hour() } func runningSum(a, b float64, idx int) float64 { return a + b } func runningMax(a, b float64, idx int) float64 { if a > b { return a } return b } func runningMin(a, b float64, idx int) float64 { if a < b { return a } return b } func runningAvg(a, b float64, idx int) float64 { // See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation return a + (b-a)/float64(idx+1) } func skipLeadingNaNs(values []float64) []float64 { i := 0 for i < len(values) && math.IsNaN(values[i]) { i++ } return values[i:] } func skipTrailingNaNs(values []float64) []float64 { i := len(values) - 1 for i >= 0 && math.IsNaN(values[i]) { i-- } return values[:i+1] } func transformKeepLastValue(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { values := ts.Values if len(values) == 0 { continue } lastValue := values[0] for i, v := range values { if !math.IsNaN(v) { lastValue = v continue } values[i] = lastValue } } return rvs, nil } func transformKeepNextValue(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { values := ts.Values if len(values) == 0 { continue } nextValue := values[len(values)-1] for i := len(values) - 1; i >= 0; i-- { v := values[i] if !math.IsNaN(v) { nextValue = v continue } values[i] = nextValue } } return rvs, nil } func transformInterpolate(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { values := ts.Values if len(values) == 0 { continue } prevValue := nan var nextValue float64 for i := 0; i < len(values); i++ { if !math.IsNaN(values[i]) { continue } if i > 0 { prevValue = values[i-1] } j := i + 1 for j < len(values) { if !math.IsNaN(values[j]) { break } j++ } if j >= len(values) { nextValue = prevValue } else { nextValue = values[j] } if math.IsNaN(prevValue) { prevValue = nextValue } delta := (nextValue - prevValue) / float64(j-i+1) for i < j { prevValue += delta values[i] = prevValue i++ } } } return rvs, nil } func newTransformFuncRunning(rf func(a, b float64, idx int) float64) transformFunc { return func(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { ts.MetricName.ResetMetricGroup() values := skipLeadingNaNs(ts.Values) if len(values) == 0 { continue } prevValue := values[0] values = values[1:] for i, v := range values { if !math.IsNaN(v) { prevValue = rf(prevValue, v, i+1) } values[i] = prevValue } } return rvs, nil } } func newTransformFuncRange(rf func(a, b float64, idx int) float64) transformFunc { tfr := newTransformFuncRunning(rf) return func(tfa *transformFuncArg) ([]*timeseries, error) { rvs, err := tfr(tfa) if err != nil { return nil, err } setLastValues(rvs) return rvs, nil } } func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } phis, err := getScalar(args[0], 0) if err != nil { return nil, err } if len(phis) == 0 { return nil, nil } phi := phis[0] rvs := args[1] hf := histogram.GetFast() for _, ts := range rvs { hf.Reset() lastIdx := -1 values := ts.Values for i, v := range values { if math.IsNaN(v) { continue } hf.Update(v) lastIdx = i } if lastIdx >= 0 { values[lastIdx] = hf.Quantile(phi) } } histogram.PutFast(hf) setLastValues(rvs) return rvs, nil } func transformRangeFirst(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { values := skipLeadingNaNs(ts.Values) if len(values) == 0 { continue } vFirst := values[0] for i, v := range values { if math.IsNaN(v) { continue } values[i] = vFirst } } return rvs, nil } func transformRangeLast(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] setLastValues(rvs) return rvs, nil } func setLastValues(tss []*timeseries) { for _, ts := range tss { values := skipTrailingNaNs(ts.Values) if len(values) == 0 { continue } vLast := values[len(values)-1] for i, v := range values { if math.IsNaN(v) { continue } values[i] = vLast } } } func transformSmoothExponential(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } sfs, err := getScalar(args[1], 1) if err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { values := skipLeadingNaNs(ts.Values) for i, v := range values { if !math.IsInf(v, 0) { values = values[i:] break } } if len(values) == 0 { continue } avg := values[0] values = values[1:] sfsX := sfs[len(ts.Values)-len(values):] for i, v := range values { if math.IsNaN(v) { continue } if math.IsInf(v, 0) { values[i] = avg continue } sf := sfsX[i] if math.IsNaN(sf) { sf = 1 } if sf < 0 { sf = 0 } if sf > 1 { sf = 1 } avg = avg*(1-sf) + v*sf values[i] = avg } } return rvs, nil } func transformRemoveResets(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { removeCounterResetsMaybeNaNs(ts.Values) } return rvs, nil } func transformUnion(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 1 { return evalNumber(tfa.ec, nan), nil } rvs := make([]*timeseries, 0, len(args[0])) m := make(map[string]bool, len(args[0])) bb := bbPool.Get() for _, arg := range args { for _, ts := range arg { bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) if m[string(bb.B)] { continue } m[string(bb.B)] = true rvs = append(rvs, ts) } } bbPool.Put(bb) return rvs, nil } func transformLabelKeep(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 1 { return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1) } var keepLabels []string for i := 1; i < len(args); i++ { keepLabel, err := getString(args[i], i) if err != nil { return nil, err } keepLabels = append(keepLabels, keepLabel) } rvs := args[0] for _, ts := range rvs { ts.MetricName.RemoveTagsOn(keepLabels) } return rvs, nil } func transformLabelDel(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 1 { return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1) } var delLabels []string for i := 1; i < len(args); i++ { delLabel, err := getString(args[i], i) if err != nil { return nil, err } delLabels = append(delLabels, delLabel) } rvs := args[0] for _, ts := range rvs { ts.MetricName.RemoveTagsIgnoring(delLabels) } return rvs, nil } func transformLabelSet(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 1 { return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1) } dstLabels, dstValues, err := getStringPairs(args[1:]) if err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { mn := &ts.MetricName for i, dstLabel := range dstLabels { value := dstValues[i] dstValue := getDstValue(mn, dstLabel) *dstValue = append((*dstValue)[:0], value...) if len(value) == 0 { mn.RemoveTag(dstLabel) } } } return rvs, nil } func transformLabelMap(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 2 { return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 2) } label, err := getString(args[1], 1) if err != nil { return nil, fmt.Errorf("cannot read label name: %w", err) } srcValues, dstValues, err := getStringPairs(args[2:]) if err != nil { return nil, err } m := make(map[string]string, len(srcValues)) for i, srcValue := range srcValues { m[srcValue] = dstValues[i] } rvs := args[0] for _, ts := range rvs { mn := &ts.MetricName dstValue := getDstValue(mn, label) value, ok := m[string(*dstValue)] if ok { *dstValue = append((*dstValue)[:0], value...) } if len(*dstValue) == 0 { mn.RemoveTag(label) } } return rvs, nil } func transformLabelCopy(tfa *transformFuncArg) ([]*timeseries, error) { return transformLabelCopyExt(tfa, false) } func transformLabelMove(tfa *transformFuncArg) ([]*timeseries, error) { return transformLabelCopyExt(tfa, true) } func transformLabelCopyExt(tfa *transformFuncArg, removeSrcLabels bool) ([]*timeseries, error) { args := tfa.args if len(args) < 1 { return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1) } srcLabels, dstLabels, err := getStringPairs(args[1:]) if err != nil { return nil, err } rvs := args[0] for _, ts := range rvs { mn := &ts.MetricName for i, srcLabel := range srcLabels { dstLabel := dstLabels[i] value := mn.GetTagValue(srcLabel) dstValue := getDstValue(mn, dstLabel) *dstValue = append((*dstValue)[:0], value...) if len(value) == 0 { mn.RemoveTag(dstLabel) } if removeSrcLabels && srcLabel != dstLabel { mn.RemoveTag(srcLabel) } } } return rvs, nil } func getStringPairs(args [][]*timeseries) ([]string, []string, error) { if len(args)%2 != 0 { return nil, nil, fmt.Errorf(`the number of string args must be even; got %d`, len(args)) } var ks, vs []string for i := 0; i < len(args); i += 2 { k, err := getString(args[i], i) if err != nil { return nil, nil, err } ks = append(ks, k) v, err := getString(args[i+1], i+1) if err != nil { return nil, nil, err } vs = append(vs, v) } return ks, vs, nil } func transformLabelJoin(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) < 3 { return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 3) } dstLabel, err := getString(args[1], 1) if err != nil { return nil, err } separator, err := getString(args[2], 2) if err != nil { return nil, err } var srcLabels []string for i := 3; i < len(args); i++ { srcLabel, err := getString(args[i], i) if err != nil { return nil, err } srcLabels = append(srcLabels, srcLabel) } rvs := args[0] for _, ts := range rvs { mn := &ts.MetricName dstValue := getDstValue(mn, dstLabel) b := *dstValue b = b[:0] for j, srcLabel := range srcLabels { srcValue := mn.GetTagValue(srcLabel) b = append(b, srcValue...) if j+1 < len(srcLabels) { b = append(b, separator...) } } *dstValue = b if len(b) == 0 { mn.RemoveTag(dstLabel) } } return rvs, nil } func transformLabelTransform(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 4); err != nil { return nil, err } label, err := getString(args[1], 1) if err != nil { return nil, err } regex, err := getString(args[2], 2) if err != nil { return nil, err } replacement, err := getString(args[3], 3) if err != nil { return nil, err } r, err := metricsql.CompileRegexp(regex) if err != nil { return nil, fmt.Errorf(`cannot compile regex %q: %w`, regex, err) } return labelReplace(args[0], label, r, label, replacement) } func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 5); err != nil { return nil, err } dstLabel, err := getString(args[1], 1) if err != nil { return nil, err } replacement, err := getString(args[2], 2) if err != nil { return nil, err } srcLabel, err := getString(args[3], 3) if err != nil { return nil, err } regex, err := getString(args[4], 4) if err != nil { return nil, err } r, err := metricsql.CompileRegexpAnchored(regex) if err != nil { return nil, fmt.Errorf(`cannot compile regex %q: %w`, regex, err) } return labelReplace(args[0], srcLabel, r, dstLabel, replacement) } func labelReplace(tss []*timeseries, srcLabel string, r *regexp.Regexp, dstLabel, replacement string) ([]*timeseries, error) { replacementBytes := []byte(replacement) for _, ts := range tss { mn := &ts.MetricName dstValue := getDstValue(mn, dstLabel) srcValue := mn.GetTagValue(srcLabel) if !r.Match(srcValue) { continue } b := r.ReplaceAll(srcValue, replacementBytes) *dstValue = append((*dstValue)[:0], b...) if len(b) == 0 { mn.RemoveTag(dstLabel) } } return tss, nil } func transformLabelValue(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } labelName, err := getString(args[1], 1) if err != nil { return nil, fmt.Errorf("cannot get label name: %w", err) } rvs := args[0] for _, ts := range rvs { ts.MetricName.ResetMetricGroup() labelValue := ts.MetricName.GetTagValue(labelName) v, err := strconv.ParseFloat(string(labelValue), 64) if err != nil { v = nan } values := ts.Values for i := range values { values[i] = v } } // Do not remove timeseries with only NaN values, so `default` could be applied to them: // label_value(q, "label") default 123 return rvs, nil } func transformLabelMatch(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 3); err != nil { return nil, err } labelName, err := getString(args[1], 1) if err != nil { return nil, fmt.Errorf("cannot get label name: %w", err) } labelRe, err := getString(args[2], 2) if err != nil { return nil, fmt.Errorf("cannot get regexp: %w", err) } r, err := metricsql.CompileRegexpAnchored(labelRe) if err != nil { return nil, fmt.Errorf(`cannot compile regexp %q: %w`, labelRe, err) } tss := args[0] rvs := tss[:0] for _, ts := range tss { labelValue := ts.MetricName.GetTagValue(labelName) if r.Match(labelValue) { rvs = append(rvs, ts) } } return rvs, nil } func transformLabelMismatch(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 3); err != nil { return nil, err } labelName, err := getString(args[1], 1) if err != nil { return nil, fmt.Errorf("cannot get label name: %w", err) } labelRe, err := getString(args[2], 2) if err != nil { return nil, fmt.Errorf("cannot get regexp: %w", err) } r, err := metricsql.CompileRegexpAnchored(labelRe) if err != nil { return nil, fmt.Errorf(`cannot compile regexp %q: %w`, labelRe, err) } tss := args[0] rvs := tss[:0] for _, ts := range tss { labelValue := ts.MetricName.GetTagValue(labelName) if !r.Match(labelValue) { rvs = append(rvs, ts) } } return rvs, nil } func transformLn(v float64) float64 { return math.Log(v) } func transformLog2(v float64) float64 { return math.Log2(v) } func transformLog10(v float64) float64 { return math.Log10(v) } func transformMinute(t time.Time) int { return t.Minute() } func transformMonth(t time.Time) int { return int(t.Month()) } func transformRound(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) != 1 && len(args) != 2 { return nil, fmt.Errorf(`unexpected number of args: %d; want 1 or 2`, len(args)) } var nearestArg []*timeseries if len(args) == 1 { nearestArg = evalNumber(tfa.ec, 1) } else { nearestArg = args[1] } nearest, err := getScalar(nearestArg, 1) if err != nil { return nil, err } tf := func(values []float64) { var nPrev float64 var p10 float64 for i, v := range values { n := nearest[i] if n != nPrev { nPrev = n _, e := decimal.FromFloat(n) p10 = math.Pow10(int(-e)) } v += 0.5 * math.Copysign(n, v) v -= math.Mod(v, n) v, _ = math.Modf(v * p10) values[i] = v / p10 } } return doTransformValues(args[0], tf, tfa.fe) } func transformScalar(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } // Verify whether the arg is a string. // Then try converting the string to number. if se, ok := tfa.fe.Args[0].(*metricsql.StringExpr); ok { n, err := strconv.ParseFloat(se.S, 64) if err != nil { n = nan } return evalNumber(tfa.ec, n), nil } // The arg isn't a string. Extract scalar from it. arg := args[0] if len(arg) != 1 { return evalNumber(tfa.ec, nan), nil } arg[0].MetricName.Reset() return arg, nil } func newTransformFuncSortByLabel(isDesc bool) transformFunc { return func(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 2); err != nil { return nil, err } label, err := getString(args[1], 1) if err != nil { return nil, fmt.Errorf("cannot parse label name for sorting: %w", err) } rvs := args[0] sort.SliceStable(rvs, func(i, j int) bool { a := rvs[i].MetricName.GetTagValue(label) b := rvs[j].MetricName.GetTagValue(label) if isDesc { return string(b) < string(a) } return string(a) < string(b) }) return rvs, nil } } func newTransformFuncSort(isDesc bool) transformFunc { return func(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] sort.Slice(rvs, func(i, j int) bool { a := rvs[i].Values b := rvs[j].Values n := len(a) - 1 for n >= 0 { if !math.IsNaN(a[n]) && !math.IsNaN(b[n]) && a[n] != b[n] { break } n-- } if n < 0 { return false } if isDesc { return b[n] < a[n] } return a[n] < b[n] }) return rvs, nil } } func transformSqrt(v float64) float64 { return math.Sqrt(v) } func transformSin(v float64) float64 { return math.Sin(v) } func transformCos(v float64) float64 { return math.Cos(v) } func transformAsin(v float64) float64 { return math.Asin(v) } func transformAcos(v float64) float64 { return math.Acos(v) } func newTransformRand(newRandFunc func(r *rand.Rand) func() float64) transformFunc { return func(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if len(args) > 1 { return nil, fmt.Errorf(`unexpected number of args; got %d; want 0 or 1`, len(args)) } var seed int64 if len(args) == 1 { tmp, err := getScalar(args[0], 0) if err != nil { return nil, err } seed = int64(tmp[0]) } else { seed = time.Now().UnixNano() } source := rand.NewSource(seed) r := rand.New(source) randFunc := newRandFunc(r) tss := evalNumber(tfa.ec, 0) values := tss[0].Values for i := range values { values[i] = randFunc() } return tss, nil } } func newRandFloat64(r *rand.Rand) func() float64 { return r.Float64 } func newRandNormFloat64(r *rand.Rand) func() float64 { return r.NormFloat64 } func newRandExpFloat64(r *rand.Rand) func() float64 { return r.ExpFloat64 } func transformPi(tfa *transformFuncArg) ([]*timeseries, error) { if err := expectTransformArgsNum(tfa.args, 0); err != nil { return nil, err } return evalNumber(tfa.ec, math.Pi), nil } func transformTime(tfa *transformFuncArg) ([]*timeseries, error) { if err := expectTransformArgsNum(tfa.args, 0); err != nil { return nil, err } return evalTime(tfa.ec), nil } func transformVector(tfa *transformFuncArg) ([]*timeseries, error) { args := tfa.args if err := expectTransformArgsNum(args, 1); err != nil { return nil, err } rvs := args[0] return rvs, nil } func transformYear(t time.Time) int { return t.Year() } func newTransformFuncZeroArgs(f func(tfa *transformFuncArg) float64) transformFunc { return func(tfa *transformFuncArg) ([]*timeseries, error) { if err := expectTransformArgsNum(tfa.args, 0); err != nil { return nil, err } v := f(tfa) return evalNumber(tfa.ec, v), nil } } func transformStep(tfa *transformFuncArg) float64 { return float64(tfa.ec.Step) / 1e3 } func transformStart(tfa *transformFuncArg) float64 { return float64(tfa.ec.Start) / 1e3 } func transformEnd(tfa *transformFuncArg) float64 { return float64(tfa.ec.End) / 1e3 } // copyTimeseriesMetricNames returns a copy of tss with real copy of MetricNames, // but with shallow copy of Timestamps and Values if makeCopy is set. // // Otherwise tss is returned. func copyTimeseriesMetricNames(tss []*timeseries, makeCopy bool) []*timeseries { if !makeCopy { return tss } rvs := make([]*timeseries, len(tss)) for i, src := range tss { var dst timeseries dst.CopyFromMetricNames(src) rvs[i] = &dst } return rvs } // copyShallow returns a copy of arg with shallow copies of MetricNames, // Timestamps and Values. func copyTimeseriesShallow(arg []*timeseries) []*timeseries { rvs := make([]*timeseries, len(arg)) for i, src := range arg { var dst timeseries dst.CopyShallow(src) rvs[i] = &dst } return rvs } func getDstValue(mn *storage.MetricName, dstLabel string) *[]byte { if dstLabel == "__name__" { return &mn.MetricGroup } tags := mn.Tags for i := range tags { tag := &tags[i] if string(tag.Key) == dstLabel { return &tag.Value } } if len(tags) < cap(tags) { tags = tags[:len(tags)+1] } else { tags = append(tags, storage.Tag{}) } mn.Tags = tags tag := &tags[len(tags)-1] tag.Key = append(tag.Key[:0], dstLabel...) return &tag.Value } func isLeapYear(y uint32) bool { if y%4 != 0 { return false } if y%100 != 0 { return true } return y%400 == 0 } var daysInMonth = [...]int{ time.January: 31, time.February: 28, time.March: 31, time.April: 30, time.May: 31, time.June: 30, time.July: 31, time.August: 31, time.September: 30, time.October: 31, time.November: 30, time.December: 31, } func expectTransformArgsNum(args [][]*timeseries, expectedNum int) error { if len(args) == expectedNum { return nil } return fmt.Errorf(`unexpected number of args; got %d; want %d`, len(args), expectedNum) } func removeCounterResetsMaybeNaNs(values []float64) { values = skipLeadingNaNs(values) if len(values) == 0 { return } var correction float64 prevValue := values[0] for i, v := range values { if math.IsNaN(v) { continue } d := v - prevValue if d < 0 { if (-d * 8) < prevValue { // This is likely jitter from `Prometheus HA pairs`. // Just substitute v with prevValue. v = prevValue } else { correction += prevValue } } prevValue = v values[i] = v + correction } }