app/vmselect: abort streaming connections for vmselect (#5650)

* app/vmselect: abort streaming connections for vmselect
due to streaming nature of export APIs, curl and simmilr tools cannot
detect errors that happened after http.Header with status 200 was
written to it.

This PR tracks if body write was already started and closes connection.

It allows client to detect not expected chunk sequence and return error
to the caller.

Mostly it affects vmselect at cluster version

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645

* wip

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5645
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5650

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2024-01-21 01:12:51 +01:00 committed by Aliaksandr Valialkin
parent 7734cbbed0
commit 92603bbdf0
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -209,8 +209,10 @@ var gzipHandlerWrapper = func() func(http.Handler) http.HandlerFunc {
return hw
}()
var metricsHandlerDuration = metrics.NewHistogram(`vm_http_request_duration_seconds{path="/metrics"}`)
var connTimeoutClosedConns = metrics.NewCounter(`vm_http_conn_timeout_closed_conns_total`)
var (
metricsHandlerDuration = metrics.NewHistogram(`vm_http_request_duration_seconds{path="/metrics"}`)
connTimeoutClosedConns = metrics.NewCounter(`vm_http_conn_timeout_closed_conns_total`)
)
var hostname = func() string {
h, err := os.Hostname()
@ -341,6 +343,10 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques
if !CheckBasicAuth(w, r) {
return
}
w = &responseWriterWithAbort{
ResponseWriter: w,
}
if rh(w, r) {
return
}
@ -444,6 +450,69 @@ func GetQuotedRemoteAddr(r *http.Request) string {
return strconv.Quote(remoteAddr)
}
type responseWriterWithAbort struct {
http.ResponseWriter
sentHeaders bool
aborted bool
}
func (rwa *responseWriterWithAbort) Write(data []byte) (int, error) {
if rwa.aborted {
return 0, fmt.Errorf("response connection is aborted")
}
if !rwa.sentHeaders {
rwa.sentHeaders = true
}
return rwa.ResponseWriter.Write(data)
}
func (rwa *responseWriterWithAbort) WriteHeader(statusCode int) {
if rwa.aborted {
logger.WarnfSkipframes(1, "cannot write response headers with statusCode=%d, since the response connection has been aborted", statusCode)
return
}
if rwa.sentHeaders {
logger.WarnfSkipframes(1, "cannot write response headers with statusCode=%d, since they were already sent", statusCode)
return
}
rwa.ResponseWriter.WriteHeader(statusCode)
rwa.sentHeaders = true
}
// abort aborts the client connection associated with rwa.
//
// The last http chunk in the response stream is intentionally written incorrectly,
// so the client, which reads the response, could notice this error.
func (rwa *responseWriterWithAbort) abort() {
if !rwa.sentHeaders {
logger.Panicf("BUG: abort can be called only after http response headers are sent")
}
if rwa.aborted {
logger.WarnfSkipframes(2, "cannot abort the connection, since it has been already aborted")
return
}
hj, ok := rwa.ResponseWriter.(http.Hijacker)
if !ok {
logger.Panicf("BUG: ResponseWriter must implement http.Hijacker interface")
}
conn, bw, err := hj.Hijack()
if err != nil {
logger.WarnfSkipframes(2, "cannot hijack response connection: %s", err)
return
}
// Just write an error message into the client connection as is without http chunked encoding.
// This is needed in order to notify the client about the aborted connection.
_, _ = bw.WriteString("\nthe connection has been aborted; see the last line in the response and/or in the server log for the reason\n")
_ = bw.Flush()
// Forcibly close the client connection in order to break http keep-alive at client side.
_ = conn.Close()
rwa.aborted = true
}
// Errorf writes formatted error message to w and to logger.
func Errorf(w http.ResponseWriter, r *http.Request, format string, args ...interface{}) {
errStr := fmt.Sprintf(format, args...)
@ -461,6 +530,13 @@ func Errorf(w http.ResponseWriter, r *http.Request, format string, args ...inter
break
}
}
if rwa, ok := w.(*responseWriterWithAbort); ok && rwa.sentHeaders {
// HTTP status code has been already sent to client, so it cannot be sent again.
// Just write errStr to the response and abort the client connection, so the client could notice the error.
fmt.Fprintf(w, "\n%s\n", errStr)
rwa.abort()
return
}
http.Error(w, errStr, statusCode)
}