From bbf663bd0436c75b20818e5001a7d4822936614c Mon Sep 17 00:00:00 2001
From: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Date: Thu, 9 Feb 2023 23:13:06 +0400
Subject: [PATCH] lib/promscrape: fix cancelling in-flight scrape requests
 during configuration reload  (#3791)

* lib/promscrape: fix cancelling in-flight scrape requests during configuration reload when using `streamParse` mode (see #3747)

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* Update docs/CHANGELOG.md

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
---
 docs/CHANGELOG.md         |  1 +
 lib/promscrape/client.go  |  6 ++++--
 lib/promscrape/scraper.go | 17 ++++++++++-------
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md
index 978f099200..55713dd8d0 100644
--- a/docs/CHANGELOG.md
+++ b/docs/CHANGELOG.md
@@ -28,6 +28,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
 * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent disabling state updates tracking per rule via setting values < 1. The minimum number of update states to track is now set to 1.
 * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly update `debug` and `update_entries_limit` rule's params on config's hot-reload.
 * BUGFIX: properly initialize the `vm_concurrent_insert_current` metric before exposing it. Previously this metric could be left uninitialized in some cases, e.g. its value was zero. This could lead to false alerts for the query `avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3761).
+* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): immediately cancel in-flight scrape requests during configuration reload when using [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously `vmagent` could wait for long time until all the in-flight requests are completed before reloading the configuration. This could significantly slow down configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747).
 
 ## [v1.87.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.0)
 
diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go
index 835c08e050..185c693ad0 100644
--- a/lib/promscrape/client.go
+++ b/lib/promscrape/client.go
@@ -43,6 +43,7 @@ type client struct {
 	// It may be useful for scraping targets with millions of metrics per target.
 	sc *http.Client
 
+	ctx                     context.Context
 	scrapeURL               string
 	scrapeTimeoutSecondsStr string
 	hostPort                string
@@ -77,7 +78,7 @@ func concatTwoStrings(x, y string) string {
 	return s
 }
 
-func newClient(sw *ScrapeWork) *client {
+func newClient(sw *ScrapeWork, ctx context.Context) *client {
 	var u fasthttp.URI
 	u.Update(sw.ScrapeURL)
 	hostPort := string(u.Host())
@@ -165,6 +166,7 @@ func newClient(sw *ScrapeWork) *client {
 	}
 	return &client{
 		hc:                      hc,
+		ctx:                     ctx,
 		sc:                      sc,
 		scrapeURL:               sw.ScrapeURL,
 		scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
@@ -182,7 +184,7 @@ func newClient(sw *ScrapeWork) *client {
 
 func (c *client) GetStreamReader() (*streamReader, error) {
 	deadline := time.Now().Add(c.sc.Timeout)
-	ctx, cancel := context.WithDeadline(context.Background(), deadline)
+	ctx, cancel := context.WithDeadline(c.ctx, deadline)
 	req, err := http.NewRequestWithContext(ctx, "GET", c.scrapeURL, nil)
 	if err != nil {
 		cancel()
diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go
index 7f72c21a89..0686e47c25 100644
--- a/lib/promscrape/scraper.go
+++ b/lib/promscrape/scraper.go
@@ -2,6 +2,7 @@ package promscrape
 
 import (
 	"bytes"
+	"context"
 	"flag"
 	"fmt"
 	"io"
@@ -341,7 +342,7 @@ func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarsha
 func (sg *scraperGroup) stop() {
 	sg.mLock.Lock()
 	for _, sc := range sg.m {
-		close(sc.stopCh)
+		sc.cancel()
 	}
 	sg.m = nil
 	sg.mLock.Unlock()
@@ -383,7 +384,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
 	var stoppedChs []<-chan struct{}
 	for key, sc := range sg.m {
 		if _, ok := swsMap[key]; !ok {
-			close(sc.stopCh)
+			sc.cancel()
 			stoppedChs = append(stoppedChs, sc.stoppedCh)
 			delete(sg.m, key)
 			deletionsCount++
@@ -406,7 +407,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
 				sg.wg.Done()
 				close(sc.stoppedCh)
 			}()
-			sc.sw.run(sc.stopCh, sg.globalStopCh)
+			sc.sw.run(sc.ctx.Done(), sg.globalStopCh)
 			tsmGlobal.Unregister(&sc.sw)
 			sg.activeScrapers.Dec()
 			sg.scrapersStopped.Inc()
@@ -425,19 +426,21 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
 type scraper struct {
 	sw scrapeWork
 
-	// stopCh is unblocked when the given scraper must be stopped.
-	stopCh chan struct{}
+	ctx    context.Context
+	cancel context.CancelFunc
 
 	// stoppedCh is unblocked when the given scraper is stopped.
 	stoppedCh chan struct{}
 }
 
 func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper {
+	ctx, cancel := context.WithCancel(context.Background())
 	sc := &scraper{
-		stopCh:    make(chan struct{}),
+		ctx:       ctx,
+		cancel:    cancel,
 		stoppedCh: make(chan struct{}),
 	}
-	c := newClient(sw)
+	c := newClient(sw, ctx)
 	sc.sw.Config = sw
 	sc.sw.ScrapeGroup = group
 	sc.sw.ReadData = c.ReadData