From ccd3aa4f158ee32ff3b8d57ca8b0945052ec2791 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 4 Feb 2020 16:13:59 +0200 Subject: [PATCH] app/vmselect: take into account the time the requests wait in the queue if `-search.maxConcurrentRequests` is exceeded This will prevent from excess CPU usage for timed out queries. --- app/vmselect/main.go | 29 +++++++++++++------------- app/vmselect/prometheus/prometheus.go | 30 +++++++++------------------ 2 files changed, 25 insertions(+), 34 deletions(-) diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 255e387ee..701a2f422 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -111,6 +111,7 @@ var ( ) func requestHandler(w http.ResponseWriter, r *http.Request) bool { + startTime := time.Now() // Limit the number of concurrent queries. select { case concurrencyCh <- struct{}{}: @@ -155,23 +156,23 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } switch p.Prefix { case "select": - return selectHandler(w, r, p, at) + return selectHandler(startTime, w, r, p, at) case "delete": - return deleteHandler(w, r, p, at) + return deleteHandler(startTime, w, r, p, at) default: // This is not our link return false } } -func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool { +func selectHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool { if strings.HasPrefix(p.Suffix, "prometheus/api/v1/label/") { s := p.Suffix[len("prometheus/api/v1/label/"):] if strings.HasSuffix(s, "/values") { labelValuesRequests.Inc() labelName := s[:len(s)-len("/values")] httpserver.EnableCORS(w, r) - if err := prometheus.LabelValuesHandler(at, labelName, w, r); err != nil { + if err := prometheus.LabelValuesHandler(startTime, at, labelName, w, r); err != nil { labelValuesErrors.Inc() sendPrometheusError(w, r, err) return true @@ -184,7 +185,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a case "prometheus/api/v1/query": queryRequests.Inc() httpserver.EnableCORS(w, r) - if err := prometheus.QueryHandler(at, w, r); err != nil { + if err := prometheus.QueryHandler(startTime, at, w, r); err != nil { queryErrors.Inc() sendPrometheusError(w, r, err) return true @@ -193,7 +194,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a case "prometheus/api/v1/query_range": queryRangeRequests.Inc() httpserver.EnableCORS(w, r) - if err := prometheus.QueryRangeHandler(at, w, r); err != nil { + if err := prometheus.QueryRangeHandler(startTime, at, w, r); err != nil { queryRangeErrors.Inc() sendPrometheusError(w, r, err) return true @@ -202,7 +203,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a case "prometheus/api/v1/series": seriesRequests.Inc() httpserver.EnableCORS(w, r) - if err := prometheus.SeriesHandler(at, w, r); err != nil { + if err := prometheus.SeriesHandler(startTime, at, w, r); err != nil { seriesErrors.Inc() sendPrometheusError(w, r, err) return true @@ -211,7 +212,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a case "prometheus/api/v1/series/count": seriesCountRequests.Inc() httpserver.EnableCORS(w, r) - if err := prometheus.SeriesCountHandler(at, w, r); err != nil { + if err := prometheus.SeriesCountHandler(startTime, at, w, r); err != nil { seriesCountErrors.Inc() sendPrometheusError(w, r, err) return true @@ -220,7 +221,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a case "prometheus/api/v1/labels": labelsRequests.Inc() httpserver.EnableCORS(w, r) - if err := prometheus.LabelsHandler(at, w, r); err != nil { + if err := prometheus.LabelsHandler(startTime, at, w, r); err != nil { labelsErrors.Inc() sendPrometheusError(w, r, err) return true @@ -229,7 +230,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a case "prometheus/api/v1/labels/count": labelsCountRequests.Inc() httpserver.EnableCORS(w, r) - if err := prometheus.LabelsCountHandler(at, w, r); err != nil { + if err := prometheus.LabelsCountHandler(startTime, at, w, r); err != nil { labelsCountErrors.Inc() sendPrometheusError(w, r, err) return true @@ -237,7 +238,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a return true case "prometheus/api/v1/export": exportRequests.Inc() - if err := prometheus.ExportHandler(at, w, r); err != nil { + if err := prometheus.ExportHandler(startTime, at, w, r); err != nil { exportErrors.Inc() httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) return true @@ -245,7 +246,7 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a return true case "prometheus/federate": federateRequests.Inc() - if err := prometheus.FederateHandler(at, w, r); err != nil { + if err := prometheus.FederateHandler(startTime, at, w, r); err != nil { federateErrors.Inc() httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) return true @@ -274,11 +275,11 @@ func selectHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, a } } -func deleteHandler(w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool { +func deleteHandler(startTime time.Time, w http.ResponseWriter, r *http.Request, p *httpserver.Path, at *auth.Token) bool { switch p.Suffix { case "prometheus/api/v1/admin/tsdb/delete_series": deleteRequests.Inc() - if err := prometheus.DeleteHandler(at, r); err != nil { + if err := prometheus.DeleteHandler(startTime, at, r); err != nil { deleteErrors.Inc() httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) return true diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index e6f3b125a..4f4ce5bf6 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -39,8 +39,7 @@ var ( const defaultStep = 5 * 60 * 1000 // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ -func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { ct := currentTime() if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %s", err) @@ -116,8 +115,7 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) // ExportHandler exports data in raw format from /api/v1/export. -func ExportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { ct := currentTime() if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %s", err) @@ -215,8 +213,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, star // DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series -func DeleteHandler(at *auth.Token, r *http.Request) error { - startTime := time.Now() +func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error { if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %s", err) } @@ -288,8 +285,7 @@ var httpClient = &http.Client{ // LabelValuesHandler processes /api/v1/label//values request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values -func LabelValuesHandler(at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error { deadline := getDeadlineForQuery(r) if err := r.ParseForm(); err != nil { @@ -392,8 +388,7 @@ func labelValuesWithMatches(at *auth.Token, labelName string, matches []string, var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/label/{}/values"}`) // LabelsCountHandler processes /api/v1/labels/count request. -func LabelsCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { deadline := getDeadlineForQuery(r) labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline) if err != nil { @@ -414,8 +409,7 @@ var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" // LabelsHandler processes /api/v1/labels request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names -func LabelsHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { deadline := getDeadlineForQuery(r) if err := r.ParseForm(); err != nil { @@ -510,8 +504,7 @@ func labelsWithMatches(at *auth.Token, matches []string, start, end int64, deadl var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/labels"}`) // SeriesCountHandler processes /api/v1/series/count request. -func SeriesCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func SeriesCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { deadline := getDeadlineForQuery(r) n, isPartial, err := netstorage.GetSeriesCount(at, deadline) if err != nil { @@ -532,8 +525,7 @@ var seriesCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" // SeriesHandler processes /api/v1/series request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers -func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { ct := currentTime() if err := r.ParseForm(); err != nil { @@ -613,8 +605,7 @@ var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ // QueryHandler processes /api/v1/query request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries -func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { ct := currentTime() query := r.FormValue("query") @@ -728,8 +719,7 @@ func parsePositiveDuration(s string, step int64) (int64, error) { // QueryRangeHandler processes /api/v1/query_range request. // // See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries -func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { - startTime := time.Now() +func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { ct := currentTime() query := r.FormValue("query")