mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
FIX bottomk doesn't return any data when there are no time range overlap between timeseries (#5509)
* FIX sort order in bottomk * Add lessWithNaNsReversed for bottomk * Add ut for TopK * Move lt from loop * FIX lint * FIX lint * FIX lint * Mod log format --------- Co-authored-by: xiaozongyang <xiaozngyang@kanyun.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
190a6565ae
commit
ce4f26db02
5 changed files with 290 additions and 14 deletions
|
@ -649,15 +649,16 @@ func newAggrFuncTopK(isReverse bool) aggrFunc {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lt := lessWithNaNs
|
||||
if isReverse {
|
||||
lt = lessWithNaNsReversed
|
||||
}
|
||||
afe := func(tss []*timeseries, modififer *metricsql.ModifierExpr) []*timeseries {
|
||||
for n := range tss[0].Values {
|
||||
sort.Slice(tss, func(i, j int) bool {
|
||||
a := tss[i].Values[n]
|
||||
b := tss[j].Values[n]
|
||||
if isReverse {
|
||||
a, b = b, a
|
||||
}
|
||||
return lessWithNaNs(a, b)
|
||||
return lt(a, b)
|
||||
})
|
||||
fillNaNsAtIdx(n, ks[n], tss)
|
||||
}
|
||||
|
@ -710,13 +711,12 @@ func getRangeTopKTimeseries(tss []*timeseries, modifier *metricsql.ModifierExpr,
|
|||
value: value,
|
||||
}
|
||||
}
|
||||
lt := lessWithNaNs
|
||||
if isReverse {
|
||||
lt = lessWithNaNsReversed
|
||||
}
|
||||
sort.Slice(maxs, func(i, j int) bool {
|
||||
a := maxs[i].value
|
||||
b := maxs[j].value
|
||||
if isReverse {
|
||||
a, b = b, a
|
||||
}
|
||||
return lessWithNaNs(a, b)
|
||||
return lt(maxs[i].value, maxs[j].value)
|
||||
})
|
||||
for i := range maxs {
|
||||
tss[i] = maxs[i].ts
|
||||
|
@ -1259,6 +1259,13 @@ func lessWithNaNs(a, b float64) bool {
|
|||
return a < b
|
||||
}
|
||||
|
||||
func lessWithNaNsReversed(a, b float64) bool {
|
||||
if math.IsNaN(a) {
|
||||
return true
|
||||
}
|
||||
return a > b
|
||||
}
|
||||
|
||||
func floatToIntBounded(f float64) int {
|
||||
if f > math.MaxInt {
|
||||
return math.MaxInt
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
package promql
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
)
|
||||
|
||||
func TestModeNoNaNs(t *testing.T) {
|
||||
|
@ -34,3 +38,268 @@ func TestModeNoNaNs(t *testing.T) {
|
|||
f(1, []float64{2, 3, 3, 4, 4}, 3)
|
||||
f(1, []float64{4, 3, 2, 3, 4}, 3)
|
||||
}
|
||||
|
||||
func TestLessWithNaNs(t *testing.T) {
|
||||
f := func(a, b float64, expectedResult bool) {
|
||||
t.Helper()
|
||||
result := lessWithNaNs(a, b)
|
||||
if result != expectedResult {
|
||||
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
|
||||
}
|
||||
}
|
||||
f(nan, nan, false)
|
||||
f(nan, 1, true)
|
||||
f(1, nan, false)
|
||||
f(1, 2, true)
|
||||
f(2, 1, false)
|
||||
f(1, 1, false)
|
||||
}
|
||||
|
||||
func TestLessWithNaNsReversed(t *testing.T) {
|
||||
f := func(a, b float64, expectedResult bool) {
|
||||
t.Helper()
|
||||
result := lessWithNaNsReversed(a, b)
|
||||
if result != expectedResult {
|
||||
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
|
||||
}
|
||||
}
|
||||
f(nan, nan, true)
|
||||
f(nan, 1, true)
|
||||
f(1, nan, false)
|
||||
f(1, 2, false)
|
||||
f(2, 1, true)
|
||||
f(1, 1, false)
|
||||
}
|
||||
|
||||
func TestTopK(t *testing.T) {
|
||||
f := func(all [][]*timeseries, expected []*timeseries, k int, reversed bool) {
|
||||
t.Helper()
|
||||
topKFunc := newAggrFuncTopK(reversed)
|
||||
actual, err := topKFunc(&aggrFuncArg{
|
||||
args: all,
|
||||
ae: &metricsql.AggrFuncExpr{
|
||||
Limit: 1,
|
||||
Modifier: metricsql.ModifierExpr{},
|
||||
},
|
||||
ec: nil,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("failed to call topK, err=%v", err)
|
||||
}
|
||||
for i := range actual {
|
||||
if !eq(expected[i], actual[i]) {
|
||||
t.Fatalf("unexpected result: i:%v got:\n%v; want:\t%v", i, actual[i], expected[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
f(newTestSeries(), []*timeseries{
|
||||
{
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
Values: []float64{nan, nan, 3, 2, 1},
|
||||
},
|
||||
{
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
Values: []float64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
Values: []float64{2, 3, nan, nan, nan},
|
||||
},
|
||||
}, 2, true)
|
||||
f(newTestSeries(), []*timeseries{
|
||||
{
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
Values: []float64{3, 4, 5, 6, 7},
|
||||
},
|
||||
{
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
Values: []float64{nan, nan, 4, 5, 6},
|
||||
},
|
||||
{
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
Values: []float64{5, 4, nan, nan, nan},
|
||||
},
|
||||
}, 2, false)
|
||||
f(newTestSeriesWithNaNsWithoutOverlap(), []*timeseries{
|
||||
{
|
||||
Values: []float64{nan, nan, nan, 2, 1},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, 5, 6, 7},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{2, 3, 4, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{1, 2, nan, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
}, 2, true)
|
||||
f(newTestSeriesWithNaNsWithoutOverlap(), []*timeseries{
|
||||
{
|
||||
Values: []float64{nan, nan, 5, 6, 7},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, 6, 2, 1},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{2, 3, nan, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{1, 2, nan, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
}, 2, false)
|
||||
f(newTestSeriesWithNaNsWithOverlap(), []*timeseries{
|
||||
{
|
||||
Values: []float64{nan, nan, nan, 2, 1},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, nan, 6, 7},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{1, 2, 3, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{2, 3, 4, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
}, 2, true)
|
||||
f(newTestSeriesWithNaNsWithOverlap(), []*timeseries{
|
||||
{
|
||||
Values: []float64{nan, nan, 5, 6, 7},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, 6, 2, 1},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{2, 3, nan, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{1, 2, nan, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
}, 2, false)
|
||||
}
|
||||
|
||||
func newTestSeries() [][]*timeseries {
|
||||
return [][]*timeseries{
|
||||
{
|
||||
{
|
||||
Values: []float64{2, 2, 2, 2, 2},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
Values: []float64{1, 2, 3, 4, 5},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{2, 3, 4, 5, 6},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{5, 4, 3, 2, 1},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{3, 4, 5, 6, 7},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTestSeriesWithNaNsWithoutOverlap() [][]*timeseries {
|
||||
return [][]*timeseries{
|
||||
{
|
||||
{
|
||||
Values: []float64{2, 2, 2, 2, 2},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
Values: []float64{1, 2, nan, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{2, 3, 4, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, 6, 2, 1},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, 5, 6, 7},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTestSeriesWithNaNsWithOverlap() [][]*timeseries {
|
||||
return [][]*timeseries{
|
||||
{
|
||||
{
|
||||
Values: []float64{2, 2, 2, 2, 2},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
{
|
||||
{
|
||||
Values: []float64{1, 2, 3, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{2, 3, 4, nan, nan},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, 6, 2, 1},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
{
|
||||
Values: []float64{nan, nan, 5, 6, 7},
|
||||
Timestamps: []int64{1, 2, 3, 4, 5},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func eq(a, b *timeseries) bool {
|
||||
if !reflect.DeepEqual(a.Timestamps, b.Timestamps) {
|
||||
return false
|
||||
}
|
||||
for i := range a.Values {
|
||||
if !eqWithNan(a.Values[i], b.Values[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func eqWithNan(a, b float64) bool {
|
||||
if math.IsNaN(a) && math.IsNaN(b) {
|
||||
return true
|
||||
}
|
||||
if math.IsNaN(a) || math.IsNaN(b) {
|
||||
return false
|
||||
}
|
||||
return a == b
|
||||
}
|
||||
|
|
|
@ -334,6 +334,6 @@ var bwPool sync.Pool
|
|||
|
||||
type streamTracker struct {
|
||||
fd uintptr
|
||||
offset uint64
|
||||
length uint64
|
||||
offset uint64 // nolint
|
||||
length uint64 // nolint
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package filestream
|
||||
|
||||
func (st *streamTracker) adviseDontNeed(n int, fdatasync bool) error {
|
||||
func (st *streamTracker) adviseDontNeed(n int, fdatasync bool) error { // nolint
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"os"
|
||||
)
|
||||
|
||||
func fadviseSequentialRead(f *os.File, prefetch bool) error {
|
||||
func fadviseSequentialRead(f *os.File, prefetch bool) error { // nolint
|
||||
// TODO: implement this properly
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue