From 3921d8afaec9051ce6fdbaa0d2bec31c7a97bb45 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@gmail.com>
Date: Mon, 26 Jul 2021 15:38:51 +0300
Subject: [PATCH] app/vmselect: prevent from possible deadlock when f callback
 blocks inside RunParallel

---
 app/vmselect/netstorage/netstorage.go | 90 ++++++++++++++++++---------
 docs/CHANGELOG.md                     |  1 +
 2 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go
index 630a67c0f7..5e1cfa52fa 100644
--- a/app/vmselect/netstorage/netstorage.go
+++ b/app/vmselect/netstorage/netstorage.go
@@ -122,32 +122,22 @@ func putTimeseriesWork(tsw *timeseriesWork) {
 
 var tswPool sync.Pool
 
-var timeseriesWorkChs []chan *timeseriesWork
-
-func init() {
-	timeseriesWorkChs = make([]chan *timeseriesWork, gomaxprocs)
-	for i := range timeseriesWorkChs {
-		timeseriesWorkChs[i] = make(chan *timeseriesWork, 16)
-		go timeseriesWorker(timeseriesWorkChs[i], uint(i))
-	}
-}
-
-func scheduleTimeseriesWork(tsw *timeseriesWork) {
-	if len(timeseriesWorkChs) == 1 {
+func scheduleTimeseriesWork(workChs []chan *timeseriesWork, tsw *timeseriesWork) {
+	if len(workChs) == 1 {
 		// Fast path for a single CPU core
-		timeseriesWorkChs[0] <- tsw
+		workChs[0] <- tsw
 		return
 	}
 	attempts := 0
 	for {
-		idx := fastrand.Uint32n(uint32(len(timeseriesWorkChs)))
+		idx := fastrand.Uint32n(uint32(len(workChs)))
 		select {
-		case timeseriesWorkChs[idx] <- tsw:
+		case workChs[idx] <- tsw:
 			return
 		default:
 			attempts++
-			if attempts >= len(timeseriesWorkChs) {
-				timeseriesWorkChs[idx] <- tsw
+			if attempts >= len(workChs) {
+				workChs[idx] <- tsw
 				return
 			}
 		}
@@ -155,8 +145,11 @@ func scheduleTimeseriesWork(tsw *timeseriesWork) {
 }
 
 func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
-	var rs Result
-	var rsLastResetTime uint64
+	v := resultPool.Get()
+	if v == nil {
+		v = &result{}
+	}
+	r := v.(*result)
 	for tsw := range ch {
 		if atomic.LoadUint32(tsw.mustStop) != 0 {
 			tsw.doneCh <- nil
@@ -168,29 +161,38 @@ func timeseriesWorker(ch <-chan *timeseriesWork, workerID uint) {
 			tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
 			continue
 		}
-		if err := tsw.pts.Unpack(rss.tbf, &rs, rss.tr, rss.fetchData, rss.at); err != nil {
+		if err := tsw.pts.Unpack(rss.tbf, &r.rs, rss.tr, rss.fetchData, rss.at); err != nil {
 			atomic.StoreUint32(tsw.mustStop, 1)
 			tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
 			continue
 		}
-		if len(rs.Timestamps) > 0 || !rss.fetchData {
-			if err := tsw.f(&rs, workerID); err != nil {
+		if len(r.rs.Timestamps) > 0 || !rss.fetchData {
+			if err := tsw.f(&r.rs, workerID); err != nil {
 				atomic.StoreUint32(tsw.mustStop, 1)
 				tsw.doneCh <- err
 				continue
 			}
 		}
-		tsw.rowsProcessed = len(rs.Values)
+		tsw.rowsProcessed = len(r.rs.Values)
 		tsw.doneCh <- nil
 		currentTime := fasttime.UnixTimestamp()
-		if cap(rs.Values) > 1024*1024 && 4*len(rs.Values) < cap(rs.Values) && currentTime-rsLastResetTime > 10 {
-			// Reset rs in order to preseve memory usage after processing big time series with millions of rows.
-			rs = Result{}
-			rsLastResetTime = currentTime
+		if cap(r.rs.Values) > 1024*1024 && 4*len(r.rs.Values) < cap(r.rs.Values) && currentTime-r.lastResetTime > 10 {
+			// Reset r.rs in order to preseve memory usage after processing big time series with millions of rows.
+			r.rs = Result{}
+			r.lastResetTime = currentTime
 		}
 	}
+	r.rs.reset()
+	resultPool.Put(r)
 }
 
+type result struct {
+	rs            Result
+	lastResetTime uint64
+}
+
+var resultPool sync.Pool
+
 // RunParallel runs f in parallel for all the results from rss.
 //
 // f shouldn't hold references to rs after returning.
@@ -204,6 +206,30 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
 		rss.tbf = nil
 	}()
 
+	// Spin up local workers.
+	//
+	// Do not use a global workChs with a global pool of workers, since it may lead to a deadlock in the following case:
+	// - RunParallel is called with f, which blocks without forward progress.
+	// - All the workers in the global pool became blocked in f.
+	// - workChs is filled up, so it cannot accept new work items from other RunParallel calls.
+	workers := len(rss.packedTimeseries)
+	if workers > gomaxprocs {
+		workers = gomaxprocs
+	}
+	if workers < 1 {
+		workers = 1
+	}
+	workChs := make([]chan *timeseriesWork, workers)
+	var workChsWG sync.WaitGroup
+	for i := 0; i < workers; i++ {
+		workChs[i] = make(chan *timeseriesWork, 16)
+		workChsWG.Add(1)
+		go func(workerID int) {
+			defer workChsWG.Done()
+			timeseriesWorker(workChs[workerID], uint(workerID))
+		}(i)
+	}
+
 	// Feed workers with work.
 	tsws := make([]*timeseriesWork, len(rss.packedTimeseries))
 	var mustStop uint32
@@ -213,7 +239,7 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
 		tsw.pts = &rss.packedTimeseries[i]
 		tsw.f = f
 		tsw.mustStop = &mustStop
-		scheduleTimeseriesWork(tsw)
+		scheduleTimeseriesWork(workChs, tsw)
 		tsws[i] = tsw
 	}
 	seriesProcessedTotal := len(rss.packedTimeseries)
@@ -233,6 +259,13 @@ func (rss *Results) RunParallel(f func(rs *Result, workerID uint) error) error {
 
 	perQueryRowsProcessed.Update(float64(rowsProcessedTotal))
 	perQuerySeriesProcessed.Update(float64(seriesProcessedTotal))
+
+	// Shut down local workers
+	for _, workCh := range workChs {
+		close(workCh)
+	}
+	workChsWG.Wait()
+
 	return firstErr
 }
 
@@ -310,7 +343,6 @@ func putUnpackWork(upw *unpackWork) {
 var unpackWorkPool sync.Pool
 
 var unpackWorkChs []chan *unpackWork
-var unpackWorkIdx uint32
 
 func init() {
 	unpackWorkChs = make([]chan *unpackWork, gomaxprocs)
diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 6dcb559439..41e860007c 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -8,6 +8,7 @@ sort: 15
 
 * FEATURE: add `-search.maxSamplesPerSeries` command-line flag for limiting the number of raw samples a single query could process per each time series. This option can prevent from out of memory errors when a query processes tens of millions of raw samples per series. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1067).
 
+* BUGFIX: vmselect: prevent from possible deadlock when multiple `target` query args are passed to [Graphite Render API](https://docs.victoriametrics.com/#graphite-render-api-usage).
 * BUGFIX: return series with `a op b` labels and `N` values for `(a op b) default N` if `(a op b)` returns series with all NaN values. Previously such series were removed.