app/vmselect/promql: consistently replace NaN data points with non-NaN values for range_first and range_last functions

It is expected that range_first and range_last functions return non-nan const value across all the points
if the original series contains at least a single non-NaN value. Previously this rule was violated for NaN data points
in the original series. This could confuse users.

While at it, add tests for series with NaN values across all the range_* and running_* functions, in order to maintain
consistent handling of NaN values across these functions.
This commit is contained in:
Aliaksandr Valialkin 2024-09-23 14:54:39 +02:00
parent 3ed172eeeb
commit 3964889705
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 232 additions and 8 deletions

View file

@ -7251,6 +7251,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_trim_outliers(time() > 1200)`, func(t *testing.T) {
t.Parallel()
q := `range_trim_outliers(0.5, time() > 1200)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 1600, 1800, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_trim_spikes()`, func(t *testing.T) { t.Run(`range_trim_spikes()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_trim_spikes(0.2, time())` q := `range_trim_spikes(0.2, time())`
@ -7262,6 +7273,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_trim_spikes(time() > 1200 <= 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_trim_spikes(0.2, time() > 1200 <= 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 1600, nan, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_trim_zscore()`, func(t *testing.T) { t.Run(`range_trim_zscore()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_trim_zscore(0.9, time())` q := `range_trim_zscore(0.9, time())`
@ -7273,6 +7295,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_trim_zscore(time() > 1200 <= 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_trim_zscore(0.9, time() > 1200 <= 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 1600, nan, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_zscore()`, func(t *testing.T) { t.Run(`range_zscore()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `round(range_zscore(time()), 0.1)` q := `round(range_zscore(time()), 0.1)`
@ -7284,6 +7317,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_zscore(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `round(range_zscore(time() > 1200 < 1800), 0.1)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, -1, 1, nan, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_quantile(0.5)`, func(t *testing.T) { t.Run(`range_quantile(0.5)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_quantile(0.5, time())` q := `range_quantile(0.5, time())`
@ -7295,6 +7339,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_quantile(0.5, time() > 1200 < 2000)`, func(t *testing.T) {
t.Parallel()
q := `range_quantile(0.5, time() > 1200 < 2000)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1600, 1600, 1600, 1600, 1600, 1600},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_stddev()`, func(t *testing.T) { t.Run(`range_stddev()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `round(range_stddev(time()),0.01)` q := `round(range_stddev(time()),0.01)`
@ -7306,6 +7361,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_stddev(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `round(range_stddev(time() > 1200 < 1800),0.01)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{100, 100, 100, 100, 100, 100},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_stdvar()`, func(t *testing.T) { t.Run(`range_stdvar()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `round(range_stdvar(time()),0.01)` q := `round(range_stdvar(time()),0.01)`
@ -7317,6 +7383,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_stdvar(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `round(range_stdvar(time() > 1200 < 1800),0.01)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{10000, 10000, 10000, 10000, 10000, 10000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_median()`, func(t *testing.T) { t.Run(`range_median()`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_median(time())` q := `range_median(time())`
@ -7677,6 +7754,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`running_min(abs(1500-time()) < 400 > 100)`, func(t *testing.T) {
t.Parallel()
q := `running_min(abs(1500-time()) < 400 > 100)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, 300, 300, 300, 300, 300},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`running_max(abs(1300-time()))`, func(t *testing.T) { t.Run(`running_max(abs(1300-time()))`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `running_max(abs(1300-time()))` q := `running_max(abs(1300-time()))`
@ -7688,6 +7776,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`running_max(abs(1300-time()) > 300 < 700)`, func(t *testing.T) {
t.Parallel()
q := `running_max(abs(1300-time()) > 300 < 700)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, nan, 500, 500},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`running_sum(1)`, func(t *testing.T) { t.Run(`running_sum(1)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `running_sum(1)` q := `running_sum(1)`
@ -7710,6 +7809,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`running_sum(time() > 1.2 < 1.8)`, func(t *testing.T) {
t.Parallel()
q := `running_sum(time()/1e3 > 1.2 < 1.8)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, 1.4, 3, 3, 3},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`running_avg(time())`, func(t *testing.T) { t.Run(`running_avg(time())`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `running_avg(time())` q := `running_avg(time())`
@ -7721,6 +7831,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`running_avg(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `running_avg(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, 1400, 1500, 1500, 1500},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`smooth_exponential(time(), 1)`, func(t *testing.T) { t.Run(`smooth_exponential(time(), 1)`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `smooth_exponential(time(), 1)` q := `smooth_exponential(time(), 1)`
@ -7801,6 +7922,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_min(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_min(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1400, 1400, 1400, 1400, 1400, 1400},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_normalize(time(),alias(-time(),"negative"))`, func(t *testing.T) { t.Run(`range_normalize(time(),alias(-time(),"negative"))`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_normalize(time(),alias(-time(), "negative"))` q := `range_normalize(time(),alias(-time(), "negative"))`
@ -7818,6 +7950,23 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2} resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_normalize(time() > 1200 < 1800,alias(-(time() > 1400 < 2000),"negative"))`, func(t *testing.T) {
t.Parallel()
q := `range_normalize(time() > 1200 < 1800,alias(-(time() > 1200 < 2000), "negative"))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, 0, 1, nan, nan},
Timestamps: timestampsExpected,
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, 1, 0.5, 0, nan},
Timestamps: timestampsExpected,
}
r2.MetricName.MetricGroup = []byte("negative")
resultExpected := []netstorage.Result{r1, r2}
f(q, resultExpected)
})
t.Run(`range_first(time())`, func(t *testing.T) { t.Run(`range_first(time())`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_first(time())` q := `range_first(time())`
@ -7829,6 +7978,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_first(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_first(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1400, 1400, 1400, 1400, 1400, 1400},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_mad(time())`, func(t *testing.T) { t.Run(`range_mad(time())`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_mad(time())` q := `range_mad(time())`
@ -7840,6 +8000,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_mad(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_mad(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{100, 100, 100, 100, 100, 100},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_max(time())`, func(t *testing.T) { t.Run(`range_max(time())`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_max(time())` q := `range_max(time())`
@ -7851,6 +8022,39 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_max(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_max(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1600, 1600, 1600, 1600, 1600, 1600},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_sum(time())`, func(t *testing.T) {
t.Parallel()
q := `range_sum(time())`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{9000, 9000, 9000, 9000, 9000, 9000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_sum(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_sum(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3000, 3000, 3000, 3000, 3000, 3000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_last(time())`, func(t *testing.T) { t.Run(`range_last(time())`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_last(time())` q := `range_last(time())`
@ -7862,6 +8066,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_last(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_last(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1600, 1600, 1600, 1600, 1600, 1600},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_linear_regression(time())`, func(t *testing.T) { t.Run(`range_linear_regression(time())`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `range_linear_regression(time())` q := `range_linear_regression(time())`
@ -7884,6 +8099,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`range_linear_regression(time() > 1200 < 1800)`, func(t *testing.T) {
t.Parallel()
q := `range_linear_regression(time() > 1200 < 1800)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range_linear_regression(100/time())`, func(t *testing.T) { t.Run(`range_linear_regression(100/time())`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort_desc(round(( q := `sort_desc(round((

View file

@ -1544,10 +1544,8 @@ func transformRangeFirst(tfa *transformFuncArg) ([]*timeseries, error) {
continue continue
} }
vFirst := values[0] vFirst := values[0]
for i, v := range values { values = ts.Values
if math.IsNaN(v) { for i := range values {
continue
}
values[i] = vFirst values[i] = vFirst
} }
} }
@ -1571,10 +1569,8 @@ func setLastValues(tss []*timeseries) {
continue continue
} }
vLast := values[len(values)-1] vLast := values[len(values)-1]
for i, v := range values { values = ts.Values
if math.IsNaN(v) { for i := range values {
continue
}
values[i] = vLast values[i] = vLast
} }
} }

View file

@ -40,6 +40,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): do not send notifications without labels to Alertmanager. Such notifications are rejected by Alertmanager anyway. Before, vmalert could send alert notifications even if no label-value pairs left after applying `alert_relabel_configs` from [notifier config](https://docs.victoriametrics.com/vmalert/#notifier-configuration-file). * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): do not send notifications without labels to Alertmanager. Such notifications are rejected by Alertmanager anyway. Before, vmalert could send alert notifications even if no label-value pairs left after applying `alert_relabel_configs` from [notifier config](https://docs.victoriametrics.com/vmalert/#notifier-configuration-file).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): properly update value of variable `$activeAt` in rules annotation during replay mode. Before, `$activeAt` could have provided incorrect values during replay. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert/): properly update value of variable `$activeAt` in rules annotation during replay mode. Before, `$activeAt` could have provided incorrect values during replay.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/metricsql/): properly handle `c1 AND c2` and `c1 OR c1` queries for constants `c1` and `c2`. Previously such queries could return unexpected results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6637). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/metricsql/): properly handle `c1 AND c2` and `c1 OR c1` queries for constants `c1` and `c2`. Previously such queries could return unexpected results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6637).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/metricsql/): consistently return the first non-`NaN` value from [`range_first`](https://docs.victoriametrics.com/metricsql/#range_first) function across all the returned data points. Previously `NaN` data points weren't replaced with the first non-`NaN` value.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/metricsql/): consistently return the last non-`NaN` value from [`range_last`](https://docs.victoriametrics.com/metricsql/#range_last) function across all the returned data points. Previously `NaN` data points weren't replaced with the last non-`NaN` value.
* BUGFIX: all VictoriaMetrics components: increase default value of `-loggerMaxArgLen` cmd-line flag from 1000 to 5000. This should improve visibility on errors produced by very long queries. * BUGFIX: all VictoriaMetrics components: increase default value of `-loggerMaxArgLen` cmd-line flag from 1000 to 5000. This should improve visibility on errors produced by very long queries.
## [v1.103.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.103.0) ## [v1.103.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.103.0)