From ed10141ff8b17424217821b1e3103945c2788feb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 16 Jul 2021 00:34:33 +0300 Subject: [PATCH] app/vmselect/netstorage: use more scalable algorithm for ditributing the work among among multiple channels on systems with big number of CPU cores --- app/vmselect/netstorage/netstorage.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index a263a361f..578a80e4a 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -19,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastrand" ) var ( @@ -119,7 +120,6 @@ func putTimeseriesWork(tsw *timeseriesWork) { var tswPool sync.Pool var timeseriesWorkChs []chan *timeseriesWork -var timeseriesWorkIdx uint32 func init() { timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs) @@ -130,9 +130,14 @@ func init() { } func scheduleTimeseriesWork(tsw *timeseriesWork) { + if len(timeseriesWorkChs) == 1 { + // Fast path for a single CPU core + timeseriesWorkChs[0] <- tsw + return + } attempts := 0 for { - idx := atomic.AddUint32(×eriesWorkIdx, 1) % uint32(len(timeseriesWorkChs)) + idx := fastrand.Uint32n(uint32(len(timeseriesWorkChs))) select { case timeseriesWorkChs[idx] <- tsw: return @@ -308,9 +313,14 @@ func init() { } func scheduleUnpackWork(uw *unpackWork) { + if len(unpackWorkChs) == 1 { + // Fast path for a single CPU core + unpackWorkChs[0] <- uw + return + } attempts := 0 for { - idx := atomic.AddUint32(&unpackWorkIdx, 1) % uint32(len(unpackWorkChs)) + idx := fastrand.Uint32n(uint32(len(unpackWorkChs))) select { case unpackWorkChs[idx] <- uw: return