mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect/promql: add lag(q[d])
function, which returns the lag between the current timestamp and the timstamp for the last data point in q
This commit is contained in:
parent
d18ea0c95b
commit
2f58f37f07
3 changed files with 45 additions and 2 deletions
|
@ -3710,6 +3710,17 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`lag()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `lag(time()[60s:17s])`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{14, 10, 6, 2, 15, 11},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`()`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `()`
|
||||
|
@ -4175,6 +4186,8 @@ func TestExecError(t *testing.T) {
|
|||
f(`alias()`)
|
||||
f(`alias(1)`)
|
||||
f(`alias(1, "foo", "bar")`)
|
||||
f(`lifetime()`)
|
||||
f(`lag()`)
|
||||
|
||||
// Invalid argument type
|
||||
f(`median_over_time({}, 2)`)
|
||||
|
|
|
@ -48,6 +48,7 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||
"integrate": newRollupFuncOneArg(rollupIntegrate),
|
||||
"ideriv": newRollupFuncOneArg(rollupIderiv),
|
||||
"lifetime": newRollupFuncOneArg(rollupLifetime),
|
||||
"lag": newRollupFuncOneArg(rollupLag),
|
||||
"scrape_interval": newRollupFuncOneArg(rollupScrapeInterval),
|
||||
"rollup": newRollupFuncOneArg(rollupFake),
|
||||
"rollup_rate": newRollupFuncOneArg(rollupFake), // + rollupFuncsRemoveCounterResets
|
||||
|
@ -113,6 +114,7 @@ type rollupFuncArg struct {
|
|||
values []float64
|
||||
timestamps []int64
|
||||
|
||||
currTimestamp int64
|
||||
idx int
|
||||
step int64
|
||||
}
|
||||
|
@ -122,6 +124,7 @@ func (rfa *rollupFuncArg) reset() {
|
|||
rfa.prevTimestamp = 0
|
||||
rfa.values = nil
|
||||
rfa.timestamps = nil
|
||||
rfa.currTimestamp = 0
|
||||
rfa.idx = 0
|
||||
rfa.step = 0
|
||||
}
|
||||
|
@ -226,6 +229,7 @@ func (rc *rollupConfig) Do(dstValues []float64, values []float64, timestamps []i
|
|||
|
||||
rfa.values = values[i:j]
|
||||
rfa.timestamps = timestamps[i:j]
|
||||
rfa.currTimestamp = tEnd
|
||||
value := rc.Func(rfa)
|
||||
rfa.idx++
|
||||
dstValues = append(dstValues, value)
|
||||
|
@ -799,6 +803,18 @@ func rollupLifetime(rfa *rollupFuncArg) float64 {
|
|||
return float64(timestamps[len(timestamps)-1]-rfa.prevTimestamp) * 1e-3
|
||||
}
|
||||
|
||||
func rollupLag(rfa *rollupFuncArg) float64 {
|
||||
// Calculate the duration between the current timestamp and the last data point.
|
||||
timestamps := rfa.timestamps
|
||||
if len(timestamps) == 0 {
|
||||
if math.IsNaN(rfa.prevValue) {
|
||||
return nan
|
||||
}
|
||||
return float64(rfa.currTimestamp-rfa.prevTimestamp) * 1e-3
|
||||
}
|
||||
return float64(rfa.currTimestamp-timestamps[len(timestamps)-1]) * 1e-3
|
||||
}
|
||||
|
||||
func rollupScrapeInterval(rfa *rollupFuncArg) float64 {
|
||||
// Calculate the average interval between data points.
|
||||
timestamps := rfa.timestamps
|
||||
|
|
|
@ -632,6 +632,20 @@ func TestRollupFuncsNoWindow(t *testing.T) {
|
|||
timestampsExpected := []int64{10, 50, 90, 130}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
t.Run("lag", func(t *testing.T) {
|
||||
rc := rollupConfig{
|
||||
Func: rollupLag,
|
||||
Start: 0,
|
||||
End: 160,
|
||||
Step: 40,
|
||||
Window: 0,
|
||||
}
|
||||
rc.Timestamps = getTimestamps(rc.Start, rc.End, rc.Step)
|
||||
values := rc.Do(nil, testValues, testTimestamps)
|
||||
valuesExpected := []float64{nan, 0.004, 0, 0, 0.03}
|
||||
timestampsExpected := []int64{0, 40, 80, 120, 160}
|
||||
testRowsEqual(t, values, rc.Timestamps, valuesExpected, timestampsExpected)
|
||||
})
|
||||
t.Run("lifetime_1", func(t *testing.T) {
|
||||
rc := rollupConfig{
|
||||
Func: rollupLifetime,
|
||||
|
|
Loading…
Reference in a new issue