FIX bottomk doesn't return any data when there are no time range overlap between timeseries (#5509)

* FIX sort order in bottomk

* Add lessWithNaNsReversed for bottomk

* Add ut for TopK

* Move lt from loop

* FIX lint

* FIX lint

* FIX lint

* Mod log format

---------

Co-authored-by: xiaozongyang <xiaozngyang@kanyun.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Zongyang 2024-01-16 08:19:36 +08:00 committed by Aliaksandr Valialkin
parent 08ba65a728
commit d371848c2f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 290 additions and 14 deletions

View file

@ -648,15 +648,16 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
if err != nil { if err != nil {
return nil, err return nil, err
} }
lt := lessWithNaNs
if isReverse {
lt = lessWithNaNsReversed
}
afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries { afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
for n := range tss[0].Values { for n := range tss[0].Values {
sort.Slice(tss, func(i, j int) bool { sort.Slice(tss, func(i, j int) bool {
a := tss[i].Values[n] a := tss[i].Values[n]
b := tss[j].Values[n] b := tss[j].Values[n]
if isReverse { return lt(a, b)
a, b = b, a
}
return lessWithNaNs(a, b)
}) })
fillNaNsAtIdx(n, ks[n], tss) fillNaNsAtIdx(n, ks[n], tss)
} }
@ -709,13 +710,12 @@ func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr,
value: value, value: value,
} }
} }
lt := lessWithNaNs
if isReverse {
lt = lessWithNaNsReversed
}
sort.Slice(maxs, func(i, j int) bool { sort.Slice(maxs, func(i, j int) bool {
a := maxs[i].value return lt(maxs[i].value, maxs[j].value)
b := maxs[j].value
if isReverse {
a, b = b, a
}
return lessWithNaNs(a, b)
}) })
for i := range maxs { for i := range maxs {
tss[i] = maxs[i].ts tss[i] = maxs[i].ts
@ -1206,6 +1206,13 @@ func lessWithNaNs(a, b float64) bool {
return a < b return a < b
} }
func lessWithNaNsReversed(a, b float64) bool {
if math.IsNaN(a) {
return true
}
return a > b
}
func floatToIntBounded(f float64) int { func floatToIntBounded(f float64) int {
if f > math.MaxInt { if f > math.MaxInt {
return math.MaxInt return math.MaxInt

View file

@ -1,8 +1,12 @@
package promql package promql
import ( import (
"log"
"math" "math"
"reflect"
"testing" "testing"
"github.com/VictoriaMetrics/metricsql"
) )
func TestModeNoNaNs(t *testing.T) { func TestModeNoNaNs(t *testing.T) {
@ -34,3 +38,268 @@ func TestModeNoNaNs(t *testing.T) {
f(1, []float64{2, 3, 3, 4, 4}, 3) f(1, []float64{2, 3, 3, 4, 4}, 3)
f(1, []float64{4, 3, 2, 3, 4}, 3) f(1, []float64{4, 3, 2, 3, 4}, 3)
} }
func TestLessWithNaNs(t *testing.T) {
f := func(a, b float64, expectedResult bool) {
t.Helper()
result := lessWithNaNs(a, b)
if result != expectedResult {
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
}
}
f(nan, nan, false)
f(nan, 1, true)
f(1, nan, false)
f(1, 2, true)
f(2, 1, false)
f(1, 1, false)
}
func TestLessWithNaNsReversed(t *testing.T) {
f := func(a, b float64, expectedResult bool) {
t.Helper()
result := lessWithNaNsReversed(a, b)
if result != expectedResult {
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
}
}
f(nan, nan, true)
f(nan, 1, true)
f(1, nan, false)
f(1, 2, false)
f(2, 1, true)
f(1, 1, false)
}
func TestTopK(t *testing.T) {
f := func(all [][]*timeseries, expected []*timeseries, k int, reversed bool) {
t.Helper()
topKFunc := newAggrFuncTopK(reversed)
actual, err := topKFunc(&aggrFuncArg{
args: all,
ae: &metricsql.AggrFuncExpr{
Limit: 1,
Modifier: metricsql.ModifierExpr{},
},
ec: nil,
})
if err != nil {
log.Fatalf("failed to call topK, err=%v", err)
}
for i := range actual {
if !eq(expected[i], actual[i]) {
t.Fatalf("unexpected result: i:%v got:\n%v; want:\t%v", i, actual[i], expected[i])
}
}
}
f(newTestSeries(), []*timeseries{
{
Timestamps: []int64{1, 2, 3, 4, 5},
Values: []float64{nan, nan, 3, 2, 1},
},
{
Timestamps: []int64{1, 2, 3, 4, 5},
Values: []float64{1, 2, 3, 4, 5},
},
{
Timestamps: []int64{1, 2, 3, 4, 5},
Values: []float64{2, 3, nan, nan, nan},
},
}, 2, true)
f(newTestSeries(), []*timeseries{
{
Timestamps: []int64{1, 2, 3, 4, 5},
Values: []float64{3, 4, 5, 6, 7},
},
{
Timestamps: []int64{1, 2, 3, 4, 5},
Values: []float64{nan, nan, 4, 5, 6},
},
{
Timestamps: []int64{1, 2, 3, 4, 5},
Values: []float64{5, 4, nan, nan, nan},
},
}, 2, false)
f(newTestSeriesWithNaNsWithoutOverlap(), []*timeseries{
{
Values: []float64{nan, nan, nan, 2, 1},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, 5, 6, 7},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{2, 3, 4, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{1, 2, nan, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
}, 2, true)
f(newTestSeriesWithNaNsWithoutOverlap(), []*timeseries{
{
Values: []float64{nan, nan, 5, 6, 7},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, 6, 2, 1},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{2, 3, nan, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{1, 2, nan, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
}, 2, false)
f(newTestSeriesWithNaNsWithOverlap(), []*timeseries{
{
Values: []float64{nan, nan, nan, 2, 1},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, nan, 6, 7},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{1, 2, 3, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{2, 3, 4, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
}, 2, true)
f(newTestSeriesWithNaNsWithOverlap(), []*timeseries{
{
Values: []float64{nan, nan, 5, 6, 7},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, 6, 2, 1},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{2, 3, nan, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{1, 2, nan, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
}, 2, false)
}
func newTestSeries() [][]*timeseries {
return [][]*timeseries{
{
{
Values: []float64{2, 2, 2, 2, 2},
Timestamps: []int64{1, 2, 3, 4, 5},
},
},
{
{
Values: []float64{1, 2, 3, 4, 5},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{2, 3, 4, 5, 6},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{5, 4, 3, 2, 1},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{3, 4, 5, 6, 7},
Timestamps: []int64{1, 2, 3, 4, 5},
},
},
}
}
func newTestSeriesWithNaNsWithoutOverlap() [][]*timeseries {
return [][]*timeseries{
{
{
Values: []float64{2, 2, 2, 2, 2},
Timestamps: []int64{1, 2, 3, 4, 5},
},
},
{
{
Values: []float64{1, 2, nan, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{2, 3, 4, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, 6, 2, 1},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, 5, 6, 7},
Timestamps: []int64{1, 2, 3, 4, 5},
},
},
}
}
func newTestSeriesWithNaNsWithOverlap() [][]*timeseries {
return [][]*timeseries{
{
{
Values: []float64{2, 2, 2, 2, 2},
Timestamps: []int64{1, 2, 3, 4, 5},
},
},
{
{
Values: []float64{1, 2, 3, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{2, 3, 4, nan, nan},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, 6, 2, 1},
Timestamps: []int64{1, 2, 3, 4, 5},
},
{
Values: []float64{nan, nan, 5, 6, 7},
Timestamps: []int64{1, 2, 3, 4, 5},
},
},
}
}
func eq(a, b *timeseries) bool {
if !reflect.DeepEqual(a.Timestamps, b.Timestamps) {
return false
}
for i := range a.Values {
if !eqWithNan(a.Values[i], b.Values[i]) {
return false
}
}
return true
}
func eqWithNan(a, b float64) bool {
if math.IsNaN(a) && math.IsNaN(b) {
return true
}
if math.IsNaN(a) || math.IsNaN(b) {
return false
}
return a == b
}

View file

@ -324,6 +324,6 @@ var bwPool sync.Pool
type streamTracker struct { type streamTracker struct {
fd uintptr fd uintptr
offset uint64 offset uint64 // nolint
length uint64 length uint64 // nolint
} }

View file

@ -1,6 +1,6 @@
package filestream package filestream
func (st *streamTracker) adviseDontNeed(n int, fdatasync bool) error { func (st *streamTracker) adviseDontNeed(n int, fdatasync bool) error { // nolint
return nil return nil
} }

View file

@ -4,7 +4,7 @@ import (
"os" "os"
) )
func fadviseSequentialRead(f *os.File, prefetch bool) error { func fadviseSequentialRead(f *os.File, prefetch bool) error { // nolint
// TODO: implement this properly // TODO: implement this properly
return nil return nil
} }