app/vmselect/promql: follow-up for 7205c79c5a

- Allocate and initialize seriesByWorkerID slice in a single go instead
  of initializing every item in the list separately.
  This should reduce CPU usage a bit.
- Properly set anti-false sharing padding at timeseriesWithPadding structure
- Document the change at docs/CHANGELOG.md

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
This commit is contained in:
Aliaksandr Valialkin 2023-03-24 23:34:34 -07:00
parent 36da3faf73
commit a7079022ff
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 46 additions and 36 deletions

View file

@ -895,13 +895,10 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
return nil, err
}
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
for i := 0; i < netstorage.MaxWorkers(); i++ {
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
}
var samplesScannedTotal uint64
keepMetricNames := getKeepMetricNames(expr)
tsw := getTimeseriesByWorkerID()
seriesByWorkerID := tsw.byWorkerID
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps)
@ -922,8 +919,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
for i := range seriesByWorkerID {
tss = append(tss, seriesByWorkerID[i].tss...)
putTimeseriesPadded(seriesByWorkerID[i])
}
putTimeseriesByWorkerID(tsw)
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
@ -1188,40 +1185,15 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string,
return tss, nil
}
var tspPool sync.Pool
func getTimeseriesPadded() *timeseriesWithPadding {
v := tspPool.Get()
if v == nil {
return &timeseriesWithPadding{}
}
return v.(*timeseriesWithPadding)
}
func putTimeseriesPadded(tsp *timeseriesWithPadding) {
tsp.tss = tsp.tss[:0]
tspPool.Put(tsp)
}
type timeseriesWithPadding struct {
tss []*timeseries
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(timeseries{})%128]byte
}
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
defer qt.Done()
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
for i := 0; i < netstorage.MaxWorkers(); i++ {
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
}
var samplesScannedTotal uint64
tsw := getTimeseriesByWorkerID()
seriesByWorkerID := tsw.byWorkerID
seriesLen := rss.Len()
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps)
@ -1242,11 +1214,11 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k
if err != nil {
return nil, err
}
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
tss := make([]*timeseries, 0, seriesLen*len(rcs))
for i := range seriesByWorkerID {
tss = append(tss, seriesByWorkerID[i].tss...)
putTimeseriesPadded(seriesByWorkerID[i])
}
putTimeseriesByWorkerID(tsw)
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("samplesScanned=%d", samplesScannedTotal)
@ -1269,6 +1241,42 @@ func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConf
return samplesScanned
}
type timeseriesWithPadding struct {
tss []*timeseries
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof([]*timeseries{})%128]byte
}
type timeseriesByWorkerID struct {
byWorkerID []timeseriesWithPadding
}
func (tsw *timeseriesByWorkerID) reset() {
byWorkerID := tsw.byWorkerID
for i := range byWorkerID {
tsw.byWorkerID[i].tss = nil
}
}
func getTimeseriesByWorkerID() *timeseriesByWorkerID {
v := timeseriesByWorkerIDPool.Get()
if v == nil {
return &timeseriesByWorkerID{
byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()),
}
}
return v.(*timeseriesByWorkerID)
}
func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) {
tsw.reset()
timeseriesByWorkerIDPool.Put(tsw)
}
var timeseriesByWorkerIDPool sync.Pool
var bbPool bytesutil.ByteBufferPool
func evalNumber(ec *EvalConfig, n float64) []*timeseries {

View file

@ -19,6 +19,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)