app/vmselect/promql: fix duplicate time series error on joins against time series filtered by values

This should prevent from `duplicate time series` errors when executing the following query:

kube_pod_container_resource_requests{resource="cpu"} * on (namespace,pod) group_left() (kube_pod_status_phase{phase=~"Pending|Running"}==1)

where `kube_pod_status_phase{phase=~"Pending|Running"}==1` filters out diplicate time series
This commit is contained in:
Aliaksandr Valialkin 2022-04-20 22:18:39 +03:00
parent 0ef7a05fc0
commit 25b841c6ed
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 59 additions and 17 deletions

View file

@ -82,14 +82,35 @@ func newBinaryOpArithFunc(af func(left, right float64) float64) binaryOpFunc {
func newBinaryOpFunc(bf func(left, right float64, isBool bool) float64) binaryOpFunc {
return func(bfa *binaryOpFuncArg) ([]*timeseries, error) {
isBool := bfa.be.Bool
left, right, dst, err := adjustBinaryOpTags(bfa.be, bfa.left, bfa.right)
left := bfa.left
right := bfa.right
switch bfa.be.Op {
case "ifnot":
left = removeEmptySeries(left)
// Do not remove empty series on the right side,
// so the left-side series could be matched against them.
case "default":
// Do not remove empty series on the left side,
// so they could be replaced with the corresponding series on the right side.
right = removeEmptySeries(right)
if len(right) == 0 {
return left, nil
}
default:
left = removeEmptySeries(left)
right = removeEmptySeries(right)
}
if len(left) == 0 || len(right) == 0 {
return nil, nil
}
left, right, dst, err := adjustBinaryOpTags(bfa.be, left, right)
if err != nil {
return nil, err
}
if len(left) != len(right) || len(left) != len(dst) {
logger.Panicf("BUG: len(left) must match len(right) and len(dst); got %d vs %d vs %d", len(left), len(right), len(dst))
}
isBool := bfa.be.Bool
for i, tsLeft := range left {
leftValues := tsLeft.Values
rightValues := right[i].Values
@ -206,7 +227,11 @@ func ensureSingleTimeseries(side string, be *metricsql.BinaryOpExpr, tss []*time
func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft, rvsRight, tssLeft, tssRight []*timeseries) ([]*timeseries, []*timeseries, error) {
joinTags := be.JoinModifier.Args
var m map[string]*timeseries
type tsPair struct {
left *timeseries
right *timeseries
}
m := make(map[string]*tsPair)
for _, tsLeft := range tssLeft {
resetMetricGroupIfRequired(be, tsLeft)
if len(tssRight) == 1 {
@ -219,12 +244,8 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
// Hard case - right part contains multiple matching time series.
// Verify it doesn't result in duplicate MetricName values after adding missing tags.
if m == nil {
m = make(map[string]*timeseries, len(tssRight))
} else {
for k := range m {
delete(m, k)
}
for k := range m {
delete(m, k)
}
bb := bbPool.Get()
for _, tsRight := range tssRight {
@ -232,20 +253,29 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
tsCopy.CopyFromShallowTimestamps(tsLeft)
tsCopy.MetricName.SetTags(joinTags, &tsRight.MetricName)
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
if tsExisting := m[string(bb.B)]; tsExisting != nil {
// Try merging tsExisting with tsRight if they don't overlap.
if mergeNonOverlappingTimeseries(tsExisting, tsRight) {
continue
pair, ok := m[string(bb.B)]
if !ok {
m[string(bb.B)] = &tsPair{
left: &tsCopy,
right: tsRight,
}
continue
}
// Try merging pair.right with tsRight if they don't overlap.
var tmp timeseries
tmp.CopyFromShallowTimestamps(pair.right)
if !mergeNonOverlappingTimeseries(&tmp, tsRight) {
return nil, nil, fmt.Errorf("duplicate time series on the %s side of `%s %s %s`: %s and %s",
singleTimeseriesSide, be.Op, be.GroupModifier.AppendString(nil), be.JoinModifier.AppendString(nil),
stringMetricTags(&tsExisting.MetricName), stringMetricTags(&tsRight.MetricName))
stringMetricTags(&tmp.MetricName), stringMetricTags(&tsRight.MetricName))
}
m[string(bb.B)] = tsRight
rvsLeft = append(rvsLeft, &tsCopy)
rvsRight = append(rvsRight, tsRight)
pair.right = &tmp
}
bbPool.Put(bb)
for _, pair := range m {
rvsLeft = append(rvsLeft, pair.left)
rvsRight = append(rvsRight, pair.right)
}
}
return rvsLeft, rvsRight, nil
}

View file

@ -2685,6 +2685,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`scalar default NaN`, func(t *testing.T) {
t.Parallel()
q := `time() > 1400 default (time() < -100)`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{nan, nan, nan, 1600, 1800, 2000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`vector default scalar`, func(t *testing.T) {
t.Parallel()
q := `sort_desc(union(

View file

@ -25,6 +25,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): return non-zero exit code on error. This allows handling `vmctl` errors in shell scripts. Previously `vmctl` was returning 0 exit code on error. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2322).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly show `scrape_timeout` and `scrape_interval` options at `http://vmagent:8429/config` page. Previously these options weren't displayed even if they were set in `-promscrape.config`.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly handle joins on time series filtered by values. For example, `kube_pod_container_resource_requests{resource="cpu"} * on (namespace,pod) group_left() (kube_pod_status_phase{phase=~"Pending|Running"}==1)`. This query may result in `duplicate time series on the right side` error even if `==1` filter leaves only a single time series per `(namespace,pod)` labels. Now such query is properly executed.
## [v1.76.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.76.1)