diff --git a/app/vmselect/promql/aggr.go b/app/vmselect/promql/aggr.go index 4cf01c5ff..e6caeec06 100644 --- a/app/vmselect/promql/aggr.go +++ b/app/vmselect/promql/aggr.go @@ -86,7 +86,7 @@ func removeGroupTags(metricName *storage.MetricName, modifier *metricsql.Modifie } func aggrFuncExt(afe func(tss []*timeseries) []*timeseries, argOrig []*timeseries, modifier *metricsql.ModifierExpr, maxSeries int, keepOriginal bool) ([]*timeseries, error) { - arg := copyTimeseriesMetricNames(argOrig) + arg := copyTimeseriesMetricNames(argOrig, keepOriginal) // Perform grouping. m := make(map[string][]*timeseries) diff --git a/app/vmselect/promql/aggr_incremental.go b/app/vmselect/promql/aggr_incremental.go index c9d058b3e..1321ca73c 100644 --- a/app/vmselect/promql/aggr_incremental.go +++ b/app/vmselect/promql/aggr_incremental.go @@ -52,6 +52,13 @@ var incrementalAggrFuncCallbacksMap = map[string]*incrementalAggrFuncCallbacks{ updateAggrFunc: updateAggrAny, mergeAggrFunc: mergeAggrAny, finalizeAggrFunc: finalizeAggrCommon, + + keepOriginal: true, + }, + "group": { + updateAggrFunc: updateAggrGroup, + mergeAggrFunc: mergeAggrAny, + finalizeAggrFunc: finalizeAggrCommon, }, } @@ -72,7 +79,7 @@ func newIncrementalAggrFuncContext(ae *metricsql.AggrFuncExpr, callbacks *increm } } -func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerID uint) { +func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, workerID uint) { iafc.mLock.Lock() m := iafc.m[workerID] if m == nil { @@ -81,6 +88,13 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerI } iafc.mLock.Unlock() + ts := tsOrig + keepOriginal := iafc.callbacks.keepOriginal + if keepOriginal { + var dst timeseries + dst.CopyFromMetricNames(tsOrig) + ts = &dst + } removeGroupTags(&ts.MetricName, &iafc.ae.Modifier) bb := bbPool.Get() bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName) @@ -95,6 +109,9 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(ts *timeseries, workerI Timestamps: ts.Timestamps, denyReuse: true, } + if keepOriginal { + ts = tsOrig + } tsAggr.MetricName.CopyFrom(&ts.MetricName) iac = &incrementalAggrContext{ ts: tsAggr, @@ -138,6 +155,9 @@ type incrementalAggrFuncCallbacks struct { updateAggrFunc func(iac *incrementalAggrContext, values []float64) mergeAggrFunc func(dst, src *incrementalAggrContext) finalizeAggrFunc func(iac *incrementalAggrContext) + + // Whether to keep the original MetricName for every time series during aggregation + keepOriginal bool } func getIncrementalAggrFuncCallbacks(name string) *incrementalAggrFuncCallbacks { @@ -485,3 +505,17 @@ func mergeAggrAny(dst, src *incrementalAggrContext) { dstCounts[0] = srcCounts[0] dst.ts.Values = append(dst.ts.Values[:0], srcValues...) } + +func updateAggrGroup(iac *incrementalAggrContext, values []float64) { + dstCounts := iac.values + if dstCounts[0] > 0 { + return + } + for i := range values { + dstCounts[i] = 1 + } + for i := range values { + values[i] = 1 + } + iac.ts.Values = append(iac.ts.Values[:0], values...) +} diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index e2843d3c1..32f3fe4f3 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -1601,11 +1601,16 @@ func transformEnd(tfa *transformFuncArg) float64 { return float64(tfa.ec.End) * 1e-3 } -// copyTimeseriesMetricNames returns a copy of arg with real copy of MetricNames, -// but with shallow copy of Timestamps and Values. -func copyTimeseriesMetricNames(arg []*timeseries) []*timeseries { - rvs := make([]*timeseries, len(arg)) - for i, src := range arg { +// copyTimeseriesMetricNames returns a copy of tss with real copy of MetricNames, +// but with shallow copy of Timestamps and Values if makeCopy is set. +// +// Otherwise tss is returned. +func copyTimeseriesMetricNames(tss []*timeseries, makeCopy bool) []*timeseries { + if !makeCopy { + return tss + } + rvs := make([]*timeseries, len(tss)) + for i, src := range tss { var dst timeseries dst.CopyFromMetricNames(src) rvs[i] = &dst