mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect: handle timestamp(metric offset X)
the same way as Prometheus does
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415
This commit is contained in:
parent
426a0567c4
commit
6f7f64f757
5 changed files with 32 additions and 29 deletions
|
@ -338,7 +338,7 @@ func TestExecSuccess(t *testing.T) {
|
|||
q := `timestamp(123)`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
|
||||
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
|
@ -349,7 +349,7 @@ func TestExecSuccess(t *testing.T) {
|
|||
q := `timestamp(time())`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
|
||||
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
|
@ -360,7 +360,7 @@ func TestExecSuccess(t *testing.T) {
|
|||
q := `timestamp(456/time()+123)`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
|
||||
Values: []float64{900, 1100, 1300, 1500, 1700, 1900},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
|
@ -371,7 +371,7 @@ func TestExecSuccess(t *testing.T) {
|
|||
q := `timestamp(time()>=1600)`
|
||||
r := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{nan, nan, nan, 1600, 1800, 2000},
|
||||
Values: []float64{nan, nan, nan, nan, 1700, 1900},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
|
|
|
@ -72,6 +72,11 @@ var rollupFuncs = map[string]newRollupFunc{
|
|||
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
|
||||
"hoeffding_bound_upper": newRollupHoeffdingBoundUpper,
|
||||
"hoeffding_bound_lower": newRollupHoeffdingBoundLower,
|
||||
|
||||
// `timestamp` function must return timestamp for the last datapoint on the current window
|
||||
// in order to properly handle offset and timestamps unaligned to the current step.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.
|
||||
"timestamp": newRollupFuncOneArg(rollupTimestamp),
|
||||
}
|
||||
|
||||
// rollupAggrFuncs are functions that can be passed to `aggr_over_time()`
|
||||
|
@ -1509,6 +1514,19 @@ func rollupLow(rfa *rollupFuncArg) float64 {
|
|||
return min
|
||||
}
|
||||
|
||||
func rollupTimestamp(rfa *rollupFuncArg) float64 {
|
||||
// There is no need in handling NaNs here, since they must be cleaned up
|
||||
// before calling rollup funcs.
|
||||
timestamps := rfa.timestamps
|
||||
if len(timestamps) == 0 {
|
||||
// Do not take into account rfa.prevTimestamp, since it may lead
|
||||
// to inconsistent results comparing to Prometheus on broken time series
|
||||
// with irregular data points.
|
||||
return nan
|
||||
}
|
||||
return float64(timestamps[len(timestamps)-1]) / 1e3
|
||||
}
|
||||
|
||||
func rollupFirst(rfa *rollupFuncArg) float64 {
|
||||
// There is no need in handling NaNs here, since they must be cleaned up
|
||||
// before calling rollup funcs.
|
||||
|
|
|
@ -53,7 +53,7 @@ var transformFuncs = map[string]transformFunc{
|
|||
"sort_desc": newTransformFuncSort(true),
|
||||
"sqrt": newTransformFuncOneArg(transformSqrt),
|
||||
"time": transformTime,
|
||||
"timestamp": transformTimestamp,
|
||||
// "timestamp" has been moved to rollup funcs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415
|
||||
"vector": transformVector,
|
||||
"year": newTransformFuncDateTime(transformYear),
|
||||
|
||||
|
@ -1516,25 +1516,6 @@ func transformTime(tfa *transformFuncArg) ([]*timeseries, error) {
|
|||
return evalTime(tfa.ec), nil
|
||||
}
|
||||
|
||||
func transformTimestamp(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||
args := tfa.args
|
||||
if err := expectTransformArgsNum(args, 1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rvs := args[0]
|
||||
for _, ts := range rvs {
|
||||
ts.MetricName.ResetMetricGroup()
|
||||
values := ts.Values
|
||||
for i, t := range ts.Timestamps {
|
||||
v := values[i]
|
||||
if !math.IsNaN(v) {
|
||||
values[i] = float64(t) / 1e3
|
||||
}
|
||||
}
|
||||
}
|
||||
return rvs, nil
|
||||
}
|
||||
|
||||
func transformVector(tfa *transformFuncArg) ([]*timeseries, error) {
|
||||
args := tfa.args
|
||||
if err := expectTransformArgsNum(args, 1); err != nil {
|
||||
|
|
|
@ -57,6 +57,10 @@ var rollupFuncs = map[string]bool{
|
|||
"aggr_over_time": true,
|
||||
"hoeffding_bound_upper": true,
|
||||
"hoeffding_bound_lower": true,
|
||||
|
||||
// `timestamp` func has been moved here because it must work properly with offsets and samples unaligned to the current step.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415 for details.
|
||||
"timestamp": true,
|
||||
}
|
||||
|
||||
// IsRollupFunc returns whether funcName is known rollup function.
|
||||
|
|
|
@ -32,7 +32,7 @@ var transformFuncs = map[string]bool{
|
|||
"sort_desc": true,
|
||||
"sqrt": true,
|
||||
"time": true,
|
||||
"timestamp": true,
|
||||
// "timestamp" has been moved to rollup funcs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/415
|
||||
"vector": true,
|
||||
"year": true,
|
||||
|
||||
|
|
Loading…
Reference in a new issue