diff --git a/README.md b/README.md index 09a73bff1..43fa0a3a2 100644 --- a/README.md +++ b/README.md @@ -512,7 +512,7 @@ at `http://<victoriametrics-addr>:8428/federate?match[]=<timeseries_selector_for Optional `start` and `end` args may be added to the request in order to scrape the last point for each selected time series on the `[start ... end]` interval. `start` and `end` may contain either unix timestamp in seconds or [RFC3339](https://www.ietf.org/rfc/rfc3339.txt) values. By default, the last point -on the interval `[now - max_lookback ... now]` is scraped for each time series. The default value for `max_lookback` is `5m` (5 minutes), but can be overridden. +on the interval `[now - max_lookback ... now]` is scraped for each time series. The default value for `max_lookback` is `5m` (5 minutes), but it can be overridden. For instance, `/federate?match[]=up&max_lookback=1h` would return last points on the `[now - 1h ... now]` interval. This may be useful for time series federation with scrape intervals exceeding `5m`. diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index c3d06a359..9c39c7064 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -23,6 +23,8 @@ import ( var ( maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution") maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") + maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to `-search.lookback-delta` from Prometheus. "+ + "The value is dynamically detected from interval between time series datapoints if not set. It can be overriden on per-query basis via `max_lookback` arg") ) // Default step used if not set. @@ -43,11 +45,14 @@ func FederateHandler(w http.ResponseWriter, r *http.Request) error { if len(matches) == 0 { return fmt.Errorf("missing `match[]` arg") } - maxLookback, err := getDuration(r, "max_lookback", defaultStep) + lookbackDelta, err := getMaxLookback(r) if err != nil { return err } - start, err := getTime(r, "start", ct-maxLookback) + if lookbackDelta <= 0 { + lookbackDelta = defaultStep + } + start, err := getTime(r, "start", ct-lookbackDelta) if err != nil { return err } @@ -468,6 +473,10 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error { return err } deadline := getDeadline(r) + lookbackDelta, err := getMaxLookback(r) + if err != nil { + return err + } if len(query) > *maxQueryLen { return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen) @@ -503,10 +512,11 @@ func QueryHandler(w http.ResponseWriter, r *http.Request) error { } ec := promql.EvalConfig{ - Start: start, - End: start, - Step: step, - Deadline: deadline, + Start: start, + End: start, + Step: step, + Deadline: deadline, + LookbackDelta: lookbackDelta, } result, err := promql.Exec(&ec, query, true) if err != nil { @@ -546,6 +556,10 @@ func QueryRangeHandler(w http.ResponseWriter, r *http.Request) error { } deadline := getDeadline(r) mayCache := !getBool(r, "nocache") + lookbackDelta, err := getMaxLookback(r) + if err != nil { + return err + } // Validate input args. if len(query) > *maxQueryLen { @@ -562,11 +576,12 @@ func QueryRangeHandler(w http.ResponseWriter, r *http.Request) error { } ec := promql.EvalConfig{ - Start: start, - End: end, - Step: step, - Deadline: deadline, - MayCache: mayCache, + Start: start, + End: end, + Step: step, + Deadline: deadline, + MayCache: mayCache, + LookbackDelta: lookbackDelta, } result, err := promql.Exec(&ec, query, false) if err != nil { @@ -726,6 +741,11 @@ func getDuration(r *http.Request, argKey string, defaultValue int64) (int64, err const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000 +func getMaxLookback(r *http.Request) (int64, error) { + d := int64(*maxLookback / time.Millisecond) + return getDuration(r, "max_lookback", d) +} + func getDeadline(r *http.Request) netstorage.Deadline { d, err := getDuration(r, "timeout", 0) if err != nil { diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 3b8df6531..c4094e511 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -70,6 +70,9 @@ type EvalConfig struct { MayCache bool + // LookbackDelta is analog to `-query.lookback-delta` from Prometheus. + LookbackDelta int64 + timestamps []int64 timestampsOnce sync.Once } @@ -82,6 +85,7 @@ func newEvalConfig(src *EvalConfig) *EvalConfig { ec.Step = src.Step ec.Deadline = src.Deadline ec.MayCache = src.MayCache + ec.LookbackDelta = src.LookbackDelta // do not copy src.timestamps - they must be generated again. return &ec @@ -465,7 +469,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re * } sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step) - preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, sharedTimestamps) + preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) var tssLock sync.Mutex removeMetricGroup := !rollupFuncsKeepMetricGroup[name] @@ -586,7 +590,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me return tss, nil } sharedTimestamps := getTimestamps(start, ec.End, ec.Step) - preFunc, rcs := getRollupConfigs(name, rf, start, ec.End, ec.Step, window, sharedTimestamps) + preFunc, rcs := getRollupConfigs(name, rf, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps) // Verify timeseries fit available memory after the rollup. // Take into account points from tssCached. @@ -689,7 +693,8 @@ func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.M tsDst.denyReuse = true } -func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, sharedTimestamps []int64) (func(values []float64, timestamps []int64), []*rollupConfig) { +func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) ( + func(values []float64, timestamps []int64), []*rollupConfig) { preFunc := func(values []float64, timestamps []int64) {} if rollupFuncsRemoveCounterResets[name] { preFunc = func(values []float64, timestamps []int64) { @@ -705,6 +710,7 @@ func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64 Step: step, Window: window, MayAdjustWindow: rollupFuncsMayAdjustWindow[name], + LookbackDelta: lookbackDelta, Timestamps: sharedTimestamps, } } diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 5e0571668..0754ed4a8 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -149,6 +149,9 @@ type rollupConfig struct { MayAdjustWindow bool Timestamps []int64 + + // LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world. + LookbackDelta int64 } var ( @@ -186,6 +189,9 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps)) maxPrevInterval := getMaxPrevInterval(timestamps) + if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta { + maxPrevInterval = rc.LookbackDelta + } window := rc.Window if window <= 0 { window = rc.Step diff --git a/app/vmselect/promql/rollup_test.go b/app/vmselect/promql/rollup_test.go index 8b99a635d..d16cec875 100644 --- a/app/vmselect/promql/rollup_test.go +++ b/app/vmselect/promql/rollup_test.go @@ -488,6 +488,51 @@ func TestRollupWindowPartialPoints(t *testing.T) { }) } +func TestRollupFuncsLookbackDelta(t *testing.T) { + t.Run("1", func(t *testing.T) { + rc := rollupConfig{ + Func: rollupFirst, + Start: 80, + End: 140, + Step: 10, + LookbackDelta: 1, + } + rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) + values := rc.Do(nil, testValues, testTimestamps) + valuesExpected := []float64{99, 12, 44, nan, 32, 34, nan} + timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140} + testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) + }) + t.Run("7", func(t *testing.T) { + rc := rollupConfig{ + Func: rollupFirst, + Start: 80, + End: 140, + Step: 10, + LookbackDelta: 7, + } + rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) + values := rc.Do(nil, testValues, testTimestamps) + valuesExpected := []float64{99, 12, 44, 44, 32, 34, nan} + timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140} + testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) + }) + t.Run("0", func(t *testing.T) { + rc := rollupConfig{ + Func: rollupFirst, + Start: 80, + End: 140, + Step: 10, + LookbackDelta: 0, + } + rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step) + values := rc.Do(nil, testValues, testTimestamps) + valuesExpected := []float64{34, 12, 12, 44, 44, 34, nan} + timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140} + testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected) + }) +} + func TestRollupFuncsNoWindow(t *testing.T) { t.Run("first", func(t *testing.T) { rc := rollupConfig{