From caa2952aa6360e633da956eecb9e534ff6a1a151 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 21 Jul 2020 18:34:59 +0300 Subject: [PATCH] app/vmselect: take into account the time spent in wait queue before query execution as time spent on the query --- app/vmselect/netstorage/netstorage.go | 34 ++++++++++++--- app/vmselect/prometheus/prometheus.go | 61 ++++++++++++--------------- app/vmselect/promql/exec_test.go | 4 +- 3 files changed, 57 insertions(+), 42 deletions(-) diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 54b13952e..344ba86b3 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -93,7 +93,7 @@ func timeseriesWorker(workerID uint) { var rsLastResetTime uint64 for tsw := range timeseriesWorkCh { rss := tsw.rss - if time.Until(rss.deadline.Deadline) < 0 { + if rss.deadline.Exceeded() { tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) continue } @@ -396,6 +396,9 @@ func DeleteSeries(sq *storage.SearchQuery) (int, error) { // GetLabels returns labels until the given deadline. func GetLabels(deadline Deadline) ([]string, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } labels, err := vmstorage.SearchTagKeys(*maxTagKeysPerSearch) if err != nil { return nil, fmt.Errorf("error during labels search: %w", err) @@ -417,6 +420,9 @@ func GetLabels(deadline Deadline) ([]string, error) { // GetLabelValues returns label values for the given labelName // until the given deadline. func GetLabelValues(labelName string, deadline Deadline) ([]string, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } if labelName == "__name__" { labelName = "" } @@ -435,6 +441,9 @@ func GetLabelValues(labelName string, deadline Deadline) ([]string, error) { // GetLabelEntries returns all the label entries until the given deadline. func GetLabelEntries(deadline Deadline) ([]storage.TagEntry, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } labelEntries, err := vmstorage.SearchTagEntries(*maxTagKeysPerSearch, *maxTagValuesPerSearch) if err != nil { return nil, fmt.Errorf("error during label entries request: %w", err) @@ -462,6 +471,9 @@ func GetLabelEntries(deadline Deadline) ([]storage.TagEntry, error) { // GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats func GetTSDBStatusForDate(deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } status, err := vmstorage.GetTSDBStatusForDate(date, topN) if err != nil { return nil, fmt.Errorf("error during tsdb status request: %w", err) @@ -471,6 +483,9 @@ func GetTSDBStatusForDate(deadline Deadline, date uint64, topN int) (*storage.TS // GetSeriesCount returns the number of unique series. func GetSeriesCount(deadline Deadline) (uint64, error) { + if deadline.Exceeded() { + return 0, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } n, err := vmstorage.GetSeriesCount() if err != nil { return 0, fmt.Errorf("error during series count request: %w", err) @@ -497,6 +512,10 @@ var ssPool sync.Pool // // Results.RunParallel or Results.Cancel must be called on the returned Results. func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, error) { + if deadline.Exceeded() { + return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } + // Setup search. tfss, err := setupTfss(sq.TagFilterss) if err != nil { @@ -521,7 +540,7 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline Deadli blocksRead := 0 for sr.NextMetricBlock() { blocksRead++ - if time.Until(deadline.Deadline) < 0 { + if deadline.Exceeded() { return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String()) } metricName := sr.MetricBlockRef.MetricName @@ -569,7 +588,7 @@ func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) // Deadline contains deadline with the corresponding timeout for pretty error messages. type Deadline struct { - Deadline time.Time + deadline uint64 timeout time.Duration flagHint string @@ -579,14 +598,19 @@ type Deadline struct { // // flagHint must contain a hit for command-line flag, which could be used // in order to increase timeout. -func NewDeadline(timeout time.Duration, flagHint string) Deadline { +func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline { return Deadline{ - Deadline: time.Now().Add(timeout), + deadline: uint64(startTime.Add(timeout).Unix()), timeout: timeout, flagHint: flagHint, } } +// Exceeded returns true if deadline is exceeded. +func (d *Deadline) Exceeded() bool { + return fasttime.UnixTimestamp() > d.deadline +} + // String returns human-readable string representation for d. func (d *Deadline) String() string { return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 04de32a64..e19b6166a 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -44,7 +44,7 @@ const defaultStep = 5 * 60 * 1000 // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() + ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } @@ -67,7 +67,7 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request if err != nil { return err } - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) if start >= end { start = end - defaultStep } @@ -115,7 +115,7 @@ var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/fe // ExportHandler exports data in raw format from /api/v1/export. func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() + ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } @@ -138,7 +138,7 @@ func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) } format := r.FormValue("format") maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line"))) - deadline := getDeadlineForExport(r) + deadline := getDeadlineForExport(r, startTime) if start >= end { end = start + defaultStep } @@ -270,8 +270,7 @@ var deleteDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ // // See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) - + deadline := getDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -291,7 +290,7 @@ func LabelValuesHandler(startTime time.Time, labelName string, w http.ResponseWr if len(matches) == 0 { matches = []string{fmt.Sprintf("{%s!=''}", labelName)} } - ct := currentTime() + ct := startTime.UnixNano() / 1e6 end, err := getTime(r, "end", ct) if err != nil { return err @@ -373,7 +372,7 @@ var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" // LabelsCountHandler processes /api/v1/labels/count request. func LabelsCountHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) labelEntries, err := netstorage.GetLabelEntries(deadline) if err != nil { return fmt.Errorf(`cannot obtain label entries: %w`, err) @@ -392,7 +391,7 @@ const secsPerDay = 3600 * 24 // // See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats func TSDBStatusHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -436,8 +435,7 @@ var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/ // // See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) - + deadline := getDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -455,7 +453,7 @@ func LabelsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) if len(matches) == 0 { matches = []string{"{__name__!=''}"} } - ct := currentTime() + ct := startTime.UnixNano() / 1e6 end, err := getTime(r, "end", ct) if err != nil { return err @@ -525,7 +523,7 @@ var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ // SeriesCountHandler processes /api/v1/series/count request. func SeriesCountHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) n, err := netstorage.GetSeriesCount(deadline) if err != nil { return fmt.Errorf("cannot obtain series count: %w", err) @@ -542,8 +540,7 @@ var seriesCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" // // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers func SeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() - + ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -564,7 +561,7 @@ func SeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) if err != nil { return err } - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) tagFilterss, err := getTagFilterssFromMatches(matches) if err != nil { @@ -617,8 +614,7 @@ var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ // // See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() - + ct := startTime.UnixNano() / 1e6 query := r.FormValue("query") if len(query) == 0 { return fmt.Errorf("missing `query` arg") @@ -638,7 +634,7 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e if step <= 0 { step = defaultStep } - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) if len(query) > *maxQueryLen { return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) @@ -686,7 +682,7 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e start -= offset end := start start = end - window - if err := queryRangeHandler(w, childQuery, start, end, step, r, ct); err != nil { + if err := queryRangeHandler(startTime, w, childQuery, start, end, step, r, ct); err != nil { return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", childQuery, start, end, step, err) } queryDuration.UpdateDuration(startTime) @@ -731,8 +727,7 @@ func parsePositiveDuration(s string, step int64) (int64, error) { // // See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries func QueryRangeHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() - + ct := startTime.UnixNano() / 1e6 query := r.FormValue("query") if len(query) == 0 { return fmt.Errorf("missing `query` arg") @@ -749,15 +744,15 @@ func QueryRangeHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque if err != nil { return err } - if err := queryRangeHandler(w, query, start, end, step, r, ct); err != nil { + if err := queryRangeHandler(startTime, w, query, start, end, step, r, ct); err != nil { return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", query, start, end, step, err) } queryRangeDuration.UpdateDuration(startTime) return nil } -func queryRangeHandler(w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error { - deadline := getDeadlineForQuery(r) +func queryRangeHandler(startTime time.Time, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error { + deadline := getDeadlineForQuery(r, startTime) mayCache := !getBool(r, "nocache") lookbackDelta, err := getMaxLookback(r) if err != nil { @@ -961,17 +956,17 @@ func getMaxLookback(r *http.Request) (int64, error) { return getDuration(r, "max_lookback", d) } -func getDeadlineForQuery(r *http.Request) netstorage.Deadline { +func getDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline { dMax := maxQueryDuration.Milliseconds() - return getDeadlineWithMaxDuration(r, dMax, "-search.maxQueryDuration") + return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration") } -func getDeadlineForExport(r *http.Request) netstorage.Deadline { +func getDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline { dMax := maxExportDuration.Milliseconds() - return getDeadlineWithMaxDuration(r, dMax, "-search.maxExportDuration") + return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration") } -func getDeadlineWithMaxDuration(r *http.Request, dMax int64, flagHint string) netstorage.Deadline { +func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline { d, err := getDuration(r, "timeout", 0) if err != nil { d = 0 @@ -980,7 +975,7 @@ func getDeadlineWithMaxDuration(r *http.Request, dMax int64, flagHint string) ne d = dMax } timeout := time.Duration(d) * time.Millisecond - return netstorage.NewDeadline(timeout, flagHint) + return netstorage.NewDeadline(startTime, timeout, flagHint) } func getBool(r *http.Request, argKey string) bool { @@ -993,10 +988,6 @@ func getBool(r *http.Request, argKey string) bool { } } -func currentTime() int64 { - return int64(fasttime.UnixTimestamp() * 1000) -} - func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) { tagFilterss := make([][]storage.TagFilter, 0, len(matches)) for _, match := range matches { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 217b35605..dc35c28e4 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -21,7 +21,7 @@ func TestExecSuccess(t *testing.T) { Start: start, End: end, Step: step, - Deadline: netstorage.NewDeadline(time.Minute, ""), + Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 5; i++ { result, err := Exec(ec, q, false) @@ -5623,7 +5623,7 @@ func TestExecError(t *testing.T) { Start: 1000, End: 2000, Step: 100, - Deadline: netstorage.NewDeadline(time.Minute, ""), + Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 4; i++ { rv, err := Exec(ec, q, false)