mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: drop staleness marks before calling rollupConfig.Do
This allows dropping staleness marks only once and then calculate multiple rollup functions on the result. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526
This commit is contained in:
parent
25997a70f1
commit
113f0a8a07
2 changed files with 48 additions and 45 deletions
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
|
@ -763,6 +764,10 @@ func getRollupMemoryLimiter() *memoryLimiter {
|
|||
func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
|
||||
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
|
||||
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
|
||||
if name != "default_rollup" {
|
||||
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
|
||||
rs.Values, rs.Timestamps = dropStaleNaNs(rs.Values, rs.Timestamps)
|
||||
}
|
||||
preFunc(rs.Values, rs.Timestamps)
|
||||
ts := getTimeseries()
|
||||
defer putTimeseries(ts)
|
||||
|
@ -796,6 +801,10 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs
|
|||
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
|
||||
var tssLock sync.Mutex
|
||||
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error {
|
||||
if name != "default_rollup" {
|
||||
// Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values.
|
||||
rs.Values, rs.Timestamps = dropStaleNaNs(rs.Values, rs.Timestamps)
|
||||
}
|
||||
preFunc(rs.Values, rs.Timestamps)
|
||||
for _, rc := range rcs {
|
||||
if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil {
|
||||
|
@ -891,3 +900,28 @@ func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) {
|
|||
dst.IsRegexp = src.IsRegexp
|
||||
dst.IsNegative = src.IsNegative
|
||||
}
|
||||
|
||||
func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) {
|
||||
hasStaleSamples := false
|
||||
for _, v := range values {
|
||||
if decimal.IsStaleNaN(v) {
|
||||
hasStaleSamples = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasStaleSamples {
|
||||
// Fast path: values have no Prometheus staleness marks.
|
||||
return values, timestamps
|
||||
}
|
||||
// Slow path: drop Prometheus staleness marks from values.
|
||||
dstValues := values[:0]
|
||||
dstTimestamps := timestamps[:0]
|
||||
for i, v := range values {
|
||||
if decimal.IsStaleNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues = append(dstValues, v)
|
||||
dstTimestamps = append(dstTimestamps, timestamps[i])
|
||||
}
|
||||
return dstValues, dstTimestamps
|
||||
}
|
||||
|
|
|
@ -278,9 +278,9 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en
|
|||
Step: step,
|
||||
Window: window,
|
||||
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
|
||||
CanDropStalePoints: name == "default_rollup",
|
||||
LookbackDelta: lookbackDelta,
|
||||
Timestamps: sharedTimestamps,
|
||||
isDefaultRollup: name == "default_rollup",
|
||||
}
|
||||
}
|
||||
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
||||
|
@ -402,15 +402,13 @@ type rollupConfig struct {
|
|||
// when using window smaller than 2 x scrape_interval.
|
||||
MayAdjustWindow bool
|
||||
|
||||
// Whether points after Prometheus stale marks can be dropped during rollup calculations.
|
||||
// Stale points can be dropped only if `default_rollup()` function is used.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
|
||||
CanDropStalePoints bool
|
||||
|
||||
Timestamps []int64
|
||||
|
||||
// LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world.
|
||||
LookbackDelta int64
|
||||
|
||||
// Whether default_rollup is used.
|
||||
isDefaultRollup bool
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -506,10 +504,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
|||
// Extend dstValues in order to remove mallocs below.
|
||||
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
||||
|
||||
if !rc.CanDropStalePoints {
|
||||
// Remove Prometheus staleness marks from values, so rollup functions don't hit NaN values.
|
||||
values, timestamps = dropStaleNaNs(values, timestamps)
|
||||
}
|
||||
scrapeInterval := getScrapeInterval(timestamps)
|
||||
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
|
||||
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
||||
|
@ -523,7 +517,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
|||
window := rc.Window
|
||||
if window <= 0 {
|
||||
window = rc.Step
|
||||
if rc.CanDropStalePoints && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
|
||||
if rc.isDefaultRollup && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
|
||||
// Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
|
||||
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
|
||||
window = rc.LookbackDelta
|
||||
|
@ -580,31 +574,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
|||
return dstValues
|
||||
}
|
||||
|
||||
func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) {
|
||||
hasStaleSamples := false
|
||||
for _, v := range values {
|
||||
if decimal.IsStaleNaN(v) {
|
||||
hasStaleSamples = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasStaleSamples {
|
||||
// Fast path: values have noe Prometheus staleness marks.
|
||||
return values, timestamps
|
||||
}
|
||||
// Slow path: drop Prometheus staleness marks from values.
|
||||
dstValues := make([]float64, 0, len(values))
|
||||
dstTimestamps := make([]int64, 0, len(timestamps))
|
||||
for i, v := range values {
|
||||
if decimal.IsStaleNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues = append(dstValues, v)
|
||||
dstTimestamps = append(dstTimestamps, timestamps[i])
|
||||
}
|
||||
return dstValues, dstTimestamps
|
||||
}
|
||||
|
||||
func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
|
||||
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
|
||||
return 0
|
||||
|
|
Loading…
Reference in a new issue