mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect/promql: binary operation fixes according to Prometheus behaviour
The follosing issues were fixed: - VictoriaMetrics could leave superflouos labels when using `on` or `ignoring` modifiers - VictoriaMetrics could return `duplicate timeseries` error when using `group_left` or `group_right` with non-empty label list
This commit is contained in:
parent
62b424bc4c
commit
828e5f6d26
2 changed files with 173 additions and 72 deletions
|
@ -292,24 +292,14 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
|
|||
}
|
||||
|
||||
// Slow path: `vector op vector` or `a op {on|ignoring} {group_left|group_right} b`
|
||||
ensureOneX := func(side string, tss []*timeseries) error {
|
||||
if len(tss) == 0 {
|
||||
logger.Panicf("BUG: tss must contain at least one value")
|
||||
}
|
||||
if len(tss) == 1 {
|
||||
return nil
|
||||
}
|
||||
if mergeNonOverlappingTimeseries(tss) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf(`duplicate timeseries on the %s side of %s %s: %s and %s`, side, be.Op, be.GroupModifier.AppendString(nil),
|
||||
stringMetricTags(&tss[0].MetricName), stringMetricTags(&tss[1].MetricName))
|
||||
}
|
||||
|
||||
var rvsLeft, rvsRight []*timeseries
|
||||
mLeft, mRight := createTimeseriesMapByTagSet(be, left, right)
|
||||
joinOp := strings.ToLower(be.JoinModifier.Op)
|
||||
joinTags := be.JoinModifier.Args
|
||||
groupOp := strings.ToLower(be.GroupModifier.Op)
|
||||
if len(groupOp) == 0 {
|
||||
groupOp = "ignoring"
|
||||
}
|
||||
groupTags := be.GroupModifier.Args
|
||||
for k, tssLeft := range mLeft {
|
||||
tssRight := mRight[k]
|
||||
if len(tssRight) == 0 {
|
||||
|
@ -317,39 +307,38 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
|
|||
}
|
||||
switch joinOp {
|
||||
case "group_left":
|
||||
if err := ensureOneX("right", tssRight); err != nil {
|
||||
var err error
|
||||
rvsLeft, rvsRight, err = groupJoin("right", be, rvsLeft, rvsRight, tssLeft, tssRight)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
src := tssRight[0]
|
||||
for _, ts := range tssLeft {
|
||||
resetMetricGroupIfRequired(be, ts)
|
||||
ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
|
||||
rvsLeft = append(rvsLeft, ts)
|
||||
rvsRight = append(rvsRight, src)
|
||||
}
|
||||
case "group_right":
|
||||
if err := ensureOneX("left", tssLeft); err != nil {
|
||||
var err error
|
||||
rvsRight, rvsLeft, err = groupJoin("left", be, rvsRight, rvsLeft, tssRight, tssLeft)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
src := tssLeft[0]
|
||||
for _, ts := range tssRight {
|
||||
resetMetricGroupIfRequired(be, ts)
|
||||
ts.MetricName.AddMissingTags(joinTags, &src.MetricName)
|
||||
rvsLeft = append(rvsLeft, src)
|
||||
rvsRight = append(rvsRight, ts)
|
||||
}
|
||||
case "":
|
||||
if err := ensureOneX("left", tssLeft); err != nil {
|
||||
if err := ensureSingleTimeseries("left", be, tssLeft); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
if err := ensureOneX("right", tssRight); err != nil {
|
||||
if err := ensureSingleTimeseries("right", be, tssRight); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
resetMetricGroupIfRequired(be, tssLeft[0])
|
||||
rvsLeft = append(rvsLeft, tssLeft[0])
|
||||
tsLeft := tssLeft[0]
|
||||
resetMetricGroupIfRequired(be, tsLeft)
|
||||
switch groupOp {
|
||||
case "on":
|
||||
tsLeft.MetricName.RemoveTagsOn(groupTags)
|
||||
case "ignoring":
|
||||
tsLeft.MetricName.RemoveTagsIgnoring(groupTags)
|
||||
default:
|
||||
logger.Panicf("BUG: unexpected binary op modifier %q", groupOp)
|
||||
}
|
||||
rvsLeft = append(rvsLeft, tsLeft)
|
||||
rvsRight = append(rvsRight, tssRight[0])
|
||||
default:
|
||||
return nil, nil, nil, fmt.Errorf(`unexpected join modifier %q`, joinOp)
|
||||
logger.Panicf("BUG: unexpected join modifier %q", joinOp)
|
||||
}
|
||||
}
|
||||
dst := rvsLeft
|
||||
|
@ -359,6 +348,90 @@ func adjustBinaryOpTags(be *binaryOpExpr, left, right []*timeseries) ([]*timeser
|
|||
return rvsLeft, rvsRight, dst, nil
|
||||
}
|
||||
|
||||
func ensureSingleTimeseries(side string, be *binaryOpExpr, tss []*timeseries) error {
|
||||
if len(tss) == 0 {
|
||||
logger.Panicf("BUG: tss must contain at least one value")
|
||||
}
|
||||
for len(tss) > 1 {
|
||||
if !mergeNonOverlappingTimeseries(tss[0], tss[len(tss)-1]) {
|
||||
return fmt.Errorf(`duplicate time series on the %s side of %s %s: %s and %s`, side, be.Op, be.GroupModifier.AppendString(nil),
|
||||
stringMetricTags(&tss[0].MetricName), stringMetricTags(&tss[len(tss)-1].MetricName))
|
||||
}
|
||||
tss = tss[:len(tss)-1]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func groupJoin(singleTimeseriesSide string, be *binaryOpExpr, rvsLeft, rvsRight, tssLeft, tssRight []*timeseries) ([]*timeseries, []*timeseries, error) {
|
||||
joinTags := be.JoinModifier.Args
|
||||
var m map[string]*timeseries
|
||||
for _, tsLeft := range tssLeft {
|
||||
resetMetricGroupIfRequired(be, tsLeft)
|
||||
if len(tssRight) == 1 {
|
||||
// Easy case - right part contains only a single matching time series.
|
||||
tsLeft.MetricName.AddMissingTags(joinTags, &tssRight[0].MetricName)
|
||||
rvsLeft = append(rvsLeft, tsLeft)
|
||||
rvsRight = append(rvsRight, tssRight[0])
|
||||
continue
|
||||
}
|
||||
|
||||
// Hard case - right part contains multiple matching time series.
|
||||
// Verify it doesn't result in duplicate MetricName values after adding missing tags.
|
||||
if m == nil {
|
||||
m = make(map[string]*timeseries, len(tssRight))
|
||||
} else {
|
||||
for k := range m {
|
||||
delete(m, k)
|
||||
}
|
||||
}
|
||||
bb := bbPool.Get()
|
||||
for _, tsRight := range tssRight {
|
||||
var tsCopy timeseries
|
||||
tsCopy.CopyFromShallowTimestamps(tsLeft)
|
||||
tsCopy.MetricName.AddMissingTags(joinTags, &tsRight.MetricName)
|
||||
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
|
||||
if tsExisting := m[string(bb.B)]; tsExisting != nil {
|
||||
// Try merging tsExisting with tsRight if they don't overlap.
|
||||
if mergeNonOverlappingTimeseries(tsExisting, tsRight) {
|
||||
continue
|
||||
}
|
||||
return nil, nil, fmt.Errorf("duplicate time series on the %s side of `%s %s %s`: %s and %s",
|
||||
singleTimeseriesSide, be.Op, be.GroupModifier.AppendString(nil), be.JoinModifier.AppendString(nil),
|
||||
stringMetricTags(&tsExisting.MetricName), stringMetricTags(&tsRight.MetricName))
|
||||
}
|
||||
m[string(bb.B)] = tsRight
|
||||
rvsLeft = append(rvsLeft, &tsCopy)
|
||||
rvsRight = append(rvsRight, tsRight)
|
||||
}
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
return rvsLeft, rvsRight, nil
|
||||
}
|
||||
|
||||
func mergeNonOverlappingTimeseries(dst, src *timeseries) bool {
|
||||
// Verify whether the time series can be merged.
|
||||
srcValues := src.Values
|
||||
dstValues := dst.Values
|
||||
_ = dstValues[len(srcValues)-1]
|
||||
for i, v := range srcValues {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
if !math.IsNaN(dstValues[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Time series can be merged. Merge them.
|
||||
for i, v := range srcValues {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues[i] = v
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func resetMetricGroupIfRequired(be *binaryOpExpr, ts *timeseries) {
|
||||
if isBinaryOpCmp(be.Op) && !be.Bool {
|
||||
// Do not reset MetricGroup for non-boolean `compare` binary ops like Prometheus does.
|
||||
|
@ -535,26 +608,3 @@ func isScalar(arg []*timeseries) bool {
|
|||
}
|
||||
return len(mn.Tags) == 0
|
||||
}
|
||||
|
||||
func mergeNonOverlappingTimeseries(tss []*timeseries) bool {
|
||||
if len(tss) < 2 {
|
||||
logger.Panicf("BUG: expecting at least two timeseries. Got %d", len(tss))
|
||||
}
|
||||
|
||||
// Check whether time series in tss overlap.
|
||||
var dst timeseries
|
||||
dst.CopyFromShallowTimestamps(tss[0])
|
||||
dstValues := dst.Values
|
||||
for _, ts := range tss[1:] {
|
||||
for i, value := range ts.Values {
|
||||
if math.IsNaN(dstValues[i]) {
|
||||
dstValues[i] = value
|
||||
} else if !math.IsNaN(value) {
|
||||
// Time series overlap.
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
tss[0].CopyFromShallowTimestamps(&dst)
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -1836,10 +1836,6 @@ func TestExecSuccess(t *testing.T) {
|
|||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("aa"),
|
||||
Value: []byte("bb"),
|
||||
},
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
|
@ -1861,12 +1857,75 @@ func TestExecSuccess(t *testing.T) {
|
|||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on(foo) group_left(additional_tag) duplicate_timeseries_differ_by_additional_tag`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `sort(label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) group_left(op) (
|
||||
label_set(time() < 1400, "foo", "bar", "op", "le"),
|
||||
label_set(time() >= 1400, "foo", "bar", "op", "ge"),
|
||||
))`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1100, 1320, nan, nan, nan, nan},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("op"),
|
||||
Value: []byte("le"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xx"),
|
||||
Value: []byte("yy"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r}
|
||||
r2 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{nan, nan, 1540, 1760, 1980, 2200},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r2.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
{
|
||||
Key: []byte("op"),
|
||||
Value: []byte("ge"),
|
||||
},
|
||||
{
|
||||
Key: []byte("xx"),
|
||||
Value: []byte("yy"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1, r2}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on(foo) duplicate_nonoverlapping_timeseries`, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
q := `label_set(time()/10, "foo", "bar", "xx", "yy", "__name__", "qwert") + on(foo) (
|
||||
label_set(time() < 1400, "foo", "bar", "op", "le"),
|
||||
label_set(time() >= 1400, "foo", "bar", "op", "ge"),
|
||||
)`
|
||||
r1 := netstorage.Result{
|
||||
MetricName: metricNameExpected,
|
||||
Values: []float64{1100, 1320, 1540, 1760, 1980, 2200},
|
||||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r1.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
},
|
||||
}
|
||||
resultExpected := []netstorage.Result{r1}
|
||||
f(q, resultExpected)
|
||||
})
|
||||
t.Run(`vector * on(foo) group_left() duplicate_nonoverlapping_timeseries`, func(t *testing.T) {
|
||||
|
@ -2053,10 +2112,6 @@ func TestExecSuccess(t *testing.T) {
|
|||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("t1"),
|
||||
Value: []byte("v123"),
|
||||
},
|
||||
{
|
||||
Key: []byte("t2"),
|
||||
Value: []byte("v3"),
|
||||
|
@ -2162,10 +2217,6 @@ func TestExecSuccess(t *testing.T) {
|
|||
Timestamps: timestampsExpected,
|
||||
}
|
||||
r.MetricName.Tags = []storage.Tag{
|
||||
{
|
||||
Key: []byte("t1"),
|
||||
Value: []byte("v123"),
|
||||
},
|
||||
{
|
||||
Key: []byte("t2"),
|
||||
Value: []byte("v3"),
|
||||
|
|
Loading…
Reference in a new issue