app/vmselect/netstorage: improve the speed of queries over big number of time series on multi-CPU system

Reduce inter-CPU communications when processing the query over big number of time series.
This should improve performance for queries over big number of time series
on systems with many CPU cores.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896

Based on b596ac3745
Thanks to @zqyzyq for the idea.
This commit is contained in:
Aliaksandr Valialkin 2022-07-25 09:12:42 +03:00
parent 1aa5112771
commit 92630c1ab4
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 47 additions and 66 deletions

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"math/rand"
"regexp" "regexp"
"sort" "sort"
"sync" "sync"
@ -80,7 +81,7 @@ type timeseriesWork struct {
rss *Results rss *Results
pts *packedTimeseries pts *packedTimeseries
f func(rs *Result, workerID uint) error f func(rs *Result, workerID uint) error
doneCh chan error err error
rowsProcessed int rowsProcessed int
} }
@ -90,18 +91,14 @@ func (tsw *timeseriesWork) reset() {
tsw.rss = nil tsw.rss = nil
tsw.pts = nil tsw.pts = nil
tsw.f = nil tsw.f = nil
if n := len(tsw.doneCh); n > 0 { tsw.err = nil
logger.Panicf("BUG: tsw.doneCh must be empty during reset; it contains %d items instead", n)
}
tsw.rowsProcessed = 0 tsw.rowsProcessed = 0
} }
func getTimeseriesWork() *timeseriesWork { func getTimeseriesWork() *timeseriesWork {
v := tswPool.Get() v := tswPool.Get()
if v == nil { if v == nil {
v = &timeseriesWork{ v = &timeseriesWork{}
doneCh: make(chan error, 1),
}
} }
return v.(*timeseriesWork) return v.(*timeseriesWork)
} }
@ -113,28 +110,6 @@ func putTimeseriesWork(tsw *timeseriesWork) {
var tswPool sync.Pool var tswPool sync.Pool
func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) {
if len(workChs) == 1 {
// Fast path for a single worker
workChs[0] <- tsw
return
}
attempts := 0
for {
idx := fastrand.Uint32n(uint32(len(workChs)))
select {
case workChs[idx] <- tsw:
return
default:
attempts++
if attempts >= len(workChs) {
workChs[idx] <- tsw
return
}
}
}
}
func (tsw *timeseriesWork) do(r *Result, workerID uint) error { func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
if atomic.LoadUint32(tsw.mustStop) != 0 { if atomic.LoadUint32(tsw.mustStop) != 0 {
return nil return nil
@ -158,15 +133,15 @@ func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
return nil return nil
} }
func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) { func timeseriesWorker(tsws []*timeseriesWork, workerID uint) {
v := resultPool.Get() v := resultPool.Get()
if v == nil { if v == nil {
v = &result{} v = &result{}
} }
r := v.(*result) r := v.(*result)
for tsw := range ch { for _, tsw := range tsws {
err := tsw.do(&r.rs, workerID) err := tsw.do(&r.rs, workerID)
tsw.doneCh <- err tsw.err = err
} }
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 { if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
@ -195,31 +170,7 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
qt = qt.NewChild("parallel process of fetched data") qt = qt.NewChild("parallel process of fetched data")
defer rss.mustClose() defer rss.mustClose()
// Spin up local workers. // Prepare work for workers.
//
// Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case:
// - RunParallel is called with f, which blocks without forward progress.
// - All the workers in the global pool became blocked in f.
// - workChs is filled up, so it cannot accept new work items from other RunParallel calls.
workers := len(rss.packedTimeseries)
if workers > gomaxprocs {
workers = gomaxprocs
}
if workers < 1 {
workers = 1
}
workChs := make([]chan *timeseriesWork, workers)
var workChsWG sync.WaitGroup
for i := 0; i < workers; i++ {
workChs[i] = make(chan *timeseriesWork, 16)
workChsWG.Add(1)
go func(workerID int) {
defer workChsWG.Done()
timeseriesWorker(workChs[workerID], uint(workerID))
}(i)
}
// Feed workers with work.
tsws := make([]*timeseriesWork, len(rss.packedTimeseries)) tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
var mustStop uint32 var mustStop uint32
for i := range rss.packedTimeseries { for i := range rss.packedTimeseries {
@ -228,17 +179,49 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
tsw.pts = &rss.packedTimeseries[i] tsw.pts = &rss.packedTimeseries[i]
tsw.f = f tsw.f = f
tsw.mustStop = &mustStop tsw.mustStop = &mustStop
scheduleTimeseriesWork(workChs, tsw)
tsws[i] = tsw tsws[i] = tsw
} }
seriesProcessedTotal := len(rss.packedTimeseries) // Shuffle tsws for providing the equal amount of work among workers.
rss.packedTimeseries = rss.packedTimeseries[:0] r := rand.New(rand.NewSource(int64(fasttime.UnixTimestamp())))
r.Shuffle(len(tsws), func(i, j int) {
tsws[i], tsws[j] = tsws[j], tsws[i]
})
// Spin up up to gomaxprocs local workers and split work equally among them.
// This guarantees linear scalability with the increase of gomaxprocs
// (e.g. the number of available CPU cores).
workers := len(rss.packedTimeseries)
itemsPerWorker := 1
if workers > gomaxprocs {
itemsPerWorker = 1 + workers/gomaxprocs
workers = gomaxprocs
}
var start int
var i uint
var wg sync.WaitGroup
for start < len(tsws) {
end := start + itemsPerWorker
if end > len(tsws) {
end = len(tsws)
}
chunk := tsws[start:end]
wg.Add(1)
go func(tswsChunk []*timeseriesWork, workerID uint) {
defer wg.Done()
timeseriesWorker(tswsChunk, workerID)
}(chunk, i)
start = end
i++
}
// Wait until work is complete. // Wait until work is complete.
wg.Wait()
// Collect results.
var firstErr error var firstErr error
rowsProcessedTotal := 0 rowsProcessedTotal := 0
for _, tsw := range tsws { for _, tsw := range tsws {
if err := <-tsw.doneCh; err != nil && firstErr == nil { if err := tsw.err; err != nil && firstErr == nil {
// Return just the first error, since other errors are likely duplicate the first error. // Return just the first error, since other errors are likely duplicate the first error.
firstErr = err firstErr = err
} }
@ -247,14 +230,11 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
putTimeseriesWork(tsw) putTimeseriesWork(tsw)
} }
seriesProcessedTotal := len(rss.packedTimeseries)
rss.packedTimeseries = rss.packedTimeseries[:0]
rowsReadPerQuery.Update(float64(rowsProcessedTotal)) rowsReadPerQuery.Update(float64(rowsProcessedTotal))
seriesReadPerQuery.Update(float64(seriesProcessedTotal)) seriesReadPerQuery.Update(float64(seriesProcessedTotal))
// Shut down local workers
for _, workCh := range workChs {
close(workCh)
}
workChsWG.Wait()
qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal) qt.Donef("series=%d, samples=%d", seriesProcessedTotal, rowsProcessedTotal)
return firstErr return firstErr

View file

@ -19,6 +19,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): execute left and right sides of certain operations in parallel. For example, `q1 or q2`, `aggr_func(q1) <op> q2`, `q1 <op> aggr_func(q1)`. This may improve query performance if VictoriaMetrics has enough free resources for parallel processing of both sides of the operation. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886). * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): execute left and right sides of certain operations in parallel. For example, `q1 or q2`, `aggr_func(q1) <op> q2`, `q1 <op> aggr_func(q1)`. This may improve query performance if VictoriaMetrics has enough free resources for parallel processing of both sides of the operation. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886).
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow duplicate username records with different passwords at configuration file. It should allow password rotation without username change. * FEATURE: [vmauth](https://docs.victoriametrics.com/vmagent.html): allow duplicate username records with different passwords at configuration file. It should allow password rotation without username change.
* FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics). * FEATURE: add ability to push internal metrics (e.g. metrics exposed at `/metrics` page) to the configured remote storage from all the VictoriaMetrics components. See [these docs](https://docs.victoriametrics.com/#push-metrics).
* FEATURE: improve performance for heavy queries over big number of time series on systems with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2896). Thanks to @zqyzyq for [the idea](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/b596ac3745314fcc170a14e3ded062971cf7ced2).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): set `up` metric to `0` for partial scrapes in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously the `up` metric was set to `1` when at least a single metric has been scraped before the error. This aligns the behaviour of `vmselect` with Prometheus.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): restart all the scrape jobs during [config reload](https://docs.victoriametrics.com/vmagent.html#configuration-update) after `global` section is changed inside `-promscrape.config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2884).