app/vmselect/promql: properly calculate incremental aggregations grouped by __name__

Previously the following query may fail on multiple distinct metric names match:

    sum(count_over_time{__name__!=''}) by (__name__)
This commit is contained in:
Aliaksandr Valialkin 2019-07-25 21:48:12 +03:00
parent f2e8d54fb0
commit 0e52357f35

View file

@ -472,31 +472,19 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re *
preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, sharedTimestamps) preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, sharedTimestamps)
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
var tssLock sync.Mutex var tssLock sync.Mutex
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) { doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps) preFunc(values, timestamps)
for _, rc := range rcs { for _, rc := range rcs {
var ts timeseries var ts timeseries
ts.MetricName.CopyFrom(&tsSQ.MetricName) doRollupForTimeseries(rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps, removeMetricGroup)
if len(rc.TagValue) > 0 {
ts.MetricName.AddTag("rollup", rc.TagValue)
}
ts.Values = rc.Do(ts.Values[:0], values, timestamps)
ts.Timestamps = sharedTimestamps
ts.denyReuse = true
tssLock.Lock() tssLock.Lock()
tss = append(tss, &ts) tss = append(tss, &ts)
tssLock.Unlock() tssLock.Unlock()
} }
return values, timestamps return values, timestamps
}) })
if !rollupFuncsKeepMetricGroup[name] {
tss = copyTimeseriesMetricNames(tss)
for _, ts := range tss {
ts.MetricName.ResetMetricGroup()
}
}
return tss, nil return tss, nil
} }
@ -626,22 +614,16 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
defer rml.Put(uint64(rollupMemorySize)) defer rml.Put(uint64(rollupMemorySize))
// Evaluate rollup // Evaluate rollup
removeMetricGroup := !rollupFuncsKeepMetricGroup[name]
var tss []*timeseries var tss []*timeseries
if iafc != nil { if iafc != nil {
tss, err = evalRollupWithIncrementalAggregate(iafc, rss, rcs, preFunc, sharedTimestamps) tss, err = evalRollupWithIncrementalAggregate(iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup)
} else { } else {
tss, err = evalRollupNoIncrementalAggregate(rss, rcs, preFunc, sharedTimestamps) tss, err = evalRollupNoIncrementalAggregate(rss, rcs, preFunc, sharedTimestamps, removeMetricGroup)
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !rollupFuncsKeepMetricGroup[name] {
tss = copyTimeseriesMetricNames(tss)
for _, ts := range tss {
ts.MetricName.ResetMetricGroup()
}
}
tss = mergeTimeseries(tssCached, tss, start, ec) tss = mergeTimeseries(tssCached, tss, start, ec)
if !isPartial { if !isPartial {
rollupResultCacheV.Put(name, ec, me, iafc, window, tss) rollupResultCacheV.Put(name, ec, me, iafc, window, tss)
@ -662,21 +644,19 @@ func getRollupMemoryLimiter() *memoryLimiter {
} }
func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
preFunc(rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps)
ts := getTimeseries() ts := getTimeseries()
defer putTimeseries(ts) defer putTimeseries(ts)
for _, rc := range rcs { for _, rc := range rcs {
ts.Reset() ts.Reset()
ts.MetricName.CopyFrom(&rs.MetricName) doRollupForTimeseries(rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup)
if len(rc.TagValue) > 0 {
ts.MetricName.AddTag("rollup", rc.TagValue)
}
ts.Values = rc.Do(ts.Values[:0], rs.Values, rs.Timestamps)
ts.Timestamps = sharedTimestamps
iafc.updateTimeseries(ts, workerID) iafc.updateTimeseries(ts, workerID)
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
ts.Timestamps = nil ts.Timestamps = nil
ts.denyReuse = false
} }
}) })
if err != nil { if err != nil {
@ -687,21 +667,14 @@ func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *n
} }
func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConfig, func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) {
tss := make([]*timeseries, 0, rss.Len()*len(rcs)) tss := make([]*timeseries, 0, rss.Len()*len(rcs))
var tssLock sync.Mutex var tssLock sync.Mutex
err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) {
preFunc(rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps)
for _, rc := range rcs { for _, rc := range rcs {
var ts timeseries var ts timeseries
ts.MetricName.CopyFrom(&rs.MetricName) doRollupForTimeseries(rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup)
if len(rc.TagValue) > 0 {
ts.MetricName.AddTag("rollup", rc.TagValue)
}
ts.Values = rc.Do(ts.Values[:0], rs.Values, rs.Timestamps)
ts.Timestamps = sharedTimestamps
ts.denyReuse = true
tssLock.Lock() tssLock.Lock()
tss = append(tss, &ts) tss = append(tss, &ts)
tssLock.Unlock() tssLock.Unlock()
@ -713,6 +686,20 @@ func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConf
return tss, nil return tss, nil
} }
func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64,
sharedTimestamps []int64, removeMetricGroup bool) {
tsDst.MetricName.CopyFrom(mnSrc)
if len(rc.TagValue) > 0 {
tsDst.MetricName.AddTag("rollup", rc.TagValue)
}
if removeMetricGroup {
tsDst.MetricName.ResetMetricGroup()
}
tsDst.Values = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc)
tsDst.Timestamps = sharedTimestamps
tsDst.denyReuse = true
}
func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, sharedTimestamps []int64) (func(values []float64, timestamps []int64), []*rollupConfig) { func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, sharedTimestamps []int64) (func(values []float64, timestamps []int64), []*rollupConfig) {
preFunc := func(values []float64, timestamps []int64) {} preFunc := func(values []float64, timestamps []int64) {}
if rollupFuncsRemoveCounterResets[name] { if rollupFuncsRemoveCounterResets[name] {