app/vmselect: abort streaming connections for vmselect (#5650)

* app/vmselect: abort streaming connections for vmselect
due to streaming nature of export APIs, curl and simmilr tools cannot
detect errors that happened after http.Header with status 200 was
written to it.

This PR tracks if body write was already started and closes connection.

It allows client to detect not expected chunk sequence and return error
to the caller.

Mostly it affects vmselect at cluster version

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645

* wip

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5650

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2024-01-21 01:12:51 +01:00 committed by GitHub
parent 74448a7e57
commit 8ab0ce3ded
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 81 additions and 4 deletions

View file

@ -40,13 +40,14 @@ The sandbox cluster installation is running under the constant load generated by
- `-remoteRead.oauth2.endpointParams` for `-remoteRead.url`
- `-remoteWrite.oauth2.endpointParams` for `-remoteWrite.url`
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): add ability to proxy incoming requests to different backends based on the requested host via `src_hosts` option at `url_map`. See [these docs](https://docs.victoriametrics.com/vmauth.html#generic-http-proxy-for-different-backends).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): rename cmd-line flag `vm-native-disable-retries` to `vm-native-disable-per-metric-migration` to better reflect its meaning.
* FEATURE: all VictoriaMetrics components: break HTTP client connection if an error occurs after the server at `-httpListenAddr` already sent response status code. Previously such an error couldn't be detected at client side. Now the client will get an error about invalid chunked response. The error message is simultaneously written to the server log and in the last line of the response. This should help detecting errors when migrating data between VictoriaMetrics instances by [vmctl](https://docs.victoriametrics.com/vmctl.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645).
* FEATURE: all VictoriaMetrics components: add ability to specify arbitrary HTTP headers to send with every request to `-pushmetrics.url`. See [`push metrics` docs](https://docs.victoriametrics.com/#push-metrics).
* FEATURE: all VictoriaMetrics components: add `-metrics.exposeMetadata` command-line flag, which allows displaying `TYPE` and `HELP` metadata at `/metrics` page exposed at `-httpListenAddr`. This may be needed when the `/metrics` page is scraped by collector, which requires the `TYPE` and `HELP` metadata such as [Google Cloud Managed Prometheus](https://cloud.google.com/stackdriver/docs/managed-prometheus/troubleshooting#missing-metric-type).
* FEATURE: dashboards/cluster: add panels for detailed visualization of traffic usage between vmstorage, vminsert, vmselect components and their clients. New panels are available in the rows dedicated to specific components.
* FEATURE: dashboards/cluster: update "Slow Queries" panel to show percentage of the slow queries to the total number of read queries served by vmselect. The percentage value should make it more clear for users whether there is a service degradation.
* FEATURE: dashboards/single: change dashboard title from `VictoriaMetrics` to `VictoriaMetrics - single-node`. The new title should provide better understanding of this dashboard purpose.
* FEATURE [vmctl](https://docs.victoriametrics.com/vmctl.html): add `-vm-native-src-insecure-skip-verify` and `-vm-native-dst-insecure-skip-verify` command-line flags for native protocol. It can be used for skipping TLS certificate verification when connecting to the source or destination addresses.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): rename cmd-line flag `vm-native-disable-retries` to `vm-native-disable-per-metric-migration` to better reflect its meaning.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `-vm-native-src-insecure-skip-verify` and `-vm-native-dst-insecure-skip-verify` command-line flags for native protocol. It can be used for skipping TLS certificate verification when connecting to the source or destination addresses.
* FEATURE: [Alerting rules for VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/deployment/docker#alerts): add `job` label to `DiskRunsOutOfSpace` alerting rule, so it is easier to understand to which installation the triggered instance belongs.
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): properly return full results when `-search.skipSlowReplicas` command-line flag is passed to `vmselect` and when [vmstorage groups](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#vmstorage-groups-at-vmselect) are in use. Previously partial results could be returned in this case.

View file

@ -214,8 +214,10 @@ var gzipHandlerWrapper = func() func(http.Handler) http.HandlerFunc {
return hw
}()
var metricsHandlerDuration = metrics.NewHistogram(`vm_http_request_duration_seconds{path="/metrics"}`)
var connTimeoutClosedConns = metrics.NewCounter(`vm_http_conn_timeout_closed_conns_total`)
var (
metricsHandlerDuration = metrics.NewHistogram(`vm_http_request_duration_seconds{path="/metrics"}`)
connTimeoutClosedConns = metrics.NewCounter(`vm_http_conn_timeout_closed_conns_total`)
)
var hostname = func() string {
h, err := os.Hostname()
@ -358,6 +360,10 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques
if !CheckBasicAuth(w, r) {
return
}
w = &responseWriterWithAbort{
ResponseWriter: w,
}
if rh(w, r) {
return
}
@ -471,6 +477,69 @@ func GetQuotedRemoteAddr(r *http.Request) string {
return strconv.Quote(remoteAddr)
}
type responseWriterWithAbort struct {
http.ResponseWriter
sentHeaders bool
aborted bool
}
func (rwa *responseWriterWithAbort) Write(data []byte) (int, error) {
if rwa.aborted {
return 0, fmt.Errorf("response connection is aborted")
}
if !rwa.sentHeaders {
rwa.sentHeaders = true
}
return rwa.ResponseWriter.Write(data)
}
func (rwa *responseWriterWithAbort) WriteHeader(statusCode int) {
if rwa.aborted {
logger.WarnfSkipframes(1, "cannot write response headers with statusCode=%d, since the response connection has been aborted", statusCode)
return
}
if rwa.sentHeaders {
logger.WarnfSkipframes(1, "cannot write response headers with statusCode=%d, since they were already sent", statusCode)
return
}
rwa.ResponseWriter.WriteHeader(statusCode)
rwa.sentHeaders = true
}
// abort aborts the client connection associated with rwa.
//
// The last http chunk in the response stream is intentionally written incorrectly,
// so the client, which reads the response, could notice this error.
func (rwa *responseWriterWithAbort) abort() {
if !rwa.sentHeaders {
logger.Panicf("BUG: abort can be called only after http response headers are sent")
}
if rwa.aborted {
logger.WarnfSkipframes(2, "cannot abort the connection, since it has been already aborted")
return
}
hj, ok := rwa.ResponseWriter.(http.Hijacker)
if !ok {
logger.Panicf("BUG: ResponseWriter must implement http.Hijacker interface")
}
conn, bw, err := hj.Hijack()
if err != nil {
logger.WarnfSkipframes(2, "cannot hijack response connection: %s", err)
return
}
// Just write an error message into the client connection as is without http chunked encoding.
// This is needed in order to notify the client about the aborted connection.
_, _ = bw.WriteString("\nthe connection has been aborted; see the last line in the response and/or in the server log for the reason\n")
_ = bw.Flush()
// Forcibly close the client connection in order to break http keep-alive at client side.
_ = conn.Close()
rwa.aborted = true
}
// Errorf writes formatted error message to w and to logger.
func Errorf(w http.ResponseWriter, r *http.Request, format string, args ...interface{}) {
errStr := fmt.Sprintf(format, args...)
@ -488,6 +557,13 @@ func Errorf(w http.ResponseWriter, r *http.Request, format string, args ...inter
break
}
}
if rwa, ok := w.(*responseWriterWithAbort); ok && rwa.sentHeaders {
// HTTP status code has been already sent to client, so it cannot be sent again.
// Just write errStr to the response and abort the client connection, so the client could notice the error.
fmt.Fprintf(w, "\n%s\n", errStr)
rwa.abort()
return
}
http.Error(w, errStr, statusCode)
}