app/vmselect/promql: adjust and and unless binary operator handling to be consistent with Prometheus

This commit is contained in:
Aliaksandr Valialkin 2020-01-31 18:52:30 +02:00
parent 36ea1b503b
commit 45bc6c62f2
2 changed files with 61 additions and 7 deletions

View file

@ -32,7 +32,7 @@ var binaryOpFuncs = map[string]binaryOpFunc{
"or": binaryOpOr, "or": binaryOpOr,
"unless": binaryOpUnless, "unless": binaryOpUnless,
// New op // New ops
"if": newBinaryOpArithFunc(binaryop.If), "if": newBinaryOpArithFunc(binaryop.If),
"ifnot": newBinaryOpArithFunc(binaryop.Ifnot), "ifnot": newBinaryOpArithFunc(binaryop.Ifnot),
"default": newBinaryOpArithFunc(binaryop.Default), "default": newBinaryOpArithFunc(binaryop.Default),
@ -285,10 +285,21 @@ func resetMetricGroupIfRequired(be *metricsql.BinaryOpExpr, ts *timeseries) {
func binaryOpAnd(bfa *binaryOpFuncArg) ([]*timeseries, error) { func binaryOpAnd(bfa *binaryOpFuncArg) ([]*timeseries, error) {
mLeft, mRight := createTimeseriesMapByTagSet(bfa.be, bfa.left, bfa.right) mLeft, mRight := createTimeseriesMapByTagSet(bfa.be, bfa.left, bfa.right)
var rvs []*timeseries var rvs []*timeseries
for k := range mRight { for k, tssRight := range mRight {
if tss := mLeft[k]; tss != nil { tssLeft := mLeft[k]
rvs = append(rvs, tss...) if tssLeft == nil {
continue
} }
for i := range tssLeft[0].Values {
if !isAllNaNs(tssRight, i) {
continue
}
for _, tsLeft := range tssLeft {
tsLeft.Values[i] = nan
}
}
tssLeft = removeNaNs(tssLeft)
rvs = append(rvs, tssLeft...)
} }
return rvs, nil return rvs, nil
} }
@ -310,14 +321,35 @@ func binaryOpOr(bfa *binaryOpFuncArg) ([]*timeseries, error) {
func binaryOpUnless(bfa *binaryOpFuncArg) ([]*timeseries, error) { func binaryOpUnless(bfa *binaryOpFuncArg) ([]*timeseries, error) {
mLeft, mRight := createTimeseriesMapByTagSet(bfa.be, bfa.left, bfa.right) mLeft, mRight := createTimeseriesMapByTagSet(bfa.be, bfa.left, bfa.right)
var rvs []*timeseries var rvs []*timeseries
for k, tss := range mLeft { for k, tssLeft := range mLeft {
if mRight[k] == nil { tssRight := mRight[k]
rvs = append(rvs, tss...) if tssRight == nil {
rvs = append(rvs, tssLeft...)
continue
} }
for i := range tssLeft[0].Values {
if isAllNaNs(tssRight, i) {
continue
}
for _, tsLeft := range tssLeft {
tsLeft.Values[i] = nan
}
}
tssLeft = removeNaNs(tssLeft)
rvs = append(rvs, tssLeft...)
} }
return rvs, nil return rvs, nil
} }
func isAllNaNs(tss []*timeseries, idx int) bool {
for _, ts := range tss {
if !math.IsNaN(ts.Values[idx]) {
return false
}
}
return true
}
func createTimeseriesMapByTagSet(be *metricsql.BinaryOpExpr, left, right []*timeseries) (map[string][]*timeseries, map[string][]*timeseries) { func createTimeseriesMapByTagSet(be *metricsql.BinaryOpExpr, left, right []*timeseries) (map[string][]*timeseries, map[string][]*timeseries) {
groupTags := be.GroupModifier.Args groupTags := be.GroupModifier.Args
groupOp := strings.ToLower(be.GroupModifier.Op) groupOp := strings.ToLower(be.GroupModifier.Op)

View file

@ -1730,12 +1730,34 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r} resultExpected := []netstorage.Result{r}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`time() and time() > 1300`, func(t *testing.T) {
t.Parallel()
q := `time() and time() > 1300`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, 1400, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`time() unless 2`, func(t *testing.T) { t.Run(`time() unless 2`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `time() unless 2` q := `time() unless 2`
resultExpected := []netstorage.Result{} resultExpected := []netstorage.Result{}
f(q, resultExpected) f(q, resultExpected)
}) })
t.Run(`time() unless time() > 1500`, func(t *testing.T) {
t.Parallel()
q := `time() unless time() > 1500`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1200, 1400, nan, nan, nan},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`timseries-with-tags unless 2`, func(t *testing.T) { t.Run(`timseries-with-tags unless 2`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `label_set(time(), "foo", "bar") unless 2` q := `label_set(time(), "foo", "bar") unless 2`