diff --git a/app/vmalert/datasource/vm.go b/app/vmalert/datasource/vm.go index e9d579c69..fc538266f 100644 --- a/app/vmalert/datasource/vm.go +++ b/app/vmalert/datasource/vm.go @@ -2,6 +2,7 @@ package datasource import ( "context" + "errors" "fmt" "io" "net/http" @@ -162,6 +163,11 @@ func (s *VMStorage) do(ctx context.Context, req *http.Request) (*http.Response, logger.Infof("DEBUG datasource request: executing %s request with params %q", req.Method, req.URL.RawQuery) } resp, err := s.c.Do(req.WithContext(ctx)) + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + // something in the middle between client and datasource might be closing + // the connection. So we do a one more attempt in hope request will succeed. + resp, err = s.c.Do(req.WithContext(ctx)) + } if err != nil { return nil, fmt.Errorf("error getting response from %s: %w", req.URL.Redacted(), err) } diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index e9d728763..d6413ab6e 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -38,7 +38,7 @@ func TestVMInstantQuery(t *testing.T) { mux.HandleFunc("/render", func(w http.ResponseWriter, request *http.Request) { c++ switch c { - case 8: + case 7: w.Write([]byte(`[{"target":"constantLine(10)","tags":{"name":"constantLine(10)"},"datapoints":[[10,1611758343],[10,1611758373],[10,1611758403]]}]`)) } }) @@ -62,21 +62,18 @@ func TestVMInstantQuery(t *testing.T) { } switch c { case 0: - conn, _, _ := w.(http.Hijacker).Hijack() - _ = conn.Close() - case 1: w.WriteHeader(500) - case 2: + case 1: w.Write([]byte("[]")) - case 3: + case 2: w.Write([]byte(`{"status":"error", "errorType":"type:", "error":"some error msg"}`)) - case 4: + case 3: w.Write([]byte(`{"status":"unknown"}`)) - case 5: + case 4: w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`)) - case 6: + case 5: w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows","foo":"bar"},"value":[1583786142,"13763"]},{"metric":{"__name__":"vm_requests","foo":"baz"},"value":[1583786140,"2000"]}]}}`)) - case 7: + case 6: w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`)) } }) @@ -95,17 +92,20 @@ func TestVMInstantQuery(t *testing.T) { ts := time.Now() expErr := func(err string) { - if _, _, err := pq.Query(ctx, query, ts); err == nil { + _, _, gotErr := pq.Query(ctx, query, ts) + if gotErr == nil { t.Fatalf("expected %q got nil", err) } + if !strings.Contains(gotErr.Error(), err) { + t.Fatalf("expected err %q; got %q", err, gotErr) + } } - expErr("connection error") // 0 - expErr("invalid response status error") // 1 - expErr("response body error") // 2 - expErr("error status") // 3 - expErr("unknown status") // 4 - expErr("non-vector resultType error") // 5 + expErr("500") // 0 + expErr("error parsing prometheus metrics") // 1 + expErr("response error") // 2 + expErr("unknown status") // 3 + expErr("unexpected end of JSON input") // 4 m, _, err := pq.Query(ctx, query, ts) // 6 - vector if err != nil { @@ -165,6 +165,75 @@ func TestVMInstantQuery(t *testing.T) { }, } metricsEqual(t, m, exp) + +} + +func TestVMInstantQueryWithRetry(t *testing.T) { + mux := http.NewServeMux() + mux.HandleFunc("/", func(_ http.ResponseWriter, _ *http.Request) { + t.Errorf("should not be called") + }) + c := -1 + mux.HandleFunc("/api/v1/query", func(w http.ResponseWriter, r *http.Request) { + c++ + if r.URL.Query().Get("query") != query { + t.Errorf("expected %s in query param, got %s", query, r.URL.Query().Get("query")) + } + switch c { + case 0: + w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "1"]}}`)) + case 1: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 2: + w.Write([]byte(`{"status":"success","data":{"resultType":"scalar","result":[1583786142, "2"]}}`)) + case 3: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + case 4: + conn, _, _ := w.(http.Hijacker).Hijack() + _ = conn.Close() + } + }) + + srv := httptest.NewServer(mux) + defer srv.Close() + + s := NewVMStorage(srv.URL, nil, time.Minute, 0, false, srv.Client()) + pq := s.BuildWithParams(QuerierParams{DataSourceType: string(datasourcePrometheus)}) + + expErr := func(err string) { + _, _, gotErr := pq.Query(ctx, query, time.Now()) + if gotErr == nil { + t.Fatalf("expected %q got nil", err) + } + if !strings.Contains(gotErr.Error(), err) { + t.Fatalf("expected err %q; got %q", err, gotErr) + } + } + + expValue := func(v float64) { + m, _, err := pq.Query(ctx, query, time.Now()) + if err != nil { + t.Fatalf("unexpected %s", err) + } + if len(m) != 1 { + t.Fatalf("expected 1 metrics got %d in %+v", len(m), m) + } + expected := []Metric{ + { + Timestamps: []int64{1583786142}, + Values: []float64{v}, + }, + } + if !reflect.DeepEqual(m, expected) { + t.Fatalf("unexpected metric %+v want %+v", m, expected) + } + } + + expValue(1) // 0 + expValue(2) // 1 - fail, 2 - retry + expErr("EOF") // 3, 4 - retries } func metricsEqual(t *testing.T, gotM, expectedM []Metric) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 156af5d32..dcd03a345 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -26,6 +26,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: reduce the probability of sudden increase in the number of small parts on systems with small number of CPU cores. * BUGFIX: [vmctl](https://docs.victoriametrics.com/vmctl.html): fix performance issue when migrating data from VictoriaMetrics according to [these docs](https://docs.victoriametrics.com/vmctl.html#migrating-data-from-victoriametrics). Add the ability to speed up the data migration via `--vm-native-disable-retries` command-line flag. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4092). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): fix a panic when the duration in the query contains uppercase `M` suffix. Such a suffix isn't allowed to use in durations, since it clashes with `a million` suffix, e.g. it isn't clear whether `rate(metric[5M])` means rate over 5 minutes, 5 months or 5 million seconds. See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3589) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4120) issues. +* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): retry failed read request on the closed connection one more time. This improves rules execution reliability when connection between vmalert and datasource closes unexpectedly. * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): prevent from possible panic when the number of vmstorage nodes increases when [automatic vmstorage discovery](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#automatic-vmstorage-discovery) is enabled. ## [v1.90.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.90.0)