diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index 98ff3687e..6ab3b2136 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -649,25 +649,17 @@ func newAggrFuncTopK(isReverse bool) aggrFunc { return nil, err } afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { - var tssNoNaNs []*timeseries for n := range tss[0].Values { - // Drop series with NaNs at Values[n] before sorting. - // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506 - tssNoNaNs = tssNoNaNs[:0] - for _, ts := range tss { - if !math.IsNaN(ts.Values[n]) { - tssNoNaNs = append(tssNoNaNs, ts) - } + lessFunc := lessWithNaNs + if isReverse { + lessFunc = greaterWithNaNs } - sort.Slice(tssNoNaNs, func(i, j int) bool { - a := tssNoNaNs[i].Values[n] - b := tssNoNaNs[j].Values[n] - if isReverse { - a, b = b, a - } - return a < b + sort.Slice(tss, func(i, j int) bool { + a := tss[i].Values[n] + b := tss[j].Values[n] + return lessFunc(a, b) }) - fillNaNsAtIdx(n, ks[n], tssNoNaNs) + fillNaNsAtIdx(n, ks[n], tss) } tss = removeEmptySeries(tss) reverseSeries(tss) @@ -718,29 +710,17 @@ func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr, value: value, } } - // Drop maxs with NaNs before sorting. - // This is needed for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5506 - maxsNoNaNs := make([]tsWithValue, 0, len(maxs)) - for _, tsv := range maxs { - if !math.IsNaN(tsv.value) { - maxsNoNaNs = append(maxsNoNaNs, tsv) - } + lessFunc := lessWithNaNs + if isReverse { + lessFunc = greaterWithNaNs } - sort.Slice(maxsNoNaNs, func(i, j int) bool { - a := maxsNoNaNs[i].value - b := maxsNoNaNs[j].value - if isReverse { - a, b = b, a - } - return a < b + sort.Slice(maxs, func(i, j int) bool { + a := maxs[i].value + b := maxs[j].value + return lessFunc(a, b) }) - for _, tsv := range maxs { - if math.IsNaN(tsv.value) { - maxsNoNaNs = append(maxsNoNaNs, tsv) - } - } - for i := range maxsNoNaNs { - tss[i] = maxsNoNaNs[i].ts + for i := range maxs { + tss[i] = maxs[i].ts } remainingSumTS := getRemainingSumTimeseries(tss, modifier, ks, remainingSumTagName) @@ -1222,6 +1202,28 @@ func newAggrQuantileFunc(phis []float64) func(tss []*timeseries, modifier *metri } } +func lessWithNaNs(a, b float64) bool { + // consider NaNs are smaller than non-NaNs + if math.IsNaN(a) { + return !math.IsNaN(b) + } + if math.IsNaN(b) { + return false + } + return a < b +} + +func greaterWithNaNs(a, b float64) bool { + // consider NaNs are bigger than non-NaNs + if math.IsNaN(a) { + return !math.IsNaN(b) + } + if math.IsNaN(b) { + return false + } + return a > b +} + func floatToIntBounded(f float64) int { if f > math.MaxInt { return math.MaxInt diff --git a/app/vmselect/promql/aggr_test.go b/app/vmselect/promql/aggr_test.go index a8c58883f..eec76be61 100644 --- a/app/vmselect/promql/aggr_test.go +++ b/app/vmselect/promql/aggr_test.go @@ -2,9 +2,57 @@ package promql import ( "math" + "sort" "testing" ) +func TestSortWithNaNs(t *testing.T) { + f := func(a []float64, ascExpected, descExpected []float64) { + t.Helper() + + equalSlices := func(a, b []float64) bool { + for i := range a { + x := a[i] + y := b[i] + if math.IsNaN(x) { + return math.IsNaN(y) + } + if math.IsNaN(y) { + return false + } + if x != y { + return false + } + } + return true + } + + aCopy := append([]float64{}, a...) + sort.Slice(aCopy, func(i, j int) bool { + return lessWithNaNs(aCopy[i], aCopy[j]) + }) + if !equalSlices(aCopy, ascExpected) { + t.Fatalf("unexpected slice after asc sorting; got\n%v\nwant\n%v", aCopy, ascExpected) + } + + aCopy = append(aCopy[:0], a...) + sort.Slice(aCopy, func(i, j int) bool { + return greaterWithNaNs(aCopy[i], aCopy[j]) + }) + if !equalSlices(aCopy, descExpected) { + t.Fatalf("unexpected slice after desc sorting; got\n%v\nwant\n%v", aCopy, descExpected) + } + } + + f(nil, nil, nil) + f([]float64{1}, []float64{1}, []float64{1}) + f([]float64{1, nan, 3, 2}, []float64{nan, 1, 2, 3}, []float64{nan, 3, 2, 1}) + f([]float64{nan}, []float64{nan}, []float64{nan}) + f([]float64{nan, nan, nan}, []float64{nan, nan, nan}, []float64{nan, nan, nan}) + f([]float64{nan, 1, nan}, []float64{nan, nan, 1}, []float64{nan, nan, 1}) + f([]float64{nan, 1, 0, 2, nan}, []float64{nan, nan, 0, 1, 2}, []float64{nan, nan, 2, 1, 0}) +} + func TestModeNoNaNs(t *testing.T) { f := func(prevValue float64, a []float64, expectedResult float64) { t.Helper()