mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-19 15:30:17 +00:00
app/vmselect/netstorage: improve scalability of blocks processing on systems with multiple CPU cores
Previously a single syncwg.WaitGroup was used for tracking the lifetime of processBlock callbacks across all the per-vmstorage goroutines. This could be slow on systems with many CPU cores because of inter-CPU synchronization overhead. Use a separate per-vmstorage sync.WaitGroup instead in order to reduce inter-CPU synchronization overhead. This should imrpove performance for heavy queries over big number of blocks on multi-CPU systems.
This commit is contained in:
parent
690b505975
commit
ec3df0b913
2 changed files with 37 additions and 22 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
@ -28,7 +29,6 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
|
|
||||||
"github.com/VictoriaMetrics/metrics"
|
"github.com/VictoriaMetrics/metrics"
|
||||||
"github.com/cespare/xxhash/v2"
|
"github.com/cespare/xxhash/v2"
|
||||||
"github.com/valyala/fastrand"
|
"github.com/valyala/fastrand"
|
||||||
|
@ -655,9 +655,9 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadli
|
||||||
}
|
}
|
||||||
|
|
||||||
// Push mrs to storage nodes in parallel.
|
// Push mrs to storage nodes in parallel.
|
||||||
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.registerMetricNamesRequests.Inc()
|
sn.registerMetricNamesRequests.Inc()
|
||||||
err := sn.registerMetricNames(qt, mrsPerNode[idx], deadline)
|
err := sn.registerMetricNames(qt, mrsPerNode[workerIdx], deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sn.registerMetricNamesErrors.Inc()
|
sn.registerMetricNamesErrors.Inc()
|
||||||
}
|
}
|
||||||
|
@ -686,7 +686,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
||||||
deletedCount int
|
deletedCount int
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.deleteSeriesRequests.Inc()
|
sn.deleteSeriesRequests.Inc()
|
||||||
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
|
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -727,7 +727,7 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
|
||||||
labelNames []string
|
labelNames []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.labelNamesRequests.Inc()
|
sn.labelNamesRequests.Inc()
|
||||||
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
|
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -829,7 +829,7 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
|
||||||
labelValues []string
|
labelValues []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.labelValuesRequests.Inc()
|
sn.labelValuesRequests.Inc()
|
||||||
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
|
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -911,7 +911,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
|
||||||
suffixes []string
|
suffixes []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.tagValueSuffixesRequests.Inc()
|
sn.tagValueSuffixesRequests.Inc()
|
||||||
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -975,7 +975,7 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
|
||||||
status *storage.TSDBStatus
|
status *storage.TSDBStatus
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.tsdbStatusRequests.Inc()
|
sn.tsdbStatusRequests.Inc()
|
||||||
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
|
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1080,7 +1080,7 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
|
||||||
n uint64
|
n uint64
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.seriesCountRequests.Inc()
|
sn.seriesCountRequests.Inc()
|
||||||
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
|
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1213,7 +1213,7 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
|
||||||
metricNames []string
|
metricNames []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.searchMetricNamesRequests.Inc()
|
sn.searchMetricNamesRequests.Inc()
|
||||||
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
|
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1315,9 +1315,18 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
|
||||||
requestData := sq.Marshal(nil)
|
requestData := sq.Marshal(nil)
|
||||||
|
|
||||||
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
|
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
|
||||||
|
// Use per-worker WaitGroup instead of a shared WaitGroup in order to avoid inter-CPU contention,
|
||||||
|
// which may siginificantly slow down the rate of processBlock calls on multi-CPU systems.
|
||||||
|
type wgWithPadding struct {
|
||||||
|
wg sync.WaitGroup
|
||||||
|
// Prevents false sharing on widespread platforms with
|
||||||
|
// 128 mod (cache line size) = 0 .
|
||||||
|
pad [128 - unsafe.Sizeof(sync.WaitGroup{})%128]byte
|
||||||
|
}
|
||||||
|
wgs := make([]wgWithPadding, len(storageNodes))
|
||||||
var stopped uint32
|
var stopped uint32
|
||||||
var wg syncwg.WaitGroup
|
f := func(mb *storage.MetricBlock, workerIdx int) error {
|
||||||
f := func(mb *storage.MetricBlock) error {
|
wg := &wgs[workerIdx].wg
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if atomic.LoadUint32(&stopped) != 0 {
|
if atomic.LoadUint32(&stopped) != 0 {
|
||||||
|
@ -1327,9 +1336,9 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{} {
|
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} {
|
||||||
sn.searchRequests.Inc()
|
sn.searchRequests.Inc()
|
||||||
err := sn.processSearchQuery(qt, requestData, f, deadline)
|
err := sn.processSearchQuery(qt, requestData, f, workerIdx, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sn.searchErrors.Inc()
|
sn.searchErrors.Inc()
|
||||||
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
|
||||||
|
@ -1344,7 +1353,9 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
|
||||||
})
|
})
|
||||||
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
|
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
|
||||||
atomic.StoreUint32(&stopped, 1)
|
atomic.StoreUint32(&stopped, 1)
|
||||||
wg.Wait()
|
for i := range wgs {
|
||||||
|
wgs[i].wg.Wait()
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err)
|
return isPartial, fmt.Errorf("cannot fetch query results from vmstorage nodes: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -1356,12 +1367,12 @@ type storageNodesRequest struct {
|
||||||
resultsCh chan interface{}
|
resultsCh chan interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, idx int, sn *storageNode) interface{}) *storageNodesRequest {
|
func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{}) *storageNodesRequest {
|
||||||
resultsCh := make(chan interface{}, len(storageNodes))
|
resultsCh := make(chan interface{}, len(storageNodes))
|
||||||
for idx, sn := range storageNodes {
|
for idx, sn := range storageNodes {
|
||||||
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
|
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
|
||||||
go func(idx int, sn *storageNode) {
|
go func(workerIdx int, sn *storageNode) {
|
||||||
result := f(qtChild, idx, sn)
|
result := f(qtChild, workerIdx, sn)
|
||||||
resultsCh <- result
|
resultsCh <- result
|
||||||
qtChild.Done()
|
qtChild.Done()
|
||||||
}(idx, sn)
|
}(idx, sn)
|
||||||
|
@ -1631,9 +1642,10 @@ func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestD
|
||||||
return metricNames, nil
|
return metricNames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock) error, deadline searchutils.Deadline) error {
|
func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerIdx int) error,
|
||||||
|
workerIdx int, deadline searchutils.Deadline) error {
|
||||||
f := func(bc *handshake.BufferedConn) error {
|
f := func(bc *handshake.BufferedConn) error {
|
||||||
if err := sn.processSearchQueryOnConn(bc, requestData, processBlock); err != nil {
|
if err := sn.processSearchQueryOnConn(bc, requestData, processBlock, workerIdx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -2133,7 +2145,8 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn
|
||||||
|
|
||||||
const maxMetricNameSize = 64 * 1024
|
const maxMetricNameSize = 64 * 1024
|
||||||
|
|
||||||
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, processBlock func(mb *storage.MetricBlock) error) error {
|
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte,
|
||||||
|
processBlock func(mb *storage.MetricBlock, workerIdx int) error, workerIdx int) error {
|
||||||
// Send the request to sn.
|
// Send the request to sn.
|
||||||
if err := writeBytes(bc, requestData); err != nil {
|
if err := writeBytes(bc, requestData); err != nil {
|
||||||
return fmt.Errorf("cannot write requestData: %w", err)
|
return fmt.Errorf("cannot write requestData: %w", err)
|
||||||
|
@ -2173,7 +2186,7 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ
|
||||||
blocksRead++
|
blocksRead++
|
||||||
sn.metricBlocksRead.Inc()
|
sn.metricBlocksRead.Inc()
|
||||||
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
sn.metricRowsRead.Add(mb.Block.RowsCount())
|
||||||
if err := processBlock(&mb); err != nil {
|
if err := processBlock(&mb, workerIdx); err != nil {
|
||||||
return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err)
|
return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance for heavy queries on systems with many CPU cores.
|
||||||
|
|
||||||
* BUGFIX: prevent from excess CPU usage when the storage enters [read-only mode](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#readonly-mode).
|
* BUGFIX: prevent from excess CPU usage when the storage enters [read-only mode](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#readonly-mode).
|
||||||
|
|
||||||
## [v1.80.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.80.0)
|
## [v1.80.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.80.0)
|
||||||
|
|
Loading…
Reference in a new issue