duplicate timeseries fix for prometheus_buckets function (#1119)

* try fix for prometheus_buckets

* merge possible end of the bucket collision
This commit is contained in:
Nikolay 2021-03-09 13:26:23 +03:00 committed by Aliaksandr Valialkin
parent 75d49ee58a
commit 28e450cd7c
2 changed files with 224 additions and 6 deletions

View file

@ -3663,6 +3663,210 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r1, r2, r3, r4}
f(q, resultExpected)
})
t.Run(`prometheus_buckets(overlapped ranges)`, func(t *testing.T) {
t.Parallel()
q := `sort(prometheus_buckets((
alias(label_set(90, "foo", "bar", "vmrange", "0...0"), "xxx"),
alias(label_set(time()/20, "foo", "bar", "vmrange", "0...0.2"), "xxx"),
alias(label_set(time()/20, "foo", "bar", "vmrange", "0.2...0.25"), "xxx"),
alias(label_set(time()/20, "foo", "bar", "vmrange", "0...0.26"), "xxx"),
alias(label_set(time()/100, "foo", "bar", "vmrange", "0.2...40"), "xxx"),
alias(label_set(time()/10, "foo", "bar", "vmrange", "40...Inf"), "xxx"),
)))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{90, 90, 90, 90, 90, 90},
Timestamps: timestampsExpected,
}
r1.MetricName.MetricGroup = []byte("xxx")
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("0"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{140, 150, 160, 170, 180, 190},
Timestamps: timestampsExpected,
}
r2.MetricName.MetricGroup = []byte("xxx")
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("0.2"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{190, 210, 230, 250, 270, 290},
Timestamps: timestampsExpected,
}
r3.MetricName.MetricGroup = []byte("xxx")
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("0.25"),
},
}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{240, 270, 300, 330, 360, 390},
Timestamps: timestampsExpected,
}
r4.MetricName.MetricGroup = []byte("xxx")
r4.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("0.26"),
},
}
r5 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{250, 282, 314, 346, 378, 410},
Timestamps: timestampsExpected,
}
r5.MetricName.MetricGroup = []byte("xxx")
r5.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("40"),
},
}
r6 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{350, 402, 454, 506, 558, 610},
Timestamps: timestampsExpected,
}
r6.MetricName.MetricGroup = []byte("xxx")
r6.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("Inf"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3, r4, r5, r6}
f(q, resultExpected)
})
t.Run(`prometheus_buckets(overlapped ranges at the end)`, func(t *testing.T) {
t.Parallel()
q := `sort(prometheus_buckets((
alias(label_set(90, "foo", "bar", "vmrange", "0...0"), "xxx"),
alias(label_set(time()/20, "foo", "bar", "vmrange", "0...0.2"), "xxx"),
alias(label_set(time()/20, "foo", "bar", "vmrange", "0.2...0.25"), "xxx"),
alias(label_set(time()/20, "foo", "bar", "vmrange", "0...0.25"), "xxx"),
alias(label_set(time()/100, "foo", "bar", "vmrange", "0.2...40"), "xxx"),
alias(label_set(time()/10, "foo", "bar", "vmrange", "40...Inf"), "xxx"),
)))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{90, 90, 90, 90, 90, 90},
Timestamps: timestampsExpected,
}
r1.MetricName.MetricGroup = []byte("xxx")
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("0"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{140, 150, 160, 170, 180, 190},
Timestamps: timestampsExpected,
}
r2.MetricName.MetricGroup = []byte("xxx")
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("0.2"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{190, 210, 230, 250, 270, 290},
Timestamps: timestampsExpected,
}
r3.MetricName.MetricGroup = []byte("xxx")
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("0.25"),
},
}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{200, 222, 244, 266, 288, 310},
Timestamps: timestampsExpected,
}
r4.MetricName.MetricGroup = []byte("xxx")
r4.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("40"),
},
}
r5 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{300, 342, 384, 426, 468, 510},
Timestamps: timestampsExpected,
}
r5.MetricName.MetricGroup = []byte("xxx")
r5.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("Inf"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3, r4, r5}
f(q, resultExpected)
})
t.Run(`median_over_time()`, func(t *testing.T) {
t.Parallel()
q := `median_over_time({})`

View file

@ -518,6 +518,7 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
sort.Slice(xss, func(i, j int) bool { return xss[i].end < xss[j].end })
xssNew := make([]x, 0, len(xss)+2)
var xsPrev x
uniqTs := make(map[string]*timeseries, len(xss))
for _, xs := range xss {
ts := xs.ts
if isZeroTS(ts) {
@ -526,14 +527,27 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
continue
}
if xs.start != xsPrev.end {
xssNew = append(xssNew, x{
endStr: xs.startStr,
end: xs.start,
ts: copyTS(ts, xs.startStr),
})
// check for duplicates at the start of bucket.
// in case of duplicate following le already exists.
// no need to add new one with zero values.
if _, ok := uniqTs[xs.startStr]; !ok {
uniqTs[xs.startStr] = xs.ts
xssNew = append(xssNew, x{
endStr: xs.startStr,
end: xs.start,
ts: copyTS(ts, xs.startStr),
})
}
}
ts.MetricName.AddTag("le", xs.endStr)
xssNew = append(xssNew, xs)
if prevTs, ok := uniqTs[xs.endStr]; !ok {
xssNew = append(xssNew, xs)
uniqTs[xs.endStr] = xs.ts
} else {
// end of current bucket not uniq,
// need to merge it with existing bucket.
mergeNonOverlappingTimeseries(prevTs, xs.ts)
}
xsPrev = xs
}
if !math.IsInf(xsPrev.end, 1) {