adds pushback for fastqueue, (#1075)

during shutdown currently sending block was lost,
now its pushed back to fast queue and will be flushed on disk,
it may lead to data duplication.
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065
This commit is contained in:
Nikolay 2021-02-17 22:23:38 +03:00 committed by Aliaksandr Valialkin
parent 57d192a2c2
commit 40973eda1c
3 changed files with 20 additions and 10 deletions

View file

@ -0,0 +1,8 @@
{
"name": "basic_select_with_extra_labels",
"data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.tenant.limits\"},{\"name\":\"baz\",\"value\":\"qux\"},{\"name\":\"tenant\",\"value\":\"dev\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]},{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.up\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"],
"query": ["/api/v1/export?match={__name__!=''}&extra_label=tenant=dev"],
"result_metrics": [
{"metric":{"__name__":"prometheus.tenant.limits","baz":"qux","tenant": "dev"},"values":[100000], "timestamps": ["{TIME_MS}"]}
]
}

View file

@ -178,7 +178,9 @@ func (c *client) runWorker() {
return return
} }
go func() { go func() {
c.sendBlock(block) if delivered := c.sendBlockOk(block); !delivered {
return
}
ch <- struct{}{} ch <- struct{}{}
}() }()
select { select {
@ -190,17 +192,17 @@ func (c *client) runWorker() {
graceDuration := 5 * time.Second graceDuration := 5 * time.Second
select { select {
case <-ch: case <-ch:
logger.Infof("stop ok")
// The block has been sent successfully. // The block has been sent successfully.
case <-time.After(graceDuration): case <-time.After(graceDuration):
logger.Errorf("couldn't sent block with size %d bytes to %q in %.3f seconds during shutdown; dropping it", c.fq.MustWriteBlock(block)
len(block), c.sanitizedURL, graceDuration.Seconds())
} }
return return
} }
} }
} }
func (c *client) sendBlock(block []byte) { func (c *client) sendBlockOk(block []byte) bool {
c.rl.register(len(block), c.stopCh) c.rl.register(len(block), c.stopCh)
retryDuration := time.Second retryDuration := time.Second
retriesCount := 0 retriesCount := 0
@ -236,7 +238,7 @@ again:
select { select {
case <-c.stopCh: case <-c.stopCh:
timerpool.Put(t) timerpool.Put(t)
return return false
case <-t.C: case <-t.C:
timerpool.Put(t) timerpool.Put(t)
} }
@ -247,7 +249,7 @@ again:
if statusCode/100 == 2 { if statusCode/100 == 2 {
_ = resp.Body.Close() _ = resp.Body.Close()
c.requestsOKCount.Inc() c.requestsOKCount.Inc()
return return true
} }
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
if statusCode == 409 { if statusCode == 409 {
@ -258,7 +260,7 @@ again:
logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+ logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+
"response body=%q", len(block), c.sanitizedURL, statusCode, body) "response body=%q", len(block), c.sanitizedURL, statusCode, body)
c.packetsDropped.Inc() c.packetsDropped.Inc()
return return true
} }
// Unexpected status code returned // Unexpected status code returned
@ -279,7 +281,7 @@ again:
select { select {
case <-c.stopCh: case <-c.stopCh:
timerpool.Put(t) timerpool.Put(t)
return return false
case <-t.C: case <-t.C:
timerpool.Put(t) timerpool.Put(t)
} }

View file

@ -227,10 +227,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
} }
rwctx.idx = 0 rwctx.idx = 0
rwctx.pss = nil rwctx.pss = nil
rwctx.fq.MustClose()
rwctx.fq = nil
rwctx.c.MustStop() rwctx.c.MustStop()
rwctx.c = nil rwctx.c = nil
rwctx.fq.MustClose()
rwctx.fq = nil
rwctx.relabelMetricsDropped = nil rwctx.relabelMetricsDropped = nil
} }