mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vmalert: retry all errors except 4XX status codes (#4461)
vmalert: retry all errors except 4XX status codes Retry all errors except 4XX status codes while pushing via remote-write to the remote storage. Previously, errors like broken connection could prevent vmalert from retrying the request. Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
parent
66b42a6772
commit
79a5499cb2
3 changed files with 90 additions and 20 deletions
|
@ -185,6 +185,11 @@ var (
|
||||||
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
bufferFlushDuration = metrics.NewHistogram(`vmalert_remotewrite_flush_duration_seconds`)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
retryCount = 5
|
||||||
|
retryBackoff = time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// flush is a blocking function that marshals WriteRequest and sends
|
// flush is a blocking function that marshals WriteRequest and sends
|
||||||
// it to remote-write endpoint. Flush performs limited amount of retries
|
// it to remote-write endpoint. Flush performs limited amount of retries
|
||||||
// if request fails.
|
// if request fails.
|
||||||
|
@ -202,12 +207,6 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
b := snappy.Encode(nil, data)
|
b := snappy.Encode(nil, data)
|
||||||
|
|
||||||
const (
|
|
||||||
retryCount = 5
|
|
||||||
retryBackoff = time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
for attempts := 0; attempts < retryCount; attempts++ {
|
for attempts := 0; attempts < retryCount; attempts++ {
|
||||||
err := c.send(ctx, b)
|
err := c.send(ctx, b)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -216,10 +215,10 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, isRetriable := err.(*retriableError)
|
_, isNotRetriable := err.(*nonRetriableError)
|
||||||
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, isRetriable)
|
logger.Warnf("attempt %d to send request failed: %s (retriable: %v)", attempts+1, err, !isNotRetriable)
|
||||||
|
|
||||||
if !isRetriable {
|
if isNotRetriable {
|
||||||
// exit fast if error isn't retriable
|
// exit fast if error isn't retriable
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -276,22 +275,23 @@ func (c *Client) send(ctx context.Context, data []byte) error {
|
||||||
case 2:
|
case 2:
|
||||||
// respond with a HTTP 2xx status code when the write is successful.
|
// respond with a HTTP 2xx status code when the write is successful.
|
||||||
return nil
|
return nil
|
||||||
case 5:
|
case 4:
|
||||||
// respond with HTTP status code 5xx when the write fails and SHOULD be retried.
|
if resp.StatusCode != http.StatusTooManyRequests {
|
||||||
return &retriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
// MUST NOT retry write requests on HTTP 4xx responses other than 429
|
||||||
|
return &nonRetriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||||
resp.StatusCode, req.URL.Redacted(), body)}
|
resp.StatusCode, req.URL.Redacted(), body)}
|
||||||
|
}
|
||||||
|
fallthrough
|
||||||
default:
|
default:
|
||||||
// respond with HTTP status code 4xx when the request is invalid, will never be able to succeed
|
|
||||||
// and should not be retried.
|
|
||||||
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||||
resp.StatusCode, req.URL.Redacted(), body)
|
resp.StatusCode, req.URL.Redacted(), body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type retriableError struct {
|
type nonRetriableError struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *retriableError) Error() string {
|
func (e *nonRetriableError) Error() string {
|
||||||
return e.err.Error()
|
return e.err.Error()
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -18,15 +19,30 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestClient_Push(t *testing.T) {
|
func TestClient_Push(t *testing.T) {
|
||||||
|
oldRetryBackoff := retryBackoff
|
||||||
|
retryBackoff = time.Millisecond * 10
|
||||||
|
defer func() {
|
||||||
|
retryBackoff = oldRetryBackoff
|
||||||
|
}()
|
||||||
|
|
||||||
testSrv := newRWServer()
|
testSrv := newRWServer()
|
||||||
cfg := Config{
|
client, err := NewClient(context.Background(), Config{
|
||||||
Addr: testSrv.URL,
|
Addr: testSrv.URL,
|
||||||
MaxBatchSize: 100,
|
MaxBatchSize: 100,
|
||||||
}
|
})
|
||||||
client, err := NewClient(context.Background(), cfg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to create client: %s", err)
|
t.Fatalf("failed to create client: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
faultySrv := newFaultyRWServer()
|
||||||
|
faultyClient, err := NewClient(context.Background(), Config{
|
||||||
|
Addr: faultySrv.URL,
|
||||||
|
MaxBatchSize: 50,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create faulty client: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
r := rand.New(rand.NewSource(1))
|
r := rand.New(rand.NewSource(1))
|
||||||
const rowsN = 1e4
|
const rowsN = 1e4
|
||||||
var sent int
|
var sent int
|
||||||
|
@ -38,9 +54,16 @@ func TestClient_Push(t *testing.T) {
|
||||||
}},
|
}},
|
||||||
}
|
}
|
||||||
err := client.Push(s)
|
err := client.Push(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %s", err)
|
||||||
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
sent++
|
sent++
|
||||||
}
|
}
|
||||||
|
err = faultyClient.Push(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if sent == 0 {
|
if sent == 0 {
|
||||||
t.Fatalf("0 series sent")
|
t.Fatalf("0 series sent")
|
||||||
|
@ -48,10 +71,17 @@ func TestClient_Push(t *testing.T) {
|
||||||
if err := client.Close(); err != nil {
|
if err := client.Close(); err != nil {
|
||||||
t.Fatalf("failed to close client: %s", err)
|
t.Fatalf("failed to close client: %s", err)
|
||||||
}
|
}
|
||||||
|
if err := faultyClient.Close(); err != nil {
|
||||||
|
t.Fatalf("failed to close faulty client: %s", err)
|
||||||
|
}
|
||||||
got := testSrv.accepted()
|
got := testSrv.accepted()
|
||||||
if got != sent {
|
if got != sent {
|
||||||
t.Fatalf("expected to have %d series; got %d", sent, got)
|
t.Fatalf("expected to have %d series; got %d", sent, got)
|
||||||
}
|
}
|
||||||
|
got = faultySrv.accepted()
|
||||||
|
if got != sent {
|
||||||
|
t.Fatalf("expected to have %d series for faulty client; got %d", sent, got)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRWServer() *rwServer {
|
func newRWServer() *rwServer {
|
||||||
|
@ -117,3 +147,42 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries)))
|
atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries)))
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// faultyRWServer sometimes respond with 5XX status code
|
||||||
|
// or just closes the connection. Is used for testing retries.
|
||||||
|
type faultyRWServer struct {
|
||||||
|
*rwServer
|
||||||
|
|
||||||
|
reqsMu sync.Mutex
|
||||||
|
reqs int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFaultyRWServer() *faultyRWServer {
|
||||||
|
rw := &faultyRWServer{
|
||||||
|
rwServer: &rwServer{},
|
||||||
|
}
|
||||||
|
rw.Server = httptest.NewServer(http.HandlerFunc(rw.handler))
|
||||||
|
return rw
|
||||||
|
}
|
||||||
|
|
||||||
|
func (frw *faultyRWServer) handler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
frw.reqsMu.Lock()
|
||||||
|
reqs := frw.reqs
|
||||||
|
frw.reqs++
|
||||||
|
if frw.reqs > 5 {
|
||||||
|
frw.reqs = 0
|
||||||
|
}
|
||||||
|
frw.reqsMu.Unlock()
|
||||||
|
|
||||||
|
switch reqs {
|
||||||
|
case 0, 1, 2, 3:
|
||||||
|
frw.rwServer.handler(w, r)
|
||||||
|
case 4:
|
||||||
|
hj, _ := w.(http.Hijacker)
|
||||||
|
conn, _, _ := hj.Hijack()
|
||||||
|
conn.Close()
|
||||||
|
case 5:
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
w.Write([]byte("server overloaded"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
||||||
* BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager.html): fix an issue with `vmbackupmanager` not being able to restore data from a backup stored in GCS. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4420) for details.
|
* BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager.html): fix an issue with `vmbackupmanager` not being able to restore data from a backup stored in GCS. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4420) for details.
|
||||||
* BUGFIX: [storage](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html): Properly creates `parts.json` after migration from versions below `v1.90.0. It must fix errors on start-up after unclean shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4336) for details.
|
* BUGFIX: [storage](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html): Properly creates `parts.json` after migration from versions below `v1.90.0. It must fix errors on start-up after unclean shutdown. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4336) for details.
|
||||||
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix a memory leak issue associated with chart updates. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4455).
|
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix a memory leak issue associated with chart updates. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4455).
|
||||||
|
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): retry all errors except 4XX status codes while pushing via remote-write to the remote storage. Previously, errors like broken connection could prevent vmalert from retrying the request.
|
||||||
|
|
||||||
## [v1.91.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.91.2)
|
## [v1.91.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.91.2)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue