From 1d1ba889fe61b2ce55216e616428839261e8d07c Mon Sep 17 00:00:00 2001 From: Nikolay Date: Wed, 17 Feb 2021 22:23:38 +0300 Subject: [PATCH] adds pushback for fastqueue, (#1075) during shutdown currently sending block was lost, now its pushed back to fast queue and will be flushed on disk, it may lead to data duplication. https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1065 --- .../prometheus/with_request_extra_filter.json | 8 ++++++++ app/vmagent/remotewrite/client.go | 18 ++++++++++-------- app/vmagent/remotewrite/remotewrite.go | 4 ++-- 3 files changed, 20 insertions(+), 10 deletions(-) create mode 100644 app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json diff --git a/app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json b/app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json new file mode 100644 index 000000000..5bfbece19 --- /dev/null +++ b/app/victoria-metrics/testdata/prometheus/with_request_extra_filter.json @@ -0,0 +1,8 @@ +{ + "name": "basic_select_with_extra_labels", + "data": ["[{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.tenant.limits\"},{\"name\":\"baz\",\"value\":\"qux\"},{\"name\":\"tenant\",\"value\":\"dev\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]},{\"labels\":[{\"name\":\"__name__\",\"value\":\"prometheus.up\"},{\"name\":\"baz\",\"value\":\"qux\"}],\"samples\":[{\"value\":100000,\"timestamp\":\"{TIME_MS}\"}]}]"], + "query": ["/api/v1/export?match={__name__!=''}&extra_label=tenant=dev"], + "result_metrics": [ + {"metric":{"__name__":"prometheus.tenant.limits","baz":"qux","tenant": "dev"},"values":[100000], "timestamps": ["{TIME_MS}"]} + ] +} diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 2b0332f05..d13c16e43 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -178,7 +178,9 @@ func (c *client) runWorker() { return } go func() { - c.sendBlock(block) + if delivered := c.sendBlockOk(block); !delivered { + return + } ch <- struct{}{} }() select { @@ -190,17 +192,17 @@ func (c *client) runWorker() { graceDuration := 5 * time.Second select { case <-ch: + logger.Infof("stop ok") // The block has been sent successfully. case <-time.After(graceDuration): - logger.Errorf("couldn't sent block with size %d bytes to %q in %.3f seconds during shutdown; dropping it", - len(block), c.sanitizedURL, graceDuration.Seconds()) + c.fq.MustWriteBlock(block) } return } } } -func (c *client) sendBlock(block []byte) { +func (c *client) sendBlockOk(block []byte) bool { c.rl.register(len(block), c.stopCh) retryDuration := time.Second retriesCount := 0 @@ -236,7 +238,7 @@ again: select { case <-c.stopCh: timerpool.Put(t) - return + return false case <-t.C: timerpool.Put(t) } @@ -247,7 +249,7 @@ again: if statusCode/100 == 2 { _ = resp.Body.Close() c.requestsOKCount.Inc() - return + return true } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() if statusCode == 409 { @@ -258,7 +260,7 @@ again: logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+ "response body=%q", len(block), c.sanitizedURL, statusCode, body) c.packetsDropped.Inc() - return + return true } // Unexpected status code returned @@ -279,7 +281,7 @@ again: select { case <-c.stopCh: timerpool.Put(t) - return + return false case <-t.C: timerpool.Put(t) } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index ee3631519..337772560 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -227,10 +227,10 @@ func (rwctx *remoteWriteCtx) MustStop() { } rwctx.idx = 0 rwctx.pss = nil - rwctx.fq.MustClose() - rwctx.fq = nil rwctx.c.MustStop() rwctx.c = nil + rwctx.fq.MustClose() + rwctx.fq = nil rwctx.relabelMetricsDropped = nil }