app/vmselect/promql: properly handle args in count_values_over_time() function

Prevsiously they were swapped - the first arg should be the label name and the second arg should be label filters

This is a follow-up for e389b7b959e8144fdff5075bf7a5a39b2b0c6dd3

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5847
This commit is contained in:
Aliaksandr Valialkin 2024-02-25 01:47:09 +02:00
parent 4bdb3d9fd9
commit 35f592a02c
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 121 additions and 65 deletions

View file

@ -314,7 +314,7 @@ func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr)
} }
rf, err := nrf(args) rf, err := nrf(args)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("cannot evaluate args for %q: %w", fe.AppendString(nil), err)
} }
rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil) rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
if err != nil { if err != nil {
@ -395,7 +395,7 @@ func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFunc
} }
rf, err := nrf(args) rf, err := nrf(args)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("cannot evaluate args for aggregate func %q: %w", ae.AppendString(nil), err)
} }
iafc := newIncrementalAggrFuncContext(ae, callbacks) iafc := newIncrementalAggrFuncContext(ae, callbacks)
return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc) return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)

View file

@ -5824,7 +5824,10 @@ func TestExecSuccess(t *testing.T) {
}) })
t.Run(`count_values_over_time`, func(t *testing.T) { t.Run(`count_values_over_time`, func(t *testing.T) {
t.Parallel() t.Parallel()
q := `sort_by_label(count_values_over_time(round(label_set(rand(0), "x", "y"), 0.4)[200s:5s], "foo"), "foo")` q := `sort_by_label(
count_values_over_time("foo", round(label_set(rand(0), "x", "y"), 0.4)[200s:5s]),
"foo",
)`
r1 := netstorage.Result{ r1 := netstorage.Result{
MetricName: metricNameExpected, MetricName: metricNameExpected,
Values: []float64{4, 8, 7, 6, 10, 9}, Values: []float64{4, 8, 7, 6, 10, 9},

View file

@ -1449,11 +1449,11 @@ func newRollupCountValues(args []interface{}) (rollupFunc, error) {
if err := expectRollupArgsNum(args, 2); err != nil { if err := expectRollupArgsNum(args, 2); err != nil {
return nil, err return nil, err
} }
tssLabelNum, ok := args[1].([]*timeseries) tssLabelNum, ok := args[0].([]*timeseries)
if !ok { if !ok {
return nil, fmt.Errorf(`unexpected type for labelName arg; got %T; want %T`, args[1], tssLabelNum) return nil, fmt.Errorf(`unexpected type for labelName arg; got %T; want %T`, args[0], tssLabelNum)
} }
labelName, err := getString(tssLabelNum, 1) labelName, err := getString(tssLabelNum, 0)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot get labelName: %w", err) return nil, fmt.Errorf("cannot get labelName: %w", err)
} }

2
go.mod
View file

@ -9,7 +9,7 @@ require (
github.com/VictoriaMetrics/easyproto v0.1.4 github.com/VictoriaMetrics/easyproto v0.1.4
github.com/VictoriaMetrics/fastcache v1.12.2 github.com/VictoriaMetrics/fastcache v1.12.2
github.com/VictoriaMetrics/metrics v1.33.0 github.com/VictoriaMetrics/metrics v1.33.0
github.com/VictoriaMetrics/metricsql v0.74.0 github.com/VictoriaMetrics/metricsql v0.75.0
github.com/aws/aws-sdk-go-v2 v1.25.2 github.com/aws/aws-sdk-go-v2 v1.25.2
github.com/aws/aws-sdk-go-v2/config v1.27.4 github.com/aws/aws-sdk-go-v2/config v1.27.4
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.6 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.6

4
go.sum
View file

@ -70,8 +70,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkT
github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys= github.com/VictoriaMetrics/metrics v1.24.0/go.mod h1:eFT25kvsTidQFHb6U0oa0rTrDRdz4xTYjpL8+UPohys=
github.com/VictoriaMetrics/metrics v1.33.0 h1:EnkDEaGiL2u95t+W76GfecC/LMYpy+tFrexYzBWQIAc= github.com/VictoriaMetrics/metrics v1.33.0 h1:EnkDEaGiL2u95t+W76GfecC/LMYpy+tFrexYzBWQIAc=
github.com/VictoriaMetrics/metrics v1.33.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= github.com/VictoriaMetrics/metrics v1.33.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/VictoriaMetrics/metricsql v0.74.0 h1:bVO7USXBBYEuEHQ3PZg/6216j0DvblZM+Q8sTRECkv0= github.com/VictoriaMetrics/metricsql v0.75.0 h1:ZRt2tE+GwLqa7fhY7A3wxIZe5Bdb6KiCm9MRHilv4eo=
github.com/VictoriaMetrics/metricsql v0.74.0/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I= github.com/VictoriaMetrics/metricsql v0.75.0/go.mod h1:k4UaP/+CjuZslIjd+kCigNG9TQmUqh5v0TP/nMEy90I=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

View file

@ -60,18 +60,20 @@ func optimizeInplace(e Expr) {
optimizeInplace(t.Expr) optimizeInplace(t.Expr)
optimizeInplace(t.At) optimizeInplace(t.At)
case *FuncExpr: case *FuncExpr:
for _, arg := range t.Args { optimizeArgsInplace(t.Args)
optimizeInplace(arg)
}
case *AggrFuncExpr: case *AggrFuncExpr:
for _, arg := range t.Args { optimizeArgsInplace(t.Args)
optimizeInplace(arg)
}
case *BinaryOpExpr: case *BinaryOpExpr:
optimizeInplace(t.Left) optimizeInplace(t.Left)
optimizeInplace(t.Right) optimizeInplace(t.Right)
lfs := getCommonLabelFilters(t) lfs := getCommonLabelFilters(t)
pushdownBinaryOpFiltersInplace(t, lfs) pushdownBinaryOpFiltersInplace(lfs, t)
}
}
func optimizeArgsInplace(args []Expr) {
for _, arg := range args {
optimizeInplace(arg)
} }
} }
@ -82,19 +84,24 @@ func getCommonLabelFilters(e Expr) []LabelFilter {
case *RollupExpr: case *RollupExpr:
return getCommonLabelFilters(t.Expr) return getCommonLabelFilters(t.Expr)
case *FuncExpr: case *FuncExpr:
args := t.Args
switch strings.ToLower(t.Name) { switch strings.ToLower(t.Name) {
case "label_set": case "label_set":
return getCommonLabelFiltersForLabelSet(t.Args) return getCommonLabelFiltersForLabelSet(args)
case "label_replace", "label_join", "label_map", "label_match", "label_mismatch", "label_transform": case "label_replace", "label_join", "label_map", "label_match", "label_mismatch", "label_transform":
return getCommonLabelFiltersForLabelReplace(t.Args) return getCommonLabelFiltersForLabelReplace(args)
case "label_copy", "label_move": case "label_copy", "label_move":
return getCommonLabelFiltersForLabelCopy(t.Args) return getCommonLabelFiltersForLabelCopy(args)
case "label_del", "label_uppercase", "label_lowercase", "labels_equal": case "label_del", "label_uppercase", "label_lowercase", "labels_equal":
return getCommonLabelFiltersForLabelDel(t.Args) return getCommonLabelFiltersForLabelDel(args)
case "label_keep": case "label_keep":
return getCommonLabelFiltersForLabelKeep(t.Args) return getCommonLabelFiltersForLabelKeep(args)
case "count_values_over_time":
return getCommonLabelFiltersForCountValuesOverTime(args)
case "range_normalize", "union", "":
return intersectLabelFiltersForAllArgs(args)
default: default:
arg := getFuncArgForOptimization(t.Name, t.Args) arg := getFuncArgForOptimization(t.Name, args)
if arg == nil { if arg == nil {
return nil return nil
} }
@ -102,12 +109,16 @@ func getCommonLabelFilters(e Expr) []LabelFilter {
} }
case *AggrFuncExpr: case *AggrFuncExpr:
args := t.Args args := t.Args
if len(args) > 0 && canAcceptMultipleArgsForAggrFunc(t.Name) { if strings.ToLower(t.Name) == "count_values" {
lfs := getCommonLabelFilters(args[0]) if len(args) != 2 {
for _, arg := range args[1:] { return nil
lfsNext := getCommonLabelFilters(arg)
lfs = intersectLabelFilters(lfs, lfsNext)
} }
lfs := getCommonLabelFilters(args[1])
lfs = dropLabelFiltersForLabelName(lfs, args[0])
return trimFiltersByAggrModifier(lfs, t)
}
if canAcceptMultipleArgsForAggrFunc(t.Name) {
lfs := intersectLabelFiltersForAllArgs(args)
return trimFiltersByAggrModifier(lfs, t) return trimFiltersByAggrModifier(lfs, t)
} }
arg := getFuncArgForOptimization(t.Name, args) arg := getFuncArgForOptimization(t.Name, args)
@ -174,6 +185,26 @@ func getCommonLabelFilters(e Expr) []LabelFilter {
} }
} }
func intersectLabelFiltersForAllArgs(args []Expr) []LabelFilter {
if len(args) == 0 {
return nil
}
lfs := getCommonLabelFilters(args[0])
for _, arg := range args[1:] {
lfsNext := getCommonLabelFilters(arg)
lfs = intersectLabelFilters(lfs, lfsNext)
}
return lfs
}
func getCommonLabelFiltersForCountValuesOverTime(args []Expr) []LabelFilter {
if len(args) != 2 {
return nil
}
lfs := getCommonLabelFilters(args[1])
return dropLabelFiltersForLabelName(lfs, args[0])
}
func getCommonLabelFiltersForLabelKeep(args []Expr) []LabelFilter { func getCommonLabelFiltersForLabelKeep(args []Expr) []LabelFilter {
if len(args) == 0 { if len(args) == 0 {
return nil return nil
@ -318,11 +349,11 @@ func PushdownBinaryOpFilters(e Expr, commonFilters []LabelFilter) Expr {
return e return e
} }
eCopy := Clone(e) eCopy := Clone(e)
pushdownBinaryOpFiltersInplace(eCopy, commonFilters) pushdownBinaryOpFiltersInplace(commonFilters, eCopy)
return eCopy return eCopy
} }
func pushdownBinaryOpFiltersInplace(e Expr, lfs []LabelFilter) { func pushdownBinaryOpFiltersInplace(lfs []LabelFilter, e Expr) {
if len(lfs) == 0 { if len(lfs) == 0 {
return return
} }
@ -334,62 +365,84 @@ func pushdownBinaryOpFiltersInplace(e Expr, lfs []LabelFilter) {
t.LabelFilterss[i] = lfsLocal t.LabelFilterss[i] = lfsLocal
} }
case *RollupExpr: case *RollupExpr:
pushdownBinaryOpFiltersInplace(t.Expr, lfs) pushdownBinaryOpFiltersInplace(lfs, t.Expr)
case *FuncExpr: case *FuncExpr:
args := t.Args
switch strings.ToLower(t.Name) { switch strings.ToLower(t.Name) {
case "label_set": case "label_set":
pushdownLabelFiltersForLabelSet(t.Args, lfs) pushdownLabelFiltersForLabelSet(lfs, args)
case "label_replace", "label_join", "label_map", "label_match", "label_mismatch", "label_transform": case "label_replace", "label_join", "label_map", "label_match", "label_mismatch", "label_transform":
pushdownLabelFiltersForLabelReplace(t.Args, lfs) pushdownLabelFiltersForLabelReplace(lfs, args)
case "label_copy", "label_move": case "label_copy", "label_move":
pushdownLabelFiltersForLabelCopy(t.Args, lfs) pushdownLabelFiltersForLabelCopy(lfs, args)
case "label_del", "label_uppercase", "label_lowercase", "labels_equal": case "label_del", "label_uppercase", "label_lowercase", "labels_equal":
pushdownLabelFiltersForLabelDel(t.Args, lfs) pushdownLabelFiltersForLabelDel(lfs, args)
case "label_keep": case "label_keep":
pushdownLabelFiltersForLabelKeep(t.Args, lfs) pushdownLabelFiltersForLabelKeep(lfs, args)
case "count_values_over_time":
pushdownLabelFiltersForCountValuesOverTime(lfs, args)
case "range_normalize", "union", "":
pushdownLabelFiltersForAllArgs(lfs, args)
default: default:
arg := getFuncArgForOptimization(t.Name, t.Args) arg := getFuncArgForOptimization(t.Name, args)
if arg != nil { if arg != nil {
pushdownBinaryOpFiltersInplace(arg, lfs) pushdownBinaryOpFiltersInplace(lfs, arg)
} }
} }
case *AggrFuncExpr: case *AggrFuncExpr:
lfs = trimFiltersByAggrModifier(lfs, t) lfs = trimFiltersByAggrModifier(lfs, t)
args := t.Args args := t.Args
if len(args) > 0 && canAcceptMultipleArgsForAggrFunc(t.Name) { if strings.ToLower(t.Name) == "count_values" {
for _, arg := range args { if len(args) == 2 {
pushdownBinaryOpFiltersInplace(arg, lfs) lfs = dropLabelFiltersForLabelName(lfs, args[0])
pushdownBinaryOpFiltersInplace(lfs, args[1])
} }
} else if canAcceptMultipleArgsForAggrFunc(t.Name) {
pushdownLabelFiltersForAllArgs(lfs, args)
} else { } else {
arg := getFuncArgForOptimization(t.Name, args) arg := getFuncArgForOptimization(t.Name, args)
if arg != nil { if arg != nil {
pushdownBinaryOpFiltersInplace(arg, lfs) pushdownBinaryOpFiltersInplace(lfs, arg)
} }
} }
case *BinaryOpExpr: case *BinaryOpExpr:
lfs = TrimFiltersByGroupModifier(lfs, t) lfs = TrimFiltersByGroupModifier(lfs, t)
pushdownBinaryOpFiltersInplace(t.Left, lfs) pushdownBinaryOpFiltersInplace(lfs, t.Left)
pushdownBinaryOpFiltersInplace(t.Right, lfs) pushdownBinaryOpFiltersInplace(lfs, t.Right)
} }
} }
func pushdownLabelFiltersForLabelKeep(args []Expr, lfs []LabelFilter) { func pushdownLabelFiltersForAllArgs(lfs []LabelFilter, args []Expr) {
for _, arg := range args {
pushdownBinaryOpFiltersInplace(lfs, arg)
}
}
func pushdownLabelFiltersForCountValuesOverTime(lfs []LabelFilter, args []Expr) {
if len(args) != 2 {
return
}
lfs = dropLabelFiltersForLabelName(lfs, args[0])
pushdownBinaryOpFiltersInplace(lfs, args[1])
}
func pushdownLabelFiltersForLabelKeep(lfs []LabelFilter, args []Expr) {
if len(args) == 0 { if len(args) == 0 {
return return
} }
lfs = keepLabelFiltersForLabelNames(lfs, args[1:]) lfs = keepLabelFiltersForLabelNames(lfs, args[1:])
pushdownBinaryOpFiltersInplace(args[0], lfs) pushdownBinaryOpFiltersInplace(lfs, args[0])
} }
func pushdownLabelFiltersForLabelDel(args []Expr, lfs []LabelFilter) { func pushdownLabelFiltersForLabelDel(lfs []LabelFilter, args []Expr) {
if len(args) == 0 { if len(args) == 0 {
return return
} }
lfs = dropLabelFiltersForLabelNames(lfs, args[1:]) lfs = dropLabelFiltersForLabelNames(lfs, args[1:])
pushdownBinaryOpFiltersInplace(args[0], lfs) pushdownBinaryOpFiltersInplace(lfs, args[0])
} }
func pushdownLabelFiltersForLabelCopy(args []Expr, lfs []LabelFilter) { func pushdownLabelFiltersForLabelCopy(lfs []LabelFilter, args []Expr) {
if len(args) == 0 { if len(args) == 0 {
return return
} }
@ -403,18 +456,18 @@ func pushdownLabelFiltersForLabelCopy(args []Expr, lfs []LabelFilter) {
labelNames = append(labelNames, args[i+1]) labelNames = append(labelNames, args[i+1])
} }
lfs = dropLabelFiltersForLabelNames(lfs, labelNames) lfs = dropLabelFiltersForLabelNames(lfs, labelNames)
pushdownBinaryOpFiltersInplace(arg, lfs) pushdownBinaryOpFiltersInplace(lfs, arg)
} }
func pushdownLabelFiltersForLabelReplace(args []Expr, lfs []LabelFilter) { func pushdownLabelFiltersForLabelReplace(lfs []LabelFilter, args []Expr) {
if len(args) < 2 { if len(args) < 2 {
return return
} }
lfs = dropLabelFiltersForLabelName(lfs, args[1]) lfs = dropLabelFiltersForLabelName(lfs, args[1])
pushdownBinaryOpFiltersInplace(args[0], lfs) pushdownBinaryOpFiltersInplace(lfs, args[0])
} }
func pushdownLabelFiltersForLabelSet(args []Expr, lfs []LabelFilter) { func pushdownLabelFiltersForLabelSet(lfs []LabelFilter, args []Expr) {
if len(args) == 0 { if len(args) == 0 {
return return
} }
@ -425,7 +478,7 @@ func pushdownLabelFiltersForLabelSet(args []Expr, lfs []LabelFilter) {
labelNames = append(labelNames, args[i]) labelNames = append(labelNames, args[i])
} }
lfs = dropLabelFiltersForLabelNames(lfs, labelNames) lfs = dropLabelFiltersForLabelNames(lfs, labelNames)
pushdownBinaryOpFiltersInplace(arg, lfs) pushdownBinaryOpFiltersInplace(lfs, arg)
} }
func intersectLabelFilters(lfsA, lfsB []LabelFilter) []LabelFilter { func intersectLabelFilters(lfsA, lfsB []LabelFilter) []LabelFilter {
@ -591,13 +644,13 @@ func getAggrArgIdxForOptimization(funcName string, args []Expr) int {
"limitk", "outliers_mad", "outliersk", "quantile", "limitk", "outliers_mad", "outliersk", "quantile",
"topk", "topk_avg", "topk_max", "topk_median", "topk_last", "topk_min": "topk", "topk_avg", "topk_max", "topk_median", "topk_last", "topk_min":
return 1 return 1
case "count_values":
return -1
case "quantiles": case "quantiles":
return len(args) - 1 return len(args) - 1
case "count_values":
panic(fmt.Errorf("BUG: count_values must be already handled"))
default: default:
if len(args) > 1 && canAcceptMultipleArgsForAggrFunc(funcName) { if canAcceptMultipleArgsForAggrFunc(funcName) {
panic(fmt.Errorf("BUG: %d > 1 args passed to aggregate function %q; this case must be already handled", len(args), funcName)) panic(fmt.Errorf("BUG: %s must be already handled", funcName))
} }
return 0 return 0
} }
@ -616,6 +669,8 @@ func canAcceptMultipleArgsForAggrFunc(funcName string) bool {
func getRollupArgIdxForOptimization(funcName string, args []Expr) int { func getRollupArgIdxForOptimization(funcName string, args []Expr) int {
// This must be kept in sync with GetRollupArgIdx() // This must be kept in sync with GetRollupArgIdx()
switch strings.ToLower(funcName) { switch strings.ToLower(funcName) {
case "count_values_over_time":
panic(fmt.Errorf("BUG: count_values_over_time must be already handled"))
case "absent_over_time": case "absent_over_time":
return -1 return -1
case "quantile_over_time", "aggr_over_time", case "quantile_over_time", "aggr_over_time",
@ -632,11 +687,11 @@ func getTransformArgIdxForOptimization(funcName string, args []Expr) int {
switch strings.ToLower(funcName) { switch strings.ToLower(funcName) {
case "label_copy", "label_del", "label_join", "label_keep", "label_lowercase", "label_map", case "label_copy", "label_del", "label_join", "label_keep", "label_lowercase", "label_map",
"label_match", "label_mismatch", "label_move", "label_replace", "label_set", "label_transform", "label_match", "label_mismatch", "label_move", "label_replace", "label_set", "label_transform",
"label_uppercase", "labels_equal": "label_uppercase", "labels_equal", "range_normalize", "", "union":
panic(fmt.Errorf("BUG: unexpected funcName passed to getTransformArgIdxForOptimization: %s", funcName)) panic(fmt.Errorf("BUG: %s must be already handled", funcName))
case "drop_common_labels", "range_normalize": case "drop_common_labels":
return -1 return -1
case "", "absent", "scalar", "union", "vector": case "absent", "scalar":
return -1 return -1
case "end", "now", "pi", "ru", "start", "step", "time": case "end", "now", "pi", "ru", "start", "step", "time":
return -1 return -1
@ -647,8 +702,6 @@ func getTransformArgIdxForOptimization(funcName string, args []Expr) int {
return 1 return 1
case "histogram_quantiles": case "histogram_quantiles":
return len(args) - 1 return len(args) - 1
case "label_graphite_group":
return 0
default: default:
return 0 return 0
} }

View file

@ -104,7 +104,7 @@ func GetRollupArgIdx(fe *FuncExpr) int {
return -1 return -1
} }
switch funcName { switch funcName {
case "quantile_over_time", "aggr_over_time", case "quantile_over_time", "aggr_over_time", "count_values_over_time",
"hoeffding_bound_lower", "hoeffding_bound_upper": "hoeffding_bound_lower", "hoeffding_bound_upper":
return 1 return 1
case "quantiles_over_time": case "quantiles_over_time":

2
vendor/modules.txt vendored
View file

@ -102,7 +102,7 @@ github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/metrics v1.33.0 # github.com/VictoriaMetrics/metrics v1.33.0
## explicit; go 1.17 ## explicit; go 1.17
github.com/VictoriaMetrics/metrics github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.74.0 # github.com/VictoriaMetrics/metricsql v0.75.0
## explicit; go 1.13 ## explicit; go 1.13
github.com/VictoriaMetrics/metricsql github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop github.com/VictoriaMetrics/metricsql/binaryop