mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
46bfdbe6cf
This makes the behaviour for these functions similar to Prometheus when processing broken time series with irregular data points like `gitlab_runner_jobs`. See https://gitlab.com/gitlab-org/gitlab-exporter/issues/50 for details.
1459 lines
39 KiB
Go
1459 lines
39 KiB
Go
package promql
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
|
"github.com/VictoriaMetrics/metrics"
|
|
"github.com/valyala/histogram"
|
|
)
|
|
|
|
var rollupFuncs = map[string]newRollupFunc{
|
|
"default_rollup": newRollupFuncOneArg(rollupDefault), // default rollup func
|
|
|
|
// Standard rollup funcs from PromQL.
|
|
// See funcs accepting range-vector on https://prometheus.io/docs/prometheus/latest/querying/functions/ .
|
|
"changes": newRollupFuncOneArg(rollupChanges),
|
|
"delta": newRollupFuncOneArg(rollupDelta),
|
|
"deriv": newRollupFuncOneArg(rollupDerivSlow),
|
|
"deriv_fast": newRollupFuncOneArg(rollupDerivFast),
|
|
"holt_winters": newRollupHoltWinters,
|
|
"idelta": newRollupFuncOneArg(rollupIdelta),
|
|
"increase": newRollupFuncOneArg(rollupIncrease), // + rollupFuncsRemoveCounterResets
|
|
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
|
|
"predict_linear": newRollupPredictLinear,
|
|
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
|
|
"resets": newRollupFuncOneArg(rollupResets),
|
|
"avg_over_time": newRollupFuncOneArg(rollupAvg),
|
|
"min_over_time": newRollupFuncOneArg(rollupMin),
|
|
"max_over_time": newRollupFuncOneArg(rollupMax),
|
|
"sum_over_time": newRollupFuncOneArg(rollupSum),
|
|
"count_over_time": newRollupFuncOneArg(rollupCount),
|
|
"quantile_over_time": newRollupQuantile,
|
|
"stddev_over_time": newRollupFuncOneArg(rollupStddev),
|
|
"stdvar_over_time": newRollupFuncOneArg(rollupStdvar),
|
|
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
|
|
|
|
// Additional rollup funcs.
|
|
"sum2_over_time": newRollupFuncOneArg(rollupSum2),
|
|
"geomean_over_time": newRollupFuncOneArg(rollupGeomean),
|
|
"first_over_time": newRollupFuncOneArg(rollupFirst),
|
|
"last_over_time": newRollupFuncOneArg(rollupLast),
|
|
"distinct_over_time": newRollupFuncOneArg(rollupDistinct),
|
|
"increases_over_time": newRollupFuncOneArg(rollupIncreases),
|
|
"decreases_over_time": newRollupFuncOneArg(rollupDecreases),
|
|
"integrate": newRollupFuncOneArg(rollupIntegrate),
|
|
"ideriv": newRollupFuncOneArg(rollupIderiv),
|
|
"lifetime": newRollupFuncOneArg(rollupLifetime),
|
|
"lag": newRollupFuncOneArg(rollupLag),
|
|
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
|
|
"tmin_over_time": newRollupFuncOneArg(rollupTmin),
|
|
"tmax_over_time": newRollupFuncOneArg(rollupTmax),
|
|
"share_le_over_time": newRollupShareLE,
|
|
"share_gt_over_time": newRollupShareGT,
|
|
"histogram_over_time": newRollupFuncOneArg(rollupHistogram),
|
|
"rollup": newRollupFuncOneArg(rollupFake),
|
|
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
|
"rollup_deriv": newRollupFuncOneArg(rollupFake),
|
|
"rollup_delta": newRollupFuncOneArg(rollupFake),
|
|
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
|
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
|
|
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
|
|
}
|
|
|
|
// rollupAggrFuncs are functions that can be passed to `aggr_over_time()`
|
|
var rollupAggrFuncs = map[string]rollupFunc{
|
|
// Standard rollup funcs from PromQL.
|
|
"changes": rollupChanges,
|
|
"delta": rollupDelta,
|
|
"deriv": rollupDerivSlow,
|
|
"deriv_fast": rollupDerivFast,
|
|
"idelta": rollupIdelta,
|
|
"increase": rollupIncrease, // + rollupFuncsRemoveCounterResets
|
|
"irate": rollupIderiv, // + rollupFuncsRemoveCounterResets
|
|
"rate": rollupDerivFast, // + rollupFuncsRemoveCounterResets
|
|
"resets": rollupResets,
|
|
"avg_over_time": rollupAvg,
|
|
"min_over_time": rollupMin,
|
|
"max_over_time": rollupMax,
|
|
"sum_over_time": rollupSum,
|
|
"count_over_time": rollupCount,
|
|
"stddev_over_time": rollupStddev,
|
|
"stdvar_over_time": rollupStdvar,
|
|
"absent_over_time": rollupAbsent,
|
|
|
|
// Additional rollup funcs.
|
|
"sum2_over_time": rollupSum2,
|
|
"geomean_over_time": rollupGeomean,
|
|
"first_over_time": rollupFirst,
|
|
"last_over_time": rollupLast,
|
|
"distinct_over_time": rollupDistinct,
|
|
"increases_over_time": rollupIncreases,
|
|
"decreases_over_time": rollupDecreases,
|
|
"integrate": rollupIntegrate,
|
|
"ideriv": rollupIderiv,
|
|
"lifetime": rollupLifetime,
|
|
"lag": rollupLag,
|
|
"scrape_interval": rollupScrapeInterval,
|
|
"tmin_over_time": rollupTmin,
|
|
"tmax_over_time": rollupTmax,
|
|
}
|
|
|
|
var rollupFuncsMayAdjustWindow = map[string]bool{
|
|
"default_rollup": true,
|
|
"first_over_time": true,
|
|
"last_over_time": true,
|
|
"deriv": true,
|
|
"deriv_fast": true,
|
|
"irate": true,
|
|
"rate": true,
|
|
"lifetime": true,
|
|
"lag": true,
|
|
"scrape_interval": true,
|
|
}
|
|
|
|
var rollupFuncsRemoveCounterResets = map[string]bool{
|
|
"increase": true,
|
|
"irate": true,
|
|
"rate": true,
|
|
"rollup_rate": true,
|
|
"rollup_increase": true,
|
|
}
|
|
|
|
var rollupFuncsKeepMetricGroup = map[string]bool{
|
|
"default_rollup": true,
|
|
"avg_over_time": true,
|
|
"min_over_time": true,
|
|
"max_over_time": true,
|
|
"quantile_over_time": true,
|
|
"rollup": true,
|
|
"geomean_over_time": true,
|
|
}
|
|
|
|
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
|
|
afe, ok := expr.(*metricsql.AggrFuncExpr)
|
|
if ok {
|
|
// This is for incremental aggregate function case:
|
|
//
|
|
// sum(aggr_over_time(...))
|
|
//
|
|
// See aggr_incremental.go for details.
|
|
expr = afe.Args[0]
|
|
}
|
|
fe, ok := expr.(*metricsql.FuncExpr)
|
|
if !ok {
|
|
logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))
|
|
}
|
|
if fe.Name != "aggr_over_time" {
|
|
logger.Panicf("BUG: unexpected function name: %q; want `aggr_over_time`", fe.Name)
|
|
}
|
|
if len(fe.Args) != 2 {
|
|
return nil, fmt.Errorf("unexpected number of args to aggr_over_time(); got %d; want %d", len(fe.Args), 2)
|
|
}
|
|
arg := fe.Args[0]
|
|
var aggrFuncNames []string
|
|
if se, ok := arg.(*metricsql.StringExpr); ok {
|
|
aggrFuncNames = append(aggrFuncNames, se.S)
|
|
} else {
|
|
fe, ok := arg.(*metricsql.FuncExpr)
|
|
if !ok || fe.Name != "" {
|
|
return nil, fmt.Errorf("%s cannot be passed to aggr_over_time(); expecting quoted aggregate function name or a list of quoted aggregate function names",
|
|
arg.AppendString(nil))
|
|
}
|
|
for _, e := range fe.Args {
|
|
se, ok := e.(*metricsql.StringExpr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s cannot be passed here; expecting quoted aggregate function name", e.AppendString(nil))
|
|
}
|
|
aggrFuncNames = append(aggrFuncNames, se.S)
|
|
}
|
|
}
|
|
if len(aggrFuncNames) == 0 {
|
|
return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name")
|
|
}
|
|
for _, s := range aggrFuncNames {
|
|
if rollupAggrFuncs[s] == nil {
|
|
return nil, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s)
|
|
}
|
|
}
|
|
return aggrFuncNames, nil
|
|
}
|
|
|
|
func getRollupArgIdx(funcName string) int {
|
|
funcName = strings.ToLower(funcName)
|
|
if rollupFuncs[funcName] == nil {
|
|
logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName)
|
|
}
|
|
switch funcName {
|
|
case "quantile_over_time", "aggr_over_time":
|
|
return 1
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) (
|
|
func(values []float64, timestamps []int64), []*rollupConfig, error) {
|
|
preFunc := func(values []float64, timestamps []int64) {}
|
|
if rollupFuncsRemoveCounterResets[name] {
|
|
preFunc = func(values []float64, timestamps []int64) {
|
|
removeCounterResets(values)
|
|
}
|
|
}
|
|
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
|
|
return &rollupConfig{
|
|
TagValue: tagValue,
|
|
Func: rf,
|
|
Start: start,
|
|
End: end,
|
|
Step: step,
|
|
Window: window,
|
|
MayAdjustWindow: rollupFuncsMayAdjustWindow[name],
|
|
LookbackDelta: lookbackDelta,
|
|
Timestamps: sharedTimestamps,
|
|
}
|
|
}
|
|
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
|
dst = append(dst, newRollupConfig(rollupMin, "min"))
|
|
dst = append(dst, newRollupConfig(rollupMax, "max"))
|
|
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
|
|
return dst
|
|
}
|
|
var rcs []*rollupConfig
|
|
switch name {
|
|
case "rollup":
|
|
rcs = appendRollupConfigs(rcs)
|
|
case "rollup_rate", "rollup_deriv":
|
|
preFuncPrev := preFunc
|
|
preFunc = func(values []float64, timestamps []int64) {
|
|
preFuncPrev(values, timestamps)
|
|
derivValues(values, timestamps)
|
|
}
|
|
rcs = appendRollupConfigs(rcs)
|
|
case "rollup_increase", "rollup_delta":
|
|
preFuncPrev := preFunc
|
|
preFunc = func(values []float64, timestamps []int64) {
|
|
preFuncPrev(values, timestamps)
|
|
deltaValues(values)
|
|
}
|
|
rcs = appendRollupConfigs(rcs)
|
|
case "rollup_candlestick":
|
|
rcs = append(rcs, newRollupConfig(rollupFirst, "open"))
|
|
rcs = append(rcs, newRollupConfig(rollupLast, "close"))
|
|
rcs = append(rcs, newRollupConfig(rollupMin, "low"))
|
|
rcs = append(rcs, newRollupConfig(rollupMax, "high"))
|
|
case "aggr_over_time":
|
|
aggrFuncNames, err := getRollupAggrFuncNames(expr)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("invalid args to %s: %s", expr.AppendString(nil), err)
|
|
}
|
|
for _, aggrFuncName := range aggrFuncNames {
|
|
if rollupFuncsRemoveCounterResets[aggrFuncName] {
|
|
// There is no need to save the previous preFunc, since it is either empty or the same.
|
|
preFunc = func(values []float64, timestamps []int64) {
|
|
removeCounterResets(values)
|
|
}
|
|
}
|
|
rf := rollupAggrFuncs[aggrFuncName]
|
|
rcs = append(rcs, newRollupConfig(rf, aggrFuncName))
|
|
}
|
|
default:
|
|
rcs = append(rcs, newRollupConfig(rf, ""))
|
|
}
|
|
return preFunc, rcs, nil
|
|
}
|
|
|
|
func getRollupFunc(funcName string) newRollupFunc {
|
|
funcName = strings.ToLower(funcName)
|
|
return rollupFuncs[funcName]
|
|
}
|
|
|
|
type rollupFuncArg struct {
|
|
prevValue float64
|
|
prevTimestamp int64
|
|
values []float64
|
|
timestamps []int64
|
|
|
|
currTimestamp int64
|
|
idx int
|
|
step int64
|
|
|
|
// Real previous value even if it is located too far from the current window.
|
|
// It matches prevValue if prevValue is not nan.
|
|
realPrevValue float64
|
|
|
|
tsm *timeseriesMap
|
|
}
|
|
|
|
func (rfa *rollupFuncArg) reset() {
|
|
rfa.prevValue = 0
|
|
rfa.prevTimestamp = 0
|
|
rfa.values = nil
|
|
rfa.timestamps = nil
|
|
rfa.currTimestamp = 0
|
|
rfa.idx = 0
|
|
rfa.step = 0
|
|
rfa.realPrevValue = nan
|
|
rfa.tsm = nil
|
|
}
|
|
|
|
// rollupFunc must return rollup value for the given rfa.
|
|
//
|
|
// prevValue may be nan, values and timestamps may be empty.
|
|
type rollupFunc func(rfa *rollupFuncArg) float64
|
|
|
|
type rollupConfig struct {
|
|
// This tag value must be added to "rollup" tag if non-empty.
|
|
TagValue string
|
|
|
|
Func rollupFunc
|
|
Start int64
|
|
End int64
|
|
Step int64
|
|
Window int64
|
|
|
|
// Whether window may be adjusted to 2 x interval between data points.
|
|
// This is needed for functions which have dt in the denominator
|
|
// such as rate, deriv, etc.
|
|
// Without the adjustement their value would jump in unexpected directions
|
|
// when using window smaller than 2 x scrape_interval.
|
|
MayAdjustWindow bool
|
|
|
|
Timestamps []int64
|
|
|
|
// LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world.
|
|
LookbackDelta int64
|
|
}
|
|
|
|
var (
|
|
nan = math.NaN()
|
|
inf = math.Inf(1)
|
|
)
|
|
|
|
// The maximum interval without previous rows.
|
|
const maxSilenceInterval = 5 * 60 * 1000
|
|
|
|
type timeseriesMap struct {
|
|
origin *timeseries
|
|
labelName string
|
|
h metrics.Histogram
|
|
m map[string]*timeseries
|
|
}
|
|
|
|
func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap {
|
|
if funcName != "histogram_over_time" {
|
|
return nil
|
|
}
|
|
|
|
values := make([]float64, len(sharedTimestamps))
|
|
for i := range values {
|
|
values[i] = nan
|
|
}
|
|
var origin timeseries
|
|
origin.MetricName.CopyFrom(mnSrc)
|
|
origin.MetricName.ResetMetricGroup()
|
|
origin.Timestamps = sharedTimestamps
|
|
origin.Values = values
|
|
return ×eriesMap{
|
|
origin: &origin,
|
|
labelName: "vmrange",
|
|
m: make(map[string]*timeseries),
|
|
}
|
|
}
|
|
|
|
func (tsm *timeseriesMap) AppendTimeseriesTo(dst []*timeseries) []*timeseries {
|
|
for _, ts := range tsm.m {
|
|
dst = append(dst, ts)
|
|
}
|
|
return dst
|
|
}
|
|
|
|
func (tsm *timeseriesMap) GetOrCreateTimeseries(labelValue string) *timeseries {
|
|
ts := tsm.m[labelValue]
|
|
if ts != nil {
|
|
return ts
|
|
}
|
|
ts = ×eries{}
|
|
ts.CopyFromShallowTimestamps(tsm.origin)
|
|
ts.MetricName.RemoveTag(tsm.labelName)
|
|
ts.MetricName.AddTag(tsm.labelName, labelValue)
|
|
tsm.m[labelValue] = ts
|
|
return ts
|
|
}
|
|
|
|
// Do calculates rollups for the given timestamps and values, appends
|
|
// them to dstValues and returns results.
|
|
//
|
|
// rc.Timestamps are used as timestamps for dstValues.
|
|
//
|
|
// timestamps must cover time range [rc.Start - rc.Window - maxSilenceInterval ... rc.End + rc.Step].
|
|
//
|
|
// Do cannot be called from concurrent goroutines.
|
|
func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []int64) []float64 {
|
|
return rc.doInternal(dstValues, nil, values, timestamps)
|
|
}
|
|
|
|
// DoTimeseriesMap calculates rollups for the given timestamps and values and puts them to tsm.
|
|
func (rc *rollupConfig) DoTimeseriesMap(tsm *timeseriesMap, values []float64, timestamps []int64) {
|
|
ts := getTimeseries()
|
|
ts.Values = rc.doInternal(ts.Values[:0], tsm, values, timestamps)
|
|
putTimeseries(ts)
|
|
}
|
|
|
|
func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, values []float64, timestamps []int64) []float64 {
|
|
// Sanity checks.
|
|
if rc.Step <= 0 {
|
|
logger.Panicf("BUG: Step must be bigger than 0; got %d", rc.Step)
|
|
}
|
|
if rc.Start > rc.End {
|
|
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", rc.Start, rc.End)
|
|
}
|
|
if rc.Window < 0 {
|
|
logger.Panicf("BUG: Window must be non-negative; got %d", rc.Window)
|
|
}
|
|
if err := ValidateMaxPointsPerTimeseries(rc.Start, rc.End, rc.Step); err != nil {
|
|
logger.Panicf("BUG: %s; this must be validated before the call to rollupConfig.Do", err)
|
|
}
|
|
|
|
// Extend dstValues in order to remove mallocs below.
|
|
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
|
|
|
scrapeInterval := getScrapeInterval(timestamps)
|
|
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
|
|
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
|
maxPrevInterval = rc.LookbackDelta
|
|
}
|
|
window := rc.Window
|
|
if window <= 0 {
|
|
window = rc.Step
|
|
}
|
|
if rc.MayAdjustWindow && window < maxPrevInterval {
|
|
window = maxPrevInterval
|
|
}
|
|
rfa := getRollupFuncArg()
|
|
rfa.idx = 0
|
|
rfa.step = rc.Step
|
|
rfa.realPrevValue = nan
|
|
rfa.tsm = tsm
|
|
|
|
i := 0
|
|
j := 0
|
|
ni := 0
|
|
nj := 0
|
|
for _, tEnd := range rc.Timestamps {
|
|
tStart := tEnd - window
|
|
ni = seekFirstTimestampIdxAfter(timestamps[i:], tStart, ni)
|
|
i += ni
|
|
if j < i {
|
|
j = i
|
|
}
|
|
nj = seekFirstTimestampIdxAfter(timestamps[j:], tEnd, nj)
|
|
j += nj
|
|
|
|
rfa.prevValue = nan
|
|
rfa.prevTimestamp = tStart - maxPrevInterval
|
|
if i < len(timestamps) && i > 0 && timestamps[i-1] > rfa.prevTimestamp {
|
|
rfa.prevValue = values[i-1]
|
|
rfa.prevTimestamp = timestamps[i-1]
|
|
}
|
|
|
|
rfa.values = values[i:j]
|
|
rfa.timestamps = timestamps[i:j]
|
|
rfa.currTimestamp = tEnd
|
|
if i > 0 {
|
|
rfa.realPrevValue = values[i-1]
|
|
}
|
|
value := rc.Func(rfa)
|
|
rfa.idx++
|
|
dstValues = append(dstValues, value)
|
|
}
|
|
putRollupFuncArg(rfa)
|
|
|
|
return dstValues
|
|
}
|
|
|
|
func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
|
|
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
|
|
return 0
|
|
}
|
|
startIdx := nHint - 2
|
|
if startIdx < 0 {
|
|
startIdx = 0
|
|
}
|
|
if startIdx >= len(timestamps) {
|
|
startIdx = len(timestamps) - 1
|
|
}
|
|
endIdx := nHint + 2
|
|
if endIdx > len(timestamps) {
|
|
endIdx = len(timestamps)
|
|
}
|
|
if startIdx > 0 && timestamps[startIdx] <= seekTimestamp {
|
|
timestamps = timestamps[startIdx:]
|
|
endIdx -= startIdx
|
|
} else {
|
|
startIdx = 0
|
|
}
|
|
if endIdx < len(timestamps) && timestamps[endIdx] > seekTimestamp {
|
|
timestamps = timestamps[:endIdx]
|
|
}
|
|
if len(timestamps) < 16 {
|
|
// Fast path: the number of timestamps to search is small, so scan them all.
|
|
for i, timestamp := range timestamps {
|
|
if timestamp > seekTimestamp {
|
|
return startIdx + i
|
|
}
|
|
}
|
|
return startIdx + len(timestamps)
|
|
}
|
|
// Slow path: too big len(timestamps), so use binary search.
|
|
i := binarySearchInt64(timestamps, seekTimestamp+1)
|
|
return startIdx + int(i)
|
|
}
|
|
|
|
func binarySearchInt64(a []int64, v int64) uint {
|
|
// Copy-pasted sort.Search from https://golang.org/src/sort/search.go?s=2246:2286#L49
|
|
i, j := uint(0), uint(len(a))
|
|
for i < j {
|
|
h := (i + j) >> 1
|
|
if h < uint(len(a)) && a[h] < v {
|
|
i = h + 1
|
|
} else {
|
|
j = h
|
|
}
|
|
}
|
|
return i
|
|
}
|
|
|
|
func getScrapeInterval(timestamps []int64) int64 {
|
|
if len(timestamps) < 2 {
|
|
return int64(maxSilenceInterval)
|
|
}
|
|
|
|
// Estimate scrape interval as 0.6 quantile for the first 100 intervals.
|
|
h := histogram.GetFast()
|
|
tsPrev := timestamps[0]
|
|
timestamps = timestamps[1:]
|
|
if len(timestamps) > 100 {
|
|
timestamps = timestamps[:100]
|
|
}
|
|
for _, ts := range timestamps {
|
|
h.Update(float64(ts - tsPrev))
|
|
tsPrev = ts
|
|
}
|
|
scrapeInterval := int64(h.Quantile(0.6))
|
|
histogram.PutFast(h)
|
|
if scrapeInterval <= 0 {
|
|
return int64(maxSilenceInterval)
|
|
}
|
|
return scrapeInterval
|
|
}
|
|
|
|
func getMaxPrevInterval(scrapeInterval int64) int64 {
|
|
// Increase scrapeInterval more for smaller scrape intervals in order to hide possible gaps
|
|
// when high jitter is present.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/139 .
|
|
if scrapeInterval <= 2*1000 {
|
|
return scrapeInterval + 4*scrapeInterval
|
|
}
|
|
if scrapeInterval <= 4*1000 {
|
|
return scrapeInterval + 2*scrapeInterval
|
|
}
|
|
if scrapeInterval <= 8*1000 {
|
|
return scrapeInterval + scrapeInterval
|
|
}
|
|
if scrapeInterval <= 16*1000 {
|
|
return scrapeInterval + scrapeInterval/2
|
|
}
|
|
if scrapeInterval <= 32*1000 {
|
|
return scrapeInterval + scrapeInterval/4
|
|
}
|
|
return scrapeInterval + scrapeInterval/8
|
|
}
|
|
|
|
func removeCounterResets(values []float64) {
|
|
// There is no need in handling NaNs here, since they are impossible
|
|
// on values from vmstorage.
|
|
if len(values) == 0 {
|
|
return
|
|
}
|
|
var correction float64
|
|
prevValue := values[0]
|
|
for i, v := range values {
|
|
d := v - prevValue
|
|
if d < 0 {
|
|
if (-d * 8) < prevValue {
|
|
// This is likely jitter from `Prometheus HA pairs`.
|
|
// Just substitute v with prevValue.
|
|
v = prevValue
|
|
} else {
|
|
correction += prevValue
|
|
}
|
|
}
|
|
prevValue = v
|
|
values[i] = v + correction
|
|
}
|
|
}
|
|
|
|
func deltaValues(values []float64) {
|
|
// There is no need in handling NaNs here, since they are impossible
|
|
// on values from vmstorage.
|
|
if len(values) == 0 {
|
|
return
|
|
}
|
|
prevDelta := float64(0)
|
|
prevValue := values[0]
|
|
for i, v := range values[1:] {
|
|
prevDelta = v - prevValue
|
|
values[i] = prevDelta
|
|
prevValue = v
|
|
}
|
|
values[len(values)-1] = prevDelta
|
|
}
|
|
|
|
func derivValues(values []float64, timestamps []int64) {
|
|
// There is no need in handling NaNs here, since they are impossible
|
|
// on values from vmstorage.
|
|
if len(values) == 0 {
|
|
return
|
|
}
|
|
prevDeriv := float64(0)
|
|
prevValue := values[0]
|
|
prevTs := timestamps[0]
|
|
for i, v := range values[1:] {
|
|
ts := timestamps[i+1]
|
|
if ts == prevTs {
|
|
// Use the previous value for duplicate timestamps.
|
|
values[i] = prevDeriv
|
|
continue
|
|
}
|
|
dt := float64(ts-prevTs) * 1e-3
|
|
prevDeriv = (v - prevValue) / dt
|
|
values[i] = prevDeriv
|
|
prevValue = v
|
|
prevTs = ts
|
|
}
|
|
values[len(values)-1] = prevDeriv
|
|
}
|
|
|
|
type newRollupFunc func(args []interface{}) (rollupFunc, error)
|
|
|
|
func newRollupFuncOneArg(rf rollupFunc) newRollupFunc {
|
|
return func(args []interface{}) (rollupFunc, error) {
|
|
if err := expectRollupArgsNum(args, 1); err != nil {
|
|
return nil, err
|
|
}
|
|
return rf, nil
|
|
}
|
|
}
|
|
|
|
func newRollupFuncTwoArgs(rf rollupFunc) newRollupFunc {
|
|
return func(args []interface{}) (rollupFunc, error) {
|
|
if err := expectRollupArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
return rf, nil
|
|
}
|
|
}
|
|
|
|
func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
|
|
if err := expectRollupArgsNum(args, 3); err != nil {
|
|
return nil, err
|
|
}
|
|
sfs, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tfs, err := getScalar(args[2], 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf := func(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
return rfa.prevValue
|
|
}
|
|
sf := sfs[rfa.idx]
|
|
if sf <= 0 || sf >= 1 {
|
|
return nan
|
|
}
|
|
tf := tfs[rfa.idx]
|
|
if tf <= 0 || tf >= 1 {
|
|
return nan
|
|
}
|
|
|
|
// See https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing .
|
|
// TODO: determine whether this shit really works.
|
|
s0 := rfa.prevValue
|
|
if math.IsNaN(s0) {
|
|
s0 = values[0]
|
|
values = values[1:]
|
|
if len(values) == 0 {
|
|
return s0
|
|
}
|
|
}
|
|
b0 := values[0] - s0
|
|
for _, v := range values {
|
|
s1 := sf*v + (1-sf)*(s0+b0)
|
|
b1 := tf*(s1-s0) + (1-tf)*b0
|
|
s0 = s1
|
|
b0 = b1
|
|
}
|
|
return s0
|
|
}
|
|
return rf, nil
|
|
}
|
|
|
|
func newRollupPredictLinear(args []interface{}) (rollupFunc, error) {
|
|
if err := expectRollupArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
secs, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf := func(rfa *rollupFuncArg) float64 {
|
|
v, k := linearRegression(rfa)
|
|
if math.IsNaN(v) {
|
|
return nan
|
|
}
|
|
sec := secs[rfa.idx]
|
|
return v + k*sec
|
|
}
|
|
return rf, nil
|
|
}
|
|
|
|
func linearRegression(rfa *rollupFuncArg) (float64, float64) {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
timestamps := rfa.timestamps
|
|
if len(values) == 0 {
|
|
return rfa.prevValue, 0
|
|
}
|
|
|
|
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
|
|
tFirst := rfa.prevTimestamp
|
|
vSum := rfa.prevValue
|
|
tSum := float64(0)
|
|
tvSum := float64(0)
|
|
ttSum := float64(0)
|
|
n := 1.0
|
|
if math.IsNaN(rfa.prevValue) {
|
|
tFirst = timestamps[0]
|
|
vSum = 0
|
|
n = 0
|
|
}
|
|
for i, v := range values {
|
|
dt := float64(timestamps[i]-tFirst) * 1e-3
|
|
vSum += v
|
|
tSum += dt
|
|
tvSum += dt * v
|
|
ttSum += dt * dt
|
|
}
|
|
n += float64(len(values))
|
|
if n == 1 {
|
|
return vSum, 0
|
|
}
|
|
k := (n*tvSum - tSum*vSum) / (n*ttSum - tSum*tSum)
|
|
v := (vSum - k*tSum) / n
|
|
// Adjust v to the last timestamp on the given time range.
|
|
v += k * (float64(timestamps[len(timestamps)-1]-tFirst) * 1e-3)
|
|
return v, k
|
|
}
|
|
|
|
func newRollupShareLE(args []interface{}) (rollupFunc, error) {
|
|
return newRollupShareFilter(args, countFilterLE)
|
|
}
|
|
|
|
func countFilterLE(values []float64, le float64) int {
|
|
n := 0
|
|
for _, v := range values {
|
|
if v <= le {
|
|
n++
|
|
}
|
|
}
|
|
return n
|
|
}
|
|
|
|
func newRollupShareGT(args []interface{}) (rollupFunc, error) {
|
|
return newRollupShareFilter(args, countFilterGT)
|
|
}
|
|
|
|
func countFilterGT(values []float64, gt float64) int {
|
|
n := 0
|
|
for _, v := range values {
|
|
if v > gt {
|
|
n++
|
|
}
|
|
}
|
|
return n
|
|
}
|
|
|
|
func newRollupShareFilter(args []interface{}, countFilter func(values []float64, limit float64) int) (rollupFunc, error) {
|
|
if err := expectRollupArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
limits, err := getScalar(args[1], 1)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf := func(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
return nan
|
|
}
|
|
limit := limits[rfa.idx]
|
|
n := countFilter(values, limit)
|
|
return float64(n) / float64(len(values))
|
|
}
|
|
return rf, nil
|
|
}
|
|
|
|
func newRollupQuantile(args []interface{}) (rollupFunc, error) {
|
|
if err := expectRollupArgsNum(args, 2); err != nil {
|
|
return nil, err
|
|
}
|
|
phis, err := getScalar(args[0], 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rf := func(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
return rfa.prevValue
|
|
}
|
|
if len(values) == 1 {
|
|
// Fast path - only a single value.
|
|
return values[0]
|
|
}
|
|
hf := histogram.GetFast()
|
|
for _, v := range values {
|
|
hf.Update(v)
|
|
}
|
|
phi := phis[rfa.idx]
|
|
qv := hf.Quantile(phi)
|
|
histogram.PutFast(hf)
|
|
return qv
|
|
}
|
|
return rf, nil
|
|
}
|
|
|
|
func rollupHistogram(rfa *rollupFuncArg) float64 {
|
|
values := rfa.values
|
|
tsm := rfa.tsm
|
|
tsm.h.Reset()
|
|
for _, v := range values {
|
|
tsm.h.Update(v)
|
|
}
|
|
idx := rfa.idx
|
|
tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
|
|
ts := tsm.GetOrCreateTimeseries(vmrange)
|
|
ts.Values[idx] = float64(count)
|
|
})
|
|
return nan
|
|
}
|
|
|
|
func rollupAvg(rfa *rollupFuncArg) float64 {
|
|
// Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation,
|
|
// since it is slower and has no significant benefits in precision.
|
|
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
return rfa.prevValue
|
|
}
|
|
var sum float64
|
|
for _, v := range values {
|
|
sum += v
|
|
}
|
|
return sum / float64(len(values))
|
|
}
|
|
|
|
func rollupMin(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
// Do not take into account rfa.prevValue, since it may lead
|
|
// to inconsistent results comparing to Prometheus on broken time series
|
|
// with irregular data points.
|
|
return nan
|
|
}
|
|
minValue := values[0]
|
|
for _, v := range values {
|
|
if v < minValue {
|
|
minValue = v
|
|
}
|
|
}
|
|
return minValue
|
|
}
|
|
|
|
func rollupMax(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
// Do not take into account rfa.prevValue, since it may lead
|
|
// to inconsistent results comparing to Prometheus on broken time series
|
|
// with irregular data points.
|
|
return nan
|
|
}
|
|
maxValue := values[0]
|
|
for _, v := range values {
|
|
if v > maxValue {
|
|
maxValue = v
|
|
}
|
|
}
|
|
return maxValue
|
|
}
|
|
|
|
func rollupTmin(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
timestamps := rfa.timestamps
|
|
if len(values) == 0 {
|
|
return nan
|
|
}
|
|
minValue := values[0]
|
|
minTimestamp := timestamps[0]
|
|
for i, v := range values {
|
|
if v < minValue {
|
|
minValue = v
|
|
minTimestamp = timestamps[i]
|
|
}
|
|
}
|
|
return float64(minTimestamp) * 1e-3
|
|
}
|
|
|
|
func rollupTmax(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
timestamps := rfa.timestamps
|
|
if len(values) == 0 {
|
|
return nan
|
|
}
|
|
maxValue := values[0]
|
|
maxTimestamp := timestamps[0]
|
|
for i, v := range values {
|
|
if v > maxValue {
|
|
maxValue = v
|
|
maxTimestamp = timestamps[i]
|
|
}
|
|
}
|
|
return float64(maxTimestamp) * 1e-3
|
|
}
|
|
|
|
func rollupSum(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return 0
|
|
}
|
|
var sum float64
|
|
for _, v := range values {
|
|
sum += v
|
|
}
|
|
return sum
|
|
}
|
|
|
|
func rollupSum2(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
return rfa.prevValue * rfa.prevValue
|
|
}
|
|
var sum2 float64
|
|
for _, v := range values {
|
|
sum2 += v * v
|
|
}
|
|
return sum2
|
|
}
|
|
|
|
func rollupGeomean(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
return rfa.prevValue
|
|
}
|
|
p := 1.0
|
|
for _, v := range values {
|
|
p *= v
|
|
}
|
|
return math.Pow(p, 1/float64(len(values)))
|
|
}
|
|
|
|
func rollupAbsent(rfa *rollupFuncArg) float64 {
|
|
if len(rfa.values) == 0 {
|
|
return 1
|
|
}
|
|
return nan
|
|
}
|
|
|
|
func rollupCount(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return 0
|
|
}
|
|
return float64(len(values))
|
|
}
|
|
|
|
func rollupStddev(rfa *rollupFuncArg) float64 {
|
|
stdvar := rollupStdvar(rfa)
|
|
return math.Sqrt(stdvar)
|
|
}
|
|
|
|
func rollupStdvar(rfa *rollupFuncArg) float64 {
|
|
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
|
|
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return 0
|
|
}
|
|
if len(values) == 1 {
|
|
// Fast path.
|
|
return values[0]
|
|
}
|
|
var avg float64
|
|
var count float64
|
|
var q float64
|
|
for _, v := range values {
|
|
count++
|
|
avgNew := avg + (v-avg)/count
|
|
q += (v - avg) * (v - avgNew)
|
|
avg = avgNew
|
|
}
|
|
return q / count
|
|
}
|
|
|
|
func rollupDelta(rfa *rollupFuncArg) float64 {
|
|
return rollupDeltaInternal(rfa, false)
|
|
}
|
|
|
|
func rollupIncrease(rfa *rollupFuncArg) float64 {
|
|
return rollupDeltaInternal(rfa, true)
|
|
}
|
|
|
|
func rollupDeltaInternal(rfa *rollupFuncArg, canUseRealPrevValue bool) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
prevValue := rfa.prevValue
|
|
if math.IsNaN(prevValue) {
|
|
if len(values) == 0 {
|
|
return nan
|
|
}
|
|
// Assume that the previous non-existing value was 0.
|
|
prevValue = 0
|
|
if canUseRealPrevValue && !math.IsNaN(rfa.prevValue) {
|
|
prevValue = rfa.prevValue
|
|
}
|
|
}
|
|
if len(values) == 0 {
|
|
// Assume that the value didn't change on the given interval.
|
|
return 0
|
|
}
|
|
return values[len(values)-1] - prevValue
|
|
}
|
|
|
|
func rollupIdelta(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
// Assume that the value didn't change on the given interval.
|
|
return 0
|
|
}
|
|
lastValue := values[len(values)-1]
|
|
values = values[:len(values)-1]
|
|
if len(values) == 0 {
|
|
prevValue := rfa.prevValue
|
|
if math.IsNaN(prevValue) {
|
|
// Assume that the previous non-existing value was 0.
|
|
return lastValue
|
|
}
|
|
return lastValue - prevValue
|
|
}
|
|
return lastValue - values[len(values)-1]
|
|
}
|
|
|
|
func rollupDerivSlow(rfa *rollupFuncArg) float64 {
|
|
// Use linear regression like Prometheus does.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/73
|
|
_, k := linearRegression(rfa)
|
|
return k
|
|
}
|
|
|
|
func rollupDerivFast(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
timestamps := rfa.timestamps
|
|
prevValue := rfa.prevValue
|
|
prevTimestamp := rfa.prevTimestamp
|
|
if math.IsNaN(prevValue) {
|
|
if len(values) == 0 {
|
|
return nan
|
|
}
|
|
if len(values) == 1 {
|
|
// It is impossible to determine the duration during which the value changed
|
|
// from 0 to the current value.
|
|
// The following attempts didn't work well:
|
|
// - using scrape interval as the duration. It fails on Prometheus restarts when it
|
|
// skips scraping for the counter. This results in too high rate() value for the first point
|
|
// after Prometheus restarts.
|
|
// - using window or step as the duration. It results in too small rate() values for the first
|
|
// points of time series.
|
|
//
|
|
// So just return nan
|
|
return nan
|
|
}
|
|
prevValue = values[0]
|
|
prevTimestamp = timestamps[0]
|
|
} else if len(values) == 0 {
|
|
// Assume that the value didn't change on the given interval.
|
|
return 0
|
|
}
|
|
vEnd := values[len(values)-1]
|
|
tEnd := timestamps[len(timestamps)-1]
|
|
dv := vEnd - prevValue
|
|
dt := float64(tEnd-prevTimestamp) * 1e-3
|
|
return dv / dt
|
|
}
|
|
|
|
func rollupIderiv(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
timestamps := rfa.timestamps
|
|
if len(values) < 2 {
|
|
if len(values) == 0 {
|
|
return nan
|
|
}
|
|
if math.IsNaN(rfa.prevValue) {
|
|
// It is impossible to determine the duration during which the value changed
|
|
// from 0 to the current value.
|
|
// The following attempts didn't work well:
|
|
// - using scrape interval as the duration. It fails on Prometheus restarts when it
|
|
// skips scraping for the counter. This results in too high rate() value for the first point
|
|
// after Prometheus restarts.
|
|
// - using window or step as the duration. It results in too small rate() values for the first
|
|
// points of time series.
|
|
//
|
|
// So just return nan
|
|
return nan
|
|
}
|
|
return (values[0] - rfa.prevValue) / (float64(timestamps[0]-rfa.prevTimestamp) * 1e-3)
|
|
}
|
|
vEnd := values[len(values)-1]
|
|
tEnd := timestamps[len(timestamps)-1]
|
|
values = values[:len(values)-1]
|
|
timestamps = timestamps[:len(timestamps)-1]
|
|
// Skip data points with duplicate timestamps.
|
|
for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= tEnd {
|
|
timestamps = timestamps[:len(timestamps)-1]
|
|
}
|
|
var tStart int64
|
|
var vStart float64
|
|
if len(timestamps) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return 0
|
|
}
|
|
tStart = rfa.prevTimestamp
|
|
vStart = rfa.prevValue
|
|
} else {
|
|
tStart = timestamps[len(timestamps)-1]
|
|
vStart = values[len(timestamps)-1]
|
|
}
|
|
dv := vEnd - vStart
|
|
dt := tEnd - tStart
|
|
return dv / (float64(dt) * 1e-3)
|
|
}
|
|
|
|
func rollupLifetime(rfa *rollupFuncArg) float64 {
|
|
// Calculate the duration between the first and the last data points.
|
|
timestamps := rfa.timestamps
|
|
if math.IsNaN(rfa.prevValue) {
|
|
if len(timestamps) < 2 {
|
|
return nan
|
|
}
|
|
return float64(timestamps[len(timestamps)-1]-timestamps[0]) * 1e-3
|
|
}
|
|
if len(timestamps) == 0 {
|
|
return nan
|
|
}
|
|
return float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) * 1e-3
|
|
}
|
|
|
|
func rollupLag(rfa *rollupFuncArg) float64 {
|
|
// Calculate the duration between the current timestamp and the last data point.
|
|
timestamps := rfa.timestamps
|
|
if len(timestamps) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return float64(rfa.currTimestamp-rfa.prevTimestamp) * 1e-3
|
|
}
|
|
return float64(rfa.currTimestamp-timestamps[len(timestamps)-1]) * 1e-3
|
|
}
|
|
|
|
func rollupScrapeInterval(rfa *rollupFuncArg) float64 {
|
|
// Calculate the average interval between data points.
|
|
timestamps := rfa.timestamps
|
|
if math.IsNaN(rfa.prevValue) {
|
|
if len(timestamps) < 2 {
|
|
return nan
|
|
}
|
|
return float64(timestamps[len(timestamps)-1]-timestamps[0]) * 1e-3 / float64(len(timestamps)-1)
|
|
}
|
|
if len(timestamps) == 0 {
|
|
return nan
|
|
}
|
|
return (float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) * 1e-3) / float64(len(timestamps))
|
|
}
|
|
|
|
func rollupChanges(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
prevValue := rfa.prevValue
|
|
n := 0
|
|
if math.IsNaN(prevValue) {
|
|
if len(values) == 0 {
|
|
return nan
|
|
}
|
|
prevValue = values[0]
|
|
values = values[1:]
|
|
n++
|
|
}
|
|
for _, v := range values {
|
|
if v != prevValue {
|
|
n++
|
|
prevValue = v
|
|
}
|
|
}
|
|
return float64(n)
|
|
}
|
|
|
|
func rollupIncreases(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return 0
|
|
}
|
|
prevValue := rfa.prevValue
|
|
if math.IsNaN(prevValue) {
|
|
prevValue = values[0]
|
|
values = values[1:]
|
|
}
|
|
if len(values) == 0 {
|
|
return 0
|
|
}
|
|
n := 0
|
|
for _, v := range values {
|
|
if v > prevValue {
|
|
n++
|
|
}
|
|
prevValue = v
|
|
}
|
|
return float64(n)
|
|
}
|
|
|
|
// `decreases_over_time` logic is the same as `resets` logic.
|
|
var rollupDecreases = rollupResets
|
|
|
|
func rollupResets(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return 0
|
|
}
|
|
prevValue := rfa.prevValue
|
|
if math.IsNaN(prevValue) {
|
|
prevValue = values[0]
|
|
values = values[1:]
|
|
}
|
|
if len(values) == 0 {
|
|
return 0
|
|
}
|
|
n := 0
|
|
for _, v := range values {
|
|
if v < prevValue {
|
|
n++
|
|
}
|
|
prevValue = v
|
|
}
|
|
return float64(n)
|
|
}
|
|
|
|
func rollupFirst(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
// Do not take into account rfa.prevValue, since it may lead
|
|
// to inconsistent results comparing to Prometheus on broken time series
|
|
// with irregular data points.
|
|
return nan
|
|
}
|
|
return values[0]
|
|
}
|
|
|
|
var rollupDefault = rollupLast
|
|
|
|
func rollupLast(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
// Do not take into account rfa.prevValue, since it may lead
|
|
// to inconsistent results comparing to Prometheus on broken time series
|
|
// with irregular data points.
|
|
return nan
|
|
}
|
|
return values[len(values)-1]
|
|
}
|
|
|
|
func rollupDistinct(rfa *rollupFuncArg) float64 {
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return 0
|
|
}
|
|
m := make(map[float64]struct{})
|
|
for _, v := range values {
|
|
m[v] = struct{}{}
|
|
}
|
|
return float64(len(m))
|
|
}
|
|
|
|
func rollupIntegrate(rfa *rollupFuncArg) float64 {
|
|
prevTimestamp := rfa.prevTimestamp
|
|
|
|
// There is no need in handling NaNs here, since they must be cleaned up
|
|
// before calling rollup funcs.
|
|
values := rfa.values
|
|
timestamps := rfa.timestamps
|
|
if len(values) == 0 {
|
|
if math.IsNaN(rfa.prevValue) {
|
|
return nan
|
|
}
|
|
return 0
|
|
}
|
|
prevValue := rfa.prevValue
|
|
if math.IsNaN(prevValue) {
|
|
prevValue = values[0]
|
|
prevTimestamp = timestamps[0]
|
|
values = values[1:]
|
|
timestamps = timestamps[1:]
|
|
}
|
|
if len(values) == 0 {
|
|
return 0
|
|
}
|
|
|
|
var sum float64
|
|
for i, v := range values {
|
|
timestamp := timestamps[i]
|
|
dt := float64(timestamp-prevTimestamp) * 1e-3
|
|
sum += 0.5 * (v + prevValue) * dt
|
|
prevTimestamp = timestamp
|
|
prevValue = v
|
|
}
|
|
return sum
|
|
}
|
|
|
|
func rollupFake(rfa *rollupFuncArg) float64 {
|
|
logger.Panicf("BUG: rollupFake shouldn't be called")
|
|
return 0
|
|
}
|
|
|
|
func getScalar(arg interface{}, argNum int) ([]float64, error) {
|
|
ts, ok := arg.([]*timeseries)
|
|
if !ok {
|
|
return nil, fmt.Errorf(`unexpected type for arg #%d; got %T; want %T`, argNum+1, arg, ts)
|
|
}
|
|
if len(ts) != 1 {
|
|
return nil, fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(ts))
|
|
}
|
|
return ts[0].Values, nil
|
|
}
|
|
|
|
func getString(tss []*timeseries, argNum int) (string, error) {
|
|
if len(tss) != 1 {
|
|
return "", fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(tss))
|
|
}
|
|
ts := tss[0]
|
|
for _, v := range ts.Values {
|
|
if !math.IsNaN(v) {
|
|
return "", fmt.Errorf(`arg #%d contains non-string timeseries`, argNum+1)
|
|
}
|
|
}
|
|
return string(ts.MetricName.MetricGroup), nil
|
|
}
|
|
|
|
func expectRollupArgsNum(args []interface{}, expectedNum int) error {
|
|
if len(args) == expectedNum {
|
|
return nil
|
|
}
|
|
return fmt.Errorf(`unexpected number of args; got %d; want %d`, len(args), expectedNum)
|
|
}
|
|
|
|
func getRollupFuncArg() *rollupFuncArg {
|
|
v := rfaPool.Get()
|
|
if v == nil {
|
|
return &rollupFuncArg{}
|
|
}
|
|
return v.(*rollupFuncArg)
|
|
}
|
|
|
|
func putRollupFuncArg(rfa *rollupFuncArg) {
|
|
rfa.reset()
|
|
rfaPool.Put(rfa)
|
|
}
|
|
|
|
var rfaPool sync.Pool
|