mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
f68333a8ce
* app/vmselect: ignore empty series for `limit_offset` VictoriaMetrics doesn't return empty series (with all NaN values) to the user. But such series are filtered after transform functions. It means `limit_offset` will account for empty series as well. For example, let's consider following data set: ``` time series: foo{label="1"} NaN, NaN, NaN, NaN // empty series foo{label="2"} 1, 2, 3, 4 foo{label="3"} 4, 3, 2, 1 ``` When user requests all series for metric `foo` the empty series will be filtered out: ``` /query=foo: foo{label="v2"} 1, 2, 3, 4 foo{label="v3"} 4, 3, 2, 1 ``` But `limit_offset(1, 1, foo)` is applied to original series, not filtered yet. So it will return `foo{label="v2"}` (skips the first in list) ``` /query=limit_offset(1, 1, foo): foo{label="v2"} 1, 2, 3, 4 ``` Expected result would be to apply `limit_offset` to already filtered list, so in result we receive `foo{label="v3"}`: ``` /query=limit_offset(1, 1, foo): foo{label="v3"} 4, 3, 2, 1 ``` The change does exactly that - filters empty series before applying `limit_offset`. Signed-off-by: hagen1778 <roman@victoriametrics.com> * app/vmselect: ignore empty series for `limit_offset` Signed-off-by: hagen1778 <roman@victoriametrics.com> Signed-off-by: hagen1778 <roman@victoriametrics.com>
2361 lines
56 KiB
Go
2361 lines
56 KiB
Go
package promql
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metricsql"
|
|
)
|
|
|
|
var transformFuncs = map[string]transformFunc{
|
|
"": transformUnion, // empty func is a synonym to union
|
|
"abs": newTransformFuncOneArg(transformAbs),
|
|
"absent": transformAbsent,
|
|
"acos": newTransformFuncOneArg(transformAcos),
|
|
"acosh": newTransformFuncOneArg(transformAcosh),
|
|
"asin": newTransformFuncOneArg(transformAsin),
|
|
"asinh": newTransformFuncOneArg(transformAsinh),
|
|
"atan": newTransformFuncOneArg(transformAtan),
|
|
"atanh": newTransformFuncOneArg(transformAtanh),
|
|
"bitmap_and": newTransformBitmap(bitmapAnd),
|
|
"bitmap_or": newTransformBitmap(bitmapOr),
|
|
"bitmap_xor": newTransformBitmap(bitmapXor),
|
|
"buckets_limit": transformBucketsLimit,
|
|
"ceil": newTransformFuncOneArg(transformCeil),
|
|
"clamp": transformClamp,
|
|
"clamp_max": transformClampMax,
|
|
"clamp_min": transformClampMin,
|
|
"cos": newTransformFuncOneArg(transformCos),
|
|
"cosh": newTransformFuncOneArg(transformCosh),
|
|
"day_of_month": newTransformFuncDateTime(transformDayOfMonth),
|
|
"day_of_week": newTransformFuncDateTime(transformDayOfWeek),
|
|
"days_in_month": newTransformFuncDateTime(transformDaysInMonth),
|
|
"deg": newTransformFuncOneArg(transformDeg),
|
|
"drop_common_labels": transformDropCommonLabels,
|
|
"end": newTransformFuncZeroArgs(transformEnd),
|
|
"exp": newTransformFuncOneArg(transformExp),
|
|
"floor": newTransformFuncOneArg(transformFloor),
|
|
"histogram_avg": transformHistogramAvg,
|
|
"histogram_quantile": transformHistogramQuantile,
|
|
"histogram_quantiles": transformHistogramQuantiles,
|
|
"histogram_share": transformHistogramShare,
|
|
"histogram_stddev": transformHistogramStddev,
|
|
"histogram_stdvar": transformHistogramStdvar,
|
|
"hour": newTransformFuncDateTime(transformHour),
|
|
"interpolate": transformInterpolate,
|
|
"keep_last_value": transformKeepLastValue,
|
|
"keep_next_value": transformKeepNextValue,
|
|
"label_copy": transformLabelCopy,
|
|
"label_del": transformLabelDel,
|
|
"label_graphite_group": transformLabelGraphiteGroup,
|
|
"label_join": transformLabelJoin,
|
|
"label_keep": transformLabelKeep,
|
|
"label_lowercase": transformLabelLowercase,
|
|
"label_map": transformLabelMap,
|
|
"label_match": transformLabelMatch,
|
|
"label_mismatch": transformLabelMismatch,
|
|
"label_move": transformLabelMove,
|
|
"label_replace": transformLabelReplace,
|
|
"label_set": transformLabelSet,
|
|
"label_transform": transformLabelTransform,
|
|
"label_uppercase": transformLabelUppercase,
|
|
"label_value": transformLabelValue,
|
|
"limit_offset": transformLimitOffset,
|
|
"ln": newTransformFuncOneArg(transformLn),
|
|
"log2": newTransformFuncOneArg(transformLog2),
|
|
"log10": newTransformFuncOneArg(transformLog10),
|
|
"minute": newTransformFuncDateTime(transformMinute),
|
|
"month": newTransformFuncDateTime(transformMonth),
|
|
"now": transformNow,
|
|
"pi": transformPi,
|
|
"prometheus_buckets": transformPrometheusBuckets,
|
|
"rad": newTransformFuncOneArg(transformRad),
|
|
"rand": newTransformRand(newRandFloat64),
|
|
"rand_exponential": newTransformRand(newRandExpFloat64),
|
|
"rand_normal": newTransformRand(newRandNormFloat64),
|
|
"range_avg": newTransformFuncRange(runningAvg),
|
|
"range_first": transformRangeFirst,
|
|
"range_last": transformRangeLast,
|
|
"range_max": newTransformFuncRange(runningMax),
|
|
"range_min": newTransformFuncRange(runningMin),
|
|
"range_quantile": transformRangeQuantile,
|
|
"range_sum": newTransformFuncRange(runningSum),
|
|
"remove_resets": transformRemoveResets,
|
|
"round": transformRound,
|
|
"running_avg": newTransformFuncRunning(runningAvg),
|
|
"running_max": newTransformFuncRunning(runningMax),
|
|
"running_min": newTransformFuncRunning(runningMin),
|
|
"running_sum": newTransformFuncRunning(runningSum),
|
|
"scalar": transformScalar,
|
|
"sgn": transformSgn,
|
|
"sin": newTransformFuncOneArg(transformSin),
|
|
"sinh": newTransformFuncOneArg(transformSinh),
|
|
"smooth_exponential": transformSmoothExponential,
|
|
"sort": newTransformFuncSort(false),
|
|
"sort_by_label": newTransformFuncSortByLabel(false),
|
|
"sort_by_label_desc": newTransformFuncSortByLabel(true),
|
|
"sort_desc": newTransformFuncSort(true),
|
|
"sqrt": newTransformFuncOneArg(transformSqrt),
|
|
"start": newTransformFuncZeroArgs(transformStart),
|
|
"step": newTransformFuncZeroArgs(transformStep),
|
|
"tan": newTransformFuncOneArg(transformTan),
|
|
"tanh": newTransformFuncOneArg(transformTanh),
|
|
"time": transformTime,
|
|
// "timestamp" has been moved to rollup funcs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415
|
|
"timezone_offset": transformTimezoneOffset,
|
|
"union": transformUnion,
|
|
"vector": transformVector,
|
|
"year": newTransformFuncDateTime(transformYear),
|
|
}
|
|
|
|
// These functions don't change physical meaning of input time series,
|
|
// so they don't drop metric name
|
|
var transformFuncsKeepMetricName = map[string]bool{
|
|
"ceil": true,
|
|
"clamp": true,
|
|
"clamp_max": true,
|
|
"clamp_min": true,
|
|
"floor": true,
|
|
"interpolate": true,
|
|
"keep_last_value": true,
|
|
"keep_next_value": true,
|
|
"range_avg": true,
|
|
"range_first": true,
|
|
"range_last": true,
|
|
"range_max": true,
|
|
"range_min": true,
|
|
"range_quantile": true,
|
|
"round": true,
|
|
"running_avg": true,
|
|
"running_max": true,
|
|
"running_min": true,
|
|
"smooth_exponential": true,
|
|
}
|
|
|
|
func getTransformFunc(s string) transformFunc {
|
|
s = strings.ToLower(s)
|
|
return transformFuncs[s]
|
|
}
|
|
|
|
type transformFuncArg struct {
|
|
ec *EvalConfig
|
|
fe *metricsql.FuncExpr
|
|
args [][]*timeseries
|
|
}
|
|
|
|
type transformFunc func(tfa *transformFuncArg) ([]*timeseries, error)
|
|
|
|
func newTransformFuncOneArg(tf func(v float64) float64) transformFunc {
|
|
tfe := func(values []float64) {
|
|
for i, v := range values {
|
|
values[i] = tf(v)
|
|
}
|
|
}
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
return doTransformValues(args[0], tfe, tfa.fe)
|
|
}
|
|
}
|
|
|
|
func doTransformValues(arg []*timeseries, tf func(values []float64), fe *metricsql.FuncExpr) ([]*timeseries, error) {
|
|
name := strings.ToLower(fe.Name)
|
|
keepMetricNames := fe.KeepMetricNames
|
|
if transformFuncsKeepMetricName[name] {
|
|
keepMetricNames = true
|
|
}
|
|
for _, ts := range arg {
|
|
if !keepMetricNames {
|
|
ts.MetricName.ResetMetricGroup()
|
|
}
|
|
tf(ts.Values)
|
|
}
|
|
return arg, nil
|
|
}
|
|
|
|
func transformAbs(v float64) float64 {
|
|
return math.Abs(v)
|
|
}
|
|
|
|
func transformAbsent(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := args[0]
|
|
rvs := getAbsentTimeseries(tfa.ec, tfa.fe.Args[0])
|
|
if len(tss) == 0 {
|
|
return rvs, nil
|
|
}
|
|
for i := range tss[0].Values {
|
|
isAbsent := true
|
|
for _, ts := range tss {
|
|
if !math.IsNaN(ts.Values[i]) {
|
|
isAbsent = false
|
|
break
|
|
}
|
|
}
|
|
if !isAbsent {
|
|
rvs[0].Values[i] = nan
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func getAbsentTimeseries(ec *EvalConfig, arg metricsql.Expr) []*timeseries {
|
|
// Copy tags from arg
|
|
rvs := evalNumber(ec, 1)
|
|
rv := rvs[0]
|
|
me, ok := arg.(*metricsql.MetricExpr)
|
|
if !ok {
|
|
return rvs
|
|
}
|
|
tfs := searchutils.ToTagFilters(me.LabelFilters)
|
|
for i := range tfs {
|
|
tf := &tfs[i]
|
|
if len(tf.Key) == 0 {
|
|
continue
|
|
}
|
|
if tf.IsRegexp || tf.IsNegative {
|
|
continue
|
|
}
|
|
rv.MetricName.AddTagBytes(tf.Key, tf.Value)
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func transformCeil(v float64) float64 {
|
|
return math.Ceil(v)
|
|
}
|
|
|
|
func transformClamp(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
mins, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
maxs, err := getScalar(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if v > maxs[i] {
|
|
values[i] = maxs[i]
|
|
} else if v < mins[i] {
|
|
values[i] = mins[i]
|
|
}
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformClampMax(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
maxs, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if v > maxs[i] {
|
|
values[i] = maxs[i]
|
|
}
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformClampMin(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
mins, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if v < mins[i] {
|
|
values[i] = mins[i]
|
|
}
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func newTransformFuncDateTime(f func(t time.Time) int) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) > 1 {
|
|
return nil, fmt.Errorf(`too many args; got %d; want up to %d`, len(args), 1)
|
|
}
|
|
var arg []*timeseries
|
|
if len(args) == 0 {
|
|
arg = evalTime(tfa.ec)
|
|
} else {
|
|
arg = args[0]
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
t := time.Unix(int64(v), 0).UTC()
|
|
values[i] = float64(f(t))
|
|
}
|
|
}
|
|
return doTransformValues(arg, tf, tfa.fe)
|
|
}
|
|
}
|
|
|
|
func transformDayOfMonth(t time.Time) int {
|
|
return t.Day()
|
|
}
|
|
|
|
func transformDayOfWeek(t time.Time) int {
|
|
return int(t.Weekday())
|
|
}
|
|
|
|
func transformDaysInMonth(t time.Time) int {
|
|
m := t.Month()
|
|
if m == 2 && isLeapYear(uint32(t.Year())) {
|
|
return 29
|
|
}
|
|
return daysInMonth[m]
|
|
}
|
|
|
|
func transformExp(v float64) float64 {
|
|
return math.Exp(v)
|
|
}
|
|
|
|
func transformFloor(v float64) float64 {
|
|
return math.Floor(v)
|
|
}
|
|
|
|
func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
limits, err := getScalar(args[0], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
limit := 0
|
|
if len(limits) > 0 {
|
|
limit = int(limits[0])
|
|
}
|
|
if limit <= 0 {
|
|
return nil, nil
|
|
}
|
|
if limit < 3 {
|
|
// Preserve the first and the last bucket for better accuracy for min and max values.
|
|
limit = 3
|
|
}
|
|
tss := vmrangeBucketsToLE(args[1])
|
|
if len(tss) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Group timeseries by all MetricGroup+tags excluding `le` tag.
|
|
type x struct {
|
|
le float64
|
|
hits float64
|
|
ts *timeseries
|
|
}
|
|
m := make(map[string][]x)
|
|
var b []byte
|
|
var mn storage.MetricName
|
|
for _, ts := range tss {
|
|
leStr := ts.MetricName.GetTagValue("le")
|
|
if len(leStr) == 0 {
|
|
// Skip time series without `le` tag.
|
|
continue
|
|
}
|
|
le, err := strconv.ParseFloat(string(leStr), 64)
|
|
if err != nil {
|
|
// Skip time series with invalid `le` tag.
|
|
continue
|
|
}
|
|
mn.CopyFrom(&ts.MetricName)
|
|
mn.RemoveTag("le")
|
|
b = marshalMetricNameSorted(b[:0], &mn)
|
|
m[string(b)] = append(m[string(b)], x{
|
|
le: le,
|
|
ts: ts,
|
|
})
|
|
}
|
|
|
|
// Remove buckets with the smallest counters.
|
|
rvs := make([]*timeseries, 0, len(tss))
|
|
for _, leGroup := range m {
|
|
if len(leGroup) <= limit {
|
|
// Fast path - the number of buckets doesn't exceed the given limit.
|
|
// Keep all the buckets as is.
|
|
for _, xx := range leGroup {
|
|
rvs = append(rvs, xx.ts)
|
|
}
|
|
continue
|
|
}
|
|
// Slow path - remove buckets with the smallest number of hits until their count reaches the limit.
|
|
|
|
// Calculate per-bucket hits.
|
|
sort.Slice(leGroup, func(i, j int) bool {
|
|
return leGroup[i].le < leGroup[j].le
|
|
})
|
|
for n := range limits {
|
|
prevValue := float64(0)
|
|
for i := range leGroup {
|
|
xx := &leGroup[i]
|
|
value := xx.ts.Values[n]
|
|
xx.hits += value - prevValue
|
|
prevValue = value
|
|
}
|
|
}
|
|
for len(leGroup) > limit {
|
|
// Preserve the first and the last bucket for better accuracy for min and max values
|
|
xxMinIdx := 1
|
|
minMergeHits := leGroup[1].hits + leGroup[2].hits
|
|
for i := range leGroup[1 : len(leGroup)-2] {
|
|
mergeHits := leGroup[i+1].hits + leGroup[i+2].hits
|
|
if mergeHits < minMergeHits {
|
|
xxMinIdx = i + 1
|
|
minMergeHits = mergeHits
|
|
}
|
|
}
|
|
leGroup[xxMinIdx+1].hits += leGroup[xxMinIdx].hits
|
|
leGroup = append(leGroup[:xxMinIdx], leGroup[xxMinIdx+1:]...)
|
|
}
|
|
for _, xx := range leGroup {
|
|
rvs = append(rvs, xx.ts)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformPrometheusBuckets(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := vmrangeBucketsToLE(args[0])
|
|
return rvs, nil
|
|
}
|
|
|
|
func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
|
|
rvs := make([]*timeseries, 0, len(tss))
|
|
|
|
// Group timeseries by MetricGroup+tags excluding `vmrange` tag.
|
|
type x struct {
|
|
startStr string
|
|
endStr string
|
|
start float64
|
|
end float64
|
|
ts *timeseries
|
|
}
|
|
m := make(map[string][]x)
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
for _, ts := range tss {
|
|
vmrange := ts.MetricName.GetTagValue("vmrange")
|
|
if len(vmrange) == 0 {
|
|
if le := ts.MetricName.GetTagValue("le"); len(le) > 0 {
|
|
// Keep Prometheus-compatible buckets.
|
|
rvs = append(rvs, ts)
|
|
}
|
|
continue
|
|
}
|
|
n := strings.Index(bytesutil.ToUnsafeString(vmrange), "...")
|
|
if n < 0 {
|
|
continue
|
|
}
|
|
startStr := string(vmrange[:n])
|
|
start, err := strconv.ParseFloat(startStr, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
endStr := string(vmrange[n+len("..."):])
|
|
end, err := strconv.ParseFloat(endStr, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
ts.MetricName.RemoveTag("le")
|
|
ts.MetricName.RemoveTag("vmrange")
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
m[string(bb.B)] = append(m[string(bb.B)], x{
|
|
startStr: startStr,
|
|
endStr: endStr,
|
|
start: start,
|
|
end: end,
|
|
ts: ts,
|
|
})
|
|
}
|
|
|
|
// Convert `vmrange` label in each group of time series to `le` label.
|
|
copyTS := func(src *timeseries, leStr string) *timeseries {
|
|
var ts timeseries
|
|
ts.CopyFromShallowTimestamps(src)
|
|
values := ts.Values
|
|
for i := range values {
|
|
values[i] = 0
|
|
}
|
|
ts.MetricName.RemoveTag("le")
|
|
ts.MetricName.AddTag("le", leStr)
|
|
return &ts
|
|
}
|
|
isZeroTS := func(ts *timeseries) bool {
|
|
for _, v := range ts.Values {
|
|
if v > 0 {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool { return xss[i].end < xss[j].end })
|
|
xssNew := make([]x, 0, len(xss)+2)
|
|
var xsPrev x
|
|
uniqTs := make(map[string]*timeseries, len(xss))
|
|
for _, xs := range xss {
|
|
ts := xs.ts
|
|
if isZeroTS(ts) {
|
|
// Skip time series with zeros. They are substituted by xssNew below.
|
|
xsPrev = xs
|
|
continue
|
|
}
|
|
if xs.start != xsPrev.end && uniqTs[xs.startStr] == nil {
|
|
uniqTs[xs.startStr] = xs.ts
|
|
xssNew = append(xssNew, x{
|
|
endStr: xs.startStr,
|
|
end: xs.start,
|
|
ts: copyTS(ts, xs.startStr),
|
|
})
|
|
}
|
|
ts.MetricName.AddTag("le", xs.endStr)
|
|
prevTs := uniqTs[xs.endStr]
|
|
if prevTs != nil {
|
|
// the end of the current bucket is not unique, need to merge it with the existing bucket.
|
|
mergeNonOverlappingTimeseries(prevTs, xs.ts)
|
|
} else {
|
|
xssNew = append(xssNew, xs)
|
|
uniqTs[xs.endStr] = xs.ts
|
|
}
|
|
xsPrev = xs
|
|
}
|
|
if !math.IsInf(xsPrev.end, 1) {
|
|
xssNew = append(xssNew, x{
|
|
endStr: "+Inf",
|
|
end: math.Inf(1),
|
|
ts: copyTS(xsPrev.ts, "+Inf"),
|
|
})
|
|
}
|
|
xss = xssNew
|
|
for i := range xss[0].ts.Values {
|
|
count := float64(0)
|
|
for _, xs := range xss {
|
|
ts := xs.ts
|
|
v := ts.Values[i]
|
|
if !math.IsNaN(v) && v > 0 {
|
|
count += v
|
|
}
|
|
ts.Values[i] = count
|
|
}
|
|
}
|
|
for _, xs := range xss {
|
|
rvs = append(rvs, xs.ts)
|
|
}
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func transformHistogramShare(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 || len(args) > 3 {
|
|
return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args))
|
|
}
|
|
les, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse le: %w", err)
|
|
}
|
|
|
|
// Convert buckets with `vmrange` labels to buckets with `le` labels.
|
|
tss := vmrangeBucketsToLE(args[1])
|
|
|
|
// Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details.
|
|
var boundsLabel string
|
|
if len(args) > 2 {
|
|
s, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %w", err)
|
|
}
|
|
boundsLabel = s
|
|
}
|
|
|
|
// Group metrics by all tags excluding "le"
|
|
m := groupLeTimeseries(tss)
|
|
|
|
// Calculate share for les
|
|
share := func(i int, les []float64, xss []leTimeseries) (q, lower, upper float64) {
|
|
leReq := les[i]
|
|
if math.IsNaN(leReq) || len(xss) == 0 {
|
|
return nan, nan, nan
|
|
}
|
|
fixBrokenBuckets(i, xss)
|
|
if leReq < 0 {
|
|
return 0, 0, 0
|
|
}
|
|
if math.IsInf(leReq, 1) {
|
|
return 1, 1, 1
|
|
}
|
|
var vPrev, lePrev float64
|
|
for _, xs := range xss {
|
|
v := xs.ts.Values[i]
|
|
le := xs.le
|
|
if leReq >= le {
|
|
vPrev = v
|
|
lePrev = le
|
|
continue
|
|
}
|
|
// precondition: lePrev <= leReq < le
|
|
vLast := xss[len(xss)-1].ts.Values[i]
|
|
lower = vPrev / vLast
|
|
if math.IsInf(le, 1) {
|
|
return lower, lower, 1
|
|
}
|
|
if lePrev == leReq {
|
|
return lower, lower, lower
|
|
}
|
|
upper = v / vLast
|
|
q = lower + (v-vPrev)/vLast*(leReq-lePrev)/(le-lePrev)
|
|
return q, lower, upper
|
|
}
|
|
// precondition: leReq > leLast
|
|
return 1, 1, 1
|
|
}
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
var tsLower, tsUpper *timeseries
|
|
if len(boundsLabel) > 0 {
|
|
tsLower = ×eries{}
|
|
tsLower.CopyFromShallowTimestamps(dst)
|
|
tsLower.MetricName.RemoveTag(boundsLabel)
|
|
tsLower.MetricName.AddTag(boundsLabel, "lower")
|
|
tsUpper = ×eries{}
|
|
tsUpper.CopyFromShallowTimestamps(dst)
|
|
tsUpper.MetricName.RemoveTag(boundsLabel)
|
|
tsUpper.MetricName.AddTag(boundsLabel, "upper")
|
|
}
|
|
for i := range dst.Values {
|
|
q, lower, upper := share(i, les, xss)
|
|
dst.Values[i] = q
|
|
if len(boundsLabel) > 0 {
|
|
tsLower.Values[i] = lower
|
|
tsUpper.Values[i] = upper
|
|
}
|
|
}
|
|
rvs = append(rvs, dst)
|
|
if len(boundsLabel) > 0 {
|
|
rvs = append(rvs, tsLower)
|
|
rvs = append(rvs, tsUpper)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramAvg(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := vmrangeBucketsToLE(args[0])
|
|
m := groupLeTimeseries(tss)
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
for i := range dst.Values {
|
|
dst.Values[i] = avgForLeTimeseries(i, xss)
|
|
}
|
|
rvs = append(rvs, dst)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramStddev(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := vmrangeBucketsToLE(args[0])
|
|
m := groupLeTimeseries(tss)
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
for i := range dst.Values {
|
|
v := stdvarForLeTimeseries(i, xss)
|
|
dst.Values[i] = math.Sqrt(v)
|
|
}
|
|
rvs = append(rvs, dst)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramStdvar(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tss := vmrangeBucketsToLE(args[0])
|
|
m := groupLeTimeseries(tss)
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
for i := range dst.Values {
|
|
dst.Values[i] = stdvarForLeTimeseries(i, xss)
|
|
}
|
|
rvs = append(rvs, dst)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func avgForLeTimeseries(i int, xss []leTimeseries) float64 {
|
|
lePrev := float64(0)
|
|
vPrev := float64(0)
|
|
sum := float64(0)
|
|
weightTotal := float64(0)
|
|
for _, xs := range xss {
|
|
if math.IsInf(xs.le, 0) {
|
|
continue
|
|
}
|
|
le := xs.le
|
|
n := (le + lePrev) / 2
|
|
v := xs.ts.Values[i]
|
|
weight := v - vPrev
|
|
sum += n * weight
|
|
weightTotal += weight
|
|
lePrev = le
|
|
vPrev = v
|
|
}
|
|
if weightTotal == 0 {
|
|
return nan
|
|
}
|
|
return sum / weightTotal
|
|
}
|
|
|
|
func stdvarForLeTimeseries(i int, xss []leTimeseries) float64 {
|
|
lePrev := float64(0)
|
|
vPrev := float64(0)
|
|
sum := float64(0)
|
|
sum2 := float64(0)
|
|
weightTotal := float64(0)
|
|
for _, xs := range xss {
|
|
if math.IsInf(xs.le, 0) {
|
|
continue
|
|
}
|
|
le := xs.le
|
|
n := (le + lePrev) / 2
|
|
v := xs.ts.Values[i]
|
|
weight := v - vPrev
|
|
sum += n * weight
|
|
sum2 += n * n * weight
|
|
weightTotal += weight
|
|
lePrev = le
|
|
vPrev = v
|
|
}
|
|
if weightTotal == 0 {
|
|
return nan
|
|
}
|
|
avg := sum / weightTotal
|
|
avg2 := sum2 / weightTotal
|
|
stdvar := avg2 - avg*avg
|
|
if stdvar < 0 {
|
|
// Correct possible calculation error.
|
|
stdvar = 0
|
|
}
|
|
return stdvar
|
|
}
|
|
|
|
func transformHistogramQuantiles(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 3 {
|
|
return nil, fmt.Errorf("unexpected number of args: %d; expecting at least 3 args", len(args))
|
|
}
|
|
dstLabel, err := getString(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot obtain dstLabel: %w", err)
|
|
}
|
|
phiArgs := args[1 : len(args)-1]
|
|
tssOrig := args[len(args)-1]
|
|
// Calculate quantile individually per each phi.
|
|
var rvs []*timeseries
|
|
for i, phiArg := range phiArgs {
|
|
phis, err := getScalar(phiArg, i)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse phi: %w", err)
|
|
}
|
|
phiStr := fmt.Sprintf("%g", phis[0])
|
|
tss := copyTimeseries(tssOrig)
|
|
tfaTmp := &transformFuncArg{
|
|
ec: tfa.ec,
|
|
fe: tfa.fe,
|
|
args: [][]*timeseries{
|
|
phiArg,
|
|
tss,
|
|
},
|
|
}
|
|
tssTmp, err := transformHistogramQuantile(tfaTmp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot calculate quantile %s: %w", phiStr, err)
|
|
}
|
|
for _, ts := range tssTmp {
|
|
ts.MetricName.RemoveTag(dstLabel)
|
|
ts.MetricName.AddTag(dstLabel, phiStr)
|
|
}
|
|
rvs = append(rvs, tssTmp...)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformHistogramQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 || len(args) > 3 {
|
|
return nil, fmt.Errorf("unexpected number of args; got %d; want 2...3", len(args))
|
|
}
|
|
phis, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse phi: %w", err)
|
|
}
|
|
|
|
// Convert buckets with `vmrange` labels to buckets with `le` labels.
|
|
tss := vmrangeBucketsToLE(args[1])
|
|
|
|
// Parse boundsLabel. See https://github.com/prometheus/prometheus/issues/5706 for details.
|
|
var boundsLabel string
|
|
if len(args) > 2 {
|
|
s, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse boundsLabel (arg #3): %w", err)
|
|
}
|
|
boundsLabel = s
|
|
}
|
|
|
|
// Group metrics by all tags excluding "le"
|
|
m := groupLeTimeseries(tss)
|
|
|
|
// Calculate quantile for each group in m
|
|
lastNonInf := func(i int, xss []leTimeseries) float64 {
|
|
for len(xss) > 0 {
|
|
xsLast := xss[len(xss)-1]
|
|
if !math.IsInf(xsLast.le, 0) {
|
|
return xsLast.le
|
|
}
|
|
xss = xss[:len(xss)-1]
|
|
}
|
|
return nan
|
|
}
|
|
quantile := func(i int, phis []float64, xss []leTimeseries) (q, lower, upper float64) {
|
|
phi := phis[i]
|
|
if math.IsNaN(phi) {
|
|
return nan, nan, nan
|
|
}
|
|
fixBrokenBuckets(i, xss)
|
|
vLast := float64(0)
|
|
if len(xss) > 0 {
|
|
vLast = xss[len(xss)-1].ts.Values[i]
|
|
}
|
|
if vLast == 0 {
|
|
return nan, nan, nan
|
|
}
|
|
if phi < 0 {
|
|
return -inf, -inf, xss[0].ts.Values[i]
|
|
}
|
|
if phi > 1 {
|
|
return inf, vLast, inf
|
|
}
|
|
vReq := vLast * phi
|
|
vPrev := float64(0)
|
|
lePrev := float64(0)
|
|
for _, xs := range xss {
|
|
v := xs.ts.Values[i]
|
|
le := xs.le
|
|
if v <= 0 {
|
|
// Skip zero buckets.
|
|
lePrev = le
|
|
continue
|
|
}
|
|
if v < vReq {
|
|
vPrev = v
|
|
lePrev = le
|
|
continue
|
|
}
|
|
if math.IsInf(le, 0) {
|
|
break
|
|
}
|
|
if v == vPrev {
|
|
return lePrev, lePrev, v
|
|
}
|
|
vv := lePrev + (le-lePrev)*(vReq-vPrev)/(v-vPrev)
|
|
return vv, lePrev, le
|
|
}
|
|
vv := lastNonInf(i, xss)
|
|
return vv, vv, inf
|
|
}
|
|
rvs := make([]*timeseries, 0, len(m))
|
|
for _, xss := range m {
|
|
sort.Slice(xss, func(i, j int) bool {
|
|
return xss[i].le < xss[j].le
|
|
})
|
|
dst := xss[0].ts
|
|
var tsLower, tsUpper *timeseries
|
|
if len(boundsLabel) > 0 {
|
|
tsLower = ×eries{}
|
|
tsLower.CopyFromShallowTimestamps(dst)
|
|
tsLower.MetricName.RemoveTag(boundsLabel)
|
|
tsLower.MetricName.AddTag(boundsLabel, "lower")
|
|
tsUpper = ×eries{}
|
|
tsUpper.CopyFromShallowTimestamps(dst)
|
|
tsUpper.MetricName.RemoveTag(boundsLabel)
|
|
tsUpper.MetricName.AddTag(boundsLabel, "upper")
|
|
}
|
|
for i := range dst.Values {
|
|
v, lower, upper := quantile(i, phis, xss)
|
|
dst.Values[i] = v
|
|
if len(boundsLabel) > 0 {
|
|
tsLower.Values[i] = lower
|
|
tsUpper.Values[i] = upper
|
|
}
|
|
}
|
|
rvs = append(rvs, dst)
|
|
if len(boundsLabel) > 0 {
|
|
rvs = append(rvs, tsLower)
|
|
rvs = append(rvs, tsUpper)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
type leTimeseries struct {
|
|
le float64
|
|
ts *timeseries
|
|
}
|
|
|
|
func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries {
|
|
m := make(map[string][]leTimeseries)
|
|
bb := bbPool.Get()
|
|
for _, ts := range tss {
|
|
tagValue := ts.MetricName.GetTagValue("le")
|
|
if len(tagValue) == 0 {
|
|
continue
|
|
}
|
|
le, err := strconv.ParseFloat(bytesutil.ToUnsafeString(tagValue), 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
ts.MetricName.ResetMetricGroup()
|
|
ts.MetricName.RemoveTag("le")
|
|
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
|
|
m[string(bb.B)] = append(m[string(bb.B)], leTimeseries{
|
|
le: le,
|
|
ts: ts,
|
|
})
|
|
}
|
|
bbPool.Put(bb)
|
|
return m
|
|
}
|
|
|
|
func fixBrokenBuckets(i int, xss []leTimeseries) {
|
|
// Buckets are already sorted by le, so their values must be in ascending order,
|
|
// since the next bucket includes all the previous buckets.
|
|
// If the next bucket has lower value than the current bucket,
|
|
// then the current bucket must be substituted with the next bucket value.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2819
|
|
if len(xss) < 2 {
|
|
return
|
|
}
|
|
for j := len(xss) - 1; j >= 0; j-- {
|
|
v := xss[j].ts.Values[i]
|
|
if !math.IsNaN(v) {
|
|
j++
|
|
for j < len(xss) {
|
|
xss[j].ts.Values[i] = v
|
|
j++
|
|
}
|
|
break
|
|
}
|
|
}
|
|
vNext := xss[len(xss)-1].ts.Values[i]
|
|
for j := len(xss) - 2; j >= 0; j-- {
|
|
v := xss[j].ts.Values[i]
|
|
if math.IsNaN(v) || v > vNext {
|
|
xss[j].ts.Values[i] = vNext
|
|
} else {
|
|
vNext = v
|
|
}
|
|
}
|
|
}
|
|
|
|
func transformHour(t time.Time) int {
|
|
return t.Hour()
|
|
}
|
|
|
|
func runningSum(a, b float64, idx int) float64 {
|
|
return a + b
|
|
}
|
|
|
|
func runningMax(a, b float64, idx int) float64 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func runningMin(a, b float64, idx int) float64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func runningAvg(a, b float64, idx int) float64 {
|
|
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
|
|
return a + (b-a)/float64(idx+1)
|
|
}
|
|
|
|
func skipLeadingNaNs(values []float64) []float64 {
|
|
i := 0
|
|
for i < len(values) && math.IsNaN(values[i]) {
|
|
i++
|
|
}
|
|
return values[i:]
|
|
}
|
|
|
|
func skipTrailingNaNs(values []float64) []float64 {
|
|
i := len(values) - 1
|
|
for i >= 0 && math.IsNaN(values[i]) {
|
|
i--
|
|
}
|
|
return values[:i+1]
|
|
}
|
|
|
|
func transformKeepLastValue(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := ts.Values
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
lastValue := values[0]
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
lastValue = v
|
|
continue
|
|
}
|
|
values[i] = lastValue
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformKeepNextValue(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := ts.Values
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
nextValue := values[len(values)-1]
|
|
for i := len(values) - 1; i >= 0; i-- {
|
|
v := values[i]
|
|
if !math.IsNaN(v) {
|
|
nextValue = v
|
|
continue
|
|
}
|
|
values[i] = nextValue
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformInterpolate(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := ts.Values
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
prevValue := nan
|
|
var nextValue float64
|
|
for i := 0; i < len(values); i++ {
|
|
if !math.IsNaN(values[i]) {
|
|
continue
|
|
}
|
|
if i > 0 {
|
|
prevValue = values[i-1]
|
|
}
|
|
j := i + 1
|
|
for j < len(values) {
|
|
if !math.IsNaN(values[j]) {
|
|
break
|
|
}
|
|
j++
|
|
}
|
|
if j >= len(values) {
|
|
nextValue = prevValue
|
|
} else {
|
|
nextValue = values[j]
|
|
}
|
|
if math.IsNaN(prevValue) {
|
|
prevValue = nextValue
|
|
}
|
|
delta := (nextValue - prevValue) / float64(j-i+1)
|
|
for i < j {
|
|
prevValue += delta
|
|
values[i] = prevValue
|
|
i++
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func newTransformFuncRunning(rf func(a, b float64, idx int) float64) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.ResetMetricGroup()
|
|
values := skipLeadingNaNs(ts.Values)
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
prevValue := values[0]
|
|
values = values[1:]
|
|
for i, v := range values {
|
|
if !math.IsNaN(v) {
|
|
prevValue = rf(prevValue, v, i+1)
|
|
}
|
|
values[i] = prevValue
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func newTransformFuncRange(rf func(a, b float64, idx int) float64) transformFunc {
|
|
tfr := newTransformFuncRunning(rf)
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
rvs, err := tfr(tfa)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
setLastValues(rvs)
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func transformRangeQuantile(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
phis, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
phi := float64(0)
|
|
if len(phis) > 0 {
|
|
phi = phis[0]
|
|
}
|
|
rvs := args[1]
|
|
a := getFloat64s()
|
|
values := a.A[:0]
|
|
for _, ts := range rvs {
|
|
lastIdx := -1
|
|
originValues := ts.Values
|
|
values = values[:0]
|
|
for i, v := range originValues {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
values = append(values, v)
|
|
lastIdx = i
|
|
}
|
|
if lastIdx >= 0 {
|
|
sort.Float64s(values)
|
|
originValues[lastIdx] = quantileSorted(phi, values)
|
|
}
|
|
}
|
|
a.A = values
|
|
putFloat64s(a)
|
|
setLastValues(rvs)
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformRangeFirst(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := skipLeadingNaNs(ts.Values)
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
vFirst := values[0]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
values[i] = vFirst
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformRangeLast(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
setLastValues(rvs)
|
|
return rvs, nil
|
|
}
|
|
|
|
func setLastValues(tss []*timeseries) {
|
|
for _, ts := range tss {
|
|
values := skipTrailingNaNs(ts.Values)
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
vLast := values[len(values)-1]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
values[i] = vLast
|
|
}
|
|
}
|
|
}
|
|
|
|
func transformSmoothExponential(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
sfs, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
values := skipLeadingNaNs(ts.Values)
|
|
for i, v := range values {
|
|
if !math.IsInf(v, 0) {
|
|
values = values[i:]
|
|
break
|
|
}
|
|
}
|
|
if len(values) == 0 {
|
|
continue
|
|
}
|
|
avg := values[0]
|
|
values = values[1:]
|
|
sfsX := sfs[len(ts.Values)-len(values):]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
if math.IsInf(v, 0) {
|
|
values[i] = avg
|
|
continue
|
|
}
|
|
sf := sfsX[i]
|
|
if math.IsNaN(sf) {
|
|
sf = 1
|
|
}
|
|
if sf < 0 {
|
|
sf = 0
|
|
}
|
|
if sf > 1 {
|
|
sf = 1
|
|
}
|
|
avg = avg*(1-sf) + v*sf
|
|
values[i] = avg
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformRemoveResets(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
removeCounterResetsMaybeNaNs(ts.Values)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformUnion(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return evalNumber(tfa.ec, nan), nil
|
|
}
|
|
|
|
rvs := make([]*timeseries, 0, len(args[0]))
|
|
m := make(map[string]bool, len(args[0]))
|
|
bb := bbPool.Get()
|
|
for _, arg := range args {
|
|
for _, ts := range arg {
|
|
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
|
|
if m[string(bb.B)] {
|
|
continue
|
|
}
|
|
m[string(bb.B)] = true
|
|
rvs = append(rvs, ts)
|
|
}
|
|
}
|
|
bbPool.Put(bb)
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelKeep(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
var keepLabels []string
|
|
for i := 1; i < len(args); i++ {
|
|
keepLabel, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
keepLabels = append(keepLabels, keepLabel)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.RemoveTagsOn(keepLabels)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelDel(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
var delLabels []string
|
|
for i := 1; i < len(args); i++ {
|
|
delLabel, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
delLabels = append(delLabels, delLabel)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.RemoveTagsIgnoring(delLabels)
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelSet(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
dstLabels, dstValues, err := getStringPairs(args[1:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
for i, dstLabel := range dstLabels {
|
|
value := dstValues[i]
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
*dstValue = append((*dstValue)[:0], value...)
|
|
if len(value) == 0 {
|
|
mn.RemoveTag(dstLabel)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelUppercase(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelValueFunc(tfa, strings.ToUpper)
|
|
}
|
|
|
|
func transformLabelLowercase(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelValueFunc(tfa, strings.ToLower)
|
|
}
|
|
|
|
func transformLabelValueFunc(tfa *transformFuncArg, f func(string) string) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 2)
|
|
}
|
|
labels := make([]string, 0, len(args)-1)
|
|
for i := 1; i < len(args); i++ {
|
|
label, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
labels = append(labels, label)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
for _, label := range labels {
|
|
dstValue := getDstValue(mn, label)
|
|
*dstValue = append((*dstValue)[:0], f(string(*dstValue))...)
|
|
if len(*dstValue) == 0 {
|
|
mn.RemoveTag(label)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelMap(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 2)
|
|
}
|
|
label, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot read label name: %w", err)
|
|
}
|
|
srcValues, dstValues, err := getStringPairs(args[2:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m := make(map[string]string, len(srcValues))
|
|
for i, srcValue := range srcValues {
|
|
m[srcValue] = dstValues[i]
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
dstValue := getDstValue(mn, label)
|
|
value, ok := m[string(*dstValue)]
|
|
if ok {
|
|
*dstValue = append((*dstValue)[:0], value...)
|
|
}
|
|
if len(*dstValue) == 0 {
|
|
mn.RemoveTag(label)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformDropCommonLabels(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
rvs := args[0]
|
|
for _, tss := range args[1:] {
|
|
rvs = append(rvs, tss...)
|
|
}
|
|
m := make(map[string]map[string]int)
|
|
countLabel := func(name, value string) {
|
|
x := m[name]
|
|
if x == nil {
|
|
x = make(map[string]int)
|
|
m[name] = x
|
|
}
|
|
x[value]++
|
|
}
|
|
for _, ts := range rvs {
|
|
countLabel("__name__", string(ts.MetricName.MetricGroup))
|
|
for _, tag := range ts.MetricName.Tags {
|
|
countLabel(string(tag.Key), string(tag.Value))
|
|
}
|
|
}
|
|
for labelName, x := range m {
|
|
for _, count := range x {
|
|
if count != len(rvs) {
|
|
continue
|
|
}
|
|
for _, ts := range rvs {
|
|
ts.MetricName.RemoveTag(labelName)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelCopy(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelCopyExt(tfa, false)
|
|
}
|
|
|
|
func transformLabelMove(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return transformLabelCopyExt(tfa, true)
|
|
}
|
|
|
|
func transformLabelCopyExt(tfa *transformFuncArg, removeSrcLabels bool) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 1 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 1)
|
|
}
|
|
srcLabels, dstLabels, err := getStringPairs(args[1:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
for i, srcLabel := range srcLabels {
|
|
dstLabel := dstLabels[i]
|
|
value := mn.GetTagValue(srcLabel)
|
|
if len(value) == 0 {
|
|
// Do not remove destination label if the source label doesn't exist.
|
|
continue
|
|
}
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
*dstValue = append((*dstValue)[:0], value...)
|
|
if removeSrcLabels && srcLabel != dstLabel {
|
|
mn.RemoveTag(srcLabel)
|
|
}
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func getStringPairs(args [][]*timeseries) ([]string, []string, error) {
|
|
if len(args)%2 != 0 {
|
|
return nil, nil, fmt.Errorf(`the number of string args must be even; got %d`, len(args))
|
|
}
|
|
var ks, vs []string
|
|
for i := 0; i < len(args); i += 2 {
|
|
k, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
ks = append(ks, k)
|
|
|
|
v, err := getString(args[i+1], i+1)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
vs = append(vs, v)
|
|
}
|
|
return ks, vs, nil
|
|
}
|
|
|
|
func transformLabelJoin(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 3 {
|
|
return nil, fmt.Errorf(`not enough args; got %d; want at least %d`, len(args), 3)
|
|
}
|
|
dstLabel, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
separator, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var srcLabels []string
|
|
for i := 3; i < len(args); i++ {
|
|
srcLabel, err := getString(args[i], i)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
srcLabels = append(srcLabels, srcLabel)
|
|
}
|
|
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
mn := &ts.MetricName
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
b := *dstValue
|
|
b = b[:0]
|
|
for j, srcLabel := range srcLabels {
|
|
srcValue := mn.GetTagValue(srcLabel)
|
|
b = append(b, srcValue...)
|
|
if j+1 < len(srcLabels) {
|
|
b = append(b, separator...)
|
|
}
|
|
}
|
|
*dstValue = b
|
|
if len(b) == 0 {
|
|
mn.RemoveTag(dstLabel)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelTransform(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 4); err != nil {
|
|
return nil, err
|
|
}
|
|
label, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
regex, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
replacement, err := getString(args[3], 3)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r, err := metricsql.CompileRegexp(regex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regex %q: %w`, regex, err)
|
|
}
|
|
return labelReplace(args[0], label, r, label, replacement)
|
|
}
|
|
|
|
func transformLabelReplace(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 5); err != nil {
|
|
return nil, err
|
|
}
|
|
dstLabel, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
replacement, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
srcLabel, err := getString(args[3], 3)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
regex, err := getString(args[4], 4)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r, err := metricsql.CompileRegexpAnchored(regex)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regex %q: %w`, regex, err)
|
|
}
|
|
return labelReplace(args[0], srcLabel, r, dstLabel, replacement)
|
|
}
|
|
|
|
func labelReplace(tss []*timeseries, srcLabel string, r *regexp.Regexp, dstLabel, replacement string) ([]*timeseries, error) {
|
|
replacementBytes := []byte(replacement)
|
|
for _, ts := range tss {
|
|
mn := &ts.MetricName
|
|
dstValue := getDstValue(mn, dstLabel)
|
|
srcValue := mn.GetTagValue(srcLabel)
|
|
if !r.Match(srcValue) {
|
|
continue
|
|
}
|
|
b := r.ReplaceAll(srcValue, replacementBytes)
|
|
*dstValue = append((*dstValue)[:0], b...)
|
|
if len(b) == 0 {
|
|
mn.RemoveTag(dstLabel)
|
|
}
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func transformLabelValue(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
labelName, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get label name: %w", err)
|
|
}
|
|
rvs := args[0]
|
|
for _, ts := range rvs {
|
|
ts.MetricName.ResetMetricGroup()
|
|
labelValue := ts.MetricName.GetTagValue(labelName)
|
|
v, err := strconv.ParseFloat(string(labelValue), 64)
|
|
if err != nil {
|
|
v = nan
|
|
}
|
|
values := ts.Values
|
|
for i, vOrig := range values {
|
|
if !math.IsNaN(vOrig) {
|
|
values[i] = v
|
|
}
|
|
}
|
|
}
|
|
// Do not remove timeseries with only NaN values, so `default` could be applied to them:
|
|
// label_value(q, "label") default 123
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelMatch(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
labelName, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get label name: %w", err)
|
|
}
|
|
labelRe, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get regexp: %w", err)
|
|
}
|
|
r, err := metricsql.CompileRegexpAnchored(labelRe)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regexp %q: %w`, labelRe, err)
|
|
}
|
|
tss := args[0]
|
|
rvs := tss[:0]
|
|
for _, ts := range tss {
|
|
labelValue := ts.MetricName.GetTagValue(labelName)
|
|
if r.Match(labelValue) {
|
|
rvs = append(rvs, ts)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelMismatch(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
labelName, err := getString(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get label name: %w", err)
|
|
}
|
|
labelRe, err := getString(args[2], 2)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get regexp: %w", err)
|
|
}
|
|
r, err := metricsql.CompileRegexpAnchored(labelRe)
|
|
if err != nil {
|
|
return nil, fmt.Errorf(`cannot compile regexp %q: %w`, labelRe, err)
|
|
}
|
|
tss := args[0]
|
|
rvs := tss[:0]
|
|
for _, ts := range tss {
|
|
labelValue := ts.MetricName.GetTagValue(labelName)
|
|
if !r.Match(labelValue) {
|
|
rvs = append(rvs, ts)
|
|
}
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLabelGraphiteGroup(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf("unexpected number of args: %d; want at least 2 args", len(args))
|
|
}
|
|
tss := args[0]
|
|
groupArgs := args[1:]
|
|
groupIDs := make([]int, len(groupArgs))
|
|
for i, arg := range groupArgs {
|
|
groupID, err := getIntNumber(arg, i+1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get group name from arg #%d: %w", i+1, err)
|
|
}
|
|
groupIDs[i] = groupID
|
|
}
|
|
for _, ts := range tss {
|
|
groups := bytes.Split(ts.MetricName.MetricGroup, dotSeparator)
|
|
groupName := ts.MetricName.MetricGroup[:0]
|
|
for j, groupID := range groupIDs {
|
|
if groupID >= 0 && groupID < len(groups) {
|
|
groupName = append(groupName, groups[groupID]...)
|
|
}
|
|
if j < len(groupIDs)-1 {
|
|
groupName = append(groupName, '.')
|
|
}
|
|
}
|
|
ts.MetricName.MetricGroup = groupName
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
var dotSeparator = []byte(".")
|
|
|
|
func transformLimitOffset(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
limit, err := getIntNumber(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot obtain limit arg: %w", err)
|
|
}
|
|
offset, err := getIntNumber(args[1], 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot obtain offset arg: %w", err)
|
|
}
|
|
// removeEmptySeries so offset will be calculated after empty series
|
|
// were filtered out.
|
|
rvs := removeEmptySeries(args[2])
|
|
if len(rvs) >= offset {
|
|
rvs = rvs[offset:]
|
|
}
|
|
if len(rvs) > limit {
|
|
rvs = rvs[:limit]
|
|
}
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformLn(v float64) float64 {
|
|
return math.Log(v)
|
|
}
|
|
|
|
func transformLog2(v float64) float64 {
|
|
return math.Log2(v)
|
|
}
|
|
|
|
func transformLog10(v float64) float64 {
|
|
return math.Log10(v)
|
|
}
|
|
|
|
func transformMinute(t time.Time) int {
|
|
return t.Minute()
|
|
}
|
|
|
|
func transformMonth(t time.Time) int {
|
|
return int(t.Month())
|
|
}
|
|
|
|
func transformRound(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) != 1 && len(args) != 2 {
|
|
return nil, fmt.Errorf(`unexpected number of args: %d; want 1 or 2`, len(args))
|
|
}
|
|
var nearestArg []*timeseries
|
|
if len(args) == 1 {
|
|
nearestArg = evalNumber(tfa.ec, 1)
|
|
} else {
|
|
nearestArg = args[1]
|
|
}
|
|
nearest, err := getScalar(nearestArg, 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
var nPrev float64
|
|
var p10 float64
|
|
for i, v := range values {
|
|
n := nearest[i]
|
|
if n != nPrev {
|
|
nPrev = n
|
|
_, e := decimal.FromFloat(n)
|
|
p10 = math.Pow10(int(-e))
|
|
}
|
|
v += 0.5 * math.Copysign(n, v)
|
|
v -= math.Mod(v, n)
|
|
v, _ = math.Modf(v * p10)
|
|
values[i] = v / p10
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformSgn(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
sign := float64(0)
|
|
if v < 0 {
|
|
sign = -1
|
|
} else if v > 0 {
|
|
sign = 1
|
|
}
|
|
values[i] = sign
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
|
|
func transformScalar(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Verify whether the arg is a string.
|
|
// Then try converting the string to number.
|
|
if se, ok := tfa.fe.Args[0].(*metricsql.StringExpr); ok {
|
|
n, err := strconv.ParseFloat(se.S, 64)
|
|
if err != nil {
|
|
n = nan
|
|
}
|
|
return evalNumber(tfa.ec, n), nil
|
|
}
|
|
|
|
// The arg isn't a string. Extract scalar from it.
|
|
arg := args[0]
|
|
if len(arg) != 1 {
|
|
return evalNumber(tfa.ec, nan), nil
|
|
}
|
|
arg[0].MetricName.Reset()
|
|
return arg, nil
|
|
}
|
|
|
|
func newTransformFuncSortByLabel(isDesc bool) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) < 2 {
|
|
return nil, fmt.Errorf("expecting at least 2 args; got %d args", len(args))
|
|
}
|
|
var labels []string
|
|
for i, arg := range args[1:] {
|
|
label, err := getString(arg, 1)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot parse label #%d for sorting: %w", i+1, err)
|
|
}
|
|
labels = append(labels, label)
|
|
}
|
|
rvs := args[0]
|
|
sort.SliceStable(rvs, func(i, j int) bool {
|
|
for _, label := range labels {
|
|
a := rvs[i].MetricName.GetTagValue(label)
|
|
b := rvs[j].MetricName.GetTagValue(label)
|
|
if string(a) == string(b) {
|
|
continue
|
|
}
|
|
if isDesc {
|
|
return string(b) < string(a)
|
|
}
|
|
return string(a) < string(b)
|
|
}
|
|
return false
|
|
})
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func newTransformFuncSort(isDesc bool) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
sort.Slice(rvs, func(i, j int) bool {
|
|
a := rvs[i].Values
|
|
b := rvs[j].Values
|
|
n := len(a) - 1
|
|
for n >= 0 {
|
|
if !math.IsNaN(a[n]) {
|
|
if math.IsNaN(b[n]) {
|
|
return false
|
|
}
|
|
if a[n] != b[n] {
|
|
break
|
|
}
|
|
} else if !math.IsNaN(b[n]) {
|
|
return true
|
|
}
|
|
n--
|
|
}
|
|
if n < 0 {
|
|
return false
|
|
}
|
|
if isDesc {
|
|
return b[n] < a[n]
|
|
}
|
|
return a[n] < b[n]
|
|
})
|
|
return rvs, nil
|
|
}
|
|
}
|
|
|
|
func transformSqrt(v float64) float64 {
|
|
return math.Sqrt(v)
|
|
}
|
|
|
|
func transformSin(v float64) float64 {
|
|
return math.Sin(v)
|
|
}
|
|
|
|
func transformSinh(v float64) float64 {
|
|
return math.Sinh(v)
|
|
}
|
|
|
|
func transformCos(v float64) float64 {
|
|
return math.Cos(v)
|
|
}
|
|
|
|
func transformCosh(v float64) float64 {
|
|
return math.Cosh(v)
|
|
}
|
|
|
|
func transformTan(v float64) float64 {
|
|
return math.Tan(v)
|
|
}
|
|
|
|
func transformTanh(v float64) float64 {
|
|
return math.Tanh(v)
|
|
}
|
|
|
|
func transformAsin(v float64) float64 {
|
|
return math.Asin(v)
|
|
}
|
|
|
|
func transformAsinh(v float64) float64 {
|
|
return math.Asinh(v)
|
|
}
|
|
|
|
func transformAtan(v float64) float64 {
|
|
return math.Atan(v)
|
|
}
|
|
|
|
func transformAtanh(v float64) float64 {
|
|
return math.Atanh(v)
|
|
}
|
|
|
|
func transformAcos(v float64) float64 {
|
|
return math.Acos(v)
|
|
}
|
|
|
|
func transformAcosh(v float64) float64 {
|
|
return math.Acosh(v)
|
|
}
|
|
|
|
func transformDeg(v float64) float64 {
|
|
return v * 180 / math.Pi
|
|
}
|
|
|
|
func transformRad(v float64) float64 {
|
|
return v * math.Pi / 180
|
|
}
|
|
|
|
func newTransformRand(newRandFunc func(r *rand.Rand) func() float64) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if len(args) > 1 {
|
|
return nil, fmt.Errorf(`unexpected number of args; got %d; want 0 or 1`, len(args))
|
|
}
|
|
var seed int64
|
|
if len(args) == 1 {
|
|
tmp, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(tmp) > 0 {
|
|
seed = int64(tmp[0])
|
|
}
|
|
} else {
|
|
seed = time.Now().UnixNano()
|
|
}
|
|
source := rand.NewSource(seed)
|
|
r := rand.New(source)
|
|
randFunc := newRandFunc(r)
|
|
tss := evalNumber(tfa.ec, 0)
|
|
values := tss[0].Values
|
|
for i := range values {
|
|
values[i] = randFunc()
|
|
}
|
|
return tss, nil
|
|
}
|
|
}
|
|
|
|
func newRandFloat64(r *rand.Rand) func() float64 {
|
|
return r.Float64
|
|
}
|
|
|
|
func newRandNormFloat64(r *rand.Rand) func() float64 {
|
|
return r.NormFloat64
|
|
}
|
|
|
|
func newRandExpFloat64(r *rand.Rand) func() float64 {
|
|
return r.ExpFloat64
|
|
}
|
|
|
|
func transformPi(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
return evalNumber(tfa.ec, math.Pi), nil
|
|
}
|
|
|
|
func transformNow(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
now := float64(time.Now().UnixNano()) / 1e9
|
|
return evalNumber(tfa.ec, now), nil
|
|
}
|
|
|
|
func bitmapAnd(a, b uint64) uint64 {
|
|
return a & b
|
|
}
|
|
|
|
func bitmapOr(a, b uint64) uint64 {
|
|
return a | b
|
|
}
|
|
|
|
func bitmapXor(a, b uint64) uint64 {
|
|
return a ^ b
|
|
}
|
|
|
|
func newTransformBitmap(bitmapFunc func(a, b uint64) uint64) func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
ns, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tf := func(values []float64) {
|
|
for i, v := range values {
|
|
values[i] = float64(bitmapFunc(uint64(v), uint64(ns[i])))
|
|
}
|
|
}
|
|
return doTransformValues(args[0], tf, tfa.fe)
|
|
}
|
|
}
|
|
|
|
func transformTimezoneOffset(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
tzString, err := getString(args[0], 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot get timezone name: %w", err)
|
|
}
|
|
loc, err := time.LoadLocation(tzString)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot load timezone %q: %w", tzString, err)
|
|
}
|
|
|
|
tss := evalNumber(tfa.ec, nan)
|
|
ts := tss[0]
|
|
for i, timestamp := range ts.Timestamps {
|
|
_, offset := time.Unix(timestamp/1000, 0).In(loc).Zone()
|
|
ts.Values[i] = float64(offset)
|
|
}
|
|
return tss, nil
|
|
}
|
|
|
|
func transformTime(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
return evalTime(tfa.ec), nil
|
|
}
|
|
|
|
func transformVector(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
args := tfa.args
|
|
if err := expectTransformArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
rvs := args[0]
|
|
return rvs, nil
|
|
}
|
|
|
|
func transformYear(t time.Time) int {
|
|
return t.Year()
|
|
}
|
|
|
|
func newTransformFuncZeroArgs(f func(tfa *transformFuncArg) float64) transformFunc {
|
|
return func(tfa *transformFuncArg) ([]*timeseries, error) {
|
|
if err := expectTransformArgsNum(tfa.args, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
v := f(tfa)
|
|
return evalNumber(tfa.ec, v), nil
|
|
}
|
|
}
|
|
|
|
func transformStep(tfa *transformFuncArg) float64 {
|
|
return float64(tfa.ec.Step) / 1e3
|
|
}
|
|
|
|
func transformStart(tfa *transformFuncArg) float64 {
|
|
return float64(tfa.ec.Start) / 1e3
|
|
}
|
|
|
|
func transformEnd(tfa *transformFuncArg) float64 {
|
|
return float64(tfa.ec.End) / 1e3
|
|
}
|
|
|
|
// copyTimeseries returns a copy of tss.
|
|
func copyTimeseries(tss []*timeseries) []*timeseries {
|
|
rvs := make([]*timeseries, len(tss))
|
|
for i, src := range tss {
|
|
var dst timeseries
|
|
dst.CopyFromShallowTimestamps(src)
|
|
rvs[i] = &dst
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
// copyTimeseriesMetricNames returns a copy of tss with real copy of MetricNames,
|
|
// but with shallow copy of Timestamps and Values if makeCopy is set.
|
|
//
|
|
// Otherwise tss is returned.
|
|
func copyTimeseriesMetricNames(tss []*timeseries, makeCopy bool) []*timeseries {
|
|
if !makeCopy {
|
|
return tss
|
|
}
|
|
rvs := make([]*timeseries, len(tss))
|
|
for i, src := range tss {
|
|
var dst timeseries
|
|
dst.CopyFromMetricNames(src)
|
|
rvs[i] = &dst
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
// copyTimeseriesShallow returns a copy of arg with shallow copies of MetricNames,
|
|
// Timestamps and Values.
|
|
func copyTimeseriesShallow(arg []*timeseries) []*timeseries {
|
|
rvs := make([]*timeseries, len(arg))
|
|
for i, src := range arg {
|
|
var dst timeseries
|
|
dst.CopyShallow(src)
|
|
rvs[i] = &dst
|
|
}
|
|
return rvs
|
|
}
|
|
|
|
func getDstValue(mn *storage.MetricName, dstLabel string) *[]byte {
|
|
if dstLabel == "__name__" {
|
|
return &mn.MetricGroup
|
|
}
|
|
tags := mn.Tags
|
|
for i := range tags {
|
|
tag := &tags[i]
|
|
if string(tag.Key) == dstLabel {
|
|
return &tag.Value
|
|
}
|
|
}
|
|
if len(tags) < cap(tags) {
|
|
tags = tags[:len(tags)+1]
|
|
} else {
|
|
tags = append(tags, storage.Tag{})
|
|
}
|
|
mn.Tags = tags
|
|
tag := &tags[len(tags)-1]
|
|
tag.Key = append(tag.Key[:0], dstLabel...)
|
|
return &tag.Value
|
|
}
|
|
|
|
func isLeapYear(y uint32) bool {
|
|
if y%4 != 0 {
|
|
return false
|
|
}
|
|
if y%100 != 0 {
|
|
return true
|
|
}
|
|
return y%400 == 0
|
|
}
|
|
|
|
var daysInMonth = [...]int{
|
|
time.January: 31,
|
|
time.February: 28,
|
|
time.March: 31,
|
|
time.April: 30,
|
|
time.May: 31,
|
|
time.June: 30,
|
|
time.July: 31,
|
|
time.August: 31,
|
|
time.September: 30,
|
|
time.October: 31,
|
|
time.November: 30,
|
|
time.December: 31,
|
|
}
|
|
|
|
func expectTransformArgsNum(args [][]*timeseries, expectedNum int) error {
|
|
if len(args) == expectedNum {
|
|
return nil
|
|
}
|
|
return fmt.Errorf(`unexpected number of args; got %d; want %d`, len(args), expectedNum)
|
|
}
|
|
|
|
func removeCounterResetsMaybeNaNs(values []float64) {
|
|
values = skipLeadingNaNs(values)
|
|
if len(values) == 0 {
|
|
return
|
|
}
|
|
var correction float64
|
|
prevValue := values[0]
|
|
for i, v := range values {
|
|
if math.IsNaN(v) {
|
|
continue
|
|
}
|
|
d := v - prevValue
|
|
if d < 0 {
|
|
if (-d * 8) < prevValue {
|
|
// This is likely a partial counter reset.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2787
|
|
correction += prevValue - v
|
|
} else {
|
|
correction += prevValue
|
|
}
|
|
}
|
|
prevValue = v
|
|
values[i] = v + correction
|
|
}
|
|
}
|