app/vmselect/promql: return empty values from group() if all the time series have no values at the given timestamp

This aligns `group()` behaviour to Prometheus
This commit is contained in:
Aliaksandr Valialkin 2020-07-28 13:40:09 +03:00
parent 312acf7ce9
commit 9dccedc599
2 changed files with 27 additions and 35 deletions
app/vmselect/promql

View file

@ -27,7 +27,7 @@ var aggrFuncs = map[string]aggrFunc{
"bottomk": newAggrFuncTopK(true), "bottomk": newAggrFuncTopK(true),
"topk": newAggrFuncTopK(false), "topk": newAggrFuncTopK(false),
"quantile": aggrFuncQuantile, "quantile": aggrFuncQuantile,
"group": aggrFuncGroup, "group": newAggrFunc(aggrFuncGroup),
// PromQL extension funcs // PromQL extension funcs
"median": aggrFuncMedian, "median": aggrFuncMedian,
@ -140,25 +140,20 @@ func aggrFuncAny(afa *aggrFuncArg) ([]*timeseries, error) {
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, true) return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, true)
} }
func aggrFuncGroup(afa *aggrFuncArg) ([]*timeseries, error) { func aggrFuncGroup(tss []*timeseries) []*timeseries {
args := afa.args // See https://github.com/prometheus/prometheus/commit/72425d4e3d14d209cc3f3f6e10e3240411303399
if err := expectTransformArgsNum(args, 1); err != nil { dst := tss[0]
return nil, err for i := range dst.Values {
} v := nan
afe := func(tss []*timeseries) []*timeseries { for _, ts := range tss {
// See https://github.com/prometheus/prometheus/commit/72425d4e3d14d209cc3f3f6e10e3240411303399 if math.IsNaN(ts.Values[i]) {
values := tss[0].Values continue
for j := range values { }
values[j] = 1 v = 1
} }
return tss[:1] dst.Values[i] = v
} }
limit := afa.ae.Limit return tss[:1]
if limit > 1 {
// Only a single time series per group must be returned
limit = 1
}
return aggrFuncExt(afe, args[0], &afa.ae.Modifier, limit, false)
} }
func aggrFuncSum(tss []*timeseries) []*timeseries { func aggrFuncSum(tss []*timeseries) []*timeseries {

View file

@ -56,9 +56,9 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{
keepOriginal: true, keepOriginal: true,
}, },
"group": { "group": {
updateAggrFunc: updateAggrGroup, updateAggrFunc: updateAggrCount,
mergeAggrFunc: mergeAggrAny, mergeAggrFunc: mergeAggrCount,
finalizeAggrFunc: finalizeAggrCommon, finalizeAggrFunc: finalizeAggrGroup,
}, },
} }
@ -391,6 +391,17 @@ func finalizeAggrCount(iac *incrementalAggrContext) {
} }
} }
func finalizeAggrGroup(iac *incrementalAggrContext) {
dstValues := iac.ts.Values
for i, v := range dstValues {
if v == 0 {
dstValues[i] = nan
} else {
dstValues[i] = 1
}
}
}
func updateAggrSum2(iac *incrementalAggrContext, values []float64) { func updateAggrSum2(iac *incrementalAggrContext, values []float64) {
dstValues := iac.ts.Values dstValues := iac.ts.Values
dstCounts := iac.values dstCounts := iac.values
@ -505,17 +516,3 @@ func mergeAggrAny(dst, src *incrementalAggrContext) {
dstCounts[0] = srcCounts[0] dstCounts[0] = srcCounts[0]
dst.ts.Values = append(dst.ts.Values[:0], srcValues...) dst.ts.Values = append(dst.ts.Values[:0], srcValues...)
} }
func updateAggrGroup(iac *incrementalAggrContext, values []float64) {
dstCounts := iac.values
if dstCounts[0] > 0 {
return
}
for i := range values {
dstCounts[i] = 1
}
for i := range values {
values[i] = 1
}
iac.ts.Values = append(iac.ts.Values[:0], values...)
}