mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/promql: follow-up for 7205c79c5a
- Allocate and initialize seriesByWorkerID slice in a single go instead of initializing every item in the list separately. This should reduce CPU usage a bit. - Properly set anti-false sharing padding at timeseriesWithPadding structure - Document the change at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
This commit is contained in:
parent
fec87e3ada
commit
7aff6f872f
2 changed files with 46 additions and 36 deletions
|
@ -923,13 +923,10 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
|
|
||||||
for i := 0; i < netstorage.MaxWorkers(); i++ {
|
|
||||||
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
|
|
||||||
}
|
|
||||||
|
|
||||||
var samplesScannedTotal uint64
|
var samplesScannedTotal uint64
|
||||||
keepMetricNames := getKeepMetricNames(expr)
|
keepMetricNames := getKeepMetricNames(expr)
|
||||||
|
tsw := getTimeseriesByWorkerID()
|
||||||
|
seriesByWorkerID := tsw.byWorkerID
|
||||||
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
|
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
|
||||||
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
|
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
|
||||||
preFunc(values, timestamps)
|
preFunc(values, timestamps)
|
||||||
|
@ -950,8 +947,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||||
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
||||||
for i := range seriesByWorkerID {
|
for i := range seriesByWorkerID {
|
||||||
tss = append(tss, seriesByWorkerID[i].tss...)
|
tss = append(tss, seriesByWorkerID[i].tss...)
|
||||||
putTimeseriesPadded(seriesByWorkerID[i])
|
|
||||||
}
|
}
|
||||||
|
putTimeseriesByWorkerID(tsw)
|
||||||
|
|
||||||
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
||||||
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
|
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
|
||||||
|
@ -1226,40 +1223,15 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string,
|
||||||
return tss, nil
|
return tss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var tspPool sync.Pool
|
|
||||||
|
|
||||||
func getTimeseriesPadded() *timeseriesWithPadding {
|
|
||||||
v := tspPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return ×eriesWithPadding{}
|
|
||||||
}
|
|
||||||
return v.(*timeseriesWithPadding)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putTimeseriesPadded(tsp *timeseriesWithPadding) {
|
|
||||||
tsp.tss = tsp.tss[:0]
|
|
||||||
tspPool.Put(tsp)
|
|
||||||
}
|
|
||||||
|
|
||||||
type timeseriesWithPadding struct {
|
|
||||||
tss []*timeseries
|
|
||||||
|
|
||||||
// The padding prevents false sharing on widespread platforms with
|
|
||||||
// 128 mod (cache line size) = 0 .
|
|
||||||
_ [128 - unsafe.Sizeof(timeseries{})%128]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
|
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
|
||||||
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
||||||
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
|
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
|
||||||
defer qt.Done()
|
defer qt.Done()
|
||||||
|
|
||||||
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
|
|
||||||
for i := 0; i < netstorage.MaxWorkers(); i++ {
|
|
||||||
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
|
|
||||||
}
|
|
||||||
|
|
||||||
var samplesScannedTotal uint64
|
var samplesScannedTotal uint64
|
||||||
|
tsw := getTimeseriesByWorkerID()
|
||||||
|
seriesByWorkerID := tsw.byWorkerID
|
||||||
|
seriesLen := rss.Len()
|
||||||
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
||||||
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
||||||
preFunc(rs.Values, rs.Timestamps)
|
preFunc(rs.Values, rs.Timestamps)
|
||||||
|
@ -1280,11 +1252,11 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
|
tss := make([]*timeseries, 0, seriesLen*len(rcs))
|
||||||
for i := range seriesByWorkerID {
|
for i := range seriesByWorkerID {
|
||||||
tss = append(tss, seriesByWorkerID[i].tss...)
|
tss = append(tss, seriesByWorkerID[i].tss...)
|
||||||
putTimeseriesPadded(seriesByWorkerID[i])
|
|
||||||
}
|
}
|
||||||
|
putTimeseriesByWorkerID(tsw)
|
||||||
|
|
||||||
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
||||||
qt.Printf("samplesScanned=%d", samplesScannedTotal)
|
qt.Printf("samplesScanned=%d", samplesScannedTotal)
|
||||||
|
@ -1307,6 +1279,42 @@ func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConf
|
||||||
return samplesScanned
|
return samplesScanned
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type timeseriesWithPadding struct {
|
||||||
|
tss []*timeseries
|
||||||
|
|
||||||
|
// The padding prevents false sharing on widespread platforms with
|
||||||
|
// 128 mod (cache line size) = 0 .
|
||||||
|
_ [128 - unsafe.Sizeof([]*timeseries{})%128]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type timeseriesByWorkerID struct {
|
||||||
|
byWorkerID []timeseriesWithPadding
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tsw *timeseriesByWorkerID) reset() {
|
||||||
|
byWorkerID := tsw.byWorkerID
|
||||||
|
for i := range byWorkerID {
|
||||||
|
tsw.byWorkerID[i].tss = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTimeseriesByWorkerID() *timeseriesByWorkerID {
|
||||||
|
v := timeseriesByWorkerIDPool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return ×eriesByWorkerID{
|
||||||
|
byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return v.(*timeseriesByWorkerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) {
|
||||||
|
tsw.reset()
|
||||||
|
timeseriesByWorkerIDPool.Put(tsw)
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeseriesByWorkerIDPool sync.Pool
|
||||||
|
|
||||||
var bbPool bytesutil.ByteBufferPool
|
var bbPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
|
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
|
||||||
|
|
|
@ -37,6 +37,8 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
|
||||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix displaying errors for each query. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3987).
|
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix displaying errors for each query. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3987).
|
||||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
|
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
|
||||||
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
|
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
|
||||||
|
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
|
||||||
|
|
||||||
|
|
||||||
## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1)
|
## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue