mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
app/{vmalert,vmctl}: consistently use http.NewRequestWithContext() instead of http.NewRequest() + req.WithContext()
This commit is contained in:
parent
3c74aa6b3d
commit
e22836c636
4 changed files with 22 additions and 22 deletions
|
@ -137,11 +137,11 @@ func NewVMStorage(baseURL string, authCfg *promauth.Config, lookBack time.Durati
|
||||||
|
|
||||||
// Query executes the given query and returns parsed response
|
// Query executes the given query and returns parsed response
|
||||||
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
|
func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Result, *http.Request, error) {
|
||||||
req, err := s.newQueryRequest(query, ts)
|
req, err := s.newQueryRequest(ctx, query, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Result{}, nil, err
|
return Result{}, nil, err
|
||||||
}
|
}
|
||||||
resp, err := s.do(ctx, req)
|
resp, err := s.do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
// Return unexpected error to the caller.
|
// Return unexpected error to the caller.
|
||||||
|
@ -149,11 +149,11 @@ func (s *VMStorage) Query(ctx context.Context, query string, ts time.Time) (Resu
|
||||||
}
|
}
|
||||||
// Something in the middle between client and datasource might be closing
|
// Something in the middle between client and datasource might be closing
|
||||||
// the connection. So we do a one more attempt in hope request will succeed.
|
// the connection. So we do a one more attempt in hope request will succeed.
|
||||||
req, err = s.newQueryRequest(query, ts)
|
req, err = s.newQueryRequest(ctx, query, ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
||||||
}
|
}
|
||||||
resp, err = s.do(ctx, req)
|
resp, err = s.do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
return Result{}, nil, fmt.Errorf("second attempt: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -182,11 +182,11 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim
|
||||||
if end.IsZero() {
|
if end.IsZero() {
|
||||||
return res, fmt.Errorf("end param is missing")
|
return res, fmt.Errorf("end param is missing")
|
||||||
}
|
}
|
||||||
req, err := s.newQueryRangeRequest(query, start, end)
|
req, err := s.newQueryRangeRequest(ctx, query, start, end)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
resp, err := s.do(ctx, req)
|
resp, err := s.do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
// Return unexpected error to the caller.
|
// Return unexpected error to the caller.
|
||||||
|
@ -194,11 +194,11 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim
|
||||||
}
|
}
|
||||||
// Something in the middle between client and datasource might be closing
|
// Something in the middle between client and datasource might be closing
|
||||||
// the connection. So we do a one more attempt in hope request will succeed.
|
// the connection. So we do a one more attempt in hope request will succeed.
|
||||||
req, err = s.newQueryRangeRequest(query, start, end)
|
req, err = s.newQueryRangeRequest(ctx, query, start, end)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("second attempt: %w", err)
|
return res, fmt.Errorf("second attempt: %w", err)
|
||||||
}
|
}
|
||||||
resp, err = s.do(ctx, req)
|
resp, err = s.do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("second attempt: %w", err)
|
return res, fmt.Errorf("second attempt: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -210,7 +210,7 @@ func (s *VMStorage) QueryRange(ctx context.Context, query string, start, end tim
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, error) {
|
func (s *VMStorage) do(req *http.Request) (*http.Response, error) {
|
||||||
ru := req.URL.Redacted()
|
ru := req.URL.Redacted()
|
||||||
if *showDatasourceURL {
|
if *showDatasourceURL {
|
||||||
ru = req.URL.String()
|
ru = req.URL.String()
|
||||||
|
@ -218,7 +218,7 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response,
|
||||||
if s.debug {
|
if s.debug {
|
||||||
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru)
|
logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, ru)
|
||||||
}
|
}
|
||||||
resp, err := s.c.Do(req.WithContext(ctx))
|
resp, err := s.c.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error getting response from %s: %w", ru, err)
|
return nil, fmt.Errorf("error getting response from %s: %w", ru, err)
|
||||||
}
|
}
|
||||||
|
@ -230,8 +230,8 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response,
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) (*http.Request, error) {
|
func (s *VMStorage) newQueryRangeRequest(ctx context.Context, query string, start, end time.Time) (*http.Request, error) {
|
||||||
req, err := s.newRequest()
|
req, err := s.newRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", s.datasourceURL, err)
|
return nil, fmt.Errorf("cannot create query_range request to datasource %q: %w", s.datasourceURL, err)
|
||||||
}
|
}
|
||||||
|
@ -239,8 +239,8 @@ func (s *VMStorage) newQueryRangeRequest(query string, start, end time.Time) (*h
|
||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VMStorage) newQueryRequest(query string, ts time.Time) (*http.Request, error) {
|
func (s *VMStorage) newQueryRequest(ctx context.Context, query string, ts time.Time) (*http.Request, error) {
|
||||||
req, err := s.newRequest()
|
req, err := s.newRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot create query request to datasource %q: %w", s.datasourceURL, err)
|
return nil, fmt.Errorf("cannot create query request to datasource %q: %w", s.datasourceURL, err)
|
||||||
}
|
}
|
||||||
|
@ -255,8 +255,8 @@ func (s *VMStorage) newQueryRequest(query string, ts time.Time) (*http.Request,
|
||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *VMStorage) newRequest() (*http.Request, error) {
|
func (s *VMStorage) newRequest(ctx context.Context) (*http.Request, error) {
|
||||||
req, err := http.NewRequest(http.MethodPost, s.datasourceURL, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.datasourceURL, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err)
|
logger.Panicf("BUG: unexpected error from http.NewRequest(%q): %s", s.datasourceURL, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -637,7 +637,7 @@ func TestRequestParams(t *testing.T) {
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
req, err := tc.vm.newRequest()
|
req, err := tc.vm.newRequest(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -735,7 +735,7 @@ func TestHeaders(t *testing.T) {
|
||||||
for _, tt := range testCases {
|
for _, tt := range testCases {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
vm := tt.vmFn()
|
vm := tt.vmFn()
|
||||||
req, err := vm.newQueryRequest("foo", time.Now())
|
req, err := vm.newQueryRequest(ctx, "foo", time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -279,7 +279,7 @@ L:
|
||||||
|
|
||||||
func (c *Client) send(ctx context.Context, data []byte) error {
|
func (c *Client) send(ctx context.Context, data []byte) error {
|
||||||
r := bytes.NewReader(data)
|
r := bytes.NewReader(data)
|
||||||
req, err := http.NewRequest(http.MethodPost, c.addr, r)
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.addr, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,7 @@ func (c *Client) send(ctx context.Context, data []byte) error {
|
||||||
if !*disablePathAppend {
|
if !*disablePathAppend {
|
||||||
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
req.URL.Path = path.Join(req.URL.Path, "/api/v1/write")
|
||||||
}
|
}
|
||||||
resp, err := c.c.Do(req.WithContext(ctx))
|
resp, err := c.c.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
||||||
req.URL.Redacted(), err, len(data), r.Size())
|
req.URL.Redacted(), err, len(data), r.Size())
|
||||||
|
|
|
@ -182,7 +182,7 @@ func (c *Client) fetch(ctx context.Context, data []byte, streamCb StreamCallback
|
||||||
if c.disablePathAppend {
|
if c.disablePathAppend {
|
||||||
u = c.addr
|
u = c.addr
|
||||||
}
|
}
|
||||||
req, err := http.NewRequest(http.MethodPost, u, r)
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
return fmt.Errorf("failed to create new HTTP request: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -195,7 +195,7 @@ func (c *Client) fetch(ctx context.Context, data []byte, streamCb StreamCallback
|
||||||
}
|
}
|
||||||
req.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
|
req.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
|
||||||
|
|
||||||
resp, err := c.do(req.WithContext(ctx))
|
resp, err := c.do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
return fmt.Errorf("error while sending request to %s: %w; Data len %d(%d)",
|
||||||
req.URL.Redacted(), err, len(data), r.Size())
|
req.URL.Redacted(), err, len(data), r.Size())
|
||||||
|
|
Loading…
Reference in a new issue