From 828e5f6d26cff2b47239e89c8f9e0daece0a3d31 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 13 Sep 2019 17:40:14 +0300 Subject: [PATCH] app/vmselect/promql: binary operation fixes according to Prometheus behaviour The follosing issues were fixed: - VictoriaMetrics could leave superflouos labels when using `on` or `ignoring` modifiers - VictoriaMetrics could return `duplicate timeseries` error when using `group_left` or `group_right` with non-empty label list --- app/vmselect/promql/binary_op.go | 168 ++++++++++++++++++++----------- app/vmselect/promql/exec_test.go | 77 +++++++++++--- 2 files changed, 173 insertions(+), 72 deletions(-) diff --git a/app/vmselect/promql/binary_op.go b/app/vmselect/promql/binary_op.go index e3b2478e8..9de1b6b2a 100644 --- a/app/vmselect/promql/binary_op.go +++ b/app/vmselect/promql/binary_op.go @@ -292,24 +292,14 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser } // Slow path: `vector op vector` or `a op {on|ignoring} {group_left|group_right} b` - ensureOneX := func(side string, tss []*timeseries) error { - if len(tss) == 0 { - logger.Panicf("BUG: tss must contain at least one value") - } - if len(tss) == 1 { - return nil - } - if mergeNonOverlappingTimeseries(tss) { - return nil - } - return fmt.Errorf(`duplicate timeseries on the %s side of %s %s: %s and %s`, side, be.Op, be.GroupModifier.AppendString(nil), - stringMetricTags(&tss[0].MetricName), stringMetricTags(&tss[1].MetricName)) - } - var rvsLeft, rvsRight []*timeseries mLeft, mRight := createTimeseriesMapByTagSet(be, left, right) joinOp := strings.ToLower(be.JoinModifier.Op) - joinTags := be.JoinModifier.Args + groupOp := strings.ToLower(be.GroupModifier.Op) + if len(groupOp) == 0 { + groupOp = "ignoring" + } + groupTags := be.GroupModifier.Args for k, tssLeft := range mLeft { tssRight := mRight[k] if len(tssRight) == 0 { @@ -317,39 +307,38 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser } switch joinOp { case "group_left": - if err := ensureOneX("right", tssRight); err != nil { + var err error + rvsLeft, rvsRight, err = groupJoin("right", be, rvsLeft, rvsRight, tssLeft, tssRight) + if err != nil { return nil, nil, nil, err } - src := tssRight[0] - for _, ts := range tssLeft { - resetMetricGroupIfRequired(be, ts) - ts.MetricName.AddMissingTags(joinTags, &src.MetricName) - rvsLeft = append(rvsLeft, ts) - rvsRight = append(rvsRight, src) - } case "group_right": - if err := ensureOneX("left", tssLeft); err != nil { + var err error + rvsRight, rvsLeft, err = groupJoin("left", be, rvsRight, rvsLeft, tssRight, tssLeft) + if err != nil { return nil, nil, nil, err } - src := tssLeft[0] - for _, ts := range tssRight { - resetMetricGroupIfRequired(be, ts) - ts.MetricName.AddMissingTags(joinTags, &src.MetricName) - rvsLeft = append(rvsLeft, src) - rvsRight = append(rvsRight, ts) - } case "": - if err := ensureOneX("left", tssLeft); err != nil { + if err := ensureSingleTimeseries("left", be, tssLeft); err != nil { return nil, nil, nil, err } - if err := ensureOneX("right", tssRight); err != nil { + if err := ensureSingleTimeseries("right", be, tssRight); err != nil { return nil, nil, nil, err } - resetMetricGroupIfRequired(be, tssLeft[0]) - rvsLeft = append(rvsLeft, tssLeft[0]) + tsLeft := tssLeft[0] + resetMetricGroupIfRequired(be, tsLeft) + switch groupOp { + case "on": + tsLeft.MetricName.RemoveTagsOn(groupTags) + case "ignoring": + tsLeft.MetricName.RemoveTagsIgnoring(groupTags) + default: + logger.Panicf("BUG: unexpected binary op modifier %q", groupOp) + } + rvsLeft = append(rvsLeft, tsLeft) rvsRight = append(rvsRight, tssRight[0]) default: - return nil, nil, nil, fmt.Errorf(`unexpected join modifier %q`, joinOp) + logger.Panicf("BUG: unexpected join modifier %q", joinOp) } } dst := rvsLeft @@ -359,6 +348,90 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser return rvsLeft, rvsRight, dst, nil } +func ensureSingleTimeseries(side string, be *binaryOpExpr, tss []*timeseries) error { + if len(tss) == 0 { + logger.Panicf("BUG: tss must contain at least one value") + } + for len(tss) > 1 { + if !mergeNonOverlappingTimeseries(tss[0], tss[len(tss)-1]) { + return fmt.Errorf(`duplicate time series on the %s side of %s %s: %s and %s`, side, be.Op, be.GroupModifier.AppendString(nil), + stringMetricTags(&tss[0].MetricName), stringMetricTags(&tss[len(tss)-1].MetricName)) + } + tss = tss[:len(tss)-1] + } + return nil +} + +func groupJoin(singleTimeseriesSide string, be *binaryOpExpr, rvsLeft, rvsRight, tssLeft, tssRight []*timeseries) ([]*timeseries, []*timeseries, error) { + joinTags := be.JoinModifier.Args + var m map[string]*timeseries + for _, tsLeft := range tssLeft { + resetMetricGroupIfRequired(be, tsLeft) + if len(tssRight) == 1 { + // Easy case - right part contains only a single matching time series. + tsLeft.MetricName.AddMissingTags(joinTags, &tssRight[0].MetricName) + rvsLeft = append(rvsLeft, tsLeft) + rvsRight = append(rvsRight, tssRight[0]) + continue + } + + // Hard case - right part contains multiple matching time series. + // Verify it doesn't result in duplicate MetricName values after adding missing tags. + if m == nil { + m = make(map[string]*timeseries, len(tssRight)) + } else { + for k := range m { + delete(m, k) + } + } + bb := bbPool.Get() + for _, tsRight := range tssRight { + var tsCopy timeseries + tsCopy.CopyFromShallowTimestamps(tsLeft) + tsCopy.MetricName.AddMissingTags(joinTags, &tsRight.MetricName) + bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName) + if tsExisting := m[string(bb.B)]; tsExisting != nil { + // Try merging tsExisting with tsRight if they don't overlap. + if mergeNonOverlappingTimeseries(tsExisting, tsRight) { + continue + } + return nil, nil, fmt.Errorf("duplicate time series on the %s side of `%s %s %s`: %s and %s", + singleTimeseriesSide, be.Op, be.GroupModifier.AppendString(nil), be.JoinModifier.AppendString(nil), + stringMetricTags(&tsExisting.MetricName), stringMetricTags(&tsRight.MetricName)) + } + m[string(bb.B)] = tsRight + rvsLeft = append(rvsLeft, &tsCopy) + rvsRight = append(rvsRight, tsRight) + } + bbPool.Put(bb) + } + return rvsLeft, rvsRight, nil +} + +func mergeNonOverlappingTimeseries(dst, src *timeseries) bool { + // Verify whether the time series can be merged. + srcValues := src.Values + dstValues := dst.Values + _ = dstValues[len(srcValues)-1] + for i, v := range srcValues { + if math.IsNaN(v) { + continue + } + if !math.IsNaN(dstValues[i]) { + return false + } + } + + // Time series can be merged. Merge them. + for i, v := range srcValues { + if math.IsNaN(v) { + continue + } + dstValues[i] = v + } + return true +} + func resetMetricGroupIfRequired(be *binaryOpExpr, ts *timeseries) { if isBinaryOpCmp(be.Op) && !be.Bool { // Do not reset MetricGroup for non-boolean `compare` binary ops like Prometheus does. @@ -535,26 +608,3 @@ func isScalar(arg []*timeseries) bool { } return len(mn.Tags) == 0 } - -func mergeNonOverlappingTimeseries(tss []*timeseries) bool { - if len(tss) < 2 { - logger.Panicf("BUG: expecting at least two timeseries. Got %d", len(tss)) - } - - // Check whether time series in tss overlap. - var dst timeseries - dst.CopyFromShallowTimestamps(tss[0]) - dstValues := dst.Values - for _, ts := range tss[1:] { - for i, value := range ts.Values { - if math.IsNaN(dstValues[i]) { - dstValues[i] = value - } else if !math.IsNaN(value) { - // Time series overlap. - return false - } - } - } - tss[0].CopyFromShallowTimestamps(&dst) - return true -} diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 92f3a8f6d..768b4abdb 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -1836,10 +1836,6 @@ func TestExecSuccess(t *testing.T) { Timestamps: timestampsExpected, } r.MetricName.Tags = []storage.Tag{ - { - Key: []byte("aa"), - Value: []byte("bb"), - }, { Key: []byte("foo"), Value: []byte("bar"), @@ -1861,12 +1857,75 @@ func TestExecSuccess(t *testing.T) { Key: []byte("foo"), Value: []byte("bar"), }, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run(`vector * on(foo) group_left(additional_tag) duplicate_timeseries_differ_by_additional_tag`, func(t *testing.T) { + t.Parallel() + q := `sort(label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left(op) ( + label_set(time() < 1400, "foo", "bar", "op", "le"), + label_set(time() >= 1400, "foo", "bar", "op", "ge"), + ))` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1100, 1320, nan, nan, nan, nan}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("op"), + Value: []byte("le"), + }, { Key: []byte("xx"), Value: []byte("yy"), }, } - resultExpected := []netstorage.Result{r} + r2 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{nan, nan, 1540, 1760, 1980, 2200}, + Timestamps: timestampsExpected, + } + r2.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("op"), + Value: []byte("ge"), + }, + { + Key: []byte("xx"), + Value: []byte("yy"), + }, + } + resultExpected := []netstorage.Result{r1, r2} + f(q, resultExpected) + }) + t.Run(`vector * on(foo) duplicate_nonoverlapping_timeseries`, func(t *testing.T) { + t.Parallel() + q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) ( + label_set(time() < 1400, "foo", "bar", "op", "le"), + label_set(time() >= 1400, "foo", "bar", "op", "ge"), + )` + r1 := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1100, 1320, 1540, 1760, 1980, 2200}, + Timestamps: timestampsExpected, + } + r1.MetricName.Tags = []storage.Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + } + resultExpected := []netstorage.Result{r1} f(q, resultExpected) }) t.Run(`vector * on(foo) group_left() duplicate_nonoverlapping_timeseries`, func(t *testing.T) { @@ -2053,10 +2112,6 @@ func TestExecSuccess(t *testing.T) { Timestamps: timestampsExpected, } r.MetricName.Tags = []storage.Tag{ - { - Key: []byte("t1"), - Value: []byte("v123"), - }, { Key: []byte("t2"), Value: []byte("v3"), @@ -2162,10 +2217,6 @@ func TestExecSuccess(t *testing.T) { Timestamps: timestampsExpected, } r.MetricName.Tags = []storage.Tag{ - { - Key: []byte("t1"), - Value: []byte("v123"), - }, { Key: []byte("t2"), Value: []byte("v3"),