app/vmagent: properly perform graceful shutdown, which was broken in the commit 1d1ba889fe

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065
This commit is contained in:
Aliaksandr Valialkin 2021-02-19 00:31:07 +02:00
parent 41d3ff4f2b
commit 72eef964d9
3 changed files with 14 additions and 4 deletions

View file

@ -227,6 +227,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
} }
rwctx.idx = 0 rwctx.idx = 0
rwctx.pss = nil rwctx.pss = nil
rwctx.fq.UnblockAllReaders()
rwctx.c.MustStop() rwctx.c.MustStop()
rwctx.c = nil rwctx.c = nil
rwctx.fq.MustClose() rwctx.fq.MustClose()

View file

@ -4,6 +4,7 @@
* FEATURE: vmagent: add `scrape_align_interval` config option, which can be used for aligning scrapes to the beginning of the configured interval. See [these docs](https://victoriametrics.github.io/vmagent.html#troubleshooting) for details. * FEATURE: vmagent: add `scrape_align_interval` config option, which can be used for aligning scrapes to the beginning of the configured interval. See [these docs](https://victoriametrics.github.io/vmagent.html#troubleshooting) for details.
* BUGFIX: vmagent: properly perform graceful shutdown on `SIGINT` and `SIGTERM` signals. The graceful shutdown has been broken in `v1.54.0`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065
* BUGFIX: reduce the probability of `duplicate time series` errors when querying Kubernetes metrics. * BUGFIX: reduce the probability of `duplicate time series` errors when querying Kubernetes metrics.

View file

@ -52,16 +52,24 @@ func MustOpenFastQueue(path, name string, maxInmemoryBlocks, maxPendingBytes int
return fq return fq
} }
// MustClose unblocks all the readers. // UnblockAllReaders unblocks all the readers.
// func (fq *FastQueue) UnblockAllReaders() {
// It is expected no new writers during and after the call.
func (fq *FastQueue) MustClose() {
fq.mu.Lock() fq.mu.Lock()
defer fq.mu.Unlock() defer fq.mu.Unlock()
// Unblock blocked readers // Unblock blocked readers
fq.mustStop = true fq.mustStop = true
fq.cond.Broadcast() fq.cond.Broadcast()
}
// MustClose unblocks all the readers.
//
// It is expected no new writers during and after the call.
func (fq *FastQueue) MustClose() {
fq.UnblockAllReaders()
fq.mu.Lock()
defer fq.mu.Unlock()
// flush blocks from fq.ch to fq.pq, so they can be persisted // flush blocks from fq.ch to fq.pq, so they can be persisted
fq.flushInmemoryBlocksToFileLocked() fq.flushInmemoryBlocksToFileLocked()