diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 0bccb1441..8bbfcb0c1 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -10,6 +10,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -763,6 +764,10 @@ func getRollupMemoryLimiter() *memoryLimiter { func evalRollupWithIncrementalAggregate(name string, iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { + if name != "default_rollup" { + // Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values. + rs.Values, rs.Timestamps = dropStaleNaNs(rs.Values, rs.Timestamps) + } preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) @@ -796,6 +801,10 @@ func evalRollupNoIncrementalAggregate(name string, rss *netstorage.Results, rcs tss := make([]*timeseries, 0, rss.Len()*len(rcs)) var tssLock sync.Mutex err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) error { + if name != "default_rollup" { + // Remove Prometheus staleness marks, so non-default rollup functions don't hit NaN values. + rs.Values, rs.Timestamps = dropStaleNaNs(rs.Values, rs.Timestamps) + } preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { if tsm := newTimeseriesMap(name, sharedTimestamps, &rs.MetricName); tsm != nil { @@ -891,3 +900,28 @@ func toTagFilter(dst *storage.TagFilter, src *metricsql.LabelFilter) { dst.IsRegexp = src.IsRegexp dst.IsNegative = src.IsNegative } + +func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) { + hasStaleSamples := false + for _, v := range values { + if decimal.IsStaleNaN(v) { + hasStaleSamples = true + break + } + } + if !hasStaleSamples { + // Fast path: values have no Prometheus staleness marks. + return values, timestamps + } + // Slow path: drop Prometheus staleness marks from values. + dstValues := values[:0] + dstTimestamps := timestamps[:0] + for i, v := range values { + if decimal.IsStaleNaN(v) { + continue + } + dstValues = append(dstValues, v) + dstTimestamps = append(dstTimestamps, timestamps[i]) + } + return dstValues, dstTimestamps +} diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 6c999a93c..c8180e95b 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -271,16 +271,16 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en } newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig { return &rollupConfig{ - TagValue: tagValue, - Func: rf, - Start: start, - End: end, - Step: step, - Window: window, - MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name], - CanDropStalePoints: name == "default_rollup", - LookbackDelta: lookbackDelta, - Timestamps: sharedTimestamps, + TagValue: tagValue, + Func: rf, + Start: start, + End: end, + Step: step, + Window: window, + MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name], + LookbackDelta: lookbackDelta, + Timestamps: sharedTimestamps, + isDefaultRollup: name == "default_rollup", } } appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig { @@ -402,15 +402,13 @@ type rollupConfig struct { // when using window smaller than 2 x scrape_interval. MayAdjustWindow bool - // Whether points after Prometheus stale marks can be dropped during rollup calculations. - // Stale points can be dropped only if `default_rollup()` function is used. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 . - CanDropStalePoints bool - Timestamps []int64 // LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world. LookbackDelta int64 + + // Whether default_rollup is used. + isDefaultRollup bool } var ( @@ -506,10 +504,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu // Extend dstValues in order to remove mallocs below. dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps)) - if !rc.CanDropStalePoints { - // Remove Prometheus staleness marks from values, so rollup functions don't hit NaN values. - values, timestamps = dropStaleNaNs(values, timestamps) - } scrapeInterval := getScrapeInterval(timestamps) maxPrevInterval := getMaxPrevInterval(scrapeInterval) if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta { @@ -523,7 +517,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu window := rc.Window if window <= 0 { window = rc.Step - if rc.CanDropStalePoints && rc.LookbackDelta > 0 && window > rc.LookbackDelta { + if rc.isDefaultRollup && rc.LookbackDelta > 0 && window > rc.LookbackDelta { // Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval // according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784 window = rc.LookbackDelta @@ -580,31 +574,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu return dstValues } -func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) { - hasStaleSamples := false - for _, v := range values { - if decimal.IsStaleNaN(v) { - hasStaleSamples = true - break - } - } - if !hasStaleSamples { - // Fast path: values have noe Prometheus staleness marks. - return values, timestamps - } - // Slow path: drop Prometheus staleness marks from values. - dstValues := make([]float64, 0, len(values)) - dstTimestamps := make([]int64, 0, len(timestamps)) - for i, v := range values { - if decimal.IsStaleNaN(v) { - continue - } - dstValues = append(dstValues, v) - dstTimestamps = append(dstTimestamps, timestamps[i]) - } - return dstValues, dstTimestamps -} - func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int { if len(timestamps) == 0 || timestamps[0] > seekTimestamp { return 0