mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect: take into account the time spent in wait queue before query execution as time spent on the query
This commit is contained in:
parent
0755cb3b50
commit
5bb4fe1ba4
3 changed files with 58 additions and 43 deletions
|
@ -93,7 +93,7 @@ func timeseriesWorker(workerID uint) {
|
||||||
var rsLastResetTime uint64
|
var rsLastResetTime uint64
|
||||||
for tsw := range timeseriesWorkCh {
|
for tsw := range timeseriesWorkCh {
|
||||||
rss := tsw.rss
|
rss := tsw.rss
|
||||||
if time.Until(rss.deadline.Deadline) < 0 {
|
if rss.deadline.Exceeded() {
|
||||||
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
tsw.doneCh <- fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -438,6 +438,9 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (i
|
||||||
|
|
||||||
// GetLabels returns labels until the given deadline.
|
// GetLabels returns labels until the given deadline.
|
||||||
func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) {
|
func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
type nodeResult struct {
|
type nodeResult struct {
|
||||||
labels []string
|
labels []string
|
||||||
|
@ -507,6 +510,9 @@ func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) {
|
||||||
// GetLabelValues returns label values for the given labelName
|
// GetLabelValues returns label values for the given labelName
|
||||||
// until the given deadline.
|
// until the given deadline.
|
||||||
func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]string, bool, error) {
|
func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]string, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
if labelName == "__name__" {
|
if labelName == "__name__" {
|
||||||
labelName = ""
|
labelName = ""
|
||||||
}
|
}
|
||||||
|
@ -572,6 +578,9 @@ func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]stri
|
||||||
|
|
||||||
// GetLabelEntries returns all the label entries for at until the given deadline.
|
// GetLabelEntries returns all the label entries for at until the given deadline.
|
||||||
func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, bool, error) {
|
func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
type nodeResult struct {
|
type nodeResult struct {
|
||||||
labelEntries []storage.TagEntry
|
labelEntries []storage.TagEntry
|
||||||
|
@ -677,6 +686,9 @@ func deduplicateStrings(a []string) []string {
|
||||||
|
|
||||||
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||||
func GetTSDBStatusForDate(at *auth.Token, deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) {
|
func GetTSDBStatusForDate(at *auth.Token, deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
type nodeResult struct {
|
type nodeResult struct {
|
||||||
status *storage.TSDBStatus
|
status *storage.TSDBStatus
|
||||||
|
@ -778,6 +790,9 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
|
||||||
|
|
||||||
// GetSeriesCount returns the number of unique series for the given at.
|
// GetSeriesCount returns the number of unique series for the given at.
|
||||||
func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
|
func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return 0, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
type nodeResult struct {
|
type nodeResult struct {
|
||||||
n uint64
|
n uint64
|
||||||
|
@ -857,6 +872,9 @@ func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
|
||||||
|
|
||||||
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
|
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
|
||||||
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) {
|
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) {
|
||||||
|
if deadline.Exceeded() {
|
||||||
|
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||||
|
}
|
||||||
requestData := sq.Marshal(nil)
|
requestData := sq.Marshal(nil)
|
||||||
|
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
|
@ -1138,7 +1156,8 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot obtain connection from a pool: %w", err)
|
return fmt.Errorf("cannot obtain connection from a pool: %w", err)
|
||||||
}
|
}
|
||||||
if err := bc.SetDeadline(deadline.Deadline); err != nil {
|
d := time.Unix(int64(deadline.deadline), 0)
|
||||||
|
if err := bc.SetDeadline(d); err != nil {
|
||||||
_ = bc.Close()
|
_ = bc.Close()
|
||||||
logger.Panicf("FATAL: cannot set connection deadline: %s", err)
|
logger.Panicf("FATAL: cannot set connection deadline: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -1626,7 +1645,7 @@ const maxConcurrentQueriesPerStorageNode = 100
|
||||||
|
|
||||||
// Deadline contains deadline with the corresponding timeout for pretty error messages.
|
// Deadline contains deadline with the corresponding timeout for pretty error messages.
|
||||||
type Deadline struct {
|
type Deadline struct {
|
||||||
Deadline time.Time
|
deadline uint64
|
||||||
|
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
flagHint string
|
flagHint string
|
||||||
|
@ -1636,14 +1655,19 @@ type Deadline struct {
|
||||||
//
|
//
|
||||||
// flagHint must contain a hit for command-line flag, which could be used
|
// flagHint must contain a hit for command-line flag, which could be used
|
||||||
// in order to increase timeout.
|
// in order to increase timeout.
|
||||||
func NewDeadline(timeout time.Duration, flagHint string) Deadline {
|
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
|
||||||
return Deadline{
|
return Deadline{
|
||||||
Deadline: time.Now().Add(timeout),
|
deadline: uint64(startTime.Add(timeout).Unix()),
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
flagHint: flagHint,
|
flagHint: flagHint,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exceeded returns true if deadline is exceeded.
|
||||||
|
func (d *Deadline) Exceeded() bool {
|
||||||
|
return fasttime.UnixTimestamp() > d.deadline
|
||||||
|
}
|
||||||
|
|
||||||
// String returns human-readable string representation for d.
|
// String returns human-readable string representation for d.
|
||||||
func (d *Deadline) String() string {
|
func (d *Deadline) String() string {
|
||||||
return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint)
|
return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint)
|
||||||
|
|
|
@ -48,7 +48,7 @@ const defaultStep = 5 * 60 * 1000
|
||||||
|
|
||||||
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
|
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
|
||||||
func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
ct := currentTime()
|
ct := startTime.UnixNano() / 1e6
|
||||||
if err := r.ParseForm(); err != nil {
|
if err := r.ParseForm(); err != nil {
|
||||||
return fmt.Errorf("cannot parse request form values: %w", err)
|
return fmt.Errorf("cannot parse request form values: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
if start >= end {
|
if start >= end {
|
||||||
start = end - defaultStep
|
start = end - defaultStep
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,7 @@ var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/fe
|
||||||
|
|
||||||
// ExportHandler exports data in raw format from /api/v1/export.
|
// ExportHandler exports data in raw format from /api/v1/export.
|
||||||
func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
ct := currentTime()
|
ct := startTime.UnixNano() / 1e6
|
||||||
if err := r.ParseForm(); err != nil {
|
if err := r.ParseForm(); err != nil {
|
||||||
return fmt.Errorf("cannot parse request form values: %w", err)
|
return fmt.Errorf("cannot parse request form values: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -147,7 +147,7 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
|
||||||
}
|
}
|
||||||
format := r.FormValue("format")
|
format := r.FormValue("format")
|
||||||
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
|
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
|
||||||
deadline := getDeadlineForExport(r)
|
deadline := getDeadlineForExport(r, startTime)
|
||||||
if start >= end {
|
if start >= end {
|
||||||
end = start + defaultStep
|
end = start + defaultStep
|
||||||
}
|
}
|
||||||
|
@ -261,7 +261,7 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error {
|
||||||
if len(matches) == 0 {
|
if len(matches) == 0 {
|
||||||
return fmt.Errorf("missing `match[]` arg")
|
return fmt.Errorf("missing `match[]` arg")
|
||||||
}
|
}
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
tagFilterss, err := getTagFilterssFromMatches(matches)
|
tagFilterss, err := getTagFilterssFromMatches(matches)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -323,8 +323,7 @@ var httpClient = &http.Client{
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
|
||||||
func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
|
func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
|
|
||||||
if err := r.ParseForm(); err != nil {
|
if err := r.ParseForm(); err != nil {
|
||||||
return fmt.Errorf("cannot parse form values: %w", err)
|
return fmt.Errorf("cannot parse form values: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -345,7 +344,7 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
|
||||||
if len(matches) == 0 {
|
if len(matches) == 0 {
|
||||||
matches = []string{fmt.Sprintf("{%s!=''}", labelName)}
|
matches = []string{fmt.Sprintf("{%s!=''}", labelName)}
|
||||||
}
|
}
|
||||||
ct := currentTime()
|
ct := startTime.UnixNano() / 1e6
|
||||||
end, err := getTime(r, "end", ct)
|
end, err := getTime(r, "end", ct)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -432,7 +431,7 @@ var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="
|
||||||
|
|
||||||
// LabelsCountHandler processes /api/v1/labels/count request.
|
// LabelsCountHandler processes /api/v1/labels/count request.
|
||||||
func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func LabelsCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline)
|
labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf(`cannot obtain label entries: %w`, err)
|
return fmt.Errorf(`cannot obtain label entries: %w`, err)
|
||||||
|
@ -454,7 +453,7 @@ const secsPerDay = 3600 * 24
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
|
||||||
func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func TSDBStatusHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
if err := r.ParseForm(); err != nil {
|
if err := r.ParseForm(); err != nil {
|
||||||
return fmt.Errorf("cannot parse form values: %w", err)
|
return fmt.Errorf("cannot parse form values: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -501,8 +500,7 @@ var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
|
||||||
func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
|
|
||||||
if err := r.ParseForm(); err != nil {
|
if err := r.ParseForm(); err != nil {
|
||||||
return fmt.Errorf("cannot parse form values: %w", err)
|
return fmt.Errorf("cannot parse form values: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -521,7 +519,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
|
||||||
if len(matches) == 0 {
|
if len(matches) == 0 {
|
||||||
matches = []string{"{__name__!=''}"}
|
matches = []string{"{__name__!=''}"}
|
||||||
}
|
}
|
||||||
ct := currentTime()
|
ct := startTime.UnixNano() / 1e6
|
||||||
end, err := getTime(r, "end", ct)
|
end, err := getTime(r, "end", ct)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -596,7 +594,7 @@ var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/
|
||||||
|
|
||||||
// SeriesCountHandler processes /api/v1/series/count request.
|
// SeriesCountHandler processes /api/v1/series/count request.
|
||||||
func SeriesCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func SeriesCountHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
n, isPartial, err := netstorage.GetSeriesCount(at, deadline)
|
n, isPartial, err := netstorage.GetSeriesCount(at, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot obtain series count: %w", err)
|
return fmt.Errorf("cannot obtain series count: %w", err)
|
||||||
|
@ -617,8 +615,7 @@ var seriesCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
|
||||||
func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
ct := currentTime()
|
ct := startTime.UnixNano() / 1e6
|
||||||
|
|
||||||
if err := r.ParseForm(); err != nil {
|
if err := r.ParseForm(); err != nil {
|
||||||
return fmt.Errorf("cannot parse form values: %w", err)
|
return fmt.Errorf("cannot parse form values: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -639,7 +636,7 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
|
|
||||||
tagFilterss, err := getTagFilterssFromMatches(matches)
|
tagFilterss, err := getTagFilterssFromMatches(matches)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -697,8 +694,7 @@ var seriesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
|
||||||
func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
ct := currentTime()
|
ct := startTime.UnixNano() / 1e6
|
||||||
|
|
||||||
query := r.FormValue("query")
|
query := r.FormValue("query")
|
||||||
if len(query) == 0 {
|
if len(query) == 0 {
|
||||||
return fmt.Errorf("missing `query` arg")
|
return fmt.Errorf("missing `query` arg")
|
||||||
|
@ -718,7 +714,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
|
||||||
if step <= 0 {
|
if step <= 0 {
|
||||||
step = defaultStep
|
step = defaultStep
|
||||||
}
|
}
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
|
|
||||||
if len(query) > *maxQueryLen {
|
if len(query) > *maxQueryLen {
|
||||||
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen)
|
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), *maxQueryLen)
|
||||||
|
@ -766,7 +762,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
|
||||||
start -= offset
|
start -= offset
|
||||||
end := start
|
end := start
|
||||||
start = end - window
|
start = end - window
|
||||||
if err := queryRangeHandler(at, w, childQuery, start, end, step, r, ct); err != nil {
|
if err := queryRangeHandler(startTime, at, w, childQuery, start, end, step, r, ct); err != nil {
|
||||||
return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", childQuery, start, end, step, err)
|
return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", childQuery, start, end, step, err)
|
||||||
}
|
}
|
||||||
queryDuration.UpdateDuration(startTime)
|
queryDuration.UpdateDuration(startTime)
|
||||||
|
@ -814,8 +810,7 @@ func parsePositiveDuration(s string, step int64) (int64, error) {
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
|
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
|
||||||
func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
ct := currentTime()
|
ct := startTime.UnixNano() / 1e6
|
||||||
|
|
||||||
query := r.FormValue("query")
|
query := r.FormValue("query")
|
||||||
if len(query) == 0 {
|
if len(query) == 0 {
|
||||||
return fmt.Errorf("missing `query` arg")
|
return fmt.Errorf("missing `query` arg")
|
||||||
|
@ -832,15 +827,15 @@ func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := queryRangeHandler(at, w, query, start, end, step, r, ct); err != nil {
|
if err := queryRangeHandler(startTime, at, w, query, start, end, step, r, ct); err != nil {
|
||||||
return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", query, start, end, step, err)
|
return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", query, start, end, step, err)
|
||||||
}
|
}
|
||||||
queryRangeDuration.UpdateDuration(startTime)
|
queryRangeDuration.UpdateDuration(startTime)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func queryRangeHandler(at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error {
|
func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error {
|
||||||
deadline := getDeadlineForQuery(r)
|
deadline := getDeadlineForQuery(r, startTime)
|
||||||
mayCache := !getBool(r, "nocache")
|
mayCache := !getBool(r, "nocache")
|
||||||
lookbackDelta, err := getMaxLookback(r)
|
lookbackDelta, err := getMaxLookback(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1047,17 +1042,17 @@ func getMaxLookback(r *http.Request) (int64, error) {
|
||||||
return getDuration(r, "max_lookback", d)
|
return getDuration(r, "max_lookback", d)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDeadlineForQuery(r *http.Request) netstorage.Deadline {
|
func getDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline {
|
||||||
dMax := maxQueryDuration.Milliseconds()
|
dMax := maxQueryDuration.Milliseconds()
|
||||||
return getDeadlineWithMaxDuration(r, dMax, "-search.maxQueryDuration")
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDeadlineForExport(r *http.Request) netstorage.Deadline {
|
func getDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline {
|
||||||
dMax := maxExportDuration.Milliseconds()
|
dMax := maxExportDuration.Milliseconds()
|
||||||
return getDeadlineWithMaxDuration(r, dMax, "-search.maxExportDuration")
|
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDeadlineWithMaxDuration(r *http.Request, dMax int64, flagHint string) netstorage.Deadline {
|
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline {
|
||||||
d, err := getDuration(r, "timeout", 0)
|
d, err := getDuration(r, "timeout", 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d = 0
|
d = 0
|
||||||
|
@ -1066,7 +1061,7 @@ func getDeadlineWithMaxDuration(r *http.Request, dMax int64, flagHint string) ne
|
||||||
d = dMax
|
d = dMax
|
||||||
}
|
}
|
||||||
timeout := time.Duration(d) * time.Millisecond
|
timeout := time.Duration(d) * time.Millisecond
|
||||||
return netstorage.NewDeadline(timeout, flagHint)
|
return netstorage.NewDeadline(startTime, timeout, flagHint)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBool(r *http.Request, argKey string) bool {
|
func getBool(r *http.Request, argKey string) bool {
|
||||||
|
@ -1079,10 +1074,6 @@ func getBool(r *http.Request, argKey string) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func currentTime() int64 {
|
|
||||||
return int64(fasttime.UnixTimestamp() * 1000)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) {
|
func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) {
|
||||||
tagFilterss := make([][]storage.TagFilter, 0, len(matches))
|
tagFilterss := make([][]storage.TagFilter, 0, len(matches))
|
||||||
for _, match := range matches {
|
for _, match := range matches {
|
||||||
|
|
|
@ -31,7 +31,7 @@ func TestExecSuccess(t *testing.T) {
|
||||||
Start: start,
|
Start: start,
|
||||||
End: end,
|
End: end,
|
||||||
Step: step,
|
Step: step,
|
||||||
Deadline: netstorage.NewDeadline(time.Minute, ""),
|
Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""),
|
||||||
}
|
}
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
result, err := Exec(ec, q, false)
|
result, err := Exec(ec, q, false)
|
||||||
|
@ -5637,7 +5637,7 @@ func TestExecError(t *testing.T) {
|
||||||
Start: 1000,
|
Start: 1000,
|
||||||
End: 2000,
|
End: 2000,
|
||||||
Step: 100,
|
Step: 100,
|
||||||
Deadline: netstorage.NewDeadline(time.Minute, ""),
|
Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""),
|
||||||
}
|
}
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
rv, err := Exec(ec, q, false)
|
rv, err := Exec(ec, q, false)
|
||||||
|
|
Loading…
Reference in a new issue