mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect/netstorage: increase concurrency when processing small number of time series with big number of data points per each time series
Previously VictoriaMetrics was processing up to 32 time series in a single goroutine. This could be slow if each time series contains big number of data points (10M+ or more), since only a single CPU core could be loaded with work, while other CPU cores were idle. Fix this by launching GOMAXPROCS workers for time series processing. This should help with https://github.com/VictoriaMetrics/VictoriaMetrics/issues/572
This commit is contained in:
parent
72c90bfd8b
commit
7209d58fbd
1 changed files with 115 additions and 127 deletions
|
@ -7,13 +7,11 @@ import (
|
|||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
@ -72,6 +70,47 @@ func (rss *Results) mustClose() {
|
|||
rss.sr = nil
|
||||
}
|
||||
|
||||
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs)
|
||||
|
||||
type timeseriesWork struct {
|
||||
rss *Results
|
||||
pts *packedTimeseries
|
||||
f func(rs *Result, workerID uint)
|
||||
doneCh chan error
|
||||
|
||||
rowsProcessed int
|
||||
}
|
||||
|
||||
func init() {
|
||||
for i := 0; i < gomaxprocs; i++ {
|
||||
go timeseriesWorker(uint(i))
|
||||
}
|
||||
}
|
||||
|
||||
func timeseriesWorker(workerID uint) {
|
||||
var rs Result
|
||||
for tsw := range timeseriesWorkCh {
|
||||
rss := tsw.rss
|
||||
if time.Until(rss.deadline.Deadline) < 0 {
|
||||
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
||||
continue
|
||||
}
|
||||
if err := tsw.pts.Unpack(&rs, rss.tr, rss.fetchData); err != nil {
|
||||
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %s", err)
|
||||
continue
|
||||
}
|
||||
if len(rs.Timestamps) > 0 || !rss.fetchData {
|
||||
tsw.f(&rs, workerID)
|
||||
}
|
||||
tsw.rowsProcessed = len(rs.Values)
|
||||
tsw.doneCh <- nil
|
||||
if cap(rs.Values) > 1024*1024 && 4*len(rs.Values) < cap(rs.Values) {
|
||||
// Reset rs in order to preseve memory usage after processing big time series with millions of rows.
|
||||
rs = Result{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RunParallel runs in parallel f for all the results from rss.
|
||||
//
|
||||
// f shouldn't hold references to rs after returning.
|
||||
|
@ -81,72 +120,36 @@ func (rss *Results) mustClose() {
|
|||
func (rss *Results) RunParallel(f func(rs *Result, workerID uint)) error {
|
||||
defer rss.mustClose()
|
||||
|
||||
workersCount := 1 + len(rss.packedTimeseries)/32
|
||||
if workersCount > gomaxprocs {
|
||||
workersCount = gomaxprocs
|
||||
}
|
||||
if workersCount == 0 {
|
||||
logger.Panicf("BUG: workersCount cannot be zero")
|
||||
}
|
||||
workCh := make(chan *packedTimeseries, workersCount)
|
||||
doneCh := make(chan error)
|
||||
|
||||
// Start workers.
|
||||
rowsProcessedTotal := uint64(0)
|
||||
for i := 0; i < workersCount; i++ {
|
||||
go func(workerID uint) {
|
||||
rs := getResult()
|
||||
defer putResult(rs)
|
||||
maxWorkersCount := gomaxprocs / workersCount
|
||||
|
||||
var err error
|
||||
rowsProcessed := 0
|
||||
for pts := range workCh {
|
||||
if time.Until(rss.deadline.Deadline) < 0 {
|
||||
err = fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
||||
break
|
||||
}
|
||||
if err = pts.Unpack(rs, rss.tr, rss.fetchData, maxWorkersCount); err != nil {
|
||||
break
|
||||
}
|
||||
if len(rs.Timestamps) == 0 && rss.fetchData {
|
||||
// Skip empty blocks.
|
||||
continue
|
||||
}
|
||||
rowsProcessed += len(rs.Values)
|
||||
f(rs, workerID)
|
||||
}
|
||||
atomic.AddUint64(&rowsProcessedTotal, uint64(rowsProcessed))
|
||||
// Drain the remaining work
|
||||
for range workCh {
|
||||
}
|
||||
doneCh <- err
|
||||
}(uint(i))
|
||||
}
|
||||
|
||||
// Feed workers with work.
|
||||
tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
|
||||
for i := range rss.packedTimeseries {
|
||||
workCh <- &rss.packedTimeseries[i]
|
||||
tsw := ×eriesWork{
|
||||
rss: rss,
|
||||
pts: &rss.packedTimeseries[i],
|
||||
f: f,
|
||||
doneCh: make(chan error, 1),
|
||||
}
|
||||
timeseriesWorkCh <- tsw
|
||||
tsws[i] = tsw
|
||||
}
|
||||
seriesProcessedTotal := len(rss.packedTimeseries)
|
||||
rss.packedTimeseries = rss.packedTimeseries[:0]
|
||||
close(workCh)
|
||||
|
||||
// Wait until workers finish.
|
||||
var errors []error
|
||||
for i := 0; i < workersCount; i++ {
|
||||
if err := <-doneCh; err != nil {
|
||||
errors = append(errors, err)
|
||||
// Wait until work is complete.
|
||||
var firstErr error
|
||||
rowsProcessedTotal := 0
|
||||
for _, tsw := range tsws {
|
||||
if err := <-tsw.doneCh; err != nil && firstErr == nil {
|
||||
// Return just the first error, since other errors
|
||||
// are likely duplicate the first error.
|
||||
firstErr = err
|
||||
}
|
||||
rowsProcessedTotal += tsw.rowsProcessed
|
||||
}
|
||||
|
||||
perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
|
||||
perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
|
||||
if len(errors) > 0 {
|
||||
// Return just the first error, since other errors
|
||||
// is likely duplicate the first error.
|
||||
return errors[0]
|
||||
}
|
||||
return nil
|
||||
return firstErr
|
||||
}
|
||||
|
||||
var perQueryRowsProcessed = metrics.NewHistogram(`vm_per_query_rows_processed_count`)
|
||||
|
@ -159,70 +162,74 @@ type packedTimeseries struct {
|
|||
brs []storage.BlockRef
|
||||
}
|
||||
|
||||
var unpackWorkCh = make(chan *unpackWork, gomaxprocs)
|
||||
|
||||
type unpackWork struct {
|
||||
br storage.BlockRef
|
||||
tr storage.TimeRange
|
||||
fetchData bool
|
||||
doneCh chan error
|
||||
sb *sortBlock
|
||||
}
|
||||
|
||||
func init() {
|
||||
for i := 0; i < gomaxprocs; i++ {
|
||||
go unpackWorker()
|
||||
}
|
||||
}
|
||||
|
||||
func unpackWorker() {
|
||||
for upw := range unpackWorkCh {
|
||||
sb := getSortBlock()
|
||||
if err := sb.unpackFrom(upw.br, upw.tr, upw.fetchData); err != nil {
|
||||
putSortBlock(sb)
|
||||
upw.doneCh <- fmt.Errorf("cannot unpack block: %s", err)
|
||||
continue
|
||||
}
|
||||
upw.sb = sb
|
||||
upw.doneCh <- nil
|
||||
}
|
||||
}
|
||||
|
||||
// Unpack unpacks pts to dst.
|
||||
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool, maxWorkersCount int) error {
|
||||
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error {
|
||||
dst.reset()
|
||||
|
||||
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
|
||||
return fmt.Errorf("cannot unmarshal metricName %q: %s", pts.metricName, err)
|
||||
}
|
||||
|
||||
workersCount := 1 + len(pts.brs)/32
|
||||
if workersCount > maxWorkersCount {
|
||||
workersCount = maxWorkersCount
|
||||
}
|
||||
if workersCount == 0 {
|
||||
logger.Panicf("BUG: workersCount cannot be zero")
|
||||
}
|
||||
|
||||
sbs := make([]*sortBlock, 0, len(pts.brs))
|
||||
var sbsLock sync.Mutex
|
||||
|
||||
workCh := make(chan storage.BlockRef, workersCount)
|
||||
doneCh := make(chan error)
|
||||
|
||||
// Start workers
|
||||
for i := 0; i < workersCount; i++ {
|
||||
go func() {
|
||||
var err error
|
||||
for br := range workCh {
|
||||
sb := getSortBlock()
|
||||
if err = sb.unpackFrom(br, tr, fetchData); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
sbsLock.Lock()
|
||||
sbs = append(sbs, sb)
|
||||
sbsLock.Unlock()
|
||||
}
|
||||
|
||||
// Drain the remaining work
|
||||
for range workCh {
|
||||
}
|
||||
doneCh <- err
|
||||
}()
|
||||
}
|
||||
|
||||
// Feed workers with work
|
||||
for _, br := range pts.brs {
|
||||
workCh <- br
|
||||
upws := make([]*unpackWork, len(pts.brs))
|
||||
for i, br := range pts.brs {
|
||||
upw := &unpackWork{
|
||||
br: br,
|
||||
tr: tr,
|
||||
fetchData: fetchData,
|
||||
doneCh: make(chan error, 1),
|
||||
}
|
||||
unpackWorkCh <- upw
|
||||
upws[i] = upw
|
||||
}
|
||||
pts.brs = pts.brs[:0]
|
||||
close(workCh)
|
||||
|
||||
// Wait until workers finish
|
||||
var errors []error
|
||||
for i := 0; i < workersCount; i++ {
|
||||
if err := <-doneCh; err != nil {
|
||||
errors = append(errors, err)
|
||||
// Wait until work is complete
|
||||
sbs := make([]*sortBlock, 0, len(pts.brs))
|
||||
var firstErr error
|
||||
for _, upw := range upws {
|
||||
if err := <-upw.doneCh; err != nil && firstErr == nil {
|
||||
// Return the first error only, since other errors are likely the same.
|
||||
firstErr = err
|
||||
}
|
||||
if firstErr == nil {
|
||||
sbs = append(sbs, upw.sb)
|
||||
} else {
|
||||
putSortBlock(upw.sb)
|
||||
}
|
||||
}
|
||||
if len(errors) > 0 {
|
||||
// Return the first error only, since other errors are likely the same.
|
||||
return errors[0]
|
||||
if firstErr != nil {
|
||||
return firstErr
|
||||
}
|
||||
|
||||
// Merge blocks
|
||||
mergeSortBlocks(dst, sbs)
|
||||
return nil
|
||||
}
|
||||
|
@ -537,25 +544,6 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli
|
|||
return &rss, nil
|
||||
}
|
||||
|
||||
func getResult() *Result {
|
||||
v := rsPool.Get()
|
||||
if v == nil {
|
||||
return &Result{}
|
||||
}
|
||||
return v.(*Result)
|
||||
}
|
||||
|
||||
func putResult(rs *Result) {
|
||||
if len(rs.Values) > 8192 {
|
||||
// Do not pool big results, since they may occupy too much memory.
|
||||
return
|
||||
}
|
||||
rs.reset()
|
||||
rsPool.Put(rs)
|
||||
}
|
||||
|
||||
var rsPool sync.Pool
|
||||
|
||||
func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) {
|
||||
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
|
||||
for _, tagFilters := range tagFilterss {
|
||||
|
|
Loading…
Reference in a new issue