From cee0fb80718191b3d93e1f969a85f6ada0bb3b86 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 24 Mar 2023 23:34:34 -0700 Subject: [PATCH] app/vmselect/promql: follow-up for 7205c79c5a617faff1c77b3b316f9fd53633b529 - Allocate and initialize seriesByWorkerID slice in a single go instead of initializing every item in the list separately. This should reduce CPU usage a bit. - Properly set anti-false sharing padding at timeseriesWithPadding structure - Document the change at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966 --- app/vmselect/promql/eval.go | 80 ++++++++++++++++++++----------------- docs/CHANGELOG.md | 2 + 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 0ddd19cc67..030dd7512c 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -912,13 +912,10 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName return nil, err } - seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) - for i := 0; i < netstorage.MaxWorkers(); i++ { - seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) - } - var samplesScannedTotal uint64 keepMetricNames := getKeepMetricNames(expr) + tsw := getTimeseriesByWorkerID() + seriesByWorkerID := tsw.byWorkerID doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) { values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps) preFunc(values, timestamps) @@ -939,8 +936,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName tss := make([]*timeseries, 0, len(tssSQ)*len(rcs)) for i := range seriesByWorkerID { tss = append(tss, seriesByWorkerID[i].tss...) - putTimeseriesPadded(seriesByWorkerID[i]) } + putTimeseriesByWorkerID(tsw) rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal) @@ -1208,40 +1205,15 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, return tss, nil } -var tspPool sync.Pool - -func getTimeseriesPadded() *timeseriesWithPadding { - v := tspPool.Get() - if v == nil { - return ×eriesWithPadding{} - } - return v.(*timeseriesWithPadding) -} - -func putTimeseriesPadded(tsp *timeseriesWithPadding) { - tsp.tss = tsp.tss[:0] - tspPool.Put(tsp) -} - -type timeseriesWithPadding struct { - tss []*timeseries - - // The padding prevents false sharing on widespread platforms with - // 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(timeseries{})%128]byte -} - func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() - seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers()) - for i := 0; i < netstorage.MaxWorkers(); i++ { - seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded()) - } - var samplesScannedTotal uint64 + tsw := getTimeseriesByWorkerID() + seriesByWorkerID := tsw.byWorkerID + seriesLen := rss.Len() err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) @@ -1262,11 +1234,11 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k if err != nil { return nil, err } - tss := make([]*timeseries, 0, rss.Len()*len(rcs)) + tss := make([]*timeseries, 0, seriesLen*len(rcs)) for i := range seriesByWorkerID { tss = append(tss, seriesByWorkerID[i].tss...) - putTimeseriesPadded(seriesByWorkerID[i]) } + putTimeseriesByWorkerID(tsw) rowsScannedPerQuery.Update(float64(samplesScannedTotal)) qt.Printf("samplesScanned=%d", samplesScannedTotal) @@ -1289,6 +1261,42 @@ func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConf return samplesScanned } +type timeseriesWithPadding struct { + tss []*timeseries + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof([]*timeseries{})%128]byte +} + +type timeseriesByWorkerID struct { + byWorkerID []timeseriesWithPadding +} + +func (tsw *timeseriesByWorkerID) reset() { + byWorkerID := tsw.byWorkerID + for i := range byWorkerID { + tsw.byWorkerID[i].tss = nil + } +} + +func getTimeseriesByWorkerID() *timeseriesByWorkerID { + v := timeseriesByWorkerIDPool.Get() + if v == nil { + return ×eriesByWorkerID{ + byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()), + } + } + return v.(*timeseriesByWorkerID) +} + +func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) { + tsw.reset() + timeseriesByWorkerIDPool.Put(tsw) +} + +var timeseriesByWorkerIDPool sync.Pool + var bbPool bytesutil.ByteBufferPool func evalNumber(ec *EvalConfig, n float64) []*timeseries { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b2523cdcce..bfe6779cfe 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,8 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error. * BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055). * BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). +* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). + ## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)