diff --git a/app/vmselect/promql/binary_op.go b/app/vmselect/promql/binary_op.go index a4884c748..fcc2f4250 100644 --- a/app/vmselect/promql/binary_op.go +++ b/app/vmselect/promql/binary_op.go @@ -296,9 +296,13 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser if len(tss) == 1 { return nil } + if mergeNonOverlappingTimeseries(tss) { + return nil + } return fmt.Errorf(`duplicate timeseries on the %s side of %s %s: %s and %s`, side, be.Op, be.GroupModifier.AppendString(nil), stringMetricTags(&tss[0].MetricName), stringMetricTags(&tss[1].MetricName)) } + var rvsLeft, rvsRight []*timeseries mLeft, mRight := createTimeseriesMapByTagSet(be, left, right) joinOp := strings.ToLower(be.JoinModifier.Op) @@ -498,3 +502,26 @@ func isScalar(arg []*timeseries) bool { } return len(mn.Tags) == 0 } + +func mergeNonOverlappingTimeseries(tss []*timeseries) bool { + if len(tss) < 2 { + logger.Panicf("BUG: expecting at least two timeseries. Got %d", len(tss)) + } + + // Check whether time series in tss overlap. + var dst timeseries + dst.CopyFromShallowTimestamps(tss[0]) + dstValues := dst.Values + for _, ts := range tss[1:] { + for i, value := range ts.Values { + if math.IsNaN(dstValues[i]) { + dstValues[i] = value + } else if !math.IsNaN(value) { + // Time series overlap. + return false + } + } + } + tss[0].CopyFromShallowTimestamps(&dst) + return true +} diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 99816421d..e53eba23f 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -1739,6 +1739,24 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run(`vector * on(foo) group_left() duplicate_timeseries`, func(t *testing.T) { + t.Parallel() + q := `label_set(time()/10, "foo", "bar") + on(foo) group_left() ( + label_set(time() < 1400, "foo", "bar", "op", "le"), + label_set(time() >= 1400, "foo", "bar", "op", "ge"), + )` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1100, 1320, 1540, 1760, 1980, 2200}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }} + resultExpected := []netstorage.Result{r1} + f(q, resultExpected) + }) t.Run(`vector * on() group_left scalar`, func(t *testing.T) { t.Parallel() q := `sort_desc((label_set(time(), "foo", "bar") or label_set(10, "foo", "qwert")) * on() group_left 2)` @@ -3594,10 +3612,12 @@ func TestExecError(t *testing.T) { f(`1 + group_left() (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`) f(`1 + on() group_left() (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`) f(`1 + on(a) group_left(b) (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`) + f(`label_set(1, "foo", "bar") + on(foo) group_left() (label_set(1, "foo", "bar", "a", "b"), label_set(1, "foo", "bar", "a", "c"))`) f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + group_right 1`) f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + on() group_right 1`) f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + on(a) group_right(b,c) 1`) f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + on() 1`) + f(`(label_set(1, "foo", "bar", "a", "b"), label_set(1, "foo", "bar", "a", "c")) + on(foo) group_right() label_set(1, "foo", "bar")`) f(`1 + on() (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`) // With expressions