mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect/netstorage: improve the speed of queries over big number of time series on multi-CPU system
Reduce inter-CPU communications when processing the query over big number of time series.
This should improve performance for queries over big number of time series
on systems with many CPU cores.
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896
Based on b596ac3745
Thanks to @zqyzyq for the idea.
This commit is contained in:
parent
970f36de17
commit
3d4c312ba2
2 changed files with 47 additions and 66 deletions
|
@ -6,6 +6,7 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
@ -84,7 +85,7 @@ type timeseriesWork struct {
|
||||||
rss *Results
|
rss *Results
|
||||||
pts *packedTimeseries
|
pts *packedTimeseries
|
||||||
f func(rs *Result, workerID uint) error
|
f func(rs *Result, workerID uint) error
|
||||||
doneCh chan error
|
err error
|
||||||
|
|
||||||
rowsProcessed int
|
rowsProcessed int
|
||||||
}
|
}
|
||||||
|
@ -94,18 +95,14 @@ func (tsw *timeseriesWork) reset() {
|
||||||
tsw.rss = nil
|
tsw.rss = nil
|
||||||
tsw.pts = nil
|
tsw.pts = nil
|
||||||
tsw.f = nil
|
tsw.f = nil
|
||||||
if n := len(tsw.doneCh); n > 0 {
|
tsw.err = nil
|
||||||
logger.Panicf("BUG: tsw.doneCh must be empty during reset; it contains %d items instead", n)
|
|
||||||
}
|
|
||||||
tsw.rowsProcessed = 0
|
tsw.rowsProcessed = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTimeseriesWork() *timeseriesWork {
|
func getTimeseriesWork() *timeseriesWork {
|
||||||
v := tswPool.Get()
|
v := tswPool.Get()
|
||||||
if v == nil {
|
if v == nil {
|
||||||
v = ×eriesWork{
|
v = ×eriesWork{}
|
||||||
doneCh: make(chan error, 1),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return v.(*timeseriesWork)
|
return v.(*timeseriesWork)
|
||||||
}
|
}
|
||||||
|
@ -117,28 +114,6 @@ func putTimeseriesWork(tsw *timeseriesWork) {
|
||||||
|
|
||||||
var tswPool sync.Pool
|
var tswPool sync.Pool
|
||||||
|
|
||||||
func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) {
|
|
||||||
if len(workChs) == 1 {
|
|
||||||
// Fast path for a single worker
|
|
||||||
workChs[0] <- tsw
|
|
||||||
return
|
|
||||||
}
|
|
||||||
attempts := 0
|
|
||||||
for {
|
|
||||||
idx := fastrand.Uint32n(uint32(len(workChs)))
|
|
||||||
select {
|
|
||||||
case workChs[idx] <- tsw:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
attempts++
|
|
||||||
if attempts >= len(workChs) {
|
|
||||||
workChs[idx] <- tsw
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
|
func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
|
||||||
if atomic.LoadUint32(tsw.mustStop) != 0 {
|
if atomic.LoadUint32(tsw.mustStop) != 0 {
|
||||||
return nil
|
return nil
|
||||||
|
@ -162,15 +137,15 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
|
func timeseriesWorker(tsws []*timeseriesWork, workerID uint) {
|
||||||
v := resultPool.Get()
|
v := resultPool.Get()
|
||||||
if v == nil {
|
if v == nil {
|
||||||
v = &result{}
|
v = &result{}
|
||||||
}
|
}
|
||||||
r := v.(*result)
|
r := v.(*result)
|
||||||
for tsw := range ch {
|
for _, tsw := range tsws {
|
||||||
err := tsw.do(&r.rs, workerID)
|
err := tsw.do(&r.rs, workerID)
|
||||||
tsw.doneCh <- err
|
tsw.err = err
|
||||||
}
|
}
|
||||||
currentTime := fasttime.UnixTimestamp()
|
currentTime := fasttime.UnixTimestamp()
|
||||||
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
|
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
|
||||||
|
@ -202,31 +177,7 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
||||||
rss.tbf = nil
|
rss.tbf = nil
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Spin up local workers.
|
// Prepare work for workers.
|
||||||
//
|
|
||||||
// Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case:
|
|
||||||
// - RunParallel is called with f, which blocks without forward progress.
|
|
||||||
// - All the workers in the global pool became blocked in f.
|
|
||||||
// - workChs is filled up, so it cannot accept new work items from other RunParallel calls.
|
|
||||||
workers := len(rss.packedTimeseries)
|
|
||||||
if workers > gomaxprocs {
|
|
||||||
workers = gomaxprocs
|
|
||||||
}
|
|
||||||
if workers < 1 {
|
|
||||||
workers = 1
|
|
||||||
}
|
|
||||||
workChs := make([]chan *timeseriesWork, workers)
|
|
||||||
var workChsWG sync.WaitGroup
|
|
||||||
for i := 0; i < workers; i++ {
|
|
||||||
workChs[i] = make(chan *timeseriesWork, 16)
|
|
||||||
workChsWG.Add(1)
|
|
||||||
go func(workerID int) {
|
|
||||||
defer workChsWG.Done()
|
|
||||||
timeseriesWorker(workChs[workerID], uint(workerID))
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Feed workers with work.
|
|
||||||
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
|
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
|
||||||
var mustStop uint32
|
var mustStop uint32
|
||||||
for i := range rss.packedTimeseries {
|
for i := range rss.packedTimeseries {
|
||||||
|
@ -235,17 +186,49 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
||||||
tsw.pts = &rss.packedTimeseries[i]
|
tsw.pts = &rss.packedTimeseries[i]
|
||||||
tsw.f = f
|
tsw.f = f
|
||||||
tsw.mustStop = &mustStop
|
tsw.mustStop = &mustStop
|
||||||
scheduleTimeseriesWork(workChs, tsw)
|
|
||||||
tsws[i] = tsw
|
tsws[i] = tsw
|
||||||
}
|
}
|
||||||
seriesProcessedTotal := len(rss.packedTimeseries)
|
// Shuffle tsws for providing the equal amount of work among workers.
|
||||||
rss.packedTimeseries = rss.packedTimeseries[:0]
|
r := rand.New(rand.NewSource(int64(fasttime.UnixTimestamp())))
|
||||||
|
r.Shuffle(len(tsws), func(i, j int) {
|
||||||
|
tsws[i], tsws[j] = tsws[j], tsws[i]
|
||||||
|
})
|
||||||
|
|
||||||
|
// Spin up up to gomaxprocs local workers and split work equally among them.
|
||||||
|
// This guarantees linear scalability with the increase of gomaxprocs
|
||||||
|
// (e.g. the number of available CPU cores).
|
||||||
|
workers := len(rss.packedTimeseries)
|
||||||
|
itemsPerWorker := 1
|
||||||
|
if workers > gomaxprocs {
|
||||||
|
itemsPerWorker = 1 + workers/gomaxprocs
|
||||||
|
workers = gomaxprocs
|
||||||
|
}
|
||||||
|
var start int
|
||||||
|
var i uint
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for start < len(tsws) {
|
||||||
|
end := start + itemsPerWorker
|
||||||
|
if end > len(tsws) {
|
||||||
|
end = len(tsws)
|
||||||
|
}
|
||||||
|
chunk := tsws[start:end]
|
||||||
|
wg.Add(1)
|
||||||
|
go func(tswsChunk []*timeseriesWork, workerID uint) {
|
||||||
|
defer wg.Done()
|
||||||
|
timeseriesWorker(tswsChunk, workerID)
|
||||||
|
}(chunk, i)
|
||||||
|
start = end
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
|
||||||
// Wait until work is complete.
|
// Wait until work is complete.
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Collect results.
|
||||||
var firstErr error
|
var firstErr error
|
||||||
rowsProcessedTotal := 0
|
rowsProcessedTotal := 0
|
||||||
for _, tsw := range tsws {
|
for _, tsw := range tsws {
|
||||||
if err := <-tsw.doneCh; err != nil && firstErr == nil {
|
if err := tsw.err; err != nil && firstErr == nil {
|
||||||
// Return just the first error, since other errors are likely duplicate the first error.
|
// Return just the first error, since other errors are likely duplicate the first error.
|
||||||
firstErr = err
|
firstErr = err
|
||||||
}
|
}
|
||||||
|
@ -254,14 +237,11 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
||||||
putTimeseriesWork(tsw)
|
putTimeseriesWork(tsw)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
seriesProcessedTotal := len(rss.packedTimeseries)
|
||||||
|
rss.packedTimeseries = rss.packedTimeseries[:0]
|
||||||
rowsReadPerQuery.Update(float64(rowsProcessedTotal))
|
rowsReadPerQuery.Update(float64(rowsProcessedTotal))
|
||||||
seriesReadPerQuery.Update(float64(seriesProcessedTotal))
|
seriesReadPerQuery.Update(float64(seriesProcessedTotal))
|
||||||
|
|
||||||
// Shut down local workers
|
|
||||||
for _, workCh := range workChs {
|
|
||||||
close(workCh)
|
|
||||||
}
|
|
||||||
workChsWG.Wait()
|
|
||||||
qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal)
|
qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal)
|
||||||
|
|
||||||
return firstErr
|
return firstErr
|
||||||
|
|
|
@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): execute left and right sides of certain operations in parallel. For example, `q1 or q2`, `aggr_func(q1) <op> q2`, `q1 <op> aggr_func(q1)`. This may improve query performance if VictoriaMetrics has enough free resources for parallel processing of both sides of the operation. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886).
|
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): execute left and right sides of certain operations in parallel. For example, `q1 or q2`, `aggr_func(q1) <op> q2`, `q1 <op> aggr_func(q1)`. This may improve query performance if VictoriaMetrics has enough free resources for parallel processing of both sides of the operation. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886).
|
||||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow duplicate username records with different passwords at configuration file. It should allow password rotation without username change.
|
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow duplicate username records with different passwords at configuration file. It should allow password rotation without username change.
|
||||||
* FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics).
|
* FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics).
|
||||||
|
* FEATURE: improve performance for heavy queries over big number of time series on systems with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896). Thanks to @zqyzyq for [the idea](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2).
|
||||||
|
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus.
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus.
|
||||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884).
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884).
|
||||||
|
|
Loading…
Reference in a new issue