mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
app/vmselect/promql: follow-up for 7205c79c5a
- Allocate and initialize seriesByWorkerID slice in a single go instead of initializing every item in the list separately. This should reduce CPU usage a bit. - Properly set anti-false sharing padding at timeseriesWithPadding structure - Document the change at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966
This commit is contained in:
parent
71044221f6
commit
cee0fb8071
2 changed files with 46 additions and 36 deletions
|
@ -912,13 +912,10 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
|
|
||||||
for i := 0; i < netstorage.MaxWorkers(); i++ {
|
|
||||||
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
|
|
||||||
}
|
|
||||||
|
|
||||||
var samplesScannedTotal uint64
|
var samplesScannedTotal uint64
|
||||||
keepMetricNames := getKeepMetricNames(expr)
|
keepMetricNames := getKeepMetricNames(expr)
|
||||||
|
tsw := getTimeseriesByWorkerID()
|
||||||
|
seriesByWorkerID := tsw.byWorkerID
|
||||||
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
|
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
|
||||||
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
|
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
|
||||||
preFunc(values, timestamps)
|
preFunc(values, timestamps)
|
||||||
|
@ -939,8 +936,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||||
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
|
||||||
for i := range seriesByWorkerID {
|
for i := range seriesByWorkerID {
|
||||||
tss = append(tss, seriesByWorkerID[i].tss...)
|
tss = append(tss, seriesByWorkerID[i].tss...)
|
||||||
putTimeseriesPadded(seriesByWorkerID[i])
|
|
||||||
}
|
}
|
||||||
|
putTimeseriesByWorkerID(tsw)
|
||||||
|
|
||||||
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
||||||
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
|
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
|
||||||
|
@ -1208,40 +1205,15 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string,
|
||||||
return tss, nil
|
return tss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var tspPool sync.Pool
|
|
||||||
|
|
||||||
func getTimeseriesPadded() *timeseriesWithPadding {
|
|
||||||
v := tspPool.Get()
|
|
||||||
if v == nil {
|
|
||||||
return ×eriesWithPadding{}
|
|
||||||
}
|
|
||||||
return v.(*timeseriesWithPadding)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putTimeseriesPadded(tsp *timeseriesWithPadding) {
|
|
||||||
tsp.tss = tsp.tss[:0]
|
|
||||||
tspPool.Put(tsp)
|
|
||||||
}
|
|
||||||
|
|
||||||
type timeseriesWithPadding struct {
|
|
||||||
tss []*timeseries
|
|
||||||
|
|
||||||
// The padding prevents false sharing on widespread platforms with
|
|
||||||
// 128 mod (cache line size) = 0 .
|
|
||||||
_ [128 - unsafe.Sizeof(timeseries{})%128]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
|
func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, keepMetricNames bool, rss *netstorage.Results, rcs []*rollupConfig,
|
||||||
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
|
||||||
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
|
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
|
||||||
defer qt.Done()
|
defer qt.Done()
|
||||||
|
|
||||||
seriesByWorkerID := make([]*timeseriesWithPadding, 0, netstorage.MaxWorkers())
|
|
||||||
for i := 0; i < netstorage.MaxWorkers(); i++ {
|
|
||||||
seriesByWorkerID = append(seriesByWorkerID, getTimeseriesPadded())
|
|
||||||
}
|
|
||||||
|
|
||||||
var samplesScannedTotal uint64
|
var samplesScannedTotal uint64
|
||||||
|
tsw := getTimeseriesByWorkerID()
|
||||||
|
seriesByWorkerID := tsw.byWorkerID
|
||||||
|
seriesLen := rss.Len()
|
||||||
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
|
||||||
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
|
||||||
preFunc(rs.Values, rs.Timestamps)
|
preFunc(rs.Values, rs.Timestamps)
|
||||||
|
@ -1262,11 +1234,11 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tss := make([]*timeseries, 0, rss.Len()*len(rcs))
|
tss := make([]*timeseries, 0, seriesLen*len(rcs))
|
||||||
for i := range seriesByWorkerID {
|
for i := range seriesByWorkerID {
|
||||||
tss = append(tss, seriesByWorkerID[i].tss...)
|
tss = append(tss, seriesByWorkerID[i].tss...)
|
||||||
putTimeseriesPadded(seriesByWorkerID[i])
|
|
||||||
}
|
}
|
||||||
|
putTimeseriesByWorkerID(tsw)
|
||||||
|
|
||||||
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
|
||||||
qt.Printf("samplesScanned=%d", samplesScannedTotal)
|
qt.Printf("samplesScanned=%d", samplesScannedTotal)
|
||||||
|
@ -1289,6 +1261,42 @@ func doRollupForTimeseries(funcName string, keepMetricNames bool, rc *rollupConf
|
||||||
return samplesScanned
|
return samplesScanned
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type timeseriesWithPadding struct {
|
||||||
|
tss []*timeseries
|
||||||
|
|
||||||
|
// The padding prevents false sharing on widespread platforms with
|
||||||
|
// 128 mod (cache line size) = 0 .
|
||||||
|
_ [128 - unsafe.Sizeof([]*timeseries{})%128]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type timeseriesByWorkerID struct {
|
||||||
|
byWorkerID []timeseriesWithPadding
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tsw *timeseriesByWorkerID) reset() {
|
||||||
|
byWorkerID := tsw.byWorkerID
|
||||||
|
for i := range byWorkerID {
|
||||||
|
tsw.byWorkerID[i].tss = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTimeseriesByWorkerID() *timeseriesByWorkerID {
|
||||||
|
v := timeseriesByWorkerIDPool.Get()
|
||||||
|
if v == nil {
|
||||||
|
return ×eriesByWorkerID{
|
||||||
|
byWorkerID: make([]timeseriesWithPadding, netstorage.MaxWorkers()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return v.(*timeseriesByWorkerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func putTimeseriesByWorkerID(tsw *timeseriesByWorkerID) {
|
||||||
|
tsw.reset()
|
||||||
|
timeseriesByWorkerIDPool.Put(tsw)
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeseriesByWorkerIDPool sync.Pool
|
||||||
|
|
||||||
var bbPool bytesutil.ByteBufferPool
|
var bbPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
|
func evalNumber(ec *EvalConfig, n float64) []*timeseries {
|
||||||
|
|
|
@ -19,6 +19,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
|
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
|
||||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
|
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
|
||||||
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
|
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
|
||||||
|
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
|
||||||
|
|
||||||
|
|
||||||
## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)
|
## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue