mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
adds pushback for fastqueue, (#1075)
during shutdown currently sending block was lost, now its pushed back to fast queue and will be flushed on disk, it may lead to data duplication. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065
This commit is contained in:
parent
4b110fa21c
commit
1d1ba889fe
3 changed files with 20 additions and 10 deletions
8
app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json
vendored
Normal file
8
app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json
vendored
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
{
|
||||||
|
"name": "basic_select_with_extra_labels",
|
||||||
|
"data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.tenant.limits\"},{\"name\":\"baz\",\"value\":\"qux\"},{\"name\":\"tenant\",\"value\":\"dev\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]},{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.up\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"],
|
||||||
|
"query": ["/api/v1/export?match={__name__!=''}&extra_label=tenant=dev"],
|
||||||
|
"result_metrics": [
|
||||||
|
{"metric":{"__name__":"prometheus.tenant.limits","baz":"qux","tenant": "dev"},"values":[100000], "timestamps": ["{TIME_MS}"]}
|
||||||
|
]
|
||||||
|
}
|
|
@ -178,7 +178,9 @@ func (c *client) runWorker() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
c.sendBlock(block)
|
if delivered := c.sendBlockOk(block); !delivered {
|
||||||
|
return
|
||||||
|
}
|
||||||
ch <- struct{}{}
|
ch <- struct{}{}
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
|
@ -190,17 +192,17 @@ func (c *client) runWorker() {
|
||||||
graceDuration := 5 * time.Second
|
graceDuration := 5 * time.Second
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case <-ch:
|
||||||
|
logger.Infof("stop ok")
|
||||||
// The block has been sent successfully.
|
// The block has been sent successfully.
|
||||||
case <-time.After(graceDuration):
|
case <-time.After(graceDuration):
|
||||||
logger.Errorf("couldn't sent block with size %d bytes to %q in %.3f seconds during shutdown; dropping it",
|
c.fq.MustWriteBlock(block)
|
||||||
len(block), c.sanitizedURL, graceDuration.Seconds())
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *client) sendBlock(block []byte) {
|
func (c *client) sendBlockOk(block []byte) bool {
|
||||||
c.rl.register(len(block), c.stopCh)
|
c.rl.register(len(block), c.stopCh)
|
||||||
retryDuration := time.Second
|
retryDuration := time.Second
|
||||||
retriesCount := 0
|
retriesCount := 0
|
||||||
|
@ -236,7 +238,7 @@ again:
|
||||||
select {
|
select {
|
||||||
case <-c.stopCh:
|
case <-c.stopCh:
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
return
|
return false
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
}
|
}
|
||||||
|
@ -247,7 +249,7 @@ again:
|
||||||
if statusCode/100 == 2 {
|
if statusCode/100 == 2 {
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
c.requestsOKCount.Inc()
|
c.requestsOKCount.Inc()
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
|
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
|
||||||
if statusCode == 409 {
|
if statusCode == 409 {
|
||||||
|
@ -258,7 +260,7 @@ again:
|
||||||
logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+
|
logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+
|
||||||
"response body=%q", len(block), c.sanitizedURL, statusCode, body)
|
"response body=%q", len(block), c.sanitizedURL, statusCode, body)
|
||||||
c.packetsDropped.Inc()
|
c.packetsDropped.Inc()
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unexpected status code returned
|
// Unexpected status code returned
|
||||||
|
@ -279,7 +281,7 @@ again:
|
||||||
select {
|
select {
|
||||||
case <-c.stopCh:
|
case <-c.stopCh:
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
return
|
return false
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
timerpool.Put(t)
|
timerpool.Put(t)
|
||||||
}
|
}
|
||||||
|
|
|
@ -227,10 +227,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||||
}
|
}
|
||||||
rwctx.idx = 0
|
rwctx.idx = 0
|
||||||
rwctx.pss = nil
|
rwctx.pss = nil
|
||||||
rwctx.fq.MustClose()
|
|
||||||
rwctx.fq = nil
|
|
||||||
rwctx.c.MustStop()
|
rwctx.c.MustStop()
|
||||||
rwctx.c = nil
|
rwctx.c = nil
|
||||||
|
rwctx.fq.MustClose()
|
||||||
|
rwctx.fq = nil
|
||||||
|
|
||||||
rwctx.relabelMetricsDropped = nil
|
rwctx.relabelMetricsDropped = nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue