app/vmselect/promql: add histogram aggregate function, which is useful for building heatmaps from multiple time series

This commit is contained in:
Aliaksandr Valialkin 2019-11-24 00:02:18 +02:00
parent e70f543321
commit 8bb254d960
4 changed files with 182 additions and 29 deletions

View file

@ -9,6 +9,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
var aggrFuncs = map[string]aggrFunc{
@ -26,11 +27,12 @@ var aggrFuncs = map[string]aggrFunc{
"quantile": aggrFuncQuantile,
// Extended PromQL funcs
"median": aggrFuncMedian,
"limitk": aggrFuncLimitK,
"distinct": newAggrFunc(aggrFuncDistinct),
"sum2": newAggrFunc(aggrFuncSum2),
"geomean": newAggrFunc(aggrFuncGeomean),
"median": aggrFuncMedian,
"limitk": aggrFuncLimitK,
"distinct": newAggrFunc(aggrFuncDistinct),
"sum2": newAggrFunc(aggrFuncSum2),
"geomean": newAggrFunc(aggrFuncGeomean),
"histogram": newAggrFunc(aggrFuncHistogram),
}
type aggrFunc func(afa *aggrFuncArg) ([]*timeseries, error)
@ -184,6 +186,37 @@ func aggrFuncGeomean(tss []*timeseries) []*timeseries {
return tss[:1]
}
func aggrFuncHistogram(tss []*timeseries) []*timeseries {
m := make(map[string]*timeseries)
for i := range tss[0].Values {
var h metrics.Histogram
for _, ts := range tss {
v := ts.Values[i]
h.Update(v)
}
h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
ts := m[vmrange]
if ts == nil {
ts = &timeseries{}
ts.CopyFromShallowTimestamps(tss[0])
ts.MetricName.RemoveTag("vmrange")
ts.MetricName.AddTag("vmrange", vmrange)
values := ts.Values
for k := range values {
values[k] = 0
}
m[vmrange] = ts
}
ts.Values[i] = float64(count)
})
}
rvs := make([]*timeseries, 0, len(m))
for _, ts := range m {
rvs = append(rvs, ts)
}
return vmrangeBucketsToLE(rvs)
}
func aggrFuncMin(tss []*timeseries) []*timeseries {
if len(tss) == 1 {
// Fast path - nothing to min.

View file

@ -110,7 +110,7 @@ func timeseriesToResult(tss []*timeseries, maySort bool) ([]netstorage.Result, e
for i, ts := range tss {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
if _, ok := m[string(bb.B)]; ok {
return nil, fmt.Errorf(`duplicate output timeseries: %s%s`, ts.MetricName.MetricGroup, stringMetricName(&ts.MetricName))
return nil, fmt.Errorf(`duplicate output timeseries: %s`, stringMetricName(&ts.MetricName))
}
m[string(bb.B)] = struct{}{}

View file

@ -2459,12 +2459,12 @@ func TestExecSuccess(t *testing.T) {
t.Run(`prometheus_buckets(missing-vmrange)`, func(t *testing.T) {
t.Parallel()
q := `sort(prometheus_buckets((
alias(label_set(time()/20, "foo", "bar", "le", "0.2"), "xxx"),
alias(label_set(time()/20, "foo", "bar", "le", "0.2"), "xyz"),
alias(label_set(time()/100, "foo", "bar", "vmrange", "foobar"), "xxx"),
alias(label_set(time()/100, "foo", "bar", "vmrange", "30...foobar"), "xxx"),
alias(label_set(time()/100, "foo", "bar", "vmrange", "30...40"), "xxx"),
alias(label_set(time()/80, "foo", "bar", "vmrange", "0...900", "le", "54"), "yyy"),
alias(label_set(time()/40, "foo", "bar", "vmrange", "900...1000", "le", "2343"), "yyy"),
alias(label_set(time()/40, "foo", "bar", "vmrange", "900...+Inf", "le", "2343"), "yyy"),
)))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
@ -2500,10 +2500,10 @@ func TestExecSuccess(t *testing.T) {
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{12.5, 15, 17.5, 20, 22.5, 25},
Values: []float64{10, 12, 14, 16, 18, 20},
Timestamps: timestampsExpected,
}
r3.MetricName.MetricGroup = []byte("yyy")
r3.MetricName.MetricGroup = []byte("xxx")
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
@ -2511,12 +2511,12 @@ func TestExecSuccess(t *testing.T) {
},
{
Key: []byte("le"),
Value: []byte("900"),
Value: []byte("+Inf"),
},
}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{37.5, 45, 52.5, 60, 67.5, 75},
Values: []float64{12.5, 15, 17.5, 20, 22.5, 25},
Timestamps: timestampsExpected,
}
r4.MetricName.MetricGroup = []byte("yyy")
@ -2527,16 +2527,32 @@ func TestExecSuccess(t *testing.T) {
},
{
Key: []byte("le"),
Value: []byte("1000"),
Value: []byte("900"),
},
}
r5 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{37.5, 45, 52.5, 60, 67.5, 75},
Timestamps: timestampsExpected,
}
r5.MetricName.MetricGroup = []byte("yyy")
r5.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
},
{
Key: []byte("le"),
Value: []byte("+Inf"),
},
}
r6 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{50, 60, 70, 80, 90, 100},
Timestamps: timestampsExpected,
}
r5.MetricName.MetricGroup = []byte("xxx")
r5.MetricName.Tags = []storage.Tag{
r6.MetricName.MetricGroup = []byte("xyz")
r6.MetricName.Tags = []storage.Tag{
{
Key: []byte("foo"),
Value: []byte("bar"),
@ -2546,7 +2562,7 @@ func TestExecSuccess(t *testing.T) {
Value: []byte("0.2"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3, r4, r5}
resultExpected := []netstorage.Result{r1, r2, r3, r4, r5, r6}
f(q, resultExpected)
})
t.Run(`prometheus_buckets(valid)`, func(t *testing.T) {
@ -2674,6 +2690,99 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`histogram(scalar)`, func(t *testing.T) {
t.Parallel()
q := `histogram(123)`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("le"),
Value: []byte("1e2"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("le"),
Value: []byte("2e2"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("le"),
Value: []byte("+Inf"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3}
f(q, resultExpected)
})
t.Run(`histogram(vector)`, func(t *testing.T) {
t.Parallel()
q := `sort(histogram((
label_set(1, "foo", "bar"),
label_set(1.5, "xx", "yy"),
alias(1.9, "foobar"),
)))`
r1 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{0, 0, 0, 0, 0, 0},
Timestamps: timestampsExpected,
}
r1.MetricName.Tags = []storage.Tag{
{
Key: []byte("le"),
Value: []byte("9e-1"),
},
}
r2 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1, 1, 1, 1, 1, 1},
Timestamps: timestampsExpected,
}
r2.MetricName.Tags = []storage.Tag{
{
Key: []byte("le"),
Value: []byte("1"),
},
}
r3 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3, 3, 3, 3, 3, 3},
Timestamps: timestampsExpected,
}
r3.MetricName.Tags = []storage.Tag{
{
Key: []byte("le"),
Value: []byte("2"),
},
}
r4 := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{3, 3, 3, 3, 3, 3},
Timestamps: timestampsExpected,
}
r4.MetricName.Tags = []storage.Tag{
{
Key: []byte("le"),
Value: []byte("+Inf"),
},
}
resultExpected := []netstorage.Result{r1, r2, r3, r4}
f(q, resultExpected)
})
t.Run(`avg(scalar) wiTHout (xx, yy)`, func(t *testing.T) {
t.Parallel()
q := `avg wiTHout (xx, yy) (123)`
@ -4520,7 +4629,7 @@ func testMetricNamesEqual(t *testing.T, mn, mnExpected *storage.MetricName, pos
t.Fatalf(`unexpected tag key at #%d,%d; got %q; want %q`, pos, i, tag.Key, tagExpected.Key)
}
if string(tag.Value) != string(tagExpected.Value) {
t.Fatalf(`unexpected tag value at #%d,%d; got %q; want %q`, pos, i, tag.Value, tagExpected.Value)
t.Fatalf(`unexpected tag value for key %q at #%d,%d; got %q; want %q`, tag.Key, pos, i, tag.Value, tagExpected.Value)
}
}
}

View file

@ -332,29 +332,40 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
}
// Convert `vmrange` label in each group of time series to `le` label.
copyTS := func(src *timeseries, leStr string) *timeseries {
var ts timeseries
ts.CopyFromShallowTimestamps(src)
values := ts.Values
for i := range values {
values[i] = 0
}
ts.MetricName.RemoveTag("le")
ts.MetricName.AddTag("le", leStr)
return &ts
}
for _, xss := range m {
sort.Slice(xss, func(i, j int) bool { return xss[i].end < xss[j].end })
xssNew := make([]x, 0, len(xss))
endStrPrev := "0"
xssNew := make([]x, 0, len(xss)+2)
var xsPrev x
for _, xs := range xss {
ts := xs.ts
if xs.startStr != endStrPrev {
var tsDummy timeseries
tsDummy.CopyFromShallowTimestamps(ts)
values := tsDummy.Values
for i := range values {
values[i] = 0
}
tsDummy.MetricName.AddTag("le", xs.startStr)
if xs.start != xsPrev.end {
xssNew = append(xssNew, x{
endStr: xs.startStr,
end: xs.start,
ts: &tsDummy,
ts: copyTS(ts, xs.startStr),
})
}
ts.MetricName.AddTag("le", xs.endStr)
xssNew = append(xssNew, xs)
endStrPrev = xs.endStr
xsPrev = xs
}
if !math.IsInf(xsPrev.end, 1) {
xssNew = append(xssNew, x{
endStr: "+Inf",
end: math.Inf(1),
ts: copyTS(xsPrev.ts, "+Inf"),
})
}
xss = xssNew
for i := range xss[0].ts.Values {