app/vmselect/promql: optimize group(rollup(m)) calculations

This commit is contained in:
Aliaksandr Valialkin 2020-07-17 16:47:16 +03:00
parent ea8dc85ba8
commit eb402a17bd
3 changed files with 46 additions and 7 deletions

View file

@ -86,7 +86,7 @@ func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.Modifie
} }
func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) { func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) {
arg := copyTimeseriesMetricNames(argOrig) arg := copyTimeseriesMetricNames(argOrig, keepOriginal)
// Perform grouping. // Perform grouping.
m := make(map[string][]*timeseries) m := make(map[string][]*timeseries)

View file

@ -52,6 +52,13 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{
updateAggrFunc: updateAggrAny, updateAggrFunc: updateAggrAny,
mergeAggrFunc: mergeAggrAny, mergeAggrFunc: mergeAggrAny,
finalizeAggrFunc: finalizeAggrCommon, finalizeAggrFunc: finalizeAggrCommon,
keepOriginal: true,
},
"group": {
updateAggrFunc: updateAggrGroup,
mergeAggrFunc: mergeAggrAny,
finalizeAggrFunc: finalizeAggrCommon,
}, },
} }
@ -72,7 +79,7 @@ func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *increm
} }
} }
func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerID uint) { func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) {
iafc.mLock.Lock() iafc.mLock.Lock()
m := iafc.m[workerID] m := iafc.m[workerID]
if m == nil { if m == nil {
@ -81,6 +88,13 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerI
} }
iafc.mLock.Unlock() iafc.mLock.Unlock()
ts := tsOrig
keepOriginal := iafc.callbacks.keepOriginal
if keepOriginal {
var dst timeseries
dst.CopyFromMetricNames(tsOrig)
ts = &dst
}
removeGroupTags(&ts.MetricName, &iafc.ae.Modifier) removeGroupTags(&ts.MetricName, &iafc.ae.Modifier)
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
@ -95,6 +109,9 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerI
Timestamps: ts.Timestamps, Timestamps: ts.Timestamps,
denyReuse: true, denyReuse: true,
} }
if keepOriginal {
ts = tsOrig
}
tsAggr.MetricName.CopyFrom(&ts.MetricName) tsAggr.MetricName.CopyFrom(&ts.MetricName)
iac = &incrementalAggrContext{ iac = &incrementalAggrContext{
ts: tsAggr, ts: tsAggr,
@ -138,6 +155,9 @@ type incrementalAggrFuncCallbacks struct {
updateAggrFunc func(iac *incrementalAggrContext, values []float64) updateAggrFunc func(iac *incrementalAggrContext, values []float64)
mergeAggrFunc func(dst, src *incrementalAggrContext) mergeAggrFunc func(dst, src *incrementalAggrContext)
finalizeAggrFunc func(iac *incrementalAggrContext) finalizeAggrFunc func(iac *incrementalAggrContext)
// Whether to keep the original MetricName for every time series during aggregation
keepOriginal bool
} }
func getIncrementalAggrFuncCallbacks(name string) *incrementalAggrFuncCallbacks { func getIncrementalAggrFuncCallbacks(name string) *incrementalAggrFuncCallbacks {
@ -485,3 +505,17 @@ func mergeAggrAny(dst, src *incrementalAggrContext) {
dstCounts[0] = srcCounts[0] dstCounts[0] = srcCounts[0]
dst.ts.Values = append(dst.ts.Values[:0], srcValues...) dst.ts.Values = append(dst.ts.Values[:0], srcValues...)
} }
func updateAggrGroup(iac *incrementalAggrContext, values []float64) {
dstCounts := iac.values
if dstCounts[0] > 0 {
return
}
for i := range values {
dstCounts[i] = 1
}
for i := range values {
values[i] = 1
}
iac.ts.Values = append(iac.ts.Values[:0], values...)
}

View file

@ -1601,11 +1601,16 @@ func transformEnd(tfa *transformFuncArg) float64 {
return float64(tfa.ec.End) * 1e-3 return float64(tfa.ec.End) * 1e-3
} }
// copyTimeseriesMetricNames returns a copy of arg with real copy of MetricNames, // copyTimeseriesMetricNames returns a copy of tss with real copy of MetricNames,
// but with shallow copy of Timestamps and Values. // but with shallow copy of Timestamps and Values if makeCopy is set.
func copyTimeseriesMetricNames(arg []*timeseries) []*timeseries { //
rvs := make([]*timeseries, len(arg)) // Otherwise tss is returned.
for i, src := range arg { func copyTimeseriesMetricNames(tss []*timeseries, makeCopy bool) []*timeseries {
if !makeCopy {
return tss
}
rvs := make([]*timeseries, len(tss))
for i, src := range tss {
var dst timeseries var dst timeseries
dst.CopyFromMetricNames(src) dst.CopyFromMetricNames(src)
rvs[i] = &dst rvs[i] = &dst