From 0e52357f351c60ddfa348f9c51768c768bba32f9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 25 Jul 2019 21:48:12 +0300 Subject: [PATCH] app/vmselect/promql: properly calculate incremental aggregations grouped by `__name__` Previously the following query may fail on multiple distinct metric names match: sum(count_over_time{__name__!=''}) by (__name__) --- app/vmselect/promql/eval.go | 65 +++++++++++++++---------------------- 1 file changed, 26 insertions(+), 39 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 21704b2ee..ead47a162 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -472,31 +472,19 @@ func evalRollupFuncWithSubquery(ec *EvalConfig, name string, rf rollupFunc, re * preFunc, rcs := getRollupConfigs(name, rf, ec.Start, ec.End, ec.Step, window, sharedTimestamps) tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) var tssLock sync.Mutex + removeMetricGroup := !rollupFuncsKeepMetricGroup[name] doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64) ([]float64, []int64) { values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) for _, rc := range rcs { var ts timeseries - ts.MetricName.CopyFrom(&tsSQ.MetricName) - if len(rc.TagValue) > 0 { - ts.MetricName.AddTag("rollup", rc.TagValue) - } - ts.Values = rc.Do(ts.Values[:0], values, timestamps) - ts.Timestamps = sharedTimestamps - ts.denyReuse = true - + doRollupForTimeseries(rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps, removeMetricGroup) tssLock.Lock() tss = append(tss, &ts) tssLock.Unlock() } return values, timestamps }) - if !rollupFuncsKeepMetricGroup[name] { - tss = copyTimeseriesMetricNames(tss) - for _, ts := range tss { - ts.MetricName.ResetMetricGroup() - } - } return tss, nil } @@ -626,22 +614,16 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me defer rml.Put(uint64(rollupMemorySize)) // Evaluate rollup + removeMetricGroup := !rollupFuncsKeepMetricGroup[name] var tss []*timeseries if iafc != nil { - tss, err = evalRollupWithIncrementalAggregate(iafc, rss, rcs, preFunc, sharedTimestamps) + tss, err = evalRollupWithIncrementalAggregate(iafc, rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) } else { - tss, err = evalRollupNoIncrementalAggregate(rss, rcs, preFunc, sharedTimestamps) + tss, err = evalRollupNoIncrementalAggregate(rss, rcs, preFunc, sharedTimestamps, removeMetricGroup) } if err != nil { return nil, err } - - if !rollupFuncsKeepMetricGroup[name] { - tss = copyTimeseriesMetricNames(tss) - for _, ts := range tss { - ts.MetricName.ResetMetricGroup() - } - } tss = mergeTimeseries(tssCached, tss, start, ec) if !isPartial { rollupResultCacheV.Put(name, ec, me, iafc, window, tss) @@ -662,21 +644,19 @@ func getRollupMemoryLimiter() *memoryLimiter { } func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *netstorage.Results, rcs []*rollupConfig, - preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { + preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { preFunc(rs.Values, rs.Timestamps) ts := getTimeseries() defer putTimeseries(ts) for _, rc := range rcs { ts.Reset() - ts.MetricName.CopyFrom(&rs.MetricName) - if len(rc.TagValue) > 0 { - ts.MetricName.AddTag("rollup", rc.TagValue) - } - ts.Values = rc.Do(ts.Values[:0], rs.Values, rs.Timestamps) - ts.Timestamps = sharedTimestamps + doRollupForTimeseries(rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) iafc.updateTimeseries(ts, workerID) + + // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used. ts.Timestamps = nil + ts.denyReuse = false } }) if err != nil { @@ -687,21 +667,14 @@ func evalRollupWithIncrementalAggregate(iafc *incrementalAggrFuncContext, rss *n } func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConfig, - preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { + preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64, removeMetricGroup bool) ([]*timeseries, error) { tss := make([]*timeseries, 0, rss.Len()*len(rcs)) var tssLock sync.Mutex err := rss.RunParallel(func(rs *netstorage.Result, workerID uint) { preFunc(rs.Values, rs.Timestamps) for _, rc := range rcs { var ts timeseries - ts.MetricName.CopyFrom(&rs.MetricName) - if len(rc.TagValue) > 0 { - ts.MetricName.AddTag("rollup", rc.TagValue) - } - ts.Values = rc.Do(ts.Values[:0], rs.Values, rs.Timestamps) - ts.Timestamps = sharedTimestamps - ts.denyReuse = true - + doRollupForTimeseries(rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps, removeMetricGroup) tssLock.Lock() tss = append(tss, &ts) tssLock.Unlock() @@ -713,6 +686,20 @@ func evalRollupNoIncrementalAggregate(rss *netstorage.Results, rcs []*rollupConf return tss, nil } +func doRollupForTimeseries(rc *rollupConfig, tsDst *timeseries, mnSrc *storage.MetricName, valuesSrc []float64, timestampsSrc []int64, + sharedTimestamps []int64, removeMetricGroup bool) { + tsDst.MetricName.CopyFrom(mnSrc) + if len(rc.TagValue) > 0 { + tsDst.MetricName.AddTag("rollup", rc.TagValue) + } + if removeMetricGroup { + tsDst.MetricName.ResetMetricGroup() + } + tsDst.Values = rc.Do(tsDst.Values[:0], valuesSrc, timestampsSrc) + tsDst.Timestamps = sharedTimestamps + tsDst.denyReuse = true +} + func getRollupConfigs(name string, rf rollupFunc, start, end, step, window int64, sharedTimestamps []int64) (func(values []float64, timestamps []int64), []*rollupConfig) { preFunc := func(values []float64, timestamps []int64) {} if rollupFuncsRemoveCounterResets[name] {