VictoriaMetrics/app/vmselect/promql/rollup.go
Aliaksandr Valialkin af4a306d7b app/vmselect/promql: properly handle Prometheus staleness marks in removeCounterResets functions
Prometheus stalenss marks shouldn't be changed in removeCounterResets. Otherwise they will be converted to an ordinary NaN values,
which couldn't be removed in dropStaleNaNs() function later. This may result in incorrect calculations for rollup functions.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526
2021-08-14 12:45:31 +03:00

1964 lines
54 KiB
Go

package promql
import (
"flag"
"fmt"
"math"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
"github.com/valyala/histogram"
)
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
"See also '-search.maxStalenessInterval'")
var rollupFuncs = map[string]newRollupFunc{
// Standard rollup funcs from PromQL.
// See funcs accepting range-vector on https://prometheus.io/docs/prometheus/latest/querying/functions/ .
"changes": newRollupFuncOneArg(rollupChanges),
"delta": newRollupFuncOneArg(rollupDelta),
"deriv": newRollupFuncOneArg(rollupDerivSlow),
"deriv_fast": newRollupFuncOneArg(rollupDerivFast),
"holt_winters": newRollupHoltWinters,
"idelta": newRollupFuncOneArg(rollupIdelta),
"increase": newRollupFuncOneArg(rollupDelta), // + rollupFuncsRemoveCounterResets
"irate": newRollupFuncOneArg(rollupIderiv), // + rollupFuncsRemoveCounterResets
"predict_linear": newRollupPredictLinear,
"rate": newRollupFuncOneArg(rollupDerivFast), // + rollupFuncsRemoveCounterResets
"resets": newRollupFuncOneArg(rollupResets),
"avg_over_time": newRollupFuncOneArg(rollupAvg),
"min_over_time": newRollupFuncOneArg(rollupMin),
"max_over_time": newRollupFuncOneArg(rollupMax),
"sum_over_time": newRollupFuncOneArg(rollupSum),
"count_over_time": newRollupFuncOneArg(rollupCount),
"quantile_over_time": newRollupQuantile,
"stddev_over_time": newRollupFuncOneArg(rollupStddev),
"stdvar_over_time": newRollupFuncOneArg(rollupStdvar),
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
"present_over_time": newRollupFuncOneArg(rollupPresent),
// Additional rollup funcs.
"default_rollup": newRollupFuncOneArg(rollupDefault), // default rollup func
"range_over_time": newRollupFuncOneArg(rollupRange),
"sum2_over_time": newRollupFuncOneArg(rollupSum2),
"geomean_over_time": newRollupFuncOneArg(rollupGeomean),
"first_over_time": newRollupFuncOneArg(rollupFirst),
"last_over_time": newRollupFuncOneArg(rollupLast),
"distinct_over_time": newRollupFuncOneArg(rollupDistinct),
"increases_over_time": newRollupFuncOneArg(rollupIncreases),
"decreases_over_time": newRollupFuncOneArg(rollupDecreases),
"increase_pure": newRollupFuncOneArg(rollupIncreasePure), // + rollupFuncsRemoveCounterResets
"integrate": newRollupFuncOneArg(rollupIntegrate),
"ideriv": newRollupFuncOneArg(rollupIderiv),
"lifetime": newRollupFuncOneArg(rollupLifetime),
"lag": newRollupFuncOneArg(rollupLag),
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
"tmin_over_time": newRollupFuncOneArg(rollupTmin),
"tmax_over_time": newRollupFuncOneArg(rollupTmax),
"tfirst_over_time": newRollupFuncOneArg(rollupTfirst),
"tlast_over_time": newRollupFuncOneArg(rollupTlast),
"share_le_over_time": newRollupShareLE,
"share_gt_over_time": newRollupShareGT,
"count_le_over_time": newRollupCountLE,
"count_gt_over_time": newRollupCountGT,
"count_eq_over_time": newRollupCountEQ,
"count_ne_over_time": newRollupCountNE,
"histogram_over_time": newRollupFuncOneArg(rollupHistogram),
"rollup": newRollupFuncOneArg(rollupFake),
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_deriv": newRollupFuncOneArg(rollupFake),
"rollup_delta": newRollupFuncOneArg(rollupFake),
"rollup_increase": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
"rollup_candlestick": newRollupFuncOneArg(rollupFake),
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
"hoeffding_bound_upper": newRollupHoeffdingBoundUpper,
"hoeffding_bound_lower": newRollupHoeffdingBoundLower,
"ascent_over_time": newRollupFuncOneArg(rollupAscentOverTime),
"descent_over_time": newRollupFuncOneArg(rollupDescentOverTime),
"zscore_over_time": newRollupFuncOneArg(rollupZScoreOverTime),
// `timestamp` function must return timestamp for the last datapoint on the current window
// in order to properly handle offset and timestamps unaligned to the current step.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.
"timestamp": newRollupFuncOneArg(rollupTlast),
// See https://en.wikipedia.org/wiki/Mode_(statistics)
"mode_over_time": newRollupFuncOneArg(rollupModeOverTime),
"rate_over_sum": newRollupFuncOneArg(rollupRateOverSum),
}
// rollupAggrFuncs are functions that can be passed to `aggr_over_time()`
var rollupAggrFuncs = map[string]rollupFunc{
// Standard rollup funcs from PromQL.
"changes": rollupChanges,
"delta": rollupDelta,
"deriv": rollupDerivSlow,
"deriv_fast": rollupDerivFast,
"idelta": rollupIdelta,
"increase": rollupDelta, // + rollupFuncsRemoveCounterResets
"irate": rollupIderiv, // + rollupFuncsRemoveCounterResets
"rate": rollupDerivFast, // + rollupFuncsRemoveCounterResets
"resets": rollupResets,
"avg_over_time": rollupAvg,
"min_over_time": rollupMin,
"max_over_time": rollupMax,
"sum_over_time": rollupSum,
"count_over_time": rollupCount,
"stddev_over_time": rollupStddev,
"stdvar_over_time": rollupStdvar,
"absent_over_time": rollupAbsent,
"present_over_time": rollupPresent,
// Additional rollup funcs.
"range_over_time": rollupRange,
"sum2_over_time": rollupSum2,
"geomean_over_time": rollupGeomean,
"first_over_time": rollupFirst,
"last_over_time": rollupLast,
"distinct_over_time": rollupDistinct,
"increases_over_time": rollupIncreases,
"decreases_over_time": rollupDecreases,
"increase_pure": rollupIncreasePure,
"integrate": rollupIntegrate,
"ideriv": rollupIderiv,
"lifetime": rollupLifetime,
"lag": rollupLag,
"scrape_interval": rollupScrapeInterval,
"tmin_over_time": rollupTmin,
"tmax_over_time": rollupTmax,
"tfirst_over_time": rollupTfirst,
"tlast_over_time": rollupTlast,
"ascent_over_time": rollupAscentOverTime,
"descent_over_time": rollupDescentOverTime,
"zscore_over_time": rollupZScoreOverTime,
"timestamp": rollupTlast,
"mode_over_time": rollupModeOverTime,
"rate_over_sum": rollupRateOverSum,
}
var rollupFuncsCannotAdjustWindow = map[string]bool{
"changes": true,
"delta": true,
"holt_winters": true,
"idelta": true,
"increase": true,
"predict_linear": true,
"resets": true,
"avg_over_time": true,
"sum_over_time": true,
"count_over_time": true,
"quantile_over_time": true,
"stddev_over_time": true,
"stdvar_over_time": true,
"absent_over_time": true,
"present_over_time": true,
"sum2_over_time": true,
"geomean_over_time": true,
"distinct_over_time": true,
"increases_over_time": true,
"decreases_over_time": true,
"increase_pure": true,
"integrate": true,
"ascent_over_time": true,
"descent_over_time": true,
"zscore_over_time": true,
}
var rollupFuncsRemoveCounterResets = map[string]bool{
"increase": true,
"irate": true,
"rate": true,
"rollup_rate": true,
"rollup_increase": true,
"increase_pure": true,
}
var rollupFuncsKeepMetricGroup = map[string]bool{
"holt_winters": true,
"predict_linear": true,
"default_rollup": true,
"avg_over_time": true,
"min_over_time": true,
"max_over_time": true,
"quantile_over_time": true,
"rollup": true,
"geomean_over_time": true,
"hoeffding_bound_lower": true,
"hoeffding_bound_upper": true,
"first_over_time": true,
"last_over_time": true,
"mode_over_time": true,
}
func getRollupAggrFuncNames(expr metricsql.Expr) ([]string, error) {
afe, ok := expr.(*metricsql.AggrFuncExpr)
if ok {
// This is for incremental aggregate function case:
//
// sum(aggr_over_time(...))
//
// See aggr_incremental.go for details.
expr = afe.Args[0]
}
fe, ok := expr.(*metricsql.FuncExpr)
if !ok {
logger.Panicf("BUG: unexpected expression; want metricsql.FuncExpr; got %T; value: %s", expr, expr.AppendString(nil))
}
if fe.Name != "aggr_over_time" {
logger.Panicf("BUG: unexpected function name: %q; want `aggr_over_time`", fe.Name)
}
if len(fe.Args) != 2 {
return nil, fmt.Errorf("unexpected number of args to aggr_over_time(); got %d; want %d", len(fe.Args), 2)
}
arg := fe.Args[0]
var aggrFuncNames []string
if se, ok := arg.(*metricsql.StringExpr); ok {
aggrFuncNames = append(aggrFuncNames, se.S)
} else {
fe, ok := arg.(*metricsql.FuncExpr)
if !ok || fe.Name != "" {
return nil, fmt.Errorf("%s cannot be passed to aggr_over_time(); expecting quoted aggregate function name or a list of quoted aggregate function names",
arg.AppendString(nil))
}
for _, e := range fe.Args {
se, ok := e.(*metricsql.StringExpr)
if !ok {
return nil, fmt.Errorf("%s cannot be passed here; expecting quoted aggregate function name", e.AppendString(nil))
}
aggrFuncNames = append(aggrFuncNames, se.S)
}
}
if len(aggrFuncNames) == 0 {
return nil, fmt.Errorf("aggr_over_time() must contain at least a single aggregate function name")
}
for _, s := range aggrFuncNames {
if rollupAggrFuncs[s] == nil {
return nil, fmt.Errorf("%q cannot be used in `aggr_over_time` function; expecting quoted aggregate function name", s)
}
}
return aggrFuncNames, nil
}
func getRollupArgIdx(funcName string) int {
funcName = strings.ToLower(funcName)
if rollupFuncs[funcName] == nil {
logger.Panicf("BUG: getRollupArgIdx is called for non-rollup func %q", funcName)
}
switch funcName {
case "quantile_over_time", "aggr_over_time",
"hoeffding_bound_lower", "hoeffding_bound_upper":
return 1
default:
return 0
}
}
func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) (
func(values []float64, timestamps []int64), []*rollupConfig, error) {
preFunc := func(values []float64, timestamps []int64) {}
if rollupFuncsRemoveCounterResets[name] {
preFunc = func(values []float64, timestamps []int64) {
removeCounterResets(values)
}
}
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
return &rollupConfig{
TagValue: tagValue,
Func: rf,
Start: start,
End: end,
Step: step,
Window: window,
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
CanDropStalePoints: name == "default_rollup",
LookbackDelta: lookbackDelta,
Timestamps: sharedTimestamps,
}
}
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
dst = append(dst, newRollupConfig(rollupMin, "min"))
dst = append(dst, newRollupConfig(rollupMax, "max"))
dst = append(dst, newRollupConfig(rollupAvg, "avg"))
return dst
}
var rcs []*rollupConfig
switch name {
case "rollup":
rcs = appendRollupConfigs(rcs)
case "rollup_rate", "rollup_deriv":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
preFuncPrev(values, timestamps)
derivValues(values, timestamps)
}
rcs = appendRollupConfigs(rcs)
case "rollup_increase", "rollup_delta":
preFuncPrev := preFunc
preFunc = func(values []float64, timestamps []int64) {
preFuncPrev(values, timestamps)
deltaValues(values)
}
rcs = appendRollupConfigs(rcs)
case "rollup_candlestick":
rcs = append(rcs, newRollupConfig(rollupOpen, "open"))
rcs = append(rcs, newRollupConfig(rollupClose, "close"))
rcs = append(rcs, newRollupConfig(rollupLow, "low"))
rcs = append(rcs, newRollupConfig(rollupHigh, "high"))
case "aggr_over_time":
aggrFuncNames, err := getRollupAggrFuncNames(expr)
if err != nil {
return nil, nil, fmt.Errorf("invalid args to %s: %w", expr.AppendString(nil), err)
}
for _, aggrFuncName := range aggrFuncNames {
if rollupFuncsRemoveCounterResets[aggrFuncName] {
// There is no need to save the previous preFunc, since it is either empty or the same.
preFunc = func(values []float64, timestamps []int64) {
removeCounterResets(values)
}
}
rf := rollupAggrFuncs[aggrFuncName]
rcs = append(rcs, newRollupConfig(rf, aggrFuncName))
}
default:
rcs = append(rcs, newRollupConfig(rf, ""))
}
return preFunc, rcs, nil
}
func getRollupFunc(funcName string) newRollupFunc {
funcName = strings.ToLower(funcName)
return rollupFuncs[funcName]
}
type rollupFuncArg struct {
// The value preceeding values if it fits staleness interval.
prevValue float64
// The timestamp for prevValue.
prevTimestamp int64
// Values that fit window ending at currTimestamp.
values []float64
// Timestamps for values.
timestamps []int64
// Real value preceeding values without restrictions on staleness interval.
realPrevValue float64
// Real value which goes after values.
realNextValue float64
// Current timestamp for rollup evaluation.
currTimestamp int64
// Index for the currently evaluated point relative to time range for query evaluation.
idx int
// Time window for rollup calculations.
window int64
tsm *timeseriesMap
}
func (rfa *rollupFuncArg) reset() {
rfa.prevValue = 0
rfa.prevTimestamp = 0
rfa.values = nil
rfa.timestamps = nil
rfa.currTimestamp = 0
rfa.idx = 0
rfa.window = 0
rfa.tsm = nil
}
// rollupFunc must return rollup value for the given rfa.
//
// prevValue may be nan, values and timestamps may be empty.
type rollupFunc func(rfa *rollupFuncArg) float64
type rollupConfig struct {
// This tag value must be added to "rollup" tag if non-empty.
TagValue string
Func rollupFunc
Start int64
End int64
Step int64
Window int64
// Whether window may be adjusted to 2 x interval between data points.
// This is needed for functions which have dt in the denominator
// such as rate, deriv, etc.
// Without the adjustment their value would jump in unexpected directions
// when using window smaller than 2 x scrape_interval.
MayAdjustWindow bool
// Whether points after Prometheus stale marks can be dropped during rollup calculations.
// Stale points can be dropped only if `default_rollup()` function is used.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
CanDropStalePoints bool
Timestamps []int64
// LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world.
LookbackDelta int64
}
var (
nan = math.NaN()
inf = math.Inf(1)
)
// The maximum interval without previous rows.
const maxSilenceInterval = 5 * 60 * 1000
type timeseriesMap struct {
origin *timeseries
labelName string
h metrics.Histogram
m map[string]*timeseries
}
func newTimeseriesMap(funcName string, sharedTimestamps []int64, mnSrc *storage.MetricName) *timeseriesMap {
if strings.ToLower(funcName) != "histogram_over_time" {
return nil
}
values := make([]float64, len(sharedTimestamps))
for i := range values {
values[i] = nan
}
var origin timeseries
origin.MetricName.CopyFrom(mnSrc)
origin.MetricName.ResetMetricGroup()
origin.Timestamps = sharedTimestamps
origin.Values = values
return &timeseriesMap{
origin: &origin,
labelName: "vmrange",
m: make(map[string]*timeseries),
}
}
func (tsm *timeseriesMap) AppendTimeseriesTo(dst []*timeseries) []*timeseries {
for _, ts := range tsm.m {
dst = append(dst, ts)
}
return dst
}
func (tsm *timeseriesMap) GetOrCreateTimeseries(labelValue string) *timeseries {
ts := tsm.m[labelValue]
if ts != nil {
return ts
}
ts = &timeseries{}
ts.CopyFromShallowTimestamps(tsm.origin)
ts.MetricName.RemoveTag(tsm.labelName)
ts.MetricName.AddTag(tsm.labelName, labelValue)
tsm.m[labelValue] = ts
return ts
}
// Do calculates rollups for the given timestamps and values, appends
// them to dstValues and returns results.
//
// rc.Timestamps are used as timestamps for dstValues.
//
// timestamps must cover time range [rc.Start - rc.Window - maxSilenceInterval ... rc.End].
//
// Do cannot be called from concurrent goroutines.
func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []int64) []float64 {
return rc.doInternal(dstValues, nil, values, timestamps)
}
// DoTimeseriesMap calculates rollups for the given timestamps and values and puts them to tsm.
func (rc *rollupConfig) DoTimeseriesMap(tsm *timeseriesMap, values []float64, timestamps []int64) {
ts := getTimeseries()
ts.Values = rc.doInternal(ts.Values[:0], tsm, values, timestamps)
putTimeseries(ts)
}
func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, values []float64, timestamps []int64) []float64 {
// Sanity checks.
if rc.Step <= 0 {
logger.Panicf("BUG: Step must be bigger than 0; got %d", rc.Step)
}
if rc.Start > rc.End {
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", rc.Start, rc.End)
}
if rc.Window < 0 {
logger.Panicf("BUG: Window must be non-negative; got %d", rc.Window)
}
if err := ValidateMaxPointsPerTimeseries(rc.Start, rc.End, rc.Step); err != nil {
logger.Panicf("BUG: %s; this must be validated before the call to rollupConfig.Do", err)
}
// Extend dstValues in order to remove mallocs below.
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
if !rc.CanDropStalePoints {
// Remove Prometheus staleness marks from values, so rollup functions don't hit NaN values.
values, timestamps = dropStaleNaNs(values, timestamps)
}
scrapeInterval := getScrapeInterval(timestamps)
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
maxPrevInterval = rc.LookbackDelta
}
if *minStalenessInterval > 0 {
if msi := minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
maxPrevInterval = msi
}
}
window := rc.Window
if window <= 0 {
window = rc.Step
if rc.CanDropStalePoints && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
// Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
window = rc.LookbackDelta
}
}
if rc.MayAdjustWindow && window < maxPrevInterval {
window = maxPrevInterval
}
rfa := getRollupFuncArg()
rfa.idx = 0
rfa.window = window
rfa.tsm = tsm
i := 0
j := 0
ni := 0
nj := 0
f := rc.Func
for _, tEnd := range rc.Timestamps {
tStart := tEnd - window
ni = seekFirstTimestampIdxAfter(timestamps[i:], tStart, ni)
i += ni
if j < i {
j = i
}
nj = seekFirstTimestampIdxAfter(timestamps[j:], tEnd, nj)
j += nj
rfa.prevValue = nan
rfa.prevTimestamp = tStart - maxPrevInterval
if i < len(timestamps) && i > 0 && timestamps[i-1] > rfa.prevTimestamp {
rfa.prevValue = values[i-1]
rfa.prevTimestamp = timestamps[i-1]
}
rfa.values = values[i:j]
rfa.timestamps = timestamps[i:j]
if i > 0 {
rfa.realPrevValue = values[i-1]
} else {
rfa.realPrevValue = nan
}
if j < len(values) {
rfa.realNextValue = values[j]
} else {
rfa.realNextValue = nan
}
rfa.currTimestamp = tEnd
value := f(rfa)
rfa.idx++
dstValues = append(dstValues, value)
}
putRollupFuncArg(rfa)
return dstValues
}
func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) {
hasStaleSamples := false
for _, v := range values {
if decimal.IsStaleNaN(v) {
hasStaleSamples = true
break
}
}
if !hasStaleSamples {
// Fast path: values have noe Prometheus staleness marks.
return values, timestamps
}
// Slow path: drop Prometheus staleness marks from values.
dstValues := make([]float64, 0, len(values))
dstTimestamps := make([]int64, 0, len(timestamps))
for i, v := range values {
if decimal.IsStaleNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, timestamps[i])
}
return dstValues, dstTimestamps
}
func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
return 0
}
startIdx := nHint - 2
if startIdx < 0 {
startIdx = 0
}
if startIdx >= len(timestamps) {
startIdx = len(timestamps) - 1
}
endIdx := nHint + 2
if endIdx > len(timestamps) {
endIdx = len(timestamps)
}
if startIdx > 0 && timestamps[startIdx] <= seekTimestamp {
timestamps = timestamps[startIdx:]
endIdx -= startIdx
} else {
startIdx = 0
}
if endIdx < len(timestamps) && timestamps[endIdx] > seekTimestamp {
timestamps = timestamps[:endIdx]
}
if len(timestamps) < 16 {
// Fast path: the number of timestamps to search is small, so scan them all.
for i, timestamp := range timestamps {
if timestamp > seekTimestamp {
return startIdx + i
}
}
return startIdx + len(timestamps)
}
// Slow path: too big len(timestamps), so use binary search.
i := binarySearchInt64(timestamps, seekTimestamp+1)
return startIdx + int(i)
}
func binarySearchInt64(a []int64, v int64) uint {
// Copy-pasted sort.Search from https://golang.org/src/sort/search.go?s=2246:2286#L49
i, j := uint(0), uint(len(a))
for i < j {
h := (i + j) >> 1
if h < uint(len(a)) && a[h] < v {
i = h + 1
} else {
j = h
}
}
return i
}
func getScrapeInterval(timestamps []int64) int64 {
if len(timestamps) < 2 {
return int64(maxSilenceInterval)
}
// Estimate scrape interval as 0.6 quantile for the first 20 intervals.
h := histogram.GetFast()
tsPrev := timestamps[0]
timestamps = timestamps[1:]
if len(timestamps) > 20 {
timestamps = timestamps[:20]
}
for _, ts := range timestamps {
h.Update(float64(ts - tsPrev))
tsPrev = ts
}
scrapeInterval := int64(h.Quantile(0.6))
histogram.PutFast(h)
if scrapeInterval <= 0 {
return int64(maxSilenceInterval)
}
return scrapeInterval
}
func getMaxPrevInterval(scrapeInterval int64) int64 {
// Increase scrapeInterval more for smaller scrape intervals in order to hide possible gaps
// when high jitter is present.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/139 .
if scrapeInterval <= 2*1000 {
return scrapeInterval + 4*scrapeInterval
}
if scrapeInterval <= 4*1000 {
return scrapeInterval + 2*scrapeInterval
}
if scrapeInterval <= 8*1000 {
return scrapeInterval + scrapeInterval
}
if scrapeInterval <= 16*1000 {
return scrapeInterval + scrapeInterval/2
}
if scrapeInterval <= 32*1000 {
return scrapeInterval + scrapeInterval/4
}
return scrapeInterval + scrapeInterval/8
}
func removeCounterResets(values []float64) {
if len(values) == 0 {
return
}
var correction float64
prevValue := values[0]
for i, v := range values {
d := v - prevValue
if d < 0 {
if (-d * 8) < prevValue {
// This is likely jitter from `Prometheus HA pairs`.
// Just substitute v with prevValue.
v = prevValue
} else {
correction += prevValue
}
}
prevValue = v
if !decimal.IsStaleNaN(v) {
values[i] = v + correction
}
}
}
func deltaValues(values []float64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
prevDelta := float64(0)
prevValue := values[0]
for i, v := range values[1:] {
prevDelta = v - prevValue
values[i] = prevDelta
prevValue = v
}
values[len(values)-1] = prevDelta
}
func derivValues(values []float64, timestamps []int64) {
// There is no need in handling NaNs here, since they are impossible
// on values from vmstorage.
if len(values) == 0 {
return
}
prevDeriv := float64(0)
prevValue := values[0]
prevTs := timestamps[0]
for i, v := range values[1:] {
ts := timestamps[i+1]
if ts == prevTs {
// Use the previous value for duplicate timestamps.
values[i] = prevDeriv
continue
}
dt := float64(ts-prevTs) / 1e3
prevDeriv = (v - prevValue) / dt
values[i] = prevDeriv
prevValue = v
prevTs = ts
}
values[len(values)-1] = prevDeriv
}
type newRollupFunc func(args []interface{}) (rollupFunc, error)
func newRollupFuncOneArg(rf rollupFunc) newRollupFunc {
return func(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 1); err != nil {
return nil, err
}
return rf, nil
}
}
func newRollupFuncTwoArgs(rf rollupFunc) newRollupFunc {
return func(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
return rf, nil
}
}
func newRollupHoltWinters(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 3); err != nil {
return nil, err
}
sfs, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
tfs, err := getScalar(args[2], 2)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
sf := sfs[rfa.idx]
if sf <= 0 || sf >= 1 {
return nan
}
tf := tfs[rfa.idx]
if tf <= 0 || tf >= 1 {
return nan
}
// See https://en.wikipedia.org/wiki/Exponential_smoothing#Double_exponential_smoothing .
// TODO: determine whether this shit really works.
s0 := rfa.prevValue
if math.IsNaN(s0) {
s0 = values[0]
values = values[1:]
if len(values) == 0 {
return s0
}
}
b0 := values[0] - s0
for _, v := range values {
s1 := sf*v + (1-sf)*(s0+b0)
b1 := tf*(s1-s0) + (1-tf)*b0
s0 = s1
b0 = b1
}
return s0
}
return rf, nil
}
func newRollupPredictLinear(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
secs, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
v, k := linearRegression(rfa)
if math.IsNaN(v) {
return nan
}
sec := secs[rfa.idx]
return v + k*sec
}
return rf, nil
}
func linearRegression(rfa *rollupFuncArg) (float64, float64) {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
return rfa.prevValue, 0
}
// See https://en.wikipedia.org/wiki/Simple_linear_regression#Numerical_example
tFirst := rfa.prevTimestamp
vSum := rfa.prevValue
tSum := float64(0)
tvSum := float64(0)
ttSum := float64(0)
n := 1.0
if math.IsNaN(rfa.prevValue) {
tFirst = timestamps[0]
vSum = 0
n = 0
}
for i, v := range values {
dt := float64(timestamps[i]-tFirst) / 1e3
vSum += v
tSum += dt
tvSum += dt * v
ttSum += dt * dt
}
n += float64(len(values))
if n == 1 {
return vSum, 0
}
k := (n*tvSum - tSum*vSum) / (n*ttSum - tSum*tSum)
v := (vSum - k*tSum) / n
// Adjust v to the last timestamp on the given time range.
v += k * (float64(timestamps[len(timestamps)-1]-tFirst) / 1e3)
return v, k
}
func newRollupShareLE(args []interface{}) (rollupFunc, error) {
return newRollupShareFilter(args, countFilterLE)
}
func countFilterLE(values []float64, le float64) int {
n := 0
for _, v := range values {
if v <= le {
n++
}
}
return n
}
func newRollupShareGT(args []interface{}) (rollupFunc, error) {
return newRollupShareFilter(args, countFilterGT)
}
func countFilterGT(values []float64, gt float64) int {
n := 0
for _, v := range values {
if v > gt {
n++
}
}
return n
}
func countFilterEQ(values []float64, eq float64) int {
n := 0
for _, v := range values {
if v == eq {
n++
}
}
return n
}
func countFilterNE(values []float64, ne float64) int {
n := 0
for _, v := range values {
if v != ne {
n++
}
}
return n
}
func newRollupShareFilter(args []interface{}, countFilter func(values []float64, limit float64) int) (rollupFunc, error) {
rf, err := newRollupCountFilter(args, countFilter)
if err != nil {
return nil, err
}
return func(rfa *rollupFuncArg) float64 {
n := rf(rfa)
return n / float64(len(rfa.values))
}, nil
}
func newRollupCountLE(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterLE)
}
func newRollupCountGT(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterGT)
}
func newRollupCountEQ(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterEQ)
}
func newRollupCountNE(args []interface{}) (rollupFunc, error) {
return newRollupCountFilter(args, countFilterNE)
}
func newRollupCountFilter(args []interface{}, countFilter func(values []float64, limit float64) int) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
limits, err := getScalar(args[1], 1)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan
}
limit := limits[rfa.idx]
return float64(countFilter(values, limit))
}
return rf, nil
}
func newRollupHoeffdingBoundLower(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
bound, avg := rollupHoeffdingBoundInternal(rfa, phis)
return avg - bound
}
return rf, nil
}
func newRollupHoeffdingBoundUpper(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
bound, avg := rollupHoeffdingBoundInternal(rfa, phis)
return avg + bound
}
return rf, nil
}
func rollupHoeffdingBoundInternal(rfa *rollupFuncArg, phis []float64) (float64, float64) {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return nan, nan
}
if len(values) == 1 {
return 0, values[0]
}
vMax := rollupMax(rfa)
vMin := rollupMin(rfa)
vAvg := rollupAvg(rfa)
vRange := vMax - vMin
if vRange <= 0 {
return 0, vAvg
}
phi := phis[rfa.idx]
if phi >= 1 {
return inf, vAvg
}
if phi <= 0 {
return 0, vAvg
}
// See https://en.wikipedia.org/wiki/Hoeffding%27s_inequality
// and https://www.youtube.com/watch?v=6UwcqiNsZ8U&feature=youtu.be&t=1237
bound := vRange * math.Sqrt(math.Log(1/(1-phi))/(2*float64(len(values))))
return bound, vAvg
}
func newRollupQuantile(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err
}
phis, err := getScalar(args[0], 0)
if err != nil {
return nil, err
}
rf := func(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
if len(values) == 1 {
// Fast path - only a single value.
return values[0]
}
hf := histogram.GetFast()
for _, v := range values {
hf.Update(v)
}
phi := phis[rfa.idx]
qv := hf.Quantile(phi)
histogram.PutFast(hf)
return qv
}
return rf, nil
}
func rollupHistogram(rfa *rollupFuncArg) float64 {
values := rfa.values
tsm := rfa.tsm
tsm.h.Reset()
for _, v := range values {
tsm.h.Update(v)
}
idx := rfa.idx
tsm.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ts := tsm.GetOrCreateTimeseries(vmrange)
ts.Values[idx] = float64(count)
})
return nan
}
func rollupAvg(rfa *rollupFuncArg) float64 {
// Do not use `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation,
// since it is slower and has no significant benefits in precision.
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
var sum float64
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func rollupMin(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
minValue := values[0]
for _, v := range values {
if v < minValue {
minValue = v
}
}
return minValue
}
func rollupMax(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
maxValue := values[0]
for _, v := range values {
if v > maxValue {
maxValue = v
}
}
return maxValue
}
func rollupTmin(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
return nan
}
minValue := values[0]
minTimestamp := timestamps[0]
for i, v := range values {
// Get the last timestamp for the minimum value as most users expect.
if v <= minValue {
minValue = v
minTimestamp = timestamps[i]
}
}
return float64(minTimestamp) / 1e3
}
func rollupTmax(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) == 0 {
return nan
}
maxValue := values[0]
maxTimestamp := timestamps[0]
for i, v := range values {
// Get the last timestamp for the maximum value as most users expect.
if v >= maxValue {
maxValue = v
maxTimestamp = timestamps[i]
}
}
return float64(maxTimestamp) / 1e3
}
func rollupTfirst(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
// Do not take into account rfa.prevTimestamp, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return float64(timestamps[0]) / 1e3
}
func rollupTlast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
// Do not take into account rfa.prevTimestamp, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return float64(timestamps[len(timestamps)-1]) / 1e3
}
func rollupSum(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
var sum float64
for _, v := range values {
sum += v
}
return sum
}
func rollupRateOverSum(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
// Assume that the value didn't change since rfa.prevValue.
return 0
}
dt := rfa.window
if !math.IsNaN(rfa.prevValue) {
dt = timestamps[len(timestamps)-1] - rfa.prevTimestamp
}
sum := float64(0)
for _, v := range rfa.values {
sum += v
}
return sum / (float64(dt) / 1e3)
}
func rollupRange(rfa *rollupFuncArg) float64 {
max := rollupMax(rfa)
min := rollupMin(rfa)
return max - min
}
func rollupSum2(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue * rfa.prevValue
}
var sum2 float64
for _, v := range values {
sum2 += v * v
}
return sum2
}
func rollupGeomean(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
return rfa.prevValue
}
p := 1.0
for _, v := range values {
p *= v
}
return math.Pow(p, 1/float64(len(values)))
}
func rollupAbsent(rfa *rollupFuncArg) float64 {
if len(rfa.values) == 0 {
return 1
}
return nan
}
func rollupPresent(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
if len(rfa.values) > 0 {
return 1
}
return nan
}
func rollupCount(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
return float64(len(values))
}
func rollupStddev(rfa *rollupFuncArg) float64 {
stdvar := rollupStdvar(rfa)
return math.Sqrt(stdvar)
}
func rollupStdvar(rfa *rollupFuncArg) float64 {
// See `Rapid calculation methods` at https://en.wikipedia.org/wiki/Standard_deviation
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
if len(values) == 1 {
// Fast path.
return values[0]
}
var avg float64
var count float64
var q float64
for _, v := range values {
count++
avgNew := avg + (v-avg)/count
q += (v - avg) * (v - avgNew)
avg = avgNew
}
return q / count
}
func rollupIncreasePure(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
// restore to the real value because of potential staleness reset
prevValue := rfa.realPrevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
// Assume the counter starts from 0.
prevValue = 0
}
if len(values) == 0 {
// Assume the counter didsn't change since prevValue.
return 0
}
return values[len(values)-1] - prevValue
}
func rollupDelta(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
if !math.IsNaN(rfa.realPrevValue) {
// Assume that the value didn't change during the current gap.
// This should fix high delta() and increase() values at the end of gaps.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/894
return values[len(values)-1] - rfa.realPrevValue
}
// Assume that the previous non-existing value was 0 only in the following cases:
//
// - If the delta with the next value equals to 0.
// This is the case for slow-changing counter - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/962
// - If the first value doesn't exceed too much the delta with the next value.
//
// This should prevent from improper increase() results for os-level counters
// such as cpu time or bytes sent over the network interface.
// These counters may start long ago before the first value appears in the db.
//
// This also should prevent from improper increase() results when a part of label values are changed
// without counter reset.
var d float64
if len(values) > 1 {
d = values[1] - values[0]
} else if !math.IsNaN(rfa.realNextValue) {
d = rfa.realNextValue - values[0]
}
if d == 0 {
d = 10
}
if math.Abs(values[0]) < 10*(math.Abs(d)+1) {
prevValue = 0
} else {
prevValue = values[0]
values = values[1:]
}
}
if len(values) == 0 {
// Assume that the value didn't change on the given interval.
return 0
}
return values[len(values)-1] - prevValue
}
func rollupIdelta(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
// Assume that the value didn't change on the given interval.
return 0
}
lastValue := values[len(values)-1]
values = values[:len(values)-1]
if len(values) == 0 {
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
// Assume that the previous non-existing value was 0.
return lastValue
}
return lastValue - prevValue
}
return lastValue - values[len(values)-1]
}
func rollupDerivSlow(rfa *rollupFuncArg) float64 {
// Use linear regression like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/73
_, k := linearRegression(rfa)
return k
}
func rollupDerivFast(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
prevValue := rfa.prevValue
prevTimestamp := rfa.prevTimestamp
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
if len(values) == 1 {
// It is impossible to determine the duration during which the value changed
// from 0 to the current value.
// The following attempts didn't work well:
// - using scrape interval as the duration. It fails on Prometheus restarts when it
// skips scraping for the counter. This results in too high rate() value for the first point
// after Prometheus restarts.
// - using window or step as the duration. It results in too small rate() values for the first
// points of time series.
//
// So just return nan
return nan
}
prevValue = values[0]
prevTimestamp = timestamps[0]
} else if len(values) == 0 {
// Assume that the value didn't change on the given interval.
return 0
}
vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1]
dv := vEnd - prevValue
dt := float64(tEnd-prevTimestamp) / 1e3
return dv / dt
}
func rollupIderiv(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
if len(values) < 2 {
if len(values) == 0 {
return nan
}
if math.IsNaN(rfa.prevValue) {
// It is impossible to determine the duration during which the value changed
// from 0 to the current value.
// The following attempts didn't work well:
// - using scrape interval as the duration. It fails on Prometheus restarts when it
// skips scraping for the counter. This results in too high rate() value for the first point
// after Prometheus restarts.
// - using window or step as the duration. It results in too small rate() values for the first
// points of time series.
//
// So just return nan
return nan
}
return (values[0] - rfa.prevValue) / (float64(timestamps[0]-rfa.prevTimestamp) / 1e3)
}
vEnd := values[len(values)-1]
tEnd := timestamps[len(timestamps)-1]
values = values[:len(values)-1]
timestamps = timestamps[:len(timestamps)-1]
// Skip data points with duplicate timestamps.
for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= tEnd {
timestamps = timestamps[:len(timestamps)-1]
}
var tStart int64
var vStart float64
if len(timestamps) == 0 {
if math.IsNaN(rfa.prevValue) {
return 0
}
tStart = rfa.prevTimestamp
vStart = rfa.prevValue
} else {
tStart = timestamps[len(timestamps)-1]
vStart = values[len(timestamps)-1]
}
dv := vEnd - vStart
dt := tEnd - tStart
return dv / (float64(dt) / 1e3)
}
func rollupLifetime(rfa *rollupFuncArg) float64 {
// Calculate the duration between the first and the last data points.
timestamps := rfa.timestamps
if math.IsNaN(rfa.prevValue) {
if len(timestamps) < 2 {
return nan
}
return float64(timestamps[len(timestamps)-1]-timestamps[0]) / 1e3
}
if len(timestamps) == 0 {
return nan
}
return float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) / 1e3
}
func rollupLag(rfa *rollupFuncArg) float64 {
// Calculate the duration between the current timestamp and the last data point.
timestamps := rfa.timestamps
if len(timestamps) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return float64(rfa.currTimestamp-rfa.prevTimestamp) / 1e3
}
return float64(rfa.currTimestamp-timestamps[len(timestamps)-1]) / 1e3
}
func rollupScrapeInterval(rfa *rollupFuncArg) float64 {
// Calculate the average interval between data points.
timestamps := rfa.timestamps
if math.IsNaN(rfa.prevValue) {
if len(timestamps) < 2 {
return nan
}
return (float64(timestamps[len(timestamps)-1]-timestamps[0]) / 1e3) / float64(len(timestamps)-1)
}
if len(timestamps) == 0 {
return nan
}
return (float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) / 1e3) / float64(len(timestamps))
}
func rollupChanges(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
n := 0
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
n++
}
for _, v := range values {
if v != prevValue {
n++
prevValue = v
}
}
return float64(n)
}
func rollupIncreases(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
prevValue = values[0]
values = values[1:]
}
if len(values) == 0 {
return 0
}
n := 0
for _, v := range values {
if v > prevValue {
n++
}
prevValue = v
}
return float64(n)
}
// `decreases_over_time` logic is the same as `resets` logic.
var rollupDecreases = rollupResets
func rollupResets(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
prevValue = values[0]
values = values[1:]
}
if len(values) == 0 {
return 0
}
n := 0
for _, v := range values {
if v < prevValue {
n++
}
prevValue = v
}
return float64(n)
}
// getCandlestickValues returns a subset of rfa.values suitable for rollup_candlestick
//
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309 for details.
func getCandlestickValues(rfa *rollupFuncArg) []float64 {
currTimestamp := rfa.currTimestamp
timestamps := rfa.timestamps
for len(timestamps) > 0 && timestamps[len(timestamps)-1] >= currTimestamp {
timestamps = timestamps[:len(timestamps)-1]
}
if len(timestamps) == 0 {
return nil
}
return rfa.values[:len(timestamps)]
}
func getFirstValueForCandlestick(rfa *rollupFuncArg) float64 {
if rfa.prevTimestamp+rfa.window >= rfa.currTimestamp {
return rfa.prevValue
}
return nan
}
func rollupOpen(rfa *rollupFuncArg) float64 {
v := getFirstValueForCandlestick(rfa)
if !math.IsNaN(v) {
return v
}
values := getCandlestickValues(rfa)
if len(values) == 0 {
return nan
}
return values[0]
}
func rollupClose(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
if len(values) == 0 {
return getFirstValueForCandlestick(rfa)
}
return values[len(values)-1]
}
func rollupHigh(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
max := getFirstValueForCandlestick(rfa)
if math.IsNaN(max) {
if len(values) == 0 {
return nan
}
max = values[0]
values = values[1:]
}
for _, v := range values {
if v > max {
max = v
}
}
return max
}
func rollupLow(rfa *rollupFuncArg) float64 {
values := getCandlestickValues(rfa)
min := getFirstValueForCandlestick(rfa)
if math.IsNaN(min) {
if len(values) == 0 {
return nan
}
min = values[0]
values = values[1:]
}
for _, v := range values {
if v < min {
min = v
}
}
return min
}
func rollupModeOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
// Copy rfa.values to a.A, since modeNoNaNs modifies a.A contents.
a := float64sPool.Get().(*float64s)
a.A = append(a.A[:0], rfa.values...)
result := modeNoNaNs(rfa.prevValue, a.A)
float64sPool.Put(a)
return result
}
var float64sPool = &sync.Pool{
New: func() interface{} {
return &float64s{}
},
}
type float64s struct {
A []float64
}
func rollupAscentOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
}
s := float64(0)
for _, v := range values {
d := v - prevValue
if d > 0 {
s += d
}
prevValue = v
}
return s
}
func rollupDescentOverTime(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
prevValue := rfa.prevValue
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
values = values[1:]
}
s := float64(0)
for _, v := range values {
d := prevValue - v
if d > 0 {
s += d
}
prevValue = v
}
return s
}
func rollupZScoreOverTime(rfa *rollupFuncArg) float64 {
// See https://about.gitlab.com/blog/2019/07/23/anomaly-detection-using-prometheus/#using-z-score-for-anomaly-detection
scrapeInterval := rollupScrapeInterval(rfa)
lag := rollupLag(rfa)
if math.IsNaN(scrapeInterval) || math.IsNaN(lag) || lag > scrapeInterval {
return nan
}
d := rollupLast(rfa) - rollupAvg(rfa)
if d == 0 {
return 0
}
return d / rollupStddev(rfa)
}
func rollupFirst(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return values[0]
}
func rollupDefault(rfa *rollupFuncArg) float64 {
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
// Intentionally do not skip the possible last Prometheus staleness mark.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
return values[len(values)-1]
}
func rollupLast(rfa *rollupFuncArg) float64 {
values := rfa.values
if len(values) == 0 {
// Do not take into account rfa.prevValue, since it may lead
// to inconsistent results comparing to Prometheus on broken time series
// with irregular data points.
return nan
}
return values[len(values)-1]
}
func rollupDistinct(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
if len(values) == 0 {
if math.IsNaN(rfa.prevValue) {
return nan
}
return 0
}
m := make(map[float64]struct{})
for _, v := range values {
m[v] = struct{}{}
}
return float64(len(m))
}
func rollupIntegrate(rfa *rollupFuncArg) float64 {
// There is no need in handling NaNs here, since they must be cleaned up
// before calling rollup funcs.
values := rfa.values
timestamps := rfa.timestamps
prevValue := rfa.prevValue
prevTimestamp := rfa.currTimestamp - rfa.window
if math.IsNaN(prevValue) {
if len(values) == 0 {
return nan
}
prevValue = values[0]
prevTimestamp = timestamps[0]
values = values[1:]
timestamps = timestamps[1:]
}
var sum float64
for i, v := range values {
timestamp := timestamps[i]
dt := float64(timestamp-prevTimestamp) / 1e3
sum += prevValue * dt
prevTimestamp = timestamp
prevValue = v
}
dt := float64(rfa.currTimestamp-prevTimestamp) / 1e3
sum += prevValue * dt
return sum
}
func rollupFake(rfa *rollupFuncArg) float64 {
logger.Panicf("BUG: rollupFake shouldn't be called")
return 0
}
func getScalar(arg interface{}, argNum int) ([]float64, error) {
ts, ok := arg.([]*timeseries)
if !ok {
return nil, fmt.Errorf(`unexpected type for arg #%d; got %T; want %T`, argNum+1, arg, ts)
}
if len(ts) != 1 {
return nil, fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(ts))
}
return ts[0].Values, nil
}
func getString(tss []*timeseries, argNum int) (string, error) {
if len(tss) != 1 {
return "", fmt.Errorf(`arg #%d must contain a single timeseries; got %d timeseries`, argNum+1, len(tss))
}
ts := tss[0]
for _, v := range ts.Values {
if !math.IsNaN(v) {
return "", fmt.Errorf(`arg #%d contains non-string timeseries`, argNum+1)
}
}
return string(ts.MetricName.MetricGroup), nil
}
func expectRollupArgsNum(args []interface{}, expectedNum int) error {
if len(args) == expectedNum {
return nil
}
return fmt.Errorf(`unexpected number of args; got %d; want %d`, len(args), expectedNum)
}
func getRollupFuncArg() *rollupFuncArg {
v := rfaPool.Get()
if v == nil {
return &rollupFuncArg{}
}
return v.(*rollupFuncArg)
}
func putRollupFuncArg(rfa *rollupFuncArg) {
rfa.reset()
rfaPool.Put(rfa)
}
var rfaPool sync.Pool