app/vmselect/promql: simplify the code after 388d020b7c

Add a test, which verifies the correct sorting of float64 slices with NaNs.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5509
This commit is contained in:
Aliaksandr Valialkin 2024-01-16 15:05:39 +02:00
parent 742b1b5443
commit 606035766a
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 87 additions and 37 deletions

View file

@ -649,25 +649,17 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
return nil, err return nil, err
} }
afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
var tssNoNaNs []*timeseries
for n := range tss[0].Values { for n := range tss[0].Values {
// Drop series with NaNs at Values[n] before sorting. lessFunc := lessWithNaNs
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506 if isReverse {
tssNoNaNs = tssNoNaNs[:0] lessFunc = greaterWithNaNs
for _, ts := range tss {
if !math.IsNaN(ts.Values[n]) {
tssNoNaNs = append(tssNoNaNs, ts)
}
} }
sort.Slice(tssNoNaNs, func(i, j int) bool { sort.Slice(tss, func(i, j int) bool {
a := tssNoNaNs[i].Values[n] a := tss[i].Values[n]
b := tssNoNaNs[j].Values[n] b := tss[j].Values[n]
if isReverse { return lessFunc(a, b)
a, b = b, a
}
return a < b
}) })
fillNaNsAtIdx(n, ks[n], tssNoNaNs) fillNaNsAtIdx(n, ks[n], tss)
} }
tss = removeEmptySeries(tss) tss = removeEmptySeries(tss)
reverseSeries(tss) reverseSeries(tss)
@ -718,29 +710,17 @@ func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr,
value: value, value: value,
} }
} }
// Drop maxs with NaNs before sorting. lessFunc := lessWithNaNs
// This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506 if isReverse {
maxsNoNaNs := make([]tsWithValue, 0, len(maxs)) lessFunc = greaterWithNaNs
for _, tsv := range maxs {
if !math.IsNaN(tsv.value) {
maxsNoNaNs = append(maxsNoNaNs, tsv)
}
} }
sort.Slice(maxsNoNaNs, func(i, j int) bool { sort.Slice(maxs, func(i, j int) bool {
a := maxsNoNaNs[i].value a := maxs[i].value
b := maxsNoNaNs[j].value b := maxs[j].value
if isReverse { return lessFunc(a, b)
a, b = b, a
}
return a < b
}) })
for _, tsv := range maxs { for i := range maxs {
if math.IsNaN(tsv.value) { tss[i] = maxs[i].ts
maxsNoNaNs = append(maxsNoNaNs, tsv)
}
}
for i := range maxsNoNaNs {
tss[i] = maxsNoNaNs[i].ts
} }
remainingSumTS := getRemainingSumTimeseries(tss, modifier, ks, remainingSumTagName) remainingSumTS := getRemainingSumTimeseries(tss, modifier, ks, remainingSumTagName)
@ -1222,6 +1202,28 @@ func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metri
} }
} }
func lessWithNaNs(a, b float64) bool {
// consider NaNs are smaller than non-NaNs
if math.IsNaN(a) {
return !math.IsNaN(b)
}
if math.IsNaN(b) {
return false
}
return a < b
}
func greaterWithNaNs(a, b float64) bool {
// consider NaNs are bigger than non-NaNs
if math.IsNaN(a) {
return !math.IsNaN(b)
}
if math.IsNaN(b) {
return false
}
return a > b
}
func floatToIntBounded(f float64) int { func floatToIntBounded(f float64) int {
if f > math.MaxInt { if f > math.MaxInt {
return math.MaxInt return math.MaxInt

View file

@ -2,9 +2,57 @@ package promql
import ( import (
"math" "math"
"sort"
"testing" "testing"
) )
func TestSortWithNaNs(t *testing.T) {
f := func(a []float64, ascExpected, descExpected []float64) {
t.Helper()
equalSlices := func(a, b []float64) bool {
for i := range a {
x := a[i]
y := b[i]
if math.IsNaN(x) {
return math.IsNaN(y)
}
if math.IsNaN(y) {
return false
}
if x != y {
return false
}
}
return true
}
aCopy := append([]float64{}, a...)
sort.Slice(aCopy, func(i, j int) bool {
return lessWithNaNs(aCopy[i], aCopy[j])
})
if !equalSlices(aCopy, ascExpected) {
t.Fatalf("unexpected slice after asc sorting; got\n%v\nwant\n%v", aCopy, ascExpected)
}
aCopy = append(aCopy[:0], a...)
sort.Slice(aCopy, func(i, j int) bool {
return greaterWithNaNs(aCopy[i], aCopy[j])
})
if !equalSlices(aCopy, descExpected) {
t.Fatalf("unexpected slice after desc sorting; got\n%v\nwant\n%v", aCopy, descExpected)
}
}
f(nil, nil, nil)
f([]float64{1}, []float64{1}, []float64{1})
f([]float64{1, nan, 3, 2}, []float64{nan, 1, 2, 3}, []float64{nan, 3, 2, 1})
f([]float64{nan}, []float64{nan}, []float64{nan})
f([]float64{nan, nan, nan}, []float64{nan, nan, nan}, []float64{nan, nan, nan})
f([]float64{nan, 1, nan}, []float64{nan, nan, 1}, []float64{nan, nan, 1})
f([]float64{nan, 1, 0, 2, nan}, []float64{nan, nan, 0, 1, 2}, []float64{nan, nan, 2, 1, 0})
}
func TestModeNoNaNs(t *testing.T) { func TestModeNoNaNs(t *testing.T) {
f := func(prevValue float64, a []float64, expectedResult float64) { f := func(prevValue float64, a []float64, expectedResult float64) {
t.Helper() t.Helper()