mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
b75f1854c5
* vmselect/promql: add alphanumeric sort by label (sort_by_label_numeric) * vmselect/promql: fix tests, add documentation * vmselect/promql: update test * vmselect/promql: update for alphanumeric sorting, fix tests * vmselect/promql: remove comments * vmselect/promql: cleanup * vmselect/promql: avoid memory allocations, update functions descriptions * vmselect/promql: make linter happy (remove ineffectual assigment) * vmselect/promql: add test case, fix behavior when strings are equal * vendor: update github.com/VictoriaMetrics/metricsql from v0.44.1 to v0.45.0 this adds support for sort_by_label_numeric and sort_by_label_numeric_desc functions * wip * lib/promscrape: read response body into memory in stream parsing mode before parsing it This reduces scrape duration for targets returning big responses. The response body was already read into memory in stream parsing mode before this change, so this commit shouldn't increase memory usage. * wip Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
2489 lines
59 KiB
Go
2489 lines
59 KiB
Go
package promql
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
)
|
|
|
|
var transformFuncs = map[string]transformFunc{
|
|
"": transformUnion, // empty func is a synonym to union
|
|
"abs": newTransformFuncOneArg(transformAbs),
|
|
"absent": transformAbsent,
|
|
"acos": newTransformFuncOneArg(transformAcos),
|
|
"acosh": newTransformFuncOneArg(transformAcosh),
|
|
"asin": newTransformFuncOneArg(transformAsin),
|
|
"asinh": newTransformFuncOneArg(transformAsinh),
|
|
"atan": newTransformFuncOneArg(transformAtan),
|
|
"atanh": newTransformFuncOneArg(transformAtanh),
|
|
"bitmap_and": newTransformBitmap(bitmapAnd),
|
|
"bitmap_or": newTransformBitmap(bitmapOr),
|
|
"bitmap_xor": newTransformBitmap(bitmapXor),
|
|
"buckets_limit": transformBucketsLimit,
|
|
"ceil": newTransformFuncOneArg(transformCeil),
|
|
"clamp": transformClamp,
|
|
"clamp_max": transformClampMax,
|
|
"clamp_min": transformClampMin,
|
|
"cos": newTransformFuncOneArg(transformCos),
|
|
"cosh": newTransformFuncOneArg(transformCosh),
|
|
"day_of_month": newTransformFuncDateTime(transformDayOfMonth),
|
|
"day_of_week": newTransformFuncDateTime(transformDayOfWeek),
|
|
"days_in_month": newTransformFuncDateTime(transformDaysInMonth),
|
|
"deg": newTransformFuncOneArg(transformDeg),
|
|
"drop_common_labels": transformDropCommonLabels,
|
|
"end": newTransformFuncZeroArgs(transformEnd),
|
|
"exp": newTransformFuncOneArg(transformExp),
|
|
"floor": newTransformFuncOneArg(transformFloor),
|
|
"histogram_avg": transformHistogramAvg,
|
|
"histogram_quantile": transformHistogramQuantile,
|
|
"histogram_quantiles": transformHistogramQuantiles,
|
|
"histogram_share": transformHistogramShare,
|
|
"histogram_stddev": transformHistogramStddev,
|
|
"histogram_stdvar": transformHistogramStdvar,
|
|
"hour": newTransformFuncDateTime(transformHour),
|
|
"interpolate": transformInterpolate,
|
|
"keep_last_value": transformKeepLastValue,
|
|
"keep_next_value": transformKeepNextValue,
|
|
"label_copy": transformLabelCopy,
|
|
"label_del": transformLabelDel,
|
|
"label_graphite_group": transformLabelGraphiteGroup,
|
|
"label_join": transformLabelJoin,
|
|
"label_keep": transformLabelKeep,
|
|
"label_lowercase": transformLabelLowercase,
|
|
"label_map": transformLabelMap,
|
|
"label_match": transformLabelMatch,
|
|
"label_mismatch": transformLabelMismatch,
|
|
"label_move": transformLabelMove,
|
|
"label_replace": transformLabelReplace,
|
|
"label_set": transformLabelSet,
|
|
"label_transform": transformLabelTransform,
|
|
"label_uppercase": transformLabelUppercase,
|
|
"label_value": transformLabelValue,
|
|
"limit_offset": transformLimitOffset,
|
|
"ln": newTransformFuncOneArg(transformLn),
|
|
"log2": newTransformFuncOneArg(transformLog2),
|
|
"log10": newTransformFuncOneArg(transformLog10),
|
|
"minute": newTransformFuncDateTime(transformMinute),
|
|
"month": newTransformFuncDateTime(transformMonth),
|
|
"now": transformNow,
|
|
"pi": transformPi,
|
|
"prometheus_buckets": transformPrometheusBuckets,
|
|
"rad": newTransformFuncOneArg(transformRad),
|
|
"rand": newTransformRand(newRandFloat64),
|
|
"rand_exponential": newTransformRand(newRandExpFloat64),
|
|
"rand_normal": newTransformRand(newRandNormFloat64),
|
|
"range_avg": newTransformFuncRange(runningAvg),
|
|
"range_first": transformRangeFirst,
|
|
"range_last": transformRangeLast,
|
|
"range_max": newTransformFuncRange(runningMax),
|
|
"range_min": newTransformFuncRange(runningMin),
|
|
"range_quantile": transformRangeQuantile,
|
|
"range_sum": newTransformFuncRange(runningSum),
|
|
"remove_resets": transformRemoveResets,
|
|
"round": transformRound,
|
|
"running_avg": newTransformFuncRunning(runningAvg),
|
|
"running_max": newTransformFuncRunning(runningMax),
|
|
"running_min": newTransformFuncRunning(runningMin),
|
|
"running_sum": newTransformFuncRunning(runningSum),
|
|
"scalar": transformScalar,
|
|
"sgn": transformSgn,
|
|
"sin": newTransformFuncOneArg(transformSin),
|
|
"sinh": newTransformFuncOneArg(transformSinh),
|
|
"smooth_exponential": transformSmoothExponential,
|
|
"sort": newTransformFuncSort(false),
|
|
"sort_by_label": newTransformFuncSortByLabel(false),
|
|
"sort_by_label_desc": newTransformFuncSortByLabel(true),
|
|
"sort_by_label_numeric": newTransformFuncNumericSort(false),
|
|
"sort_by_label_numeric_desc": newTransformFuncNumericSort(true),
|
|
"sort_desc": newTransformFuncSort(true),
|
|
"sqrt": newTransformFuncOneArg(transformSqrt),
|
|
"start": newTransformFuncZeroArgs(transformStart),
|
|
"step": newTransformFuncZeroArgs(transformStep),
|
|
"tan": newTransformFuncOneArg(transformTan),
|
|
"tanh": newTransformFuncOneArg(transformTanh),
|
|
"time": transformTime,
|
|
// "timestamp" has been moved to rollup funcs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415
|
|
"timezone_offset": transformTimezoneOffset,
|
|
"union": transformUnion,
|
|
"vector": transformVector,
|
|
"year": newTransformFuncDateTime(transformYear),
|
|
}
|
|
|
|
// These functions don't change physical meaning of input time series,
|
|
// so they don't drop metric name
|
|
var transformFuncsKeepMetricName = map[string]bool{
|
|
"ceil": true,
|
|
"clamp": true,
|
|
"clamp_max": true,
|
|
"clamp_min": true,
|
|
"floor": true,
|
|
"interpolate": true,
|
|
"keep_last_value": true,
|
|
"keep_next_value": true,
|
|
"range_avg": true,
|
|
"range_first": true,
|
|
"range_last": true,
|
|
"range_max": true,
|
|
"range_min": true,
|
|
"range_quantile": true,
|
|
"round": true,
|
|
"running_avg": true,
|
|
"running_max": true,
|
|
"running_min": true,
|
|
"smooth_exponential": true,
|
|
}
|
|
|
|
func getTransformFunc(s string) transformFunc {
|
|
s = strings.ToLower(s)
|
|
return transformFuncs[s]
|
|
}
|
|
|
|
type transformFuncArg struct {
|
|
ec *EvalConfig
|
|
fe *metricsql.FuncExpr
|
|
args [][]*timeseries
|
|
}
|
|
|
|
type transformFunc func(tfa *transformFuncArg) ([]*timeseries, error)
|
|
|
|
func newTransformFuncOneArg(tf func(v float64) float64) transformFunc {
|
|
tfe := func(values []float64) {
|
|
for i, v := range values {
|
|
values[i] = tf(v)
|
|
}
|
|
}
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
return doTransformValues(args[0], tfe, tfa.fe)
|
|
}
|
|
}
|
|
|
|
func doTransformValues(arg []*timeseries, tf func(values []float64), fe *metricsql.FuncExpr) ([]*timeseries, error) {
|
|
name := strings.ToLower(fe.Name)
|
|
keepMetricNames := fe.KeepMetricNames
|
|
if transformFuncsKeepMetricName[name] {
|
|
keepMetricNames = true
|
|
}
|
|
for _, ts := range arg {
|
|
if !keepMetricNames {
|
|
ts.MetricName.ResetMetricGroup()
|
|
}
|
|
tf(ts.Values)
|
|
}
|
|
return arg, nil
|
|
}
|
|
|
|
func transformAbs(v float64) float64 {
|
|
return math.Abs(v)
|
|
}
|
|
|
|
func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := args[0]
|
|
rvs := getAbsentTimeseries(tfa.ec, tfa.fe.Args[0])
|
|
if len(tss) == 0 {
|
|
return rvs, nil
|
|
}
|
|
for i := range tss[0].Values {
|
|
isAbsent := true
|
|
for _, ts := range tss {
|
|
if !math.IsNaN(ts.Values[i]) {
|
|
isAbsent = false
|
|
break
|
|
}
|
|
}
|
|
if !isAbsent {
|
|
rvs[0].Values[i] = nan
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func getAbsentTimeseries(ec *EvalConfig, arg metricsql.Expr) []*timeseries {
|
|
// Copy tags from arg
|
|
rvs := evalNumber(ec, 1)
|
|
rv := rvs[0]
|
|
me, ok := arg.(*metricsql.MetricExpr)
|
|
if !ok {
|
|
return rvs
|
|
}
|
|
tfs := searchutils.ToTagFilters(me.LabelFilters)
|
|
for i := range tfs {
|
|
tf := &tfs[i]
|
|
if len(tf.Key) == 0 {
|
|
continue
|
|
}
|
|
if tf.IsRegexp || tf.IsNegative {
|
|
continue
|
|
}
|
|
rv.MetricName.AddTagBytes(tf.Key, tf.Value)
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func transformCeil(v float64) float64 {
|
|
return math.Ceil(v)
|
|
}
|
|
|
|
func transformClamp(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
mins, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
maxs, err := getScalar(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if v > maxs[i] {
|
|
values[i] = maxs[i]
|
|
} else if v < mins[i] {
|
|
values[i] = mins[i]
|
|
}
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformClampMax(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
maxs, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if v > maxs[i] {
|
|
values[i] = maxs[i]
|
|
}
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformClampMin(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
mins, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if v < mins[i] {
|
|
values[i] = mins[i]
|
|
}
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func newTransformFuncDateTime(f func(t time.Time) int) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) > 1 {
|
|
return nil, fmt.Errorf(`too many args; got %d; want up to %d`, len(args), 1)
|
|
}
|
|
var arg []*timeseries
|
|
if len(args) == 0 {
|
|
arg = evalTime(tfa.ec)
|
|
} else {
|
|
arg = args[0]
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
t := time.Unix(int64(v), 0).UTC()
|
|
values[i] = float64(f(t))
|
|
}
|
|
}
|
|
return doTransformValues(arg, tf, tfa.fe)
|
|
}
|
|
}
|
|
|
|
func transformDayOfMonth(t time.Time) int {
|
|
return t.Day()
|
|
}
|
|
|
|
func transformDayOfWeek(t time.Time) int {
|
|
return int(t.Weekday())
|
|
}
|
|
|
|
func transformDaysInMonth(t time.Time) int {
|
|
m := t.Month()
|
|
if m == 2 && isLeapYear(uint32(t.Year())) {
|
|
return 29
|
|
}
|
|
return daysInMonth[m]
|
|
}
|
|
|
|
func transformExp(v float64) float64 {
|
|
return math.Exp(v)
|
|
}
|
|
|
|
func transformFloor(v float64) float64 {
|
|
return math.Floor(v)
|
|
}
|
|
|
|
func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
limits, err := getScalar(args[0], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
limit := 0
|
|
if len(limits) > 0 {
|
|
limit = int(limits[0])
|
|
}
|
|
if limit <= 0 {
|
|
return nil, nil
|
|
}
|
|
if limit < 3 {
|
|
// Preserve the first and the last bucket for better accuracy for min and max values.
|
|
limit = 3
|
|
}
|
|
tss := vmrangeBucketsToLE(args[1])
|
|
if len(tss) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Group timeseries by all MetricGroup+tags excluding `le` tag.
|
|
type x struct {
|
|
le float64
|
|
hits float64
|
|
ts *timeseries
|
|
}
|
|
m := make(map[string][]x)
|
|
var b []byte
|
|
var mn storage.MetricName
|
|
for _, ts := range tss {
|
|
leStr := ts.MetricName.GetTagValue("le")
|
|
if len(leStr) == 0 {
|
|
// Skip time series without `le` tag.
|
|
continue
|
|
}
|
|
le, err := strconv.ParseFloat(string(leStr), 64)
|
|
if err != nil {
|
|
// Skip time series with invalid `le` tag.
|
|
continue
|
|
}
|
|
mn.CopyFrom(&ts.MetricName)
|
|
mn.RemoveTag("le")
|
|
b = marshalMetricNameSorted(b[:0], &mn)
|
|
m[string(b)] = append(m[string(b)], x{
|
|
le: le,
|
|
ts: ts,
|
|
})
|
|
}
|
|
|
|
// Remove buckets with the smallest counters.
|
|
rvs := make([]*timeseries, 0, len(tss))
|
|
for _, leGroup := range m {
|
|
if len(leGroup) <= limit {
|
|
// Fast path - the number of buckets doesn't exceed the given limit.
|
|
// Keep all the buckets as is.
|
|
for _, xx := range leGroup {
|
|
rvs = append(rvs, xx.ts)
|
|
}
|
|
continue
|
|
}
|
|
// Slow path - remove buckets with the smallest number of hits until their count reaches the limit.
|
|
|
|
// Calculate per-bucket hits.
|
|
sort.Slice(leGroup, func(i, j int) bool {
|
|
return leGroup[i].le < leGroup[j].le
|
|
})
|
|
for n := range limits {
|
|
prevValue := float64(0)
|
|
for i := range leGroup {
|
|
xx := &leGroup[i]
|
|
value := xx.ts.Values[n]
|
|
xx.hits += value - prevValue
|
|
prevValue = value
|
|
}
|
|
}
|
|
for len(leGroup) > limit {
|
|
// Preserve the first and the last bucket for better accuracy for min and max values
|
|
xxMinIdx := 1
|
|
minMergeHits := leGroup[1].hits + leGroup[2].hits
|
|
for i := range leGroup[1 : len(leGroup)-2] {
|
|
mergeHits := leGroup[i+1].hits + leGroup[i+2].hits
|
|
if mergeHits < minMergeHits {
|
|
xxMinIdx = i + 1
|
|
minMergeHits = mergeHits
|
|
}
|
|
}
|
|
leGroup[xxMinIdx+1].hits += leGroup[xxMinIdx].hits
|
|
leGroup = append(leGroup[:xxMinIdx], leGroup[xxMinIdx+1:]...)
|
|
}
|
|
for _, xx := range leGroup {
|
|
rvs = append(rvs, xx.ts)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := vmrangeBucketsToLE(args[0])
|
|
return rvs, nil
|
|
}
|
|
|
|
func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
|
|
rvs := make([]*timeseries, 0, len(tss))
|
|
|
|
// Group timeseries by MetricGroup+tags excluding `vmrange` tag.
|
|
type x struct {
|
|
startStr string
|
|
endStr string
|
|
start float64
|
|
end float64
|
|
ts *timeseries
|
|
}
|
|
m := make(map[string][]x)
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
for _, ts := range tss {
|
|
vmrange := ts.MetricName.GetTagValue("vmrange")
|
|
if len(vmrange) == 0 {
|
|
if le := ts.MetricName.GetTagValue("le"); len(le) > 0 {
|
|
// Keep Prometheus-compatible buckets.
|
|
rvs = append(rvs, ts)
|
|
}
|
|
continue
|
|
}
|
|
n := strings.Index(bytesutil.ToUnsafeString(vmrange), "...")
|
|
if n < 0 {
|
|
continue
|
|
}
|
|
startStr := string(vmrange[:n])
|
|
start, err := strconv.ParseFloat(startStr, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
endStr := string(vmrange[n+len("..."):])
|
|
end, err := strconv.ParseFloat(endStr, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
ts.MetricName.RemoveTag("le")
|
|
ts.MetricName.RemoveTag("vmrange")
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
m[string(bb.B)] = append(m[string(bb.B)], x{
|
|
startStr: startStr,
|
|
endStr: endStr,
|
|
start: start,
|
|
end: end,
|
|
ts: ts,
|
|
})
|
|
}
|
|
|
|
// Convert `vmrange` label in each group of time series to `le` label.
|
|
copyTS := func(src *timeseries, leStr string) *timeseries {
|
|
var ts timeseries
|
|
ts.CopyFromShallowTimestamps(src)
|
|
values := ts.Values
|
|
for i := range values {
|
|
values[i] = 0
|
|
}
|
|
ts.MetricName.RemoveTag("le")
|
|
ts.MetricName.AddTag("le", leStr)
|
|
return &ts
|
|
}
|
|
isZeroTS := func(ts *timeseries) bool {
|
|
for _, v := range ts.Values {
|
|
if v > 0 {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool { return xss[i].end < xss[j].end })
|
|
xssNew := make([]x, 0, len(xss)+2)
|
|
var xsPrev x
|
|
uniqTs := make(map[string]*timeseries, len(xss))
|
|
for _, xs := range xss {
|
|
ts := xs.ts
|
|
if isZeroTS(ts) {
|
|
// Skip time series with zeros. They are substituted by xssNew below.
|
|
xsPrev = xs
|
|
continue
|
|
}
|
|
if xs.start != xsPrev.end && uniqTs[xs.startStr] == nil {
|
|
uniqTs[xs.startStr] = xs.ts
|
|
xssNew = append(xssNew, x{
|
|
endStr: xs.startStr,
|
|
end: xs.start,
|
|
ts: copyTS(ts, xs.startStr),
|
|
})
|
|
}
|
|
ts.MetricName.AddTag("le", xs.endStr)
|
|
prevTs := uniqTs[xs.endStr]
|
|
if prevTs != nil {
|
|
// the end of the current bucket is not unique, need to merge it with the existing bucket.
|
|
_ = mergeNonOverlappingTimeseries(prevTs, xs.ts)
|
|
} else {
|
|
xssNew = append(xssNew, xs)
|
|
uniqTs[xs.endStr] = xs.ts
|
|
}
|
|
xsPrev = xs
|
|
}
|
|
if !math.IsInf(xsPrev.end, 1) && !isZeroTS(xsPrev.ts) {
|
|
xssNew = append(xssNew, x{
|
|
endStr: "+Inf",
|
|
end: math.Inf(1),
|
|
ts: copyTS(xsPrev.ts, "+Inf"),
|
|
})
|
|
}
|
|
xss = xssNew
|
|
if len(xss) == 0 {
|
|
continue
|
|
}
|
|
for i := range xss[0].ts.Values {
|
|
count := float64(0)
|
|
for _, xs := range xss {
|
|
ts := xs.ts
|
|
v := ts.Values[i]
|
|
if !math.IsNaN(v) && v > 0 {
|
|
count += v
|
|
}
|
|
ts.Values[i] = count
|
|
}
|
|
}
|
|
for _, xs := range xss {
|
|
rvs = append(rvs, xs.ts)
|
|
}
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func transformHistogramShare(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 || len(args) > 3 {
|
|
return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args))
|
|
}
|
|
les, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse le: %w", err)
|
|
}
|
|
|
|
// Convert buckets with `vmrange` labels to buckets with `le` labels.
|
|
tss := vmrangeBucketsToLE(args[1])
|
|
|
|
// Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details.
|
|
var boundsLabel string
|
|
if len(args) > 2 {
|
|
s, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %w", err)
|
|
}
|
|
boundsLabel = s
|
|
}
|
|
|
|
// Group metrics by all tags excluding "le"
|
|
m := groupLeTimeseries(tss)
|
|
|
|
// Calculate share for les
|
|
share := func(i int, les []float64, xss []leTimeseries) (q, lower, upper float64) {
|
|
leReq := les[i]
|
|
if math.IsNaN(leReq) || len(xss) == 0 {
|
|
return nan, nan, nan
|
|
}
|
|
fixBrokenBuckets(i, xss)
|
|
if leReq < 0 {
|
|
return 0, 0, 0
|
|
}
|
|
if math.IsInf(leReq, 1) {
|
|
return 1, 1, 1
|
|
}
|
|
var vPrev, lePrev float64
|
|
for _, xs := range xss {
|
|
v := xs.ts.Values[i]
|
|
le := xs.le
|
|
if leReq >= le {
|
|
vPrev = v
|
|
lePrev = le
|
|
continue
|
|
}
|
|
// precondition: lePrev <= leReq < le
|
|
vLast := xss[len(xss)-1].ts.Values[i]
|
|
lower = vPrev / vLast
|
|
if math.IsInf(le, 1) {
|
|
return lower, lower, 1
|
|
}
|
|
if lePrev == leReq {
|
|
return lower, lower, lower
|
|
}
|
|
upper = v / vLast
|
|
q = lower + (v-vPrev)/vLast*(leReq-lePrev)/(le-lePrev)
|
|
return q, lower, upper
|
|
}
|
|
// precondition: leReq > leLast
|
|
return 1, 1, 1
|
|
}
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
var tsLower, tsUpper *timeseries
|
|
if len(boundsLabel) > 0 {
|
|
tsLower = ×eries{}
|
|
tsLower.CopyFromShallowTimestamps(dst)
|
|
tsLower.MetricName.RemoveTag(boundsLabel)
|
|
tsLower.MetricName.AddTag(boundsLabel, "lower")
|
|
tsUpper = ×eries{}
|
|
tsUpper.CopyFromShallowTimestamps(dst)
|
|
tsUpper.MetricName.RemoveTag(boundsLabel)
|
|
tsUpper.MetricName.AddTag(boundsLabel, "upper")
|
|
}
|
|
for i := range dst.Values {
|
|
q, lower, upper := share(i, les, xss)
|
|
dst.Values[i] = q
|
|
if len(boundsLabel) > 0 {
|
|
tsLower.Values[i] = lower
|
|
tsUpper.Values[i] = upper
|
|
}
|
|
}
|
|
rvs = append(rvs, dst)
|
|
if len(boundsLabel) > 0 {
|
|
rvs = append(rvs, tsLower)
|
|
rvs = append(rvs, tsUpper)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramAvg(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := vmrangeBucketsToLE(args[0])
|
|
m := groupLeTimeseries(tss)
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
for i := range dst.Values {
|
|
dst.Values[i] = avgForLeTimeseries(i, xss)
|
|
}
|
|
rvs = append(rvs, dst)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramStddev(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := vmrangeBucketsToLE(args[0])
|
|
m := groupLeTimeseries(tss)
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
for i := range dst.Values {
|
|
v := stdvarForLeTimeseries(i, xss)
|
|
dst.Values[i] = math.Sqrt(v)
|
|
}
|
|
rvs = append(rvs, dst)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramStdvar(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := vmrangeBucketsToLE(args[0])
|
|
m := groupLeTimeseries(tss)
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
for i := range dst.Values {
|
|
dst.Values[i] = stdvarForLeTimeseries(i, xss)
|
|
}
|
|
rvs = append(rvs, dst)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func avgForLeTimeseries(i int, xss []leTimeseries) float64 {
|
|
lePrev := float64(0)
|
|
vPrev := float64(0)
|
|
sum := float64(0)
|
|
weightTotal := float64(0)
|
|
for _, xs := range xss {
|
|
if math.IsInf(xs.le, 0) {
|
|
continue
|
|
}
|
|
le := xs.le
|
|
n := (le + lePrev) / 2
|
|
v := xs.ts.Values[i]
|
|
weight := v - vPrev
|
|
sum += n * weight
|
|
weightTotal += weight
|
|
lePrev = le
|
|
vPrev = v
|
|
}
|
|
if weightTotal == 0 {
|
|
return nan
|
|
}
|
|
return sum / weightTotal
|
|
}
|
|
|
|
func stdvarForLeTimeseries(i int, xss []leTimeseries) float64 {
|
|
lePrev := float64(0)
|
|
vPrev := float64(0)
|
|
sum := float64(0)
|
|
sum2 := float64(0)
|
|
weightTotal := float64(0)
|
|
for _, xs := range xss {
|
|
if math.IsInf(xs.le, 0) {
|
|
continue
|
|
}
|
|
le := xs.le
|
|
n := (le + lePrev) / 2
|
|
v := xs.ts.Values[i]
|
|
weight := v - vPrev
|
|
sum += n * weight
|
|
sum2 += n * n * weight
|
|
weightTotal += weight
|
|
lePrev = le
|
|
vPrev = v
|
|
}
|
|
if weightTotal == 0 {
|
|
return nan
|
|
}
|
|
avg := sum / weightTotal
|
|
avg2 := sum2 / weightTotal
|
|
stdvar := avg2 - avg*avg
|
|
if stdvar < 0 {
|
|
// Correct possible calculation error.
|
|
stdvar = 0
|
|
}
|
|
return stdvar
|
|
}
|
|
|
|
func transformHistogramQuantiles(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 3 {
|
|
return nil, fmt.Errorf("unexpected number of args: %d; expecting at least 3 args", len(args))
|
|
}
|
|
dstLabel, err := getString(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot obtain dstLabel: %w", err)
|
|
}
|
|
phiArgs := args[1 : len(args)-1]
|
|
tssOrig := args[len(args)-1]
|
|
// Calculate quantile individually per each phi.
|
|
var rvs []*timeseries
|
|
for i, phiArg := range phiArgs {
|
|
phis, err := getScalar(phiArg, i)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse phi: %w", err)
|
|
}
|
|
phiStr := fmt.Sprintf("%g", phis[0])
|
|
tss := copyTimeseries(tssOrig)
|
|
tfaTmp := &transformFuncArg{
|
|
ec: tfa.ec,
|
|
fe: tfa.fe,
|
|
args: [][]*timeseries{
|
|
phiArg,
|
|
tss,
|
|
},
|
|
}
|
|
tssTmp, err := transformHistogramQuantile(tfaTmp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot calculate quantile %s: %w", phiStr, err)
|
|
}
|
|
for _, ts := range tssTmp {
|
|
ts.MetricName.RemoveTag(dstLabel)
|
|
ts.MetricName.AddTag(dstLabel, phiStr)
|
|
}
|
|
rvs = append(rvs, tssTmp...)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 || len(args) > 3 {
|
|
return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args))
|
|
}
|
|
phis, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse phi: %w", err)
|
|
}
|
|
|
|
// Convert buckets with `vmrange` labels to buckets with `le` labels.
|
|
tss := vmrangeBucketsToLE(args[1])
|
|
|
|
// Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details.
|
|
var boundsLabel string
|
|
if len(args) > 2 {
|
|
s, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %w", err)
|
|
}
|
|
boundsLabel = s
|
|
}
|
|
|
|
// Group metrics by all tags excluding "le"
|
|
m := groupLeTimeseries(tss)
|
|
|
|
// Calculate quantile for each group in m
|
|
lastNonInf := func(i int, xss []leTimeseries) float64 {
|
|
for len(xss) > 0 {
|
|
xsLast := xss[len(xss)-1]
|
|
if !math.IsInf(xsLast.le, 0) {
|
|
return xsLast.le
|
|
}
|
|
xss = xss[:len(xss)-1]
|
|
}
|
|
return nan
|
|
}
|
|
quantile := func(i int, phis []float64, xss []leTimeseries) (q, lower, upper float64) {
|
|
phi := phis[i]
|
|
if math.IsNaN(phi) {
|
|
return nan, nan, nan
|
|
}
|
|
fixBrokenBuckets(i, xss)
|
|
vLast := float64(0)
|
|
if len(xss) > 0 {
|
|
vLast = xss[len(xss)-1].ts.Values[i]
|
|
}
|
|
if vLast == 0 {
|
|
return nan, nan, nan
|
|
}
|
|
if phi < 0 {
|
|
return -inf, -inf, xss[0].ts.Values[i]
|
|
}
|
|
if phi > 1 {
|
|
return inf, vLast, inf
|
|
}
|
|
vReq := vLast * phi
|
|
vPrev := float64(0)
|
|
lePrev := float64(0)
|
|
for _, xs := range xss {
|
|
v := xs.ts.Values[i]
|
|
le := xs.le
|
|
if v <= 0 {
|
|
// Skip zero buckets.
|
|
lePrev = le
|
|
continue
|
|
}
|
|
if v < vReq {
|
|
vPrev = v
|
|
lePrev = le
|
|
continue
|
|
}
|
|
if math.IsInf(le, 0) {
|
|
break
|
|
}
|
|
if v == vPrev {
|
|
return lePrev, lePrev, v
|
|
}
|
|
vv := lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev)
|
|
return vv, lePrev, le
|
|
}
|
|
vv := lastNonInf(i, xss)
|
|
return vv, vv, inf
|
|
}
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
var tsLower, tsUpper *timeseries
|
|
if len(boundsLabel) > 0 {
|
|
tsLower = ×eries{}
|
|
tsLower.CopyFromShallowTimestamps(dst)
|
|
tsLower.MetricName.RemoveTag(boundsLabel)
|
|
tsLower.MetricName.AddTag(boundsLabel, "lower")
|
|
tsUpper = ×eries{}
|
|
tsUpper.CopyFromShallowTimestamps(dst)
|
|
tsUpper.MetricName.RemoveTag(boundsLabel)
|
|
tsUpper.MetricName.AddTag(boundsLabel, "upper")
|
|
}
|
|
for i := range dst.Values {
|
|
v, lower, upper := quantile(i, phis, xss)
|
|
dst.Values[i] = v
|
|
if len(boundsLabel) > 0 {
|
|
tsLower.Values[i] = lower
|
|
tsUpper.Values[i] = upper
|
|
}
|
|
}
|
|
rvs = append(rvs, dst)
|
|
if len(boundsLabel) > 0 {
|
|
rvs = append(rvs, tsLower)
|
|
rvs = append(rvs, tsUpper)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
type leTimeseries struct {
|
|
le float64
|
|
ts *timeseries
|
|
}
|
|
|
|
func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries {
|
|
m := make(map[string][]leTimeseries)
|
|
bb := bbPool.Get()
|
|
for _, ts := range tss {
|
|
tagValue := ts.MetricName.GetTagValue("le")
|
|
if len(tagValue) == 0 {
|
|
continue
|
|
}
|
|
le, err := strconv.ParseFloat(bytesutil.ToUnsafeString(tagValue), 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
ts.MetricName.ResetMetricGroup()
|
|
ts.MetricName.RemoveTag("le")
|
|
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
|
|
m[string(bb.B)] = append(m[string(bb.B)], leTimeseries{
|
|
le: le,
|
|
ts: ts,
|
|
})
|
|
}
|
|
bbPool.Put(bb)
|
|
return m
|
|
}
|
|
|
|
func fixBrokenBuckets(i int, xss []leTimeseries) {
|
|
// Buckets are already sorted by le, so their values must be in ascending order,
|
|
// since the next bucket includes all the previous buckets.
|
|
// If the next bucket has lower value than the current bucket,
|
|
// then the current bucket must be substituted with the next bucket value.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819
|
|
if len(xss) < 2 {
|
|
return
|
|
}
|
|
for j := len(xss) - 1; j >= 0; j-- {
|
|
v := xss[j].ts.Values[i]
|
|
if !math.IsNaN(v) {
|
|
j++
|
|
for j < len(xss) {
|
|
xss[j].ts.Values[i] = v
|
|
j++
|
|
}
|
|
break
|
|
}
|
|
}
|
|
vNext := xss[len(xss)-1].ts.Values[i]
|
|
for j := len(xss) - 2; j >= 0; j-- {
|
|
v := xss[j].ts.Values[i]
|
|
if math.IsNaN(v) || v > vNext {
|
|
xss[j].ts.Values[i] = vNext
|
|
} else {
|
|
vNext = v
|
|
}
|
|
}
|
|
}
|
|
|
|
func transformHour(t time.Time) int {
|
|
return t.Hour()
|
|
}
|
|
|
|
func runningSum(a, b float64, idx int) float64 {
|
|
return a + b
|
|
}
|
|
|
|
func runningMax(a, b float64, idx int) float64 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func runningMin(a, b float64, idx int) float64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func runningAvg(a, b float64, idx int) float64 {
|
|
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
|
|
return a + (b-a)/float64(idx+1)
|
|
}
|
|
|
|
func skipLeadingNaNs(values []float64) []float64 {
|
|
i := 0
|
|
for i < len(values) && math.IsNaN(values[i]) {
|
|
i++
|
|
}
|
|
return values[i:]
|
|
}
|
|
|
|
func skipTrailingNaNs(values []float64) []float64 {
|
|
i := len(values) - 1
|
|
for i >= 0 && math.IsNaN(values[i]) {
|
|
i--
|
|
}
|
|
return values[:i+1]
|
|
}
|
|
|
|
func transformKeepLastValue(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := ts.Values
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
lastValue := values[0]
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
lastValue = v
|
|
continue
|
|
}
|
|
values[i] = lastValue
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformKeepNextValue(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := ts.Values
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
nextValue := values[len(values)-1]
|
|
for i := len(values) - 1; i >= 0; i-- {
|
|
v := values[i]
|
|
if !math.IsNaN(v) {
|
|
nextValue = v
|
|
continue
|
|
}
|
|
values[i] = nextValue
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformInterpolate(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := ts.Values
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
prevValue := nan
|
|
var nextValue float64
|
|
for i := 0; i < len(values); i++ {
|
|
if !math.IsNaN(values[i]) {
|
|
continue
|
|
}
|
|
if i > 0 {
|
|
prevValue = values[i-1]
|
|
}
|
|
j := i + 1
|
|
for j < len(values) {
|
|
if !math.IsNaN(values[j]) {
|
|
break
|
|
}
|
|
j++
|
|
}
|
|
if j >= len(values) {
|
|
nextValue = prevValue
|
|
} else {
|
|
nextValue = values[j]
|
|
}
|
|
if math.IsNaN(prevValue) {
|
|
prevValue = nextValue
|
|
}
|
|
delta := (nextValue - prevValue) / float64(j-i+1)
|
|
for i < j {
|
|
prevValue += delta
|
|
values[i] = prevValue
|
|
i++
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func newTransformFuncRunning(rf func(a, b float64, idx int) float64) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.ResetMetricGroup()
|
|
values := skipLeadingNaNs(ts.Values)
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
prevValue := values[0]
|
|
values = values[1:]
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
prevValue = rf(prevValue, v, i+1)
|
|
}
|
|
values[i] = prevValue
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func newTransformFuncRange(rf func(a, b float64, idx int) float64) transformFunc {
|
|
tfr := newTransformFuncRunning(rf)
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
rvs, err := tfr(tfa)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
setLastValues(rvs)
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
phis, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
phi := float64(0)
|
|
if len(phis) > 0 {
|
|
phi = phis[0]
|
|
}
|
|
rvs := args[1]
|
|
a := getFloat64s()
|
|
values := a.A[:0]
|
|
for _, ts := range rvs {
|
|
lastIdx := -1
|
|
originValues := ts.Values
|
|
values = values[:0]
|
|
for i, v := range originValues {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
values = append(values, v)
|
|
lastIdx = i
|
|
}
|
|
if lastIdx >= 0 {
|
|
sort.Float64s(values)
|
|
originValues[lastIdx] = quantileSorted(phi, values)
|
|
}
|
|
}
|
|
a.A = values
|
|
putFloat64s(a)
|
|
setLastValues(rvs)
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformRangeFirst(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := skipLeadingNaNs(ts.Values)
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
vFirst := values[0]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
values[i] = vFirst
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformRangeLast(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
setLastValues(rvs)
|
|
return rvs, nil
|
|
}
|
|
|
|
func setLastValues(tss []*timeseries) {
|
|
for _, ts := range tss {
|
|
values := skipTrailingNaNs(ts.Values)
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
vLast := values[len(values)-1]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
values[i] = vLast
|
|
}
|
|
}
|
|
}
|
|
|
|
func transformSmoothExponential(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
sfs, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := skipLeadingNaNs(ts.Values)
|
|
for i, v := range values {
|
|
if !math.IsInf(v, 0) {
|
|
values = values[i:]
|
|
break
|
|
}
|
|
}
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
avg := values[0]
|
|
values = values[1:]
|
|
sfsX := sfs[len(ts.Values)-len(values):]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
if math.IsInf(v, 0) {
|
|
values[i] = avg
|
|
continue
|
|
}
|
|
sf := sfsX[i]
|
|
if math.IsNaN(sf) {
|
|
sf = 1
|
|
}
|
|
if sf < 0 {
|
|
sf = 0
|
|
}
|
|
if sf > 1 {
|
|
sf = 1
|
|
}
|
|
avg = avg*(1-sf) + v*sf
|
|
values[i] = avg
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformRemoveResets(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
removeCounterResetsMaybeNaNs(ts.Values)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformUnion(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return evalNumber(tfa.ec, nan), nil
|
|
}
|
|
|
|
rvs := make([]*timeseries, 0, len(args[0]))
|
|
m := make(map[string]bool, len(args[0]))
|
|
bb := bbPool.Get()
|
|
for _, arg := range args {
|
|
for _, ts := range arg {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
if m[string(bb.B)] {
|
|
continue
|
|
}
|
|
m[string(bb.B)] = true
|
|
rvs = append(rvs, ts)
|
|
}
|
|
}
|
|
bbPool.Put(bb)
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelKeep(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
var keepLabels []string
|
|
for i := 1; i < len(args); i++ {
|
|
keepLabel, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
keepLabels = append(keepLabels, keepLabel)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.RemoveTagsOn(keepLabels)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelDel(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
var delLabels []string
|
|
for i := 1; i < len(args); i++ {
|
|
delLabel, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
delLabels = append(delLabels, delLabel)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.RemoveTagsIgnoring(delLabels)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelSet(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
dstLabels, dstValues, err := getStringPairs(args[1:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
for i, dstLabel := range dstLabels {
|
|
value := dstValues[i]
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
*dstValue = append((*dstValue)[:0], value...)
|
|
if len(value) == 0 {
|
|
mn.RemoveTag(dstLabel)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelUppercase(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelValueFunc(tfa, strings.ToUpper)
|
|
}
|
|
|
|
func transformLabelLowercase(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelValueFunc(tfa, strings.ToLower)
|
|
}
|
|
|
|
func transformLabelValueFunc(tfa *transformFuncArg, f func(string) string) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 2)
|
|
}
|
|
labels := make([]string, 0, len(args)-1)
|
|
for i := 1; i < len(args); i++ {
|
|
label, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
labels = append(labels, label)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
for _, label := range labels {
|
|
dstValue := getDstValue(mn, label)
|
|
*dstValue = append((*dstValue)[:0], f(string(*dstValue))...)
|
|
if len(*dstValue) == 0 {
|
|
mn.RemoveTag(label)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelMap(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 2)
|
|
}
|
|
label, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read label name: %w", err)
|
|
}
|
|
srcValues, dstValues, err := getStringPairs(args[2:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m := make(map[string]string, len(srcValues))
|
|
for i, srcValue := range srcValues {
|
|
m[srcValue] = dstValues[i]
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
dstValue := getDstValue(mn, label)
|
|
value, ok := m[string(*dstValue)]
|
|
if ok {
|
|
*dstValue = append((*dstValue)[:0], value...)
|
|
}
|
|
if len(*dstValue) == 0 {
|
|
mn.RemoveTag(label)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformDropCommonLabels(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
rvs := args[0]
|
|
for _, tss := range args[1:] {
|
|
rvs = append(rvs, tss...)
|
|
}
|
|
m := make(map[string]map[string]int)
|
|
countLabel := func(name, value string) {
|
|
x := m[name]
|
|
if x == nil {
|
|
x = make(map[string]int)
|
|
m[name] = x
|
|
}
|
|
x[value]++
|
|
}
|
|
for _, ts := range rvs {
|
|
countLabel("__name__", string(ts.MetricName.MetricGroup))
|
|
for _, tag := range ts.MetricName.Tags {
|
|
countLabel(string(tag.Key), string(tag.Value))
|
|
}
|
|
}
|
|
for labelName, x := range m {
|
|
for _, count := range x {
|
|
if count != len(rvs) {
|
|
continue
|
|
}
|
|
for _, ts := range rvs {
|
|
ts.MetricName.RemoveTag(labelName)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelCopy(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelCopyExt(tfa, false)
|
|
}
|
|
|
|
func transformLabelMove(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelCopyExt(tfa, true)
|
|
}
|
|
|
|
func transformLabelCopyExt(tfa *transformFuncArg, removeSrcLabels bool) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
srcLabels, dstLabels, err := getStringPairs(args[1:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
for i, srcLabel := range srcLabels {
|
|
dstLabel := dstLabels[i]
|
|
value := mn.GetTagValue(srcLabel)
|
|
if len(value) == 0 {
|
|
// Do not remove destination label if the source label doesn't exist.
|
|
continue
|
|
}
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
*dstValue = append((*dstValue)[:0], value...)
|
|
if removeSrcLabels && srcLabel != dstLabel {
|
|
mn.RemoveTag(srcLabel)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func getStringPairs(args [][]*timeseries) ([]string, []string, error) {
|
|
if len(args)%2 != 0 {
|
|
return nil, nil, fmt.Errorf(`the number of string args must be even; got %d`, len(args))
|
|
}
|
|
var ks, vs []string
|
|
for i := 0; i < len(args); i += 2 {
|
|
k, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
ks = append(ks, k)
|
|
|
|
v, err := getString(args[i+1], i+1)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
vs = append(vs, v)
|
|
}
|
|
return ks, vs, nil
|
|
}
|
|
|
|
func transformLabelJoin(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 3 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 3)
|
|
}
|
|
dstLabel, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
separator, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var srcLabels []string
|
|
for i := 3; i < len(args); i++ {
|
|
srcLabel, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
srcLabels = append(srcLabels, srcLabel)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
b := *dstValue
|
|
b = b[:0]
|
|
for j, srcLabel := range srcLabels {
|
|
srcValue := mn.GetTagValue(srcLabel)
|
|
b = append(b, srcValue...)
|
|
if j+1 < len(srcLabels) {
|
|
b = append(b, separator...)
|
|
}
|
|
}
|
|
*dstValue = b
|
|
if len(b) == 0 {
|
|
mn.RemoveTag(dstLabel)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelTransform(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 4); err != nil {
|
|
return nil, err
|
|
}
|
|
label, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
regex, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
replacement, err := getString(args[3], 3)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r, err := metricsql.CompileRegexp(regex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regex %q: %w`, regex, err)
|
|
}
|
|
return labelReplace(args[0], label, r, label, replacement)
|
|
}
|
|
|
|
func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 5); err != nil {
|
|
return nil, err
|
|
}
|
|
dstLabel, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
replacement, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
srcLabel, err := getString(args[3], 3)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
regex, err := getString(args[4], 4)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r, err := metricsql.CompileRegexpAnchored(regex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regex %q: %w`, regex, err)
|
|
}
|
|
return labelReplace(args[0], srcLabel, r, dstLabel, replacement)
|
|
}
|
|
|
|
func labelReplace(tss []*timeseries, srcLabel string, r *regexp.Regexp, dstLabel, replacement string) ([]*timeseries, error) {
|
|
replacementBytes := []byte(replacement)
|
|
for _, ts := range tss {
|
|
mn := &ts.MetricName
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
srcValue := mn.GetTagValue(srcLabel)
|
|
if !r.Match(srcValue) {
|
|
continue
|
|
}
|
|
b := r.ReplaceAll(srcValue, replacementBytes)
|
|
*dstValue = append((*dstValue)[:0], b...)
|
|
if len(b) == 0 {
|
|
mn.RemoveTag(dstLabel)
|
|
}
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func transformLabelValue(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
labelName, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get label name: %w", err)
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.ResetMetricGroup()
|
|
labelValue := ts.MetricName.GetTagValue(labelName)
|
|
v, err := strconv.ParseFloat(string(labelValue), 64)
|
|
if err != nil {
|
|
v = nan
|
|
}
|
|
values := ts.Values
|
|
for i, vOrig := range values {
|
|
if !math.IsNaN(vOrig) {
|
|
values[i] = v
|
|
}
|
|
}
|
|
}
|
|
// Do not remove timeseries with only NaN values, so `default` could be applied to them:
|
|
// label_value(q, "label") default 123
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelMatch(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
labelName, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get label name: %w", err)
|
|
}
|
|
labelRe, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get regexp: %w", err)
|
|
}
|
|
r, err := metricsql.CompileRegexpAnchored(labelRe)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regexp %q: %w`, labelRe, err)
|
|
}
|
|
tss := args[0]
|
|
rvs := tss[:0]
|
|
for _, ts := range tss {
|
|
labelValue := ts.MetricName.GetTagValue(labelName)
|
|
if r.Match(labelValue) {
|
|
rvs = append(rvs, ts)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelMismatch(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
labelName, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get label name: %w", err)
|
|
}
|
|
labelRe, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get regexp: %w", err)
|
|
}
|
|
r, err := metricsql.CompileRegexpAnchored(labelRe)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regexp %q: %w`, labelRe, err)
|
|
}
|
|
tss := args[0]
|
|
rvs := tss[:0]
|
|
for _, ts := range tss {
|
|
labelValue := ts.MetricName.GetTagValue(labelName)
|
|
if !r.Match(labelValue) {
|
|
rvs = append(rvs, ts)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelGraphiteGroup(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf("unexpected number of args: %d; want at least 2 args", len(args))
|
|
}
|
|
tss := args[0]
|
|
groupArgs := args[1:]
|
|
groupIDs := make([]int, len(groupArgs))
|
|
for i, arg := range groupArgs {
|
|
groupID, err := getIntNumber(arg, i+1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get group name from arg #%d: %w", i+1, err)
|
|
}
|
|
groupIDs[i] = groupID
|
|
}
|
|
for _, ts := range tss {
|
|
groups := bytes.Split(ts.MetricName.MetricGroup, dotSeparator)
|
|
groupName := ts.MetricName.MetricGroup[:0]
|
|
for j, groupID := range groupIDs {
|
|
if groupID >= 0 && groupID < len(groups) {
|
|
groupName = append(groupName, groups[groupID]...)
|
|
}
|
|
if j < len(groupIDs)-1 {
|
|
groupName = append(groupName, '.')
|
|
}
|
|
}
|
|
ts.MetricName.MetricGroup = groupName
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
var dotSeparator = []byte(".")
|
|
|
|
func transformLimitOffset(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
limit, err := getIntNumber(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot obtain limit arg: %w", err)
|
|
}
|
|
offset, err := getIntNumber(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot obtain offset arg: %w", err)
|
|
}
|
|
rvs := args[2]
|
|
if len(rvs) >= offset {
|
|
rvs = rvs[offset:]
|
|
}
|
|
if len(rvs) > limit {
|
|
rvs = rvs[:limit]
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLn(v float64) float64 {
|
|
return math.Log(v)
|
|
}
|
|
|
|
func transformLog2(v float64) float64 {
|
|
return math.Log2(v)
|
|
}
|
|
|
|
func transformLog10(v float64) float64 {
|
|
return math.Log10(v)
|
|
}
|
|
|
|
func transformMinute(t time.Time) int {
|
|
return t.Minute()
|
|
}
|
|
|
|
func transformMonth(t time.Time) int {
|
|
return int(t.Month())
|
|
}
|
|
|
|
func transformRound(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) != 1 && len(args) != 2 {
|
|
return nil, fmt.Errorf(`unexpected number of args: %d; want 1 or 2`, len(args))
|
|
}
|
|
var nearestArg []*timeseries
|
|
if len(args) == 1 {
|
|
nearestArg = evalNumber(tfa.ec, 1)
|
|
} else {
|
|
nearestArg = args[1]
|
|
}
|
|
nearest, err := getScalar(nearestArg, 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
var nPrev float64
|
|
var p10 float64
|
|
for i, v := range values {
|
|
n := nearest[i]
|
|
if n != nPrev {
|
|
nPrev = n
|
|
_, e := decimal.FromFloat(n)
|
|
p10 = math.Pow10(int(-e))
|
|
}
|
|
v += 0.5 * math.Copysign(n, v)
|
|
v -= math.Mod(v, n)
|
|
v, _ = math.Modf(v * p10)
|
|
values[i] = v / p10
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformSgn(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
sign := float64(0)
|
|
if v < 0 {
|
|
sign = -1
|
|
} else if v > 0 {
|
|
sign = 1
|
|
}
|
|
values[i] = sign
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformScalar(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Verify whether the arg is a string.
|
|
// Then try converting the string to number.
|
|
if se, ok := tfa.fe.Args[0].(*metricsql.StringExpr); ok {
|
|
n, err := strconv.ParseFloat(se.S, 64)
|
|
if err != nil {
|
|
n = nan
|
|
}
|
|
return evalNumber(tfa.ec, n), nil
|
|
}
|
|
|
|
// The arg isn't a string. Extract scalar from it.
|
|
arg := args[0]
|
|
if len(arg) != 1 {
|
|
return evalNumber(tfa.ec, nan), nil
|
|
}
|
|
arg[0].MetricName.Reset()
|
|
return arg, nil
|
|
}
|
|
|
|
func newTransformFuncSortByLabel(isDesc bool) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf("expecting at least 2 args; got %d args", len(args))
|
|
}
|
|
var labels []string
|
|
for i, arg := range args[1:] {
|
|
label, err := getString(arg, 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse label #%d for sorting: %w", i+1, err)
|
|
}
|
|
labels = append(labels, label)
|
|
}
|
|
rvs := args[0]
|
|
sort.SliceStable(rvs, func(i, j int) bool {
|
|
for _, label := range labels {
|
|
a := rvs[i].MetricName.GetTagValue(label)
|
|
b := rvs[j].MetricName.GetTagValue(label)
|
|
if string(a) == string(b) {
|
|
continue
|
|
}
|
|
if isDesc {
|
|
return string(b) < string(a)
|
|
}
|
|
return string(a) < string(b)
|
|
}
|
|
return false
|
|
})
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func newTransformFuncNumericSort(isDesc bool) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf("expecting at least 2 args; got %d args", len(args))
|
|
}
|
|
var labels []string
|
|
for i, arg := range args[1:] {
|
|
label, err := getString(arg, i+1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse label #%d for sorting: %w", i+1, err)
|
|
}
|
|
labels = append(labels, label)
|
|
}
|
|
rvs := args[0]
|
|
sort.SliceStable(rvs, func(i, j int) bool {
|
|
for _, label := range labels {
|
|
a := rvs[i].MetricName.GetTagValue(label)
|
|
b := rvs[j].MetricName.GetTagValue(label)
|
|
if string(a) == string(b) {
|
|
continue
|
|
}
|
|
aStr := bytesutil.ToUnsafeString(a)
|
|
bStr := bytesutil.ToUnsafeString(b)
|
|
if isDesc {
|
|
return numericLess(bStr, aStr)
|
|
}
|
|
return numericLess(aStr, bStr)
|
|
}
|
|
return false
|
|
})
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func numericLess(a, b string) bool {
|
|
for {
|
|
if len(b) == 0 {
|
|
return false
|
|
}
|
|
if len(a) == 0 {
|
|
return true
|
|
}
|
|
aPrefix := getNumPrefix(a)
|
|
bPrefix := getNumPrefix(b)
|
|
a = a[len(aPrefix):]
|
|
b = b[len(bPrefix):]
|
|
if len(aPrefix) > 0 || len(bPrefix) > 0 {
|
|
if len(aPrefix) == 0 {
|
|
return false
|
|
}
|
|
if len(bPrefix) == 0 {
|
|
return true
|
|
}
|
|
aNum := mustParseNum(aPrefix)
|
|
bNum := mustParseNum(bPrefix)
|
|
if aNum != bNum {
|
|
return aNum < bNum
|
|
}
|
|
}
|
|
aPrefix = getNonNumPrefix(a)
|
|
bPrefix = getNonNumPrefix(b)
|
|
a = a[len(aPrefix):]
|
|
b = b[len(bPrefix):]
|
|
if aPrefix != bPrefix {
|
|
return aPrefix < bPrefix
|
|
}
|
|
}
|
|
}
|
|
|
|
func getNumPrefix(s string) string {
|
|
i := 0
|
|
if len(s) > 0 {
|
|
switch s[0] {
|
|
case '-', '+':
|
|
i++
|
|
}
|
|
}
|
|
hasNum := false
|
|
hasDot := false
|
|
for i < len(s) {
|
|
if !isDecimalChar(s[i]) {
|
|
if !hasDot && s[i] == '.' {
|
|
hasDot = true
|
|
i++
|
|
continue
|
|
}
|
|
if !hasNum {
|
|
return ""
|
|
}
|
|
return s[:i]
|
|
}
|
|
hasNum = true
|
|
i++
|
|
}
|
|
if !hasNum {
|
|
return ""
|
|
}
|
|
return s
|
|
}
|
|
|
|
func getNonNumPrefix(s string) string {
|
|
i := 0
|
|
for i < len(s) {
|
|
if isDecimalChar(s[i]) {
|
|
return s[:i]
|
|
}
|
|
i++
|
|
}
|
|
return s
|
|
}
|
|
|
|
func isDecimalChar(ch byte) bool {
|
|
return ch >= '0' && ch <= '9'
|
|
}
|
|
|
|
func mustParseNum(s string) float64 {
|
|
f, err := strconv.ParseFloat(s, 64)
|
|
if err != nil {
|
|
logger.Panicf("BUG: unexpected error when parsing the number %q: %s", s, err)
|
|
}
|
|
return f
|
|
}
|
|
|
|
func newTransformFuncSort(isDesc bool) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
sort.Slice(rvs, func(i, j int) bool {
|
|
a := rvs[i].Values
|
|
b := rvs[j].Values
|
|
n := len(a) - 1
|
|
for n >= 0 {
|
|
if !math.IsNaN(a[n]) {
|
|
if math.IsNaN(b[n]) {
|
|
return false
|
|
}
|
|
if a[n] != b[n] {
|
|
break
|
|
}
|
|
} else if !math.IsNaN(b[n]) {
|
|
return true
|
|
}
|
|
n--
|
|
}
|
|
if n < 0 {
|
|
return false
|
|
}
|
|
if isDesc {
|
|
return b[n] < a[n]
|
|
}
|
|
return a[n] < b[n]
|
|
})
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func transformSqrt(v float64) float64 {
|
|
return math.Sqrt(v)
|
|
}
|
|
|
|
func transformSin(v float64) float64 {
|
|
return math.Sin(v)
|
|
}
|
|
|
|
func transformSinh(v float64) float64 {
|
|
return math.Sinh(v)
|
|
}
|
|
|
|
func transformCos(v float64) float64 {
|
|
return math.Cos(v)
|
|
}
|
|
|
|
func transformCosh(v float64) float64 {
|
|
return math.Cosh(v)
|
|
}
|
|
|
|
func transformTan(v float64) float64 {
|
|
return math.Tan(v)
|
|
}
|
|
|
|
func transformTanh(v float64) float64 {
|
|
return math.Tanh(v)
|
|
}
|
|
|
|
func transformAsin(v float64) float64 {
|
|
return math.Asin(v)
|
|
}
|
|
|
|
func transformAsinh(v float64) float64 {
|
|
return math.Asinh(v)
|
|
}
|
|
|
|
func transformAtan(v float64) float64 {
|
|
return math.Atan(v)
|
|
}
|
|
|
|
func transformAtanh(v float64) float64 {
|
|
return math.Atanh(v)
|
|
}
|
|
|
|
func transformAcos(v float64) float64 {
|
|
return math.Acos(v)
|
|
}
|
|
|
|
func transformAcosh(v float64) float64 {
|
|
return math.Acosh(v)
|
|
}
|
|
|
|
func transformDeg(v float64) float64 {
|
|
return v * 180 / math.Pi
|
|
}
|
|
|
|
func transformRad(v float64) float64 {
|
|
return v * math.Pi / 180
|
|
}
|
|
|
|
func newTransformRand(newRandFunc func(r *rand.Rand) func() float64) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) > 1 {
|
|
return nil, fmt.Errorf(`unexpected number of args; got %d; want 0 or 1`, len(args))
|
|
}
|
|
var seed int64
|
|
if len(args) == 1 {
|
|
tmp, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(tmp) > 0 {
|
|
seed = int64(tmp[0])
|
|
}
|
|
} else {
|
|
seed = time.Now().UnixNano()
|
|
}
|
|
source := rand.NewSource(seed)
|
|
r := rand.New(source)
|
|
randFunc := newRandFunc(r)
|
|
tss := evalNumber(tfa.ec, 0)
|
|
values := tss[0].Values
|
|
for i := range values {
|
|
values[i] = randFunc()
|
|
}
|
|
return tss, nil
|
|
}
|
|
}
|
|
|
|
func newRandFloat64(r *rand.Rand) func() float64 {
|
|
return r.Float64
|
|
}
|
|
|
|
func newRandNormFloat64(r *rand.Rand) func() float64 {
|
|
return r.NormFloat64
|
|
}
|
|
|
|
func newRandExpFloat64(r *rand.Rand) func() float64 {
|
|
return r.ExpFloat64
|
|
}
|
|
|
|
func transformPi(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
return evalNumber(tfa.ec, math.Pi), nil
|
|
}
|
|
|
|
func transformNow(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
now := float64(time.Now().UnixNano()) / 1e9
|
|
return evalNumber(tfa.ec, now), nil
|
|
}
|
|
|
|
func bitmapAnd(a, b uint64) uint64 {
|
|
return a & b
|
|
}
|
|
|
|
func bitmapOr(a, b uint64) uint64 {
|
|
return a | b
|
|
}
|
|
|
|
func bitmapXor(a, b uint64) uint64 {
|
|
return a ^ b
|
|
}
|
|
|
|
func newTransformBitmap(bitmapFunc func(a, b uint64) uint64) func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
ns, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
values[i] = float64(bitmapFunc(uint64(v), uint64(ns[i])))
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
}
|
|
|
|
func transformTimezoneOffset(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tzString, err := getString(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get timezone name: %w", err)
|
|
}
|
|
loc, err := time.LoadLocation(tzString)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load timezone %q: %w", tzString, err)
|
|
}
|
|
|
|
tss := evalNumber(tfa.ec, nan)
|
|
ts := tss[0]
|
|
for i, timestamp := range ts.Timestamps {
|
|
_, offset := time.Unix(timestamp/1000, 0).In(loc).Zone()
|
|
ts.Values[i] = float64(offset)
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func transformTime(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
return evalTime(tfa.ec), nil
|
|
}
|
|
|
|
func transformVector(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformYear(t time.Time) int {
|
|
return t.Year()
|
|
}
|
|
|
|
func newTransformFuncZeroArgs(f func(tfa *transformFuncArg) float64) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
v := f(tfa)
|
|
return evalNumber(tfa.ec, v), nil
|
|
}
|
|
}
|
|
|
|
func transformStep(tfa *transformFuncArg) float64 {
|
|
return float64(tfa.ec.Step) / 1e3
|
|
}
|
|
|
|
func transformStart(tfa *transformFuncArg) float64 {
|
|
return float64(tfa.ec.Start) / 1e3
|
|
}
|
|
|
|
func transformEnd(tfa *transformFuncArg) float64 {
|
|
return float64(tfa.ec.End) / 1e3
|
|
}
|
|
|
|
// copyTimeseries returns a copy of tss.
|
|
func copyTimeseries(tss []*timeseries) []*timeseries {
|
|
rvs := make([]*timeseries, len(tss))
|
|
for i, src := range tss {
|
|
var dst timeseries
|
|
dst.CopyFromShallowTimestamps(src)
|
|
rvs[i] = &dst
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
// copyTimeseriesMetricNames returns a copy of tss with real copy of MetricNames,
|
|
// but with shallow copy of Timestamps and Values if makeCopy is set.
|
|
//
|
|
// Otherwise tss is returned.
|
|
func copyTimeseriesMetricNames(tss []*timeseries, makeCopy bool) []*timeseries {
|
|
if !makeCopy {
|
|
return tss
|
|
}
|
|
rvs := make([]*timeseries, len(tss))
|
|
for i, src := range tss {
|
|
var dst timeseries
|
|
dst.CopyFromMetricNames(src)
|
|
rvs[i] = &dst
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
// copyTimeseriesShallow returns a copy of arg with shallow copies of MetricNames,
|
|
// Timestamps and Values.
|
|
func copyTimeseriesShallow(arg []*timeseries) []*timeseries {
|
|
rvs := make([]*timeseries, len(arg))
|
|
for i, src := range arg {
|
|
var dst timeseries
|
|
dst.CopyShallow(src)
|
|
rvs[i] = &dst
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func getDstValue(mn *storage.MetricName, dstLabel string) *[]byte {
|
|
if dstLabel == "__name__" {
|
|
return &mn.MetricGroup
|
|
}
|
|
tags := mn.Tags
|
|
for i := range tags {
|
|
tag := &tags[i]
|
|
if string(tag.Key) == dstLabel {
|
|
return &tag.Value
|
|
}
|
|
}
|
|
if len(tags) < cap(tags) {
|
|
tags = tags[:len(tags)+1]
|
|
} else {
|
|
tags = append(tags, storage.Tag{})
|
|
}
|
|
mn.Tags = tags
|
|
tag := &tags[len(tags)-1]
|
|
tag.Key = append(tag.Key[:0], dstLabel...)
|
|
return &tag.Value
|
|
}
|
|
|
|
func isLeapYear(y uint32) bool {
|
|
if y%4 != 0 {
|
|
return false
|
|
}
|
|
if y%100 != 0 {
|
|
return true
|
|
}
|
|
return y%400 == 0
|
|
}
|
|
|
|
var daysInMonth = [...]int{
|
|
time.January: 31,
|
|
time.February: 28,
|
|
time.March: 31,
|
|
time.April: 30,
|
|
time.May: 31,
|
|
time.June: 30,
|
|
time.July: 31,
|
|
time.August: 31,
|
|
time.September: 30,
|
|
time.October: 31,
|
|
time.November: 30,
|
|
time.December: 31,
|
|
}
|
|
|
|
func expectTransformArgsNum(args [][]*timeseries, expectedNum int) error {
|
|
if len(args) == expectedNum {
|
|
return nil
|
|
}
|
|
return fmt.Errorf(`unexpected number of args; got %d; want %d`, len(args), expectedNum)
|
|
}
|
|
|
|
func removeCounterResetsMaybeNaNs(values []float64) {
|
|
values = skipLeadingNaNs(values)
|
|
if len(values) == 0 {
|
|
return
|
|
}
|
|
var correction float64
|
|
prevValue := values[0]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
d := v - prevValue
|
|
if d < 0 {
|
|
if (-d * 8) < prevValue {
|
|
// This is likely a partial counter reset.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2787
|
|
correction += prevValue - v
|
|
} else {
|
|
correction += prevValue
|
|
}
|
|
}
|
|
prevValue = v
|
|
values[i] = v + correction
|
|
}
|
|
}
|