mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vmselect/prometheus: add -search.maxLookback
command-line flag for overriding dynamic calculations for max lookback interval
This flag is similar to `-search.lookback-delta` if set. The max lookback interval is determined dynamically from interval between datapoints for each input time series if the flag isn't set. The interval can be overriden on per-query basis by passing `max_lookback=<duration>` query arg to `/api/v1/query` and `/api/v1/query_range`. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/209
This commit is contained in:
parent
ce266d157d
commit
99786c2864
4 changed files with 95 additions and 18 deletions
|
@ -23,8 +23,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
|
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
|
||||||
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
|
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
|
||||||
|
maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to `-search.lookback-delta` from Prometheus. "+
|
||||||
|
"The value is dynamically detected from interval between time series datapoints if not set. It can be overriden on per-query basis via `max_lookback` arg")
|
||||||
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability")
|
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability")
|
||||||
selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
|
selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
|
||||||
)
|
)
|
||||||
|
@ -47,11 +49,14 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err
|
||||||
if len(matches) == 0 {
|
if len(matches) == 0 {
|
||||||
return fmt.Errorf("missing `match[]` arg")
|
return fmt.Errorf("missing `match[]` arg")
|
||||||
}
|
}
|
||||||
maxLookback, err := getDuration(r, "max_lookback", defaultStep)
|
lookbackDelta, err := getMaxLookback(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
start, err := getTime(r, "start", ct-maxLookback)
|
if lookbackDelta <= 0 {
|
||||||
|
lookbackDelta = defaultStep
|
||||||
|
}
|
||||||
|
start, err := getTime(r, "start", ct-lookbackDelta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -542,6 +547,10 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
deadline := getDeadline(r)
|
deadline := getDeadline(r)
|
||||||
|
lookbackDelta, err := getMaxLookback(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if len(query) > *maxQueryLen {
|
if len(query) > *maxQueryLen {
|
||||||
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
|
return fmt.Errorf(`too long query; got %d bytes; mustn't exceed %d bytes`, len(query), *maxQueryLen)
|
||||||
|
@ -577,11 +586,12 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
|
||||||
}
|
}
|
||||||
|
|
||||||
ec := promql.EvalConfig{
|
ec := promql.EvalConfig{
|
||||||
AuthToken: at,
|
AuthToken: at,
|
||||||
Start: start,
|
Start: start,
|
||||||
End: start,
|
End: start,
|
||||||
Step: step,
|
Step: step,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
|
LookbackDelta: lookbackDelta,
|
||||||
|
|
||||||
DenyPartialResponse: getDenyPartialResponse(r),
|
DenyPartialResponse: getDenyPartialResponse(r),
|
||||||
}
|
}
|
||||||
|
@ -623,6 +633,10 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e
|
||||||
}
|
}
|
||||||
deadline := getDeadline(r)
|
deadline := getDeadline(r)
|
||||||
mayCache := !getBool(r, "nocache")
|
mayCache := !getBool(r, "nocache")
|
||||||
|
lookbackDelta, err := getMaxLookback(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Validate input args.
|
// Validate input args.
|
||||||
if len(query) > *maxQueryLen {
|
if len(query) > *maxQueryLen {
|
||||||
|
@ -639,12 +653,13 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e
|
||||||
}
|
}
|
||||||
|
|
||||||
ec := promql.EvalConfig{
|
ec := promql.EvalConfig{
|
||||||
AuthToken: at,
|
AuthToken: at,
|
||||||
Start: start,
|
Start: start,
|
||||||
End: end,
|
End: end,
|
||||||
Step: step,
|
Step: step,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
MayCache: mayCache,
|
MayCache: mayCache,
|
||||||
|
LookbackDelta: lookbackDelta,
|
||||||
|
|
||||||
DenyPartialResponse: getDenyPartialResponse(r),
|
DenyPartialResponse: getDenyPartialResponse(r),
|
||||||
}
|
}
|
||||||
|
@ -806,6 +821,11 @@ func getDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
|
||||||
|
|
||||||
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
|
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
|
||||||
|
|
||||||
|
func getMaxLookback(r *http.Request) (int64, error) {
|
||||||
|
d := int64(*maxLookback / time.Millisecond)
|
||||||
|
return getDuration(r, "max_lookback", d)
|
||||||
|
}
|
||||||
|
|
||||||
func getDeadline(r *http.Request) netstorage.Deadline {
|
func getDeadline(r *http.Request) netstorage.Deadline {
|
||||||
d, err := getDuration(r, "timeout", 0)
|
d, err := getDuration(r, "timeout", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -72,6 +72,9 @@ type EvalConfig struct {
|
||||||
|
|
||||||
MayCache bool
|
MayCache bool
|
||||||
|
|
||||||
|
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
||||||
|
LookbackDelta int64
|
||||||
|
|
||||||
DenyPartialResponse bool
|
DenyPartialResponse bool
|
||||||
|
|
||||||
timestamps []int64
|
timestamps []int64
|
||||||
|
@ -87,6 +90,7 @@ func newEvalConfig(src *EvalConfig) *EvalConfig {
|
||||||
ec.Step = src.Step
|
ec.Step = src.Step
|
||||||
ec.Deadline = src.Deadline
|
ec.Deadline = src.Deadline
|
||||||
ec.MayCache = src.MayCache
|
ec.MayCache = src.MayCache
|
||||||
|
ec.LookbackDelta = src.LookbackDelta
|
||||||
ec.DenyPartialResponse = src.DenyPartialResponse
|
ec.DenyPartialResponse = src.DenyPartialResponse
|
||||||
|
|
||||||
// do not copy src.timestamps - they must be generated again.
|
// do not copy src.timestamps - they must be generated again.
|
||||||
|
@ -471,7 +475,7 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *
|
||||||
}
|
}
|
||||||
|
|
||||||
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step)
|
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step)
|
||||||
preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, sharedTimestamps)
|
preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
|
||||||
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
||||||
var tssLock sync.Mutex
|
var tssLock sync.Mutex
|
||||||
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
|
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
|
||||||
|
@ -597,7 +601,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
||||||
return tss, nil
|
return tss, nil
|
||||||
}
|
}
|
||||||
sharedTimestamps := getTimestamps(start, ec.End, ec.Step)
|
sharedTimestamps := getTimestamps(start, ec.End, ec.Step)
|
||||||
preFunc, rcs := getRollupConfigs(name, rf, start, ec.End, ec.Step, window, sharedTimestamps)
|
preFunc, rcs := getRollupConfigs(name, rf, start, ec.End, ec.Step, window, ec.LookbackDelta, sharedTimestamps)
|
||||||
|
|
||||||
// Verify timeseries fit available memory after the rollup.
|
// Verify timeseries fit available memory after the rollup.
|
||||||
// Take into account points from tssCached.
|
// Take into account points from tssCached.
|
||||||
|
@ -701,7 +705,8 @@ func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.M
|
||||||
tsDst.denyReuse = true
|
tsDst.denyReuse = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, sharedTimestamps []int64) (func(values []float64, timestamps []int64), []*rollupConfig) {
|
func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, lookbackDelta int64, sharedTimestamps []int64) (
|
||||||
|
func(values []float64, timestamps []int64), []*rollupConfig) {
|
||||||
preFunc := func(values []float64, timestamps []int64) {}
|
preFunc := func(values []float64, timestamps []int64) {}
|
||||||
if rollupFuncsRemoveCounterResets[name] {
|
if rollupFuncsRemoveCounterResets[name] {
|
||||||
preFunc = func(values []float64, timestamps []int64) {
|
preFunc = func(values []float64, timestamps []int64) {
|
||||||
|
@ -717,6 +722,7 @@ func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64
|
||||||
Step: step,
|
Step: step,
|
||||||
Window: window,
|
Window: window,
|
||||||
MayAdjustWindow: rollupFuncsMayAdjustWindow[name],
|
MayAdjustWindow: rollupFuncsMayAdjustWindow[name],
|
||||||
|
LookbackDelta: lookbackDelta,
|
||||||
Timestamps: sharedTimestamps,
|
Timestamps: sharedTimestamps,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,6 +149,9 @@ type rollupConfig struct {
|
||||||
MayAdjustWindow bool
|
MayAdjustWindow bool
|
||||||
|
|
||||||
Timestamps []int64
|
Timestamps []int64
|
||||||
|
|
||||||
|
// LoookbackDelta is the analog to `-query.lookback-delta` from Prometheus world.
|
||||||
|
LookbackDelta int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -186,6 +189,9 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
||||||
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
||||||
|
|
||||||
maxPrevInterval := getMaxPrevInterval(timestamps)
|
maxPrevInterval := getMaxPrevInterval(timestamps)
|
||||||
|
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
||||||
|
maxPrevInterval = rc.LookbackDelta
|
||||||
|
}
|
||||||
window := rc.Window
|
window := rc.Window
|
||||||
if window <= 0 {
|
if window <= 0 {
|
||||||
window = rc.Step
|
window = rc.Step
|
||||||
|
|
|
@ -488,6 +488,51 @@ func TestRollupWindowPartialPoints(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRollupFuncsLookbackDelta(t *testing.T) {
|
||||||
|
t.Run("1", func(t *testing.T) {
|
||||||
|
rc := rollupConfig{
|
||||||
|
Func: rollupFirst,
|
||||||
|
Start: 80,
|
||||||
|
End: 140,
|
||||||
|
Step: 10,
|
||||||
|
LookbackDelta: 1,
|
||||||
|
}
|
||||||
|
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||||
|
values := rc.Do(nil, testValues, testTimestamps)
|
||||||
|
valuesExpected := []float64{99, 12, 44, nan, 32, 34, nan}
|
||||||
|
timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140}
|
||||||
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
|
})
|
||||||
|
t.Run("7", func(t *testing.T) {
|
||||||
|
rc := rollupConfig{
|
||||||
|
Func: rollupFirst,
|
||||||
|
Start: 80,
|
||||||
|
End: 140,
|
||||||
|
Step: 10,
|
||||||
|
LookbackDelta: 7,
|
||||||
|
}
|
||||||
|
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||||
|
values := rc.Do(nil, testValues, testTimestamps)
|
||||||
|
valuesExpected := []float64{99, 12, 44, 44, 32, 34, nan}
|
||||||
|
timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140}
|
||||||
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
|
})
|
||||||
|
t.Run("0", func(t *testing.T) {
|
||||||
|
rc := rollupConfig{
|
||||||
|
Func: rollupFirst,
|
||||||
|
Start: 80,
|
||||||
|
End: 140,
|
||||||
|
Step: 10,
|
||||||
|
LookbackDelta: 0,
|
||||||
|
}
|
||||||
|
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||||
|
values := rc.Do(nil, testValues, testTimestamps)
|
||||||
|
valuesExpected := []float64{34, 12, 12, 44, 44, 34, nan}
|
||||||
|
timestampsExpected := []int64{80, 90, 100, 110, 120, 130, 140}
|
||||||
|
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestRollupFuncsNoWindow(t *testing.T) {
|
func TestRollupFuncsNoWindow(t *testing.T) {
|
||||||
t.Run("first", func(t *testing.T) {
|
t.Run("first", func(t *testing.T) {
|
||||||
rc := rollupConfig{
|
rc := rollupConfig{
|
||||||
|
|
Loading…
Reference in a new issue