mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
vmalert: improve retry logic for remote write (#4134)
vmalert should not retry on 4xx status codes according to https://prometheus.io/docs/concepts/remote_write_spec/
This commit is contained in:
parent
5c2ed85eb9
commit
a99918085d
1 changed files with 31 additions and 9 deletions
|
@ -189,6 +189,10 @@ var (
|
||||||
// it to remote write endpoint. Flush performs limited amount of retries
|
// it to remote write endpoint. Flush performs limited amount of retries
|
||||||
// if request fails.
|
// if request fails.
|
||||||
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||||
|
const (
|
||||||
|
retryCount = 5
|
||||||
|
retryBackoff = time.Second
|
||||||
|
)
|
||||||
if len(wr.Timeseries) < 1 {
|
if len(wr.Timeseries) < 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -201,20 +205,25 @@ func (c *Client) flush(ctx context.Context, wr *prompbmarshal.WriteRequest) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const attempts = 5
|
|
||||||
b := snappy.Encode(nil, data)
|
b := snappy.Encode(nil, data)
|
||||||
for i := 0; i < attempts; i++ {
|
|
||||||
|
attempts := 0
|
||||||
|
for ; attempts < retryCount; attempts++ {
|
||||||
err := c.send(ctx, b)
|
err := c.send(ctx, b)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
sentRows.Add(len(wr.Timeseries))
|
sentRows.Add(len(wr.Timeseries))
|
||||||
sentBytes.Add(len(b))
|
sentBytes.Add(len(b))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
logger.Warnf("attempt %d to send request failed: %s", attempts+1, err)
|
||||||
|
|
||||||
logger.Warnf("attempt %d to send request failed: %s", i+1, err)
|
if _, ok := err.(*retriableError); ok {
|
||||||
// sleeping to avoid remote db hammering
|
// sleeping to avoid remote db hammering
|
||||||
time.Sleep(time.Second)
|
time.Sleep(retryBackoff)
|
||||||
continue
|
continue
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
droppedRows.Add(len(wr.Timeseries))
|
droppedRows.Add(len(wr.Timeseries))
|
||||||
|
@ -249,10 +258,23 @@ func (c *Client) send(ctx context.Context, data []byte) error {
|
||||||
req.URL.Redacted(), err, len(data), r.Size())
|
req.URL.Redacted(), err, len(data), r.Size())
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
body, _ := io.ReadAll(resp.Body)
|
||||||
body, _ := io.ReadAll(resp.Body)
|
switch resp.StatusCode / 100 {
|
||||||
|
case 2:
|
||||||
|
return nil
|
||||||
|
case 5:
|
||||||
|
return &retriableError{fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||||
|
resp.StatusCode, req.URL.Redacted(), body)}
|
||||||
|
default:
|
||||||
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
return fmt.Errorf("unexpected response code %d for %s. Response body %q",
|
||||||
resp.StatusCode, req.URL.Redacted(), body)
|
resp.StatusCode, req.URL.Redacted(), body)
|
||||||
}
|
}
|
||||||
return nil
|
}
|
||||||
|
|
||||||
|
type retriableError struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *retriableError) Error() string {
|
||||||
|
return e.Error()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue