mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/netstorage: unpack time series data in mostly local big chunks
This should improve performance on multi-CPU systems for queries selecting time series with big number of raw samples
This commit is contained in:
parent
d05cac6c98
commit
a1911e1330
2 changed files with 91 additions and 58 deletions
|
@ -122,7 +122,7 @@ var tswPool sync.Pool
|
||||||
|
|
||||||
func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) {
|
func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) {
|
||||||
if len(workChs) == 1 {
|
if len(workChs) == 1 {
|
||||||
// Fast path for a single CPU core
|
// Fast path for a single worker
|
||||||
workChs[0] <- tsw
|
workChs[0] <- tsw
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -142,6 +142,29 @@ func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
|
||||||
|
if atomic.LoadUint32(tsw.mustStop) != 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rss := tsw.rss
|
||||||
|
if rss.deadline.Exceeded() {
|
||||||
|
atomic.StoreUint32(tsw.mustStop, 1)
|
||||||
|
return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
||||||
|
}
|
||||||
|
if err := tsw.pts.Unpack(r, rss.tbf, rss.tr, rss.fetchData); err != nil {
|
||||||
|
atomic.StoreUint32(tsw.mustStop, 1)
|
||||||
|
return fmt.Errorf("error during time series unpacking: %w", err)
|
||||||
|
}
|
||||||
|
if len(r.Timestamps) > 0 || !rss.fetchData {
|
||||||
|
if err := tsw.f(r, workerID); err != nil {
|
||||||
|
atomic.StoreUint32(tsw.mustStop, 1)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tsw.rowsProcessed = len(r.Values)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
|
func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
|
||||||
v := resultPool.Get()
|
v := resultPool.Get()
|
||||||
if v == nil {
|
if v == nil {
|
||||||
|
@ -149,38 +172,15 @@ func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
|
||||||
}
|
}
|
||||||
r := v.(*result)
|
r := v.(*result)
|
||||||
for tsw := range ch {
|
for tsw := range ch {
|
||||||
if atomic.LoadUint32(tsw.mustStop) != 0 {
|
err := tsw.do(&r.rs, workerID)
|
||||||
tsw.doneCh <- nil
|
tsw.doneCh <- err
|
||||||
continue
|
}
|
||||||
}
|
currentTime := fasttime.UnixTimestamp()
|
||||||
rss := tsw.rss
|
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
|
||||||
if rss.deadline.Exceeded() {
|
// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
|
||||||
atomic.StoreUint32(tsw.mustStop, 1)
|
r.rs = Result{}
|
||||||
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
r.lastResetTime = currentTime
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := tsw.pts.Unpack(&r.rs, rss.tbf, rss.tr, rss.fetchData); err != nil {
|
|
||||||
atomic.StoreUint32(tsw.mustStop, 1)
|
|
||||||
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(r.rs.Timestamps) > 0 || !rss.fetchData {
|
|
||||||
if err := tsw.f(&r.rs, workerID); err != nil {
|
|
||||||
atomic.StoreUint32(tsw.mustStop, 1)
|
|
||||||
tsw.doneCh <- err
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tsw.rowsProcessed = len(r.rs.Values)
|
|
||||||
tsw.doneCh <- nil
|
|
||||||
currentTime := fasttime.UnixTimestamp()
|
|
||||||
if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
|
|
||||||
// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
|
|
||||||
r.rs = Result{}
|
|
||||||
r.lastResetTime = currentTime
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
r.rs.reset()
|
|
||||||
resultPool.Put(r)
|
resultPool.Put(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -335,32 +335,22 @@ func putUnpackWork(upw *unpackWork) {
|
||||||
|
|
||||||
var unpackWorkPool sync.Pool
|
var unpackWorkPool sync.Pool
|
||||||
|
|
||||||
var unpackWorkChs []chan *unpackWork
|
func scheduleUnpackWork(workChs []chan *unpackWork, uw *unpackWork) {
|
||||||
|
if len(workChs) == 1 {
|
||||||
func init() {
|
// Fast path for a single worker
|
||||||
unpackWorkChs = make([]chan *unpackWork, gomaxprocs)
|
workChs[0] <- uw
|
||||||
for i := range unpackWorkChs {
|
|
||||||
unpackWorkChs[i] = make(chan *unpackWork, 128)
|
|
||||||
go unpackWorker(unpackWorkChs[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func scheduleUnpackWork(uw *unpackWork) {
|
|
||||||
if len(unpackWorkChs) == 1 {
|
|
||||||
// Fast path for a single CPU core
|
|
||||||
unpackWorkChs[0] <- uw
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
attempts := 0
|
attempts := 0
|
||||||
for {
|
for {
|
||||||
idx := fastrand.Uint32n(uint32(len(unpackWorkChs)))
|
idx := fastrand.Uint32n(uint32(len(workChs)))
|
||||||
select {
|
select {
|
||||||
case unpackWorkChs[idx] <- uw:
|
case workChs[idx] <- uw:
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
attempts++
|
attempts++
|
||||||
if attempts >= len(unpackWorkChs) {
|
if attempts >= len(workChs) {
|
||||||
unpackWorkChs[idx] <- uw
|
workChs[idx] <- uw
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -368,16 +358,26 @@ func scheduleUnpackWork(uw *unpackWork) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func unpackWorker(ch <-chan *unpackWork) {
|
func unpackWorker(ch <-chan *unpackWork) {
|
||||||
var tmpBlock storage.Block
|
v := tmpBlockPool.Get()
|
||||||
for upw := range ch {
|
if v == nil {
|
||||||
upw.unpack(&tmpBlock)
|
v = &storage.Block{}
|
||||||
}
|
}
|
||||||
|
tmpBlock := v.(*storage.Block)
|
||||||
|
for upw := range ch {
|
||||||
|
upw.unpack(tmpBlock)
|
||||||
|
}
|
||||||
|
tmpBlockPool.Put(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var tmpBlockPool sync.Pool
|
||||||
|
|
||||||
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
|
// unpackBatchSize is the maximum number of blocks that may be unpacked at once by a single goroutine.
|
||||||
//
|
//
|
||||||
// This batch is needed in order to reduce contention for upackWorkCh in multi-CPU system.
|
// It is better to load a single goroutine for up to one second on a system with many CPU cores
|
||||||
var unpackBatchSize = 32 * cgroup.AvailableCPUs()
|
// in order to reduce inter-CPU memory ping-pong.
|
||||||
|
// A single goroutine can unpack up to 40 millions of rows per second, while a single block contains up to 8K rows.
|
||||||
|
// So the batch size should be 40M / 8K = 5K.
|
||||||
|
var unpackBatchSize = 5000
|
||||||
|
|
||||||
// Unpack unpacks pts to dst.
|
// Unpack unpacks pts to dst.
|
||||||
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error {
|
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error {
|
||||||
|
@ -390,14 +390,39 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Feed workers with work
|
// Spin up local workers.
|
||||||
|
// Do not use global workers pool, since it increases inter-CPU memory ping-poing,
|
||||||
|
// which reduces the scalability on systems with many CPU cores.
|
||||||
brsLen := len(pts.brs)
|
brsLen := len(pts.brs)
|
||||||
|
workers := brsLen / unpackBatchSize
|
||||||
|
if workers > gomaxprocs {
|
||||||
|
workers = gomaxprocs
|
||||||
|
}
|
||||||
|
if workers < 1 {
|
||||||
|
workers = 1
|
||||||
|
}
|
||||||
|
workChs := make([]chan *unpackWork, workers)
|
||||||
|
var workChsWG sync.WaitGroup
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
// Use unbuffered channel on purpose, since there are high chances
|
||||||
|
// that only a single unpackWork is needed to unpack.
|
||||||
|
// The unbuffered channel should reduce inter-CPU ping-pong in this case,
|
||||||
|
// which should improve the performance in a system with many CPU cores.
|
||||||
|
workChs[i] = make(chan *unpackWork)
|
||||||
|
workChsWG.Add(1)
|
||||||
|
go func(workerID int) {
|
||||||
|
defer workChsWG.Done()
|
||||||
|
unpackWorker(workChs[workerID])
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Feed workers with work
|
||||||
upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize)
|
upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize)
|
||||||
upw := getUnpackWork()
|
upw := getUnpackWork()
|
||||||
upw.tbf = tbf
|
upw.tbf = tbf
|
||||||
for _, br := range pts.brs {
|
for _, br := range pts.brs {
|
||||||
if len(upw.ws) >= unpackBatchSize {
|
if len(upw.ws) >= unpackBatchSize {
|
||||||
scheduleUnpackWork(upw)
|
scheduleUnpackWork(workChs, upw)
|
||||||
upws = append(upws, upw)
|
upws = append(upws, upw)
|
||||||
upw = getUnpackWork()
|
upw = getUnpackWork()
|
||||||
upw.tbf = tbf
|
upw.tbf = tbf
|
||||||
|
@ -407,7 +432,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
||||||
tr: tr,
|
tr: tr,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
scheduleUnpackWork(upw)
|
scheduleUnpackWork(workChs, upw)
|
||||||
upws = append(upws, upw)
|
upws = append(upws, upw)
|
||||||
pts.brs = pts.brs[:0]
|
pts.brs = pts.brs[:0]
|
||||||
|
|
||||||
|
@ -438,6 +463,13 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
|
||||||
}
|
}
|
||||||
putUnpackWork(upw)
|
putUnpackWork(upw)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shut down local workers
|
||||||
|
for _, workCh := range workChs {
|
||||||
|
close(workCh)
|
||||||
|
}
|
||||||
|
workChsWG.Wait()
|
||||||
|
|
||||||
if firstErr != nil {
|
if firstErr != nil {
|
||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ sort: 15
|
||||||
|
|
||||||
* FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query can process per each time series. This option can protect from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067).
|
* FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query can process per each time series. This option can protect from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067).
|
||||||
* FEATURE: add `-search.maxSamplesPerQuery` command-line flag for limiting the number of raw samples a single query can process across all the time series. This option can protect from heavy queries, which select too big number of raw samples. Thanks to @jiangxinlingdu for [the initial pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1478).
|
* FEATURE: add `-search.maxSamplesPerQuery` command-line flag for limiting the number of raw samples a single query can process across all the time series. This option can protect from heavy queries, which select too big number of raw samples. Thanks to @jiangxinlingdu for [the initial pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1478).
|
||||||
|
* FEATURE: improve performance for heavy queries on systems with big number of CPU cores.
|
||||||
|
|
||||||
* BUGFIX: vmselect: return dummy response at `/rules` page in the same way as for `/api/v1/rules` page. The `/rules` page is requested by Grafana 8. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1493) for details.
|
* BUGFIX: vmselect: return dummy response at `/rules` page in the same way as for `/api/v1/rules` page. The `/rules` page is requested by Grafana 8. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1493) for details.
|
||||||
* BUGFIX: vmbackup: automatically set default `us-east-1` S3 region if it is missing. This should simplify using S3-compatible services such as MinIO for backups. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1449).
|
* BUGFIX: vmbackup: automatically set default `us-east-1` S3 region if it is missing. This should simplify using S3-compatible services such as MinIO for backups. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1449).
|
||||||
|
|
Loading…
Reference in a new issue