app/vmselect/promql: return empty values from group() if all the time series have no values at the given timestamp

This aligns `group()` behaviour to Prometheus
This commit is contained in:
Aliaksandr Valialkin 2020-07-28 13:40:09 +03:00
parent 48d0ec1363
commit 0da202023b
2 changed files with 27 additions and 35 deletions

View file

@ -27,7 +27,7 @@ var aggrFuncs = map[string]aggrFunc{
"bottomk": newAggrFuncTopK(true),
"topk": newAggrFuncTopK(false),
"quantile": aggrFuncQuantile,
"group": aggrFuncGroup,
"group": newAggrFunc(aggrFuncGroup),
// PromQL extension funcs
"median": aggrFuncMedian,
@ -140,25 +140,20 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, true)
}
func aggrFuncGroup(afa *aggrFuncArg) ([]*timeseries, error) {
args := afa.args
if err := expectTransformArgsNum(args, 1); err != nil {
return nil, err
}
afe := func(tss []*timeseries) []*timeseries {
// See https://github.com/prometheus/prometheus/commit/72425d4e3d14d209cc3f3f6e10e3240411303399
values := tss[0].Values
for j := range values {
values[j] = 1
func aggrFuncGroup(tss []*timeseries) []*timeseries {
// See https://github.com/prometheus/prometheus/commit/72425d4e3d14d209cc3f3f6e10e3240411303399
dst := tss[0]
for i := range dst.Values {
v := nan
for _, ts := range tss {
if math.IsNaN(ts.Values[i]) {
continue
}
v = 1
}
return tss[:1]
dst.Values[i] = v
}
limit := afa.ae.Limit
if limit > 1 {
// Only a single time series per group must be returned
limit = 1
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, false)
return tss[:1]
}
func aggrFuncSum(tss []*timeseries) []*timeseries {

View file

@ -56,9 +56,9 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{
keepOriginal: true,
},
"group": {
updateAggrFunc: updateAggrGroup,
mergeAggrFunc: mergeAggrAny,
finalizeAggrFunc: finalizeAggrCommon,
updateAggrFunc: updateAggrCount,
mergeAggrFunc: mergeAggrCount,
finalizeAggrFunc: finalizeAggrGroup,
},
}
@ -391,6 +391,17 @@ func finalizeAggrCount(iac *incrementalAggrContext) {
}
}
func finalizeAggrGroup(iac *incrementalAggrContext) {
dstValues := iac.ts.Values
for i, v := range dstValues {
if v == 0 {
dstValues[i] = nan
} else {
dstValues[i] = 1
}
}
}
func updateAggrSum2(iac *incrementalAggrContext, values []float64) {
dstValues := iac.ts.Values
dstCounts := iac.values
@ -505,17 +516,3 @@ func mergeAggrAny(dst, src *incrementalAggrContext) {
dstCounts[0] = srcCounts[0]
dst.ts.Values = append(dst.ts.Values[:0], srcValues...)
}
func updateAggrGroup(iac *incrementalAggrContext, values []float64) {
dstCounts := iac.values
if dstCounts[0] > 0 {
return
}
for i := range values {
dstCounts[i] = 1
}
for i := range values {
values[i] = 1
}
iac.ts.Values = append(iac.ts.Values[:0], values...)
}