2019-05-22 21:16:55 +00:00
package prometheus
import (
"flag"
"fmt"
"math"
"net/http"
"runtime"
"strconv"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
2019-05-22 21:23:23 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/quicktemplate"
)
var (
2019-06-29 22:27:03 +00:00
maxQueryDuration = flag . Duration ( "search.maxQueryDuration" , time . Second * 30 , "The maximum time for search query execution" )
maxQueryLen = flag . Int ( "search.maxQueryLen" , 16 * 1024 , "The maximum search query length in bytes" )
denyPartialResponse = flag . Bool ( "search.denyPartialResponse" , false , "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability" )
selectNodes = flagutil . NewArray ( "selectNode" , "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481" )
2019-05-22 21:16:55 +00:00
)
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// Latency for data processing pipeline, i.e. the time between data is ignested
// into the system and the time it becomes visible to search.
const latencyOffset = 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
2019-05-22 21:23:23 +00:00
func FederateHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
ct := currentTime ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %s" , err )
}
matches := r . Form [ "match[]" ]
2019-06-20 11:05:07 +00:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-06-06 19:17:13 +00:00
maxLookback , err := getDuration ( r , "max_lookback" , defaultStep )
if err != nil {
return err
}
start , err := getTime ( r , "start" , ct - maxLookback )
if err != nil {
return err
}
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
deadline := getDeadline ( r )
if start >= end {
start = end - defaultStep
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
2019-05-22 21:23:23 +00:00
AccountID : at . AccountID ,
ProjectID : at . ProjectID ,
2019-05-22 21:16:55 +00:00
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-06-29 22:27:03 +00:00
rss , isPartial , err := netstorage . ProcessSearchQuery ( at , sq , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
2019-06-29 22:27:03 +00:00
if isPartial && getDenyPartialResponse ( r ) {
return fmt . Errorf ( "cannot return full response, since some of vmstorage nodes are unavailable" )
}
2019-05-22 21:16:55 +00:00
resultsCh := make ( chan * quicktemplate . ByteBuffer )
doneCh := make ( chan error )
go func ( ) {
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-05-22 21:16:55 +00:00
bb := quicktemplate . AcquireByteBuffer ( )
WriteFederate ( bb , rs )
resultsCh <- bb
} )
close ( resultsCh )
doneCh <- err
} ( )
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
for bb := range resultsCh {
w . Write ( bb . B )
quicktemplate . ReleaseByteBuffer ( bb )
}
err = <- doneCh
if err != nil {
return fmt . Errorf ( "error during data fetching: %s" , err )
}
federateDuration . UpdateDuration ( startTime )
return nil
}
var federateDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/federate"} ` )
// ExportHandler exports data in raw format from /api/v1/export.
2019-05-22 21:23:23 +00:00
func ExportHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
ct := currentTime ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %s" , err )
}
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
// Maintain backwards compatibility
match := r . FormValue ( "match" )
2019-06-20 11:05:07 +00:00
if len ( match ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-05-22 21:16:55 +00:00
matches = [ ] string { match }
}
2019-06-06 19:17:13 +00:00
start , err := getTime ( r , "start" , 0 )
if err != nil {
return err
}
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
format := r . FormValue ( "format" )
deadline := getDeadline ( r )
if start >= end {
start = end - defaultStep
}
2019-05-22 21:23:23 +00:00
if err := exportHandler ( at , w , matches , start , end , format , deadline ) ; err != nil {
2019-05-22 21:16:55 +00:00
return err
}
exportDuration . UpdateDuration ( startTime )
return nil
}
var exportDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export"} ` )
2019-05-22 21:23:23 +00:00
func exportHandler ( at * auth . Token , w http . ResponseWriter , matches [ ] string , start , end int64 , format string , deadline netstorage . Deadline ) error {
2019-05-22 21:16:55 +00:00
writeResponseFunc := WriteExportStdResponse
writeLineFunc := WriteExportJSONLine
contentType := "application/json"
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = WriteExportPrometheusLine
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = WriteExportPromAPILine
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
2019-05-22 21:23:23 +00:00
AccountID : at . AccountID ,
ProjectID : at . ProjectID ,
2019-05-22 21:16:55 +00:00
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-05-22 21:23:23 +00:00
rss , isPartial , err := netstorage . ProcessSearchQuery ( at , sq , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
2019-05-22 21:23:23 +00:00
if isPartial {
rss . Cancel ( )
2019-06-29 22:27:03 +00:00
return fmt . Errorf ( "cannot return full response, since some of vmstorage nodes are unavailable" )
2019-05-22 21:23:23 +00:00
}
2019-05-22 21:16:55 +00:00
resultsCh := make ( chan * quicktemplate . ByteBuffer , runtime . GOMAXPROCS ( - 1 ) )
doneCh := make ( chan error )
go func ( ) {
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-05-22 21:16:55 +00:00
bb := quicktemplate . AcquireByteBuffer ( )
writeLineFunc ( bb , rs )
resultsCh <- bb
} )
close ( resultsCh )
doneCh <- err
} ( )
w . Header ( ) . Set ( "Content-Type" , contentType )
writeResponseFunc ( w , resultsCh )
2019-05-24 11:54:31 +00:00
// Consume all the data from resultsCh in the event writeResponseFunc
// fails to consume all the data.
for bb := range resultsCh {
quicktemplate . ReleaseByteBuffer ( bb )
}
2019-05-22 21:16:55 +00:00
err = <- doneCh
if err != nil {
return fmt . Errorf ( "error during data fetching: %s" , err )
}
return nil
}
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
2019-05-22 21:23:23 +00:00
func DeleteHandler ( at * auth . Token , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %s" , err )
}
if r . FormValue ( "start" ) != "" || r . FormValue ( "end" ) != "" {
return fmt . Errorf ( "start and end aren't supported. Remove these args from the query in order to delete all the matching metrics" )
}
matches := r . Form [ "match[]" ]
2019-06-20 11:05:07 +00:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-05-22 21:23:23 +00:00
deadline := getDeadline ( r )
2019-05-22 21:16:55 +00:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
2019-05-22 21:23:23 +00:00
AccountID : at . AccountID ,
ProjectID : at . ProjectID ,
2019-05-22 21:16:55 +00:00
TagFilterss : tagFilterss ,
}
2019-05-22 21:23:23 +00:00
deletedCount , err := netstorage . DeleteSeries ( at , sq , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot delete time series matching %q: %s" , matches , err )
}
if deletedCount > 0 {
2019-05-22 21:23:23 +00:00
// Reset rollup result cache on all the vmselect nodes,
// since the cache may contain deleted data.
// TODO: reset only cache for (account, project)
resetRollupResultCaches ( )
2019-05-22 21:16:55 +00:00
}
deleteDuration . UpdateDuration ( startTime )
return nil
}
var deleteDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/admin/tsdb/delete_series"} ` )
2019-05-22 21:23:23 +00:00
func resetRollupResultCaches ( ) {
2019-06-18 07:26:44 +00:00
if len ( * selectNodes ) == 0 {
2019-05-22 21:23:23 +00:00
logger . Panicf ( "BUG: missing -selectNode flag" )
}
2019-06-18 07:26:44 +00:00
for _ , selectNode := range * selectNodes {
2019-05-22 21:23:23 +00:00
callURL := fmt . Sprintf ( "http://%s/internal/resetRollupResultCache" , selectNode )
resp , err := httpClient . Get ( callURL )
if err != nil {
logger . Errorf ( "error when accessing %q: %s" , callURL , err )
resetRollupResultCacheErrors . Inc ( )
continue
}
if resp . StatusCode != http . StatusOK {
_ = resp . Body . Close ( )
logger . Errorf ( "unexpected status code at %q; got %d; want %d" , callURL , resp . StatusCode , http . StatusOK )
resetRollupResultCacheErrors . Inc ( )
continue
}
_ = resp . Body . Close ( )
}
resetRollupResultCacheCalls . Inc ( )
}
var (
resetRollupResultCacheErrors = metrics . NewCounter ( "vm_reset_rollup_result_cache_errors_total" )
resetRollupResultCacheCalls = metrics . NewCounter ( "vm_reset_rollup_result_cache_calls_total" )
)
var httpClient = & http . Client {
Timeout : time . Second * 5 ,
}
2019-05-22 21:16:55 +00:00
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
2019-05-22 21:23:23 +00:00
func LabelValuesHandler ( at * auth . Token , labelName string , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
deadline := getDeadline ( r )
2019-06-29 22:27:03 +00:00
labelValues , isPartial , err := netstorage . GetLabelValues ( at , labelName , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( ` cannot obtain label values for %q: %s ` , labelName , err )
}
2019-06-29 22:27:03 +00:00
if isPartial && getDenyPartialResponse ( r ) {
return fmt . Errorf ( "cannot return full response, since some of vmstorage nodes are unavailable" )
}
2019-05-22 21:16:55 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteLabelValuesResponse ( w , labelValues )
labelValuesDuration . UpdateDuration ( startTime )
return nil
}
var labelValuesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/label/ { }/values"} ` )
2019-06-10 15:55:20 +00:00
// LabelsCountHandler processes /api/v1/labels/count request.
func LabelsCountHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
deadline := getDeadline ( r )
2019-06-29 22:27:03 +00:00
labelEntries , isPartial , err := netstorage . GetLabelEntries ( at , deadline )
2019-06-10 15:55:20 +00:00
if err != nil {
return fmt . Errorf ( ` cannot obtain label entries: %s ` , err )
}
2019-06-29 22:27:03 +00:00
if isPartial && getDenyPartialResponse ( r ) {
return fmt . Errorf ( "cannot return full response, since some of vmstorage nodes are unavailable" )
}
2019-06-10 15:55:20 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteLabelsCountResponse ( w , labelEntries )
labelsCountDuration . UpdateDuration ( startTime )
return nil
}
var labelsCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels/count"} ` )
2019-05-22 21:16:55 +00:00
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
2019-05-22 21:23:23 +00:00
func LabelsHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
deadline := getDeadline ( r )
2019-06-29 22:27:03 +00:00
labels , isPartial , err := netstorage . GetLabels ( at , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot obtain labels: %s" , err )
}
2019-06-29 22:27:03 +00:00
if isPartial && getDenyPartialResponse ( r ) {
return fmt . Errorf ( "cannot return full response, since some of vmstorage nodes are unavailable" )
}
2019-05-22 21:16:55 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteLabelsResponse ( w , labels )
labelsDuration . UpdateDuration ( startTime )
return nil
}
var labelsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels"} ` )
// SeriesCountHandler processes /api/v1/series/count request.
2019-05-22 21:23:23 +00:00
func SeriesCountHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
deadline := getDeadline ( r )
2019-06-29 22:27:03 +00:00
n , isPartial , err := netstorage . GetSeriesCount ( at , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot obtain series count: %s" , err )
}
2019-06-29 22:27:03 +00:00
if isPartial && getDenyPartialResponse ( r ) {
return fmt . Errorf ( "cannot return full response, since some of vmstorage nodes are unavailable" )
}
2019-05-22 21:23:23 +00:00
2019-05-22 21:16:55 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteSeriesCountResponse ( w , n )
seriesCountDuration . UpdateDuration ( startTime )
return nil
}
var seriesCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series/count"} ` )
// SeriesHandler processes /api/v1/series request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
2019-05-22 21:23:23 +00:00
func SeriesHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
ct := currentTime ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse form values: %s" , err )
}
matches := r . Form [ "match[]" ]
2019-06-20 11:05:07 +00:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-07-11 14:10:12 +00:00
// Set start to minTimeMsecs by default as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
start , err := getTime ( r , "start" , minTimeMsecs )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
deadline := getDeadline ( r )
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
if start >= end {
start = end - defaultStep
}
sq := & storage . SearchQuery {
2019-05-22 21:23:23 +00:00
AccountID : at . AccountID ,
ProjectID : at . ProjectID ,
2019-05-22 21:16:55 +00:00
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-06-29 22:27:03 +00:00
rss , isPartial , err := netstorage . ProcessSearchQuery ( at , sq , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
2019-06-29 22:27:03 +00:00
if isPartial && getDenyPartialResponse ( r ) {
return fmt . Errorf ( "cannot return full response, since some of vmstorage nodes are unavailable" )
}
2019-05-22 21:16:55 +00:00
resultsCh := make ( chan * quicktemplate . ByteBuffer )
doneCh := make ( chan error )
go func ( ) {
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-05-22 21:16:55 +00:00
bb := quicktemplate . AcquireByteBuffer ( )
writemetricNameObject ( bb , & rs . MetricName )
resultsCh <- bb
} )
close ( resultsCh )
doneCh <- err
} ( )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteSeriesResponse ( w , resultsCh )
// Consume all the data from resultsCh in the event WriteSeriesResponse
2019-05-24 11:54:31 +00:00
// fails to consume all the data.
2019-05-22 21:16:55 +00:00
for bb := range resultsCh {
quicktemplate . ReleaseByteBuffer ( bb )
}
err = <- doneCh
if err != nil {
return fmt . Errorf ( "error during data fetching: %s" , err )
}
seriesDuration . UpdateDuration ( startTime )
return nil
}
var seriesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series"} ` )
// QueryHandler processes /api/v1/query request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
2019-05-22 21:23:23 +00:00
func QueryHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
ct := currentTime ( )
query := r . FormValue ( "query" )
2019-06-20 11:05:07 +00:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2019-06-06 19:17:13 +00:00
start , err := getTime ( r , "time" , ct )
if err != nil {
return err
}
step , err := getDuration ( r , "step" , latencyOffset )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
deadline := getDeadline ( r )
if len ( query ) > * maxQueryLen {
return fmt . Errorf ( ` too long query; got %d bytes; mustn't exceed %d bytes ` , len ( query ) , * maxQueryLen )
}
if ct - start < latencyOffset {
start -= latencyOffset
}
if childQuery , windowStr , offsetStr := promql . IsMetricSelectorWithRollup ( query ) ; childQuery != "" {
var window int64
if len ( windowStr ) > 0 {
var err error
window , err = promql . DurationValue ( windowStr , step )
if err != nil {
return err
}
}
var offset int64
if len ( offsetStr ) > 0 {
var err error
offset , err = promql . DurationValue ( offsetStr , step )
if err != nil {
return err
}
}
start -= offset
end := start
start = end - window
2019-05-22 21:23:23 +00:00
if err := exportHandler ( at , w , [ ] string { childQuery } , start , end , "promapi" , deadline ) ; err != nil {
2019-05-22 21:16:55 +00:00
return err
}
queryDuration . UpdateDuration ( startTime )
return nil
}
ec := promql . EvalConfig {
2019-05-22 21:23:23 +00:00
AuthToken : at ,
Start : start ,
End : start ,
Step : step ,
Deadline : deadline ,
2019-06-29 22:27:03 +00:00
DenyPartialResponse : getDenyPartialResponse ( r ) ,
2019-05-22 21:16:55 +00:00
}
2019-07-01 14:14:49 +00:00
result , err := promql . Exec ( & ec , query , true )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot execute %q: %s" , query , err )
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteQueryResponse ( w , result )
queryDuration . UpdateDuration ( startTime )
return nil
}
var queryDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query"} ` )
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
2019-05-22 21:23:23 +00:00
func QueryRangeHandler ( at * auth . Token , w http . ResponseWriter , r * http . Request ) error {
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
ct := currentTime ( )
query := r . FormValue ( "query" )
2019-06-20 11:05:07 +00:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2019-06-06 19:17:13 +00:00
start , err := getTime ( r , "start" , ct - defaultStep )
if err != nil {
return err
}
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
step , err := getDuration ( r , "step" , defaultStep )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
deadline := getDeadline ( r )
mayCache := ! getBool ( r , "nocache" )
// Validate input args.
if len ( query ) > * maxQueryLen {
return fmt . Errorf ( ` too long query; got %d bytes; mustn't exceed %d bytes ` , len ( query ) , * maxQueryLen )
}
if start > end {
start = end
}
if err := promql . ValidateMaxPointsPerTimeseries ( start , end , step ) ; err != nil {
return err
}
start , end = promql . AdjustStartEnd ( start , end , step )
ec := promql . EvalConfig {
2019-05-22 21:23:23 +00:00
AuthToken : at ,
Start : start ,
End : end ,
Step : step ,
Deadline : deadline ,
MayCache : mayCache ,
2019-06-29 22:27:03 +00:00
DenyPartialResponse : getDenyPartialResponse ( r ) ,
2019-05-22 21:16:55 +00:00
}
2019-07-01 14:14:49 +00:00
result , err := promql . Exec ( & ec , query , false )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot execute %q: %s" , query , err )
}
if ct - end < latencyOffset {
2019-07-04 06:14:15 +00:00
result = adjustLastPoints ( result )
2019-05-22 21:16:55 +00:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteQueryRangeResponse ( w , result )
queryRangeDuration . UpdateDuration ( startTime )
return nil
}
var queryRangeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query_range"} ` )
// adjustLastPoints substitutes the last point values with the previous
// point values, since the last points may contain garbage.
2019-07-04 06:14:15 +00:00
func adjustLastPoints ( tss [ ] netstorage . Result ) [ ] netstorage . Result {
2019-05-22 21:16:55 +00:00
if len ( tss ) == 0 {
2019-07-04 06:14:15 +00:00
return nil
2019-05-22 21:16:55 +00:00
}
// Search for the last non-NaN value across all the timeseries.
lastNonNaNIdx := - 1
for i := range tss {
2019-07-04 06:14:15 +00:00
values := tss [ i ] . Values
j := len ( values ) - 1
for j >= 0 && math . IsNaN ( values [ j ] ) {
2019-05-22 21:16:55 +00:00
j --
}
if j > lastNonNaNIdx {
lastNonNaNIdx = j
}
}
if lastNonNaNIdx == - 1 {
// All timeseries contain only NaNs.
2019-07-04 06:14:15 +00:00
return nil
2019-05-22 21:16:55 +00:00
}
2019-07-04 06:14:15 +00:00
// Substitute the last two values starting from lastNonNaNIdx
2019-05-22 21:16:55 +00:00
// with the previous values for each timeseries.
for i := range tss {
2019-07-04 06:14:15 +00:00
values := tss [ i ] . Values
for j := 0 ; j < 2 ; j ++ {
2019-05-22 21:16:55 +00:00
idx := lastNonNaNIdx + j
2019-07-04 06:14:15 +00:00
if idx <= 0 || idx >= len ( values ) || math . IsNaN ( values [ idx - 1 ] ) {
2019-05-22 21:16:55 +00:00
continue
}
2019-07-04 06:14:15 +00:00
values [ idx ] = values [ idx - 1 ]
2019-05-22 21:16:55 +00:00
}
}
2019-07-04 06:14:15 +00:00
return tss
2019-05-22 21:16:55 +00:00
}
2019-06-06 19:17:13 +00:00
func getTime ( r * http . Request , argKey string , defaultValue int64 ) ( int64 , error ) {
2019-05-22 21:16:55 +00:00
argValue := r . FormValue ( argKey )
if len ( argValue ) == 0 {
2019-06-06 19:17:13 +00:00
return defaultValue , nil
2019-05-22 21:16:55 +00:00
}
secs , err := strconv . ParseFloat ( argValue , 64 )
if err != nil {
// Try parsing string format
t , err := time . Parse ( time . RFC3339 , argValue )
if err != nil {
2019-07-07 16:44:26 +00:00
// Handle Prometheus'-provided minTime and maxTime.
// See https://github.com/prometheus/client_golang/issues/614
switch argValue {
case prometheusMinTimeFormatted :
return minTimeMsecs , nil
case prometheusMaxTimeFormatted :
return maxTimeMsecs , nil
}
2019-06-06 19:17:13 +00:00
return 0 , fmt . Errorf ( "cannot parse %q=%q: %s" , argKey , argValue , err )
2019-05-22 21:16:55 +00:00
}
secs = float64 ( t . UnixNano ( ) ) / 1e9
}
msecs := int64 ( secs * 1e3 )
2019-07-11 14:07:20 +00:00
if msecs < minTimeMsecs {
msecs = 0
}
if msecs > maxTimeMsecs {
msecs = maxTimeMsecs
2019-05-22 21:16:55 +00:00
}
2019-06-06 19:17:13 +00:00
return msecs , nil
2019-05-22 21:16:55 +00:00
}
2019-07-07 16:44:26 +00:00
var (
// These constants were obtained from https://github.com/prometheus/prometheus/blob/91d7175eaac18b00e370965f3a8186cc40bf9f55/web/api/v1/api.go#L442
// See https://github.com/prometheus/client_golang/issues/614 for details.
prometheusMinTimeFormatted = time . Unix ( math . MinInt64 / 1000 + 62135596801 , 0 ) . UTC ( ) . Format ( time . RFC3339Nano )
prometheusMaxTimeFormatted = time . Unix ( math . MaxInt64 / 1000 - 62135596801 , 999999999 ) . UTC ( ) . Format ( time . RFC3339Nano )
)
2019-05-22 21:16:55 +00:00
const (
// These values prevent from overflow when storing msec-precision time in int64.
2019-07-11 14:07:20 +00:00
minTimeMsecs = 0 // use 0 instead of `int64(-1<<63) / 1e6` because the storage engine doesn't actually support negative time
2019-05-22 21:16:55 +00:00
maxTimeMsecs = int64 ( 1 << 63 - 1 ) / 1e6
)
2019-06-06 19:17:13 +00:00
func getDuration ( r * http . Request , argKey string , defaultValue int64 ) ( int64 , error ) {
2019-05-22 21:16:55 +00:00
argValue := r . FormValue ( argKey )
if len ( argValue ) == 0 {
2019-06-06 19:17:13 +00:00
return defaultValue , nil
2019-05-22 21:16:55 +00:00
}
secs , err := strconv . ParseFloat ( argValue , 64 )
if err != nil {
// Try parsing string format
d , err := time . ParseDuration ( argValue )
if err != nil {
2019-06-06 19:17:13 +00:00
return 0 , fmt . Errorf ( "cannot parse %q=%q: %s" , argKey , argValue , err )
2019-05-22 21:16:55 +00:00
}
secs = d . Seconds ( )
}
msecs := int64 ( secs * 1e3 )
if msecs <= 0 || msecs > maxDurationMsecs {
2019-06-27 16:36:15 +00:00
return 0 , fmt . Errorf ( "%q=%dms is out of allowed range [%d ... %d]" , argKey , msecs , 0 , int64 ( maxDurationMsecs ) )
2019-05-22 21:16:55 +00:00
}
2019-06-06 19:17:13 +00:00
return msecs , nil
2019-05-22 21:16:55 +00:00
}
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
func getDeadline ( r * http . Request ) netstorage . Deadline {
2019-06-06 19:17:13 +00:00
d , err := getDuration ( r , "timeout" , 0 )
if err != nil {
d = 0
}
2019-05-22 21:16:55 +00:00
dMax := int64 ( maxQueryDuration . Seconds ( ) * 1e3 )
if d <= 0 || d > dMax {
d = dMax
}
timeout := time . Duration ( d ) * time . Millisecond
return netstorage . NewDeadline ( timeout )
}
func getBool ( r * http . Request , argKey string ) bool {
argValue := r . FormValue ( argKey )
switch strings . ToLower ( argValue ) {
case "" , "0" , "f" , "false" , "no" :
return false
default :
return true
}
}
func currentTime ( ) int64 {
return int64 ( time . Now ( ) . UTC ( ) . Unix ( ) ) * 1e3
}
func getTagFilterssFromMatches ( matches [ ] string ) ( [ ] [ ] storage . TagFilter , error ) {
tagFilterss := make ( [ ] [ ] storage . TagFilter , 0 , len ( matches ) )
for _ , match := range matches {
tagFilters , err := promql . ParseMetricSelector ( match )
if err != nil {
return nil , fmt . Errorf ( "cannot parse %q: %s" , match , err )
}
tagFilterss = append ( tagFilterss , tagFilters )
}
return tagFilterss , nil
}
2019-06-29 22:27:03 +00:00
func getDenyPartialResponse ( r * http . Request ) bool {
if * denyPartialResponse {
return true
}
return getBool ( r , "deny_partial_response" )
}