diff --git a/app/vmalert/remotewrite/remotewrite.go b/app/vmalert/remotewrite/remotewrite.go index 8157e4198..6597508a2 100644 --- a/app/vmalert/remotewrite/remotewrite.go +++ b/app/vmalert/remotewrite/remotewrite.go @@ -185,6 +185,11 @@ var ( bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`) ) +var ( + retryCount = 5 + retryBackoff = time.Second +) + // flush is a blocking function that marshals WriteRequest and sends // it to remote-write endpoint. Flush performs limited amount of retries // if request fails. @@ -202,12 +207,6 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) { } b := snappy.Encode(nil, data) - - const ( - retryCount = 5 - retryBackoff = time.Second - ) - for attempts := 0; attempts < retryCount; attempts++ { err := c.send(ctx, b) if err == nil { @@ -216,10 +215,10 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) { return } - _, isRetriable := err.(*retriableError) - logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, isRetriable) + _, isNotRetriable := err.(*nonRetriableError) + logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable) - if !isRetriable { + if isNotRetriable { // exit fast if error isn't retriable break } @@ -276,22 +275,23 @@ func (c *Client) send(ctx context.Context, data []byte) error { case 2: // respond with a HTTP 2xx status code when the write is successful. return nil - case 5: - // respond with HTTP status code 5xx when the write fails and SHOULD be retried. - return &retriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q", - resp.StatusCode, req.URL.Redacted(), body)} + case 4: + if resp.StatusCode != http.StatusTooManyRequests { + // MUST NOT retry write requests on HTTP 4xx responses other than 429 + return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q", + resp.StatusCode, req.URL.Redacted(), body)} + } + fallthrough default: - // respond with HTTP status code 4xx when the request is invalid, will never be able to succeed - // and should not be retried. return fmt.Errorf("unexpected response code %d for %s. Response body %q", resp.StatusCode, req.URL.Redacted(), body) } } -type retriableError struct { +type nonRetriableError struct { err error } -func (e *retriableError) Error() string { +func (e *nonRetriableError) Error() string { return e.err.Error() } diff --git a/app/vmalert/remotewrite/remotewrite_test.go b/app/vmalert/remotewrite/remotewrite_test.go index 4ca9812a5..920e0378d 100644 --- a/app/vmalert/remotewrite/remotewrite_test.go +++ b/app/vmalert/remotewrite/remotewrite_test.go @@ -7,6 +7,7 @@ import ( "math/rand" "net/http" "net/http/httptest" + "sync" "sync/atomic" "testing" "time" @@ -18,15 +19,30 @@ import ( ) func TestClient_Push(t *testing.T) { + oldRetryBackoff := retryBackoff + retryBackoff = time.Millisecond * 10 + defer func() { + retryBackoff = oldRetryBackoff + }() + testSrv := newRWServer() - cfg := Config{ + client, err := NewClient(context.Background(), Config{ Addr: testSrv.URL, MaxBatchSize: 100, - } - client, err := NewClient(context.Background(), cfg) + }) if err != nil { t.Fatalf("failed to create client: %s", err) } + + faultySrv := newFaultyRWServer() + faultyClient, err := NewClient(context.Background(), Config{ + Addr: faultySrv.URL, + MaxBatchSize: 50, + }) + if err != nil { + t.Fatalf("failed to create faulty client: %s", err) + } + r := rand.New(rand.NewSource(1)) const rowsN = 1e4 var sent int @@ -38,9 +54,16 @@ func TestClient_Push(t *testing.T) { }}, } err := client.Push(s) + if err != nil { + t.Fatalf("unexpected err: %s", err) + } if err == nil { sent++ } + err = faultyClient.Push(s) + if err != nil { + t.Fatalf("unexpected err: %s", err) + } } if sent == 0 { t.Fatalf("0 series sent") @@ -48,10 +71,17 @@ func TestClient_Push(t *testing.T) { if err := client.Close(); err != nil { t.Fatalf("failed to close client: %s", err) } + if err := faultyClient.Close(); err != nil { + t.Fatalf("failed to close faulty client: %s", err) + } got := testSrv.accepted() if got != sent { t.Fatalf("expected to have %d series; got %d", sent, got) } + got = faultySrv.accepted() + if got != sent { + t.Fatalf("expected to have %d series for faulty client; got %d", sent, got) + } } func newRWServer() *rwServer { @@ -117,3 +147,42 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) { atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries))) w.WriteHeader(http.StatusNoContent) } + +// faultyRWServer sometimes respond with 5XX status code +// or just closes the connection. Is used for testing retries. +type faultyRWServer struct { + *rwServer + + reqsMu sync.Mutex + reqs int +} + +func newFaultyRWServer() *faultyRWServer { + rw := &faultyRWServer{ + rwServer: &rwServer{}, + } + rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler)) + return rw +} + +func (frw *faultyRWServer) handler(w http.ResponseWriter, r *http.Request) { + frw.reqsMu.Lock() + reqs := frw.reqs + frw.reqs++ + if frw.reqs > 5 { + frw.reqs = 0 + } + frw.reqsMu.Unlock() + + switch reqs { + case 0, 1, 2, 3: + frw.rwServer.handler(w, r) + case 4: + hj, _ := w.(http.Hijacker) + conn, _, _ := hj.Hijack() + conn.Close() + case 5: + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("server overloaded")) + } +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index aa99769bb..ddf86a581 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -43,6 +43,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager.html): fix an issue with `vmbackupmanager` not being able to restore data from a backup stored in GCS. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4420) for details. * BUGFIX: [storage](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html): Properly creates `parts.json` after migration from versions below `v1.90.0. It must fix errors on start-up after unclean shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4336) for details. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix a memory leak issue associated with chart updates. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4455). +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): retry all errors except 4XX status codes while pushing via remote-write to the remote storage. Previously, errors like broken connection could prevent vmalert from retrying the request. ## [v1.91.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.91.2)