mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: merge non-overlapping duplicate time series in group_left
and group_right
joins
This commit is contained in:
parent
15613e5338
commit
a8db528930
2 changed files with 47 additions and 0 deletions
|
@ -296,9 +296,13 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
|
|||
if len(tss) == 1 {
|
||||
return nil
|
||||
}
|
||||
if mergeNonOverlappingTimeseries(tss) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(`duplicate timeseries on the %s side of %s %s: %s and %s`, side, be.Op, be.GroupModifier.AppendString(nil),
|
||||
stringMetricTags(&tss[0].MetricName), stringMetricTags(&tss[1].MetricName))
|
||||
}
|
||||
|
||||
var rvsLeft, rvsRight []*timeseries
|
||||
mLeft, mRight := createTimeseriesMapByTagSet(be, left, right)
|
||||
joinOp := strings.ToLower(be.JoinModifier.Op)
|
||||
|
@ -498,3 +502,26 @@ func isScalar(arg []*timeseries) bool {
|
|||
}
|
||||
return len(mn.Tags) == 0
|
||||
}
|
||||
|
||||
func mergeNonOverlappingTimeseries(tss []*timeseries) bool {
|
||||
if len(tss) < 2 {
|
||||
logger.Panicf("BUG: expecting at least two timeseries. Got %d", len(tss))
|
||||
}
|
||||
|
||||
// Check whether time series in tss overlap.
|
||||
var dst timeseries
|
||||
dst.CopyFromShallowTimestamps(tss[0])
|
||||
dstValues := dst.Values
|
||||
for _, ts := range tss[1:] {
|
||||
for i, value := range ts.Values {
|
||||
if math.IsNaN(dstValues[i]) {
|
||||
dstValues[i] = value
|
||||
} else if !math.IsNaN(value) {
|
||||
// Time series overlap.
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
tss[0].CopyFromShallowTimestamps(&dst)
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -1739,6 +1739,24 @@ func TestExecSuccess(t *testing.T) {
|
|||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on(foo) group_left() duplicate_timeseries`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `label_set(time()/10, "foo", "bar") + on(foo) group_left() (
|
||||
label_set(time() < 1400, "foo", "bar", "op", "le"),
|
||||
label_set(time() >= 1400, "foo", "bar", "op", "ge"),
|
||||
)`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
}}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on() group_left scalar`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort_desc((label_set(time(), "foo", "bar") or label_set(10, "foo", "qwert")) * on() group_left 2)`
|
||||
|
@ -3594,10 +3612,12 @@ func TestExecError(t *testing.T) {
|
|||
f(`1 + group_left() (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`)
|
||||
f(`1 + on() group_left() (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`)
|
||||
f(`1 + on(a) group_left(b) (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`)
|
||||
f(`label_set(1, "foo", "bar") + on(foo) group_left() (label_set(1, "foo", "bar", "a", "b"), label_set(1, "foo", "bar", "a", "c"))`)
|
||||
f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + group_right 1`)
|
||||
f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + on() group_right 1`)
|
||||
f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + on(a) group_right(b,c) 1`)
|
||||
f(`(label_set(1, "foo", bar"), label_set(2, "foo", "baz")) + on() 1`)
|
||||
f(`(label_set(1, "foo", "bar", "a", "b"), label_set(1, "foo", "bar", "a", "c")) + on(foo) group_right() label_set(1, "foo", "bar")`)
|
||||
f(`1 + on() (label_set(1, "foo", bar"), label_set(2, "foo", "baz"))`)
|
||||
|
||||
// With expressions
|
||||
|
|
Loading…
Reference in a new issue