diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 3c4e00796..64645f782 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -93,7 +93,7 @@ func timeseriesWorker(workerID uint) { var rsLastResetTime uint64 for tsw := range timeseriesWorkCh { rss := tsw.rss - if time.Until(rss.deadline.Deadline) < 0 { + if rss.deadline.Exceeded() { tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) continue } @@ -438,6 +438,9 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (i // GetLabels returns labels until the given deadline. func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } // Send the query to all the storage nodes in parallel. type nodeResult struct { labels []string @@ -507,6 +510,9 @@ func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) { // GetLabelValues returns label values for the given labelName // until the given deadline. func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]string, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } if labelName == "__name__" { labelName = "" } @@ -572,6 +578,9 @@ func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]stri // GetLabelEntries returns all the label entries for at until the given deadline. func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } // Send the query to all the storage nodes in parallel. type nodeResult struct { labelEntries []storage.TagEntry @@ -677,6 +686,9 @@ func deduplicateStrings(a []string) []string { // GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats func GetTSDBStatusForDate(at *auth.Token, deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } // Send the query to all the storage nodes in parallel. type nodeResult struct { status *storage.TSDBStatus @@ -778,6 +790,9 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry { // GetSeriesCount returns the number of unique series for the given at. func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) { + if deadline.Exceeded() { + return 0, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } // Send the query to all the storage nodes in parallel. type nodeResult struct { n uint64 @@ -857,6 +872,9 @@ func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error { // ProcessSearchQuery performs sq on storage nodes until the given deadline. func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) { + if deadline.Exceeded() { + return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) + } requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. @@ -1138,7 +1156,8 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC if err != nil { return fmt.Errorf("cannot obtain connection from a pool: %w", err) } - if err := bc.SetDeadline(deadline.Deadline); err != nil { + d := time.Unix(int64(deadline.deadline), 0) + if err := bc.SetDeadline(d); err != nil { _ = bc.Close() logger.Panicf("FATAL: cannot set connection deadline: %s", err) } @@ -1626,7 +1645,7 @@ const maxConcurrentQueriesPerStorageNode = 100 // Deadline contains deadline with the corresponding timeout for pretty error messages. type Deadline struct { - Deadline time.Time + deadline uint64 timeout time.Duration flagHint string @@ -1636,14 +1655,19 @@ type Deadline struct { // // flagHint must contain a hit for command-line flag, which could be used // in order to increase timeout. -func NewDeadline(timeout time.Duration, flagHint string) Deadline { +func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline { return Deadline{ - Deadline: time.Now().Add(timeout), + deadline: uint64(startTime.Add(timeout).Unix()), timeout: timeout, flagHint: flagHint, } } +// Exceeded returns true if deadline is exceeded. +func (d *Deadline) Exceeded() bool { + return fasttime.UnixTimestamp() > d.deadline +} + // String returns human-readable string representation for d. func (d *Deadline) String() string { return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index edea93472..92021f7f8 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -48,7 +48,7 @@ const defaultStep = 5 * 60 * 1000 // FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() + ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } @@ -71,7 +71,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, if err != nil { return err } - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) if start >= end { start = end - defaultStep } @@ -124,7 +124,7 @@ var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/fe // ExportHandler exports data in raw format from /api/v1/export. func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() + ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } @@ -147,7 +147,7 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r } format := r.FormValue("format") maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line"))) - deadline := getDeadlineForExport(r) + deadline := getDeadlineForExport(r, startTime) if start >= end { end = start + defaultStep } @@ -261,7 +261,7 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error { if len(matches) == 0 { return fmt.Errorf("missing `match[]` arg") } - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) tagFilterss, err := getTagFilterssFromMatches(matches) if err != nil { return err @@ -323,8 +323,7 @@ var httpClient = &http.Client{ // // See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) - + deadline := getDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -345,7 +344,7 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w if len(matches) == 0 { matches = []string{fmt.Sprintf("{%s!=''}", labelName)} } - ct := currentTime() + ct := startTime.UnixNano() / 1e6 end, err := getTime(r, "end", ct) if err != nil { return err @@ -432,7 +431,7 @@ var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" // LabelsCountHandler processes /api/v1/labels/count request. func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline) if err != nil { return fmt.Errorf(`cannot obtain label entries: %w`, err) @@ -454,7 +453,7 @@ const secsPerDay = 3600 * 24 // // See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -501,8 +500,7 @@ var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/ // // See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) - + deadline := getDeadlineForQuery(r, startTime) if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -521,7 +519,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if len(matches) == 0 { matches = []string{"{__name__!=''}"} } - ct := currentTime() + ct := startTime.UnixNano() / 1e6 end, err := getTime(r, "end", ct) if err != nil { return err @@ -596,7 +594,7 @@ var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ // SeriesCountHandler processes /api/v1/series/count request. func SeriesCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) n, isPartial, err := netstorage.GetSeriesCount(at, deadline) if err != nil { return fmt.Errorf("cannot obtain series count: %w", err) @@ -617,8 +615,7 @@ var seriesCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" // // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() - + ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } @@ -639,7 +636,7 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if err != nil { return err } - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) tagFilterss, err := getTagFilterssFromMatches(matches) if err != nil { @@ -697,8 +694,7 @@ var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ // // See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() - + ct := startTime.UnixNano() / 1e6 query := r.FormValue("query") if len(query) == 0 { return fmt.Errorf("missing `query` arg") @@ -718,7 +714,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if step <= 0 { step = defaultStep } - deadline := getDeadlineForQuery(r) + deadline := getDeadlineForQuery(r, startTime) if len(query) > *maxQueryLen { return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen) @@ -766,7 +762,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r start -= offset end := start start = end - window - if err := queryRangeHandler(at, w, childQuery, start, end, step, r, ct); err != nil { + if err := queryRangeHandler(startTime, at, w, childQuery, start, end, step, r, ct); err != nil { return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", childQuery, start, end, step, err) } queryDuration.UpdateDuration(startTime) @@ -814,8 +810,7 @@ func parsePositiveDuration(s string, step int64) (int64, error) { // // See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { - ct := currentTime() - + ct := startTime.UnixNano() / 1e6 query := r.FormValue("query") if len(query) == 0 { return fmt.Errorf("missing `query` arg") @@ -832,15 +827,15 @@ func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite if err != nil { return err } - if err := queryRangeHandler(at, w, query, start, end, step, r, ct); err != nil { + if err := queryRangeHandler(startTime, at, w, query, start, end, step, r, ct); err != nil { return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", query, start, end, step, err) } queryRangeDuration.UpdateDuration(startTime) return nil } -func queryRangeHandler(at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error { - deadline := getDeadlineForQuery(r) +func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error { + deadline := getDeadlineForQuery(r, startTime) mayCache := !getBool(r, "nocache") lookbackDelta, err := getMaxLookback(r) if err != nil { @@ -1047,17 +1042,17 @@ func getMaxLookback(r *http.Request) (int64, error) { return getDuration(r, "max_lookback", d) } -func getDeadlineForQuery(r *http.Request) netstorage.Deadline { +func getDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline { dMax := maxQueryDuration.Milliseconds() - return getDeadlineWithMaxDuration(r, dMax, "-search.maxQueryDuration") + return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration") } -func getDeadlineForExport(r *http.Request) netstorage.Deadline { +func getDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline { dMax := maxExportDuration.Milliseconds() - return getDeadlineWithMaxDuration(r, dMax, "-search.maxExportDuration") + return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration") } -func getDeadlineWithMaxDuration(r *http.Request, dMax int64, flagHint string) netstorage.Deadline { +func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline { d, err := getDuration(r, "timeout", 0) if err != nil { d = 0 @@ -1066,7 +1061,7 @@ func getDeadlineWithMaxDuration(r *http.Request, dMax int64, flagHint string) ne d = dMax } timeout := time.Duration(d) * time.Millisecond - return netstorage.NewDeadline(timeout, flagHint) + return netstorage.NewDeadline(startTime, timeout, flagHint) } func getBool(r *http.Request, argKey string) bool { @@ -1079,10 +1074,6 @@ func getBool(r *http.Request, argKey string) bool { } } -func currentTime() int64 { - return int64(fasttime.UnixTimestamp() * 1000) -} - func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) { tagFilterss := make([][]storage.TagFilter, 0, len(matches)) for _, match := range matches { diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index e970a047c..1be1916ec 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -31,7 +31,7 @@ func TestExecSuccess(t *testing.T) { Start: start, End: end, Step: step, - Deadline: netstorage.NewDeadline(time.Minute, ""), + Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 5; i++ { result, err := Exec(ec, q, false) @@ -5637,7 +5637,7 @@ func TestExecError(t *testing.T) { Start: 1000, End: 2000, Step: 100, - Deadline: netstorage.NewDeadline(time.Minute, ""), + Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 4; i++ { rv, err := Exec(ec, q, false)