lib/promscrape: fix cancelling in-flight scrape requests during configuration reload (#3791)

* lib/promscrape: fix cancelling in-flight scrape requests during configuration reload when using `streamParse` mode (see #3747)

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>

* Update docs/CHANGELOG.md

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Zakhar Bessarab 2023-02-09 23:13:06 +04:00 committed by GitHub
parent 513707a8c7
commit f13a255918
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 9 deletions

View file

@ -28,6 +28,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): prevent disabling state updates tracking per rule via setting values < 1. The minimum number of update states to track is now set to 1.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): properly update `debug` and `update_entries_limit` rule's params on config's hot-reload.
* BUGFIX: properly initialize the `vm_concurrent_insert_current` metric before exposing it. Previously this metric could be left uninitialized in some cases, e.g. its value was zero. This could lead to false alerts for the query `avg_over_time(vm_concurrent_insert_current[1m]) >= vm_concurrent_insert_capacity`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3761).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): immediately cancel in-flight scrape requests during configuration reload when using [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously `vmagent` could wait for long time until all the in-flight requests are completed before reloading the configuration. This could significantly slow down configuration reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3747).
## [v1.87.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.0)

View file

@ -43,6 +43,7 @@ type client struct {
// It may be useful for scraping targets with millions of metrics per target.
sc *http.Client
ctx context.Context
scrapeURL string
scrapeTimeoutSecondsStr string
hostPort string
@ -77,7 +78,7 @@ func concatTwoStrings(x, y string) string {
return s
}
func newClient(sw *ScrapeWork) *client {
func newClient(sw *ScrapeWork, ctx context.Context) *client {
var u fasthttp.URI
u.Update(sw.ScrapeURL)
hostPort := string(u.Host())
@ -165,6 +166,7 @@ func newClient(sw *ScrapeWork) *client {
}
return &client{
hc: hc,
ctx: ctx,
sc: sc,
scrapeURL: sw.ScrapeURL,
scrapeTimeoutSecondsStr: fmt.Sprintf("%.3f", sw.ScrapeTimeout.Seconds()),
@ -182,7 +184,7 @@ func newClient(sw *ScrapeWork) *client {
func (c *client) GetStreamReader() (*streamReader, error) {
deadline := time.Now().Add(c.sc.Timeout)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
ctx, cancel := context.WithDeadline(c.ctx, deadline)
req, err := http.NewRequestWithContext(ctx, "GET", c.scrapeURL, nil)
if err != nil {
cancel()

View file

@ -2,6 +2,7 @@ package promscrape
import (
"bytes"
"context"
"flag"
"fmt"
"io"
@ -341,7 +342,7 @@ func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarsha
func (sg *scraperGroup) stop() {
sg.mLock.Lock()
for _, sc := range sg.m {
close(sc.stopCh)
sc.cancel()
}
sg.m = nil
sg.mLock.Unlock()
@ -383,7 +384,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
var stoppedChs []<-chan struct{}
for key, sc := range sg.m {
if _, ok := swsMap[key]; !ok {
close(sc.stopCh)
sc.cancel()
stoppedChs = append(stoppedChs, sc.stoppedCh)
delete(sg.m, key)
deletionsCount++
@ -406,7 +407,7 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
sg.wg.Done()
close(sc.stoppedCh)
}()
sc.sw.run(sc.stopCh, sg.globalStopCh)
sc.sw.run(sc.ctx.Done(), sg.globalStopCh)
tsmGlobal.Unregister(&sc.sw)
sg.activeScrapers.Dec()
sg.scrapersStopped.Inc()
@ -425,19 +426,21 @@ func (sg *scraperGroup) update(sws []*ScrapeWork) {
type scraper struct {
sw scrapeWork
// stopCh is unblocked when the given scraper must be stopped.
stopCh chan struct{}
ctx context.Context
cancel context.CancelFunc
// stoppedCh is unblocked when the given scraper is stopped.
stoppedCh chan struct{}
}
func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper {
ctx, cancel := context.WithCancel(context.Background())
sc := &scraper{
stopCh: make(chan struct{}),
ctx: ctx,
cancel: cancel,
stoppedCh: make(chan struct{}),
}
c := newClient(sw)
c := newClient(sw, ctx)
sc.sw.Config = sw
sc.sw.ScrapeGroup = group
sc.sw.ReadData = c.ReadData