From e196c61e36e3e6c2f4a8b813222a9462665a6ffe Mon Sep 17 00:00:00 2001 From: Nikolay Date: Sun, 21 Jan 2024 01:12:51 +0100 Subject: [PATCH] app/vmselect: abort streaming connections for vmselect (#5650) * app/vmselect: abort streaming connections for vmselect due to streaming nature of export APIs, curl and simmilr tools cannot detect errors that happened after http.Header with status 200 was written to it. This PR tracks if body write was already started and closes connection. It allows client to detect not expected chunk sequence and return error to the caller. Mostly it affects vmselect at cluster version https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645 * wip Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5650 --------- Co-authored-by: Aliaksandr Valialkin --- docs/CHANGELOG.md | 5 ++- lib/httpserver/httpserver.go | 80 +++++++++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2f9a4e214..56475f745 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -40,13 +40,14 @@ The sandbox cluster installation is running under the constant load generated by - `-remoteRead.oauth2.endpointParams` for `-remoteRead.url` - `-remoteWrite.oauth2.endpointParams` for `-remoteWrite.url` * FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to proxy incoming requests to different backends based on the requested host via `src_hosts` option at `url_map`. See [these docs](https://docs.victoriametrics.com/vmauth.html#generic-http-proxy-for-different-backends). -* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): rename cmd-line flag `vm-native-disable-retries` to `vm-native-disable-per-metric-migration` to better reflect its meaning. +* FEATURE: all VictoriaMetrics components: break HTTP client connection if an error occurs after the server at `-httpListenAddr` already sent response status code. Previously such an error couldn't be detected at client side. Now the client will get an error about invalid chunked response. The error message is simultaneously written to the server log and in the last line of the response. This should help detecting errors when migrating data between VictoriaMetrics instances by [vmctl](https://docs.victoriametrics.com/vmctl.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645). * FEATURE: all VictoriaMetrics components: add ability to specify arbitrary HTTP headers to send with every request to `-pushmetrics.url`. See [`push metrics` docs](https://docs.victoriametrics.com/#push-metrics). * FEATURE: all VictoriaMetrics components: add `-metrics.exposeMetadata` command-line flag, which allows displaying `TYPE` and `HELP` metadata at `/metrics` page exposed at `-httpListenAddr`. This may be needed when the `/metrics` page is scraped by collector, which requires the `TYPE` and `HELP` metadata such as [Google Cloud Managed Prometheus](https://cloud.google.com/stackdriver/docs/managed-prometheus/troubleshooting#missing-metric-type). * FEATURE: dashboards/cluster: add panels for detailed visualization of traffic usage between vmstorage, vminsert, vmselect components and their clients. New panels are available in the rows dedicated to specific components. * FEATURE: dashboards/cluster: update "Slow Queries" panel to show percentage of the slow queries to the total number of read queries served by vmselect. The percentage value should make it more clear for users whether there is a service degradation. * FEATURE: dashboards/single: change dashboard title from `VictoriaMetrics` to `VictoriaMetrics - single-node`. The new title should provide better understanding of this dashboard purpose. -* FEATURE [vmctl](https://docs.victoriametrics.com/vmctl.html): add `-vm-native-src-insecure-skip-verify` and `-vm-native-dst-insecure-skip-verify` command-line flags for native protocol. It can be used for skipping TLS certificate verification when connecting to the source or destination addresses. +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): rename cmd-line flag `vm-native-disable-retries` to `vm-native-disable-per-metric-migration` to better reflect its meaning. +* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `-vm-native-src-insecure-skip-verify` and `-vm-native-dst-insecure-skip-verify` command-line flags for native protocol. It can be used for skipping TLS certificate verification when connecting to the source or destination addresses. * FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): add `job` label to `DiskRunsOutOfSpace` alerting rule, so it is easier to understand to which installation the triggered instance belongs. * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): properly return full results when `-search.skipSlowReplicas` command-line flag is passed to `vmselect` and when [vmstorage groups](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#vmstorage-groups-at-vmselect) are in use. Previously partial results could be returned in this case. diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index 27ba674bc..b847843c1 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -214,8 +214,10 @@ var gzipHandlerWrapper = func() func(http.Handler) http.HandlerFunc { return hw }() -var metricsHandlerDuration = metrics.NewHistogram(`vm_http_request_duration_seconds{path="/metrics"}`) -var connTimeoutClosedConns = metrics.NewCounter(`vm_http_conn_timeout_closed_conns_total`) +var ( + metricsHandlerDuration = metrics.NewHistogram(`vm_http_request_duration_seconds{path="/metrics"}`) + connTimeoutClosedConns = metrics.NewCounter(`vm_http_conn_timeout_closed_conns_total`) +) var hostname = func() string { h, err := os.Hostname() @@ -358,6 +360,10 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques if !CheckBasicAuth(w, r) { return } + + w = &responseWriterWithAbort{ + ResponseWriter: w, + } if rh(w, r) { return } @@ -471,6 +477,69 @@ func GetQuotedRemoteAddr(r *http.Request) string { return strconv.Quote(remoteAddr) } +type responseWriterWithAbort struct { + http.ResponseWriter + + sentHeaders bool + aborted bool +} + +func (rwa *responseWriterWithAbort) Write(data []byte) (int, error) { + if rwa.aborted { + return 0, fmt.Errorf("response connection is aborted") + } + if !rwa.sentHeaders { + rwa.sentHeaders = true + } + return rwa.ResponseWriter.Write(data) +} + +func (rwa *responseWriterWithAbort) WriteHeader(statusCode int) { + if rwa.aborted { + logger.WarnfSkipframes(1, "cannot write response headers with statusCode=%d, since the response connection has been aborted", statusCode) + return + } + if rwa.sentHeaders { + logger.WarnfSkipframes(1, "cannot write response headers with statusCode=%d, since they were already sent", statusCode) + return + } + rwa.ResponseWriter.WriteHeader(statusCode) + rwa.sentHeaders = true +} + +// abort aborts the client connection associated with rwa. +// +// The last http chunk in the response stream is intentionally written incorrectly, +// so the client, which reads the response, could notice this error. +func (rwa *responseWriterWithAbort) abort() { + if !rwa.sentHeaders { + logger.Panicf("BUG: abort can be called only after http response headers are sent") + } + if rwa.aborted { + logger.WarnfSkipframes(2, "cannot abort the connection, since it has been already aborted") + return + } + hj, ok := rwa.ResponseWriter.(http.Hijacker) + if !ok { + logger.Panicf("BUG: ResponseWriter must implement http.Hijacker interface") + } + conn, bw, err := hj.Hijack() + if err != nil { + logger.WarnfSkipframes(2, "cannot hijack response connection: %s", err) + return + } + + // Just write an error message into the client connection as is without http chunked encoding. + // This is needed in order to notify the client about the aborted connection. + _, _ = bw.WriteString("\nthe connection has been aborted; see the last line in the response and/or in the server log for the reason\n") + _ = bw.Flush() + + // Forcibly close the client connection in order to break http keep-alive at client side. + _ = conn.Close() + + rwa.aborted = true +} + // Errorf writes formatted error message to w and to logger. func Errorf(w http.ResponseWriter, r *http.Request, format string, args ...interface{}) { errStr := fmt.Sprintf(format, args...) @@ -488,6 +557,13 @@ func Errorf(w http.ResponseWriter, r *http.Request, format string, args ...inter break } } + if rwa, ok := w.(*responseWriterWithAbort); ok && rwa.sentHeaders { + // HTTP status code has been already sent to client, so it cannot be sent again. + // Just write errStr to the response and abort the client connection, so the client could notice the error. + fmt.Fprintf(w, "\n%s\n", errStr) + rwa.abort() + return + } http.Error(w, errStr, statusCode) }