2019-05-22 21:16:55 +00:00
package prometheus
import (
"flag"
"fmt"
"math"
"net/http"
"runtime"
2019-08-04 20:09:18 +00:00
"sort"
2019-05-22 21:16:55 +00:00
"strconv"
"strings"
2019-08-04 20:09:18 +00:00
"sync"
2019-05-22 21:16:55 +00:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
2019-08-04 20:09:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-12-25 19:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/metricsql"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/quicktemplate"
)
var (
2019-11-25 14:32:56 +00:00
latencyOffset = flag . Duration ( "search.latencyOffset" , time . Second * 30 , "The time when data points become visible in query results after the colection. " +
2019-10-28 10:30:50 +00:00
"Too small value can result in incomplete last points for query results" )
2020-01-22 13:37:13 +00:00
maxExportDuration = flag . Duration ( "search.maxExportDuration" , time . Hour * 24 * 30 , "The maximum duration for `/api/v1/export` call" )
2019-12-20 09:34:52 +00:00
maxQueryDuration = flag . Duration ( "search.maxQueryDuration" , time . Second * 30 , "The maximum duration for search query execution" )
maxQueryLen = flag . Int ( "search.maxQueryLen" , 16 * 1024 , "The maximum search query length in bytes" )
maxLookback = flag . Duration ( "search.maxLookback" , 0 , "Synonim to `-search.lookback-delta` from Prometheus. " +
2019-11-11 22:16:42 +00:00
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via `max_lookback` arg" )
2019-05-22 21:16:55 +00:00
)
// Default step used if not set.
const defaultStep = 5 * 60 * 1000
// FederateHandler implements /federate . See https://prometheus.io/docs/prometheus/latest/federation/
func FederateHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
ct := currentTime ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %s" , err )
}
matches := r . Form [ "match[]" ]
2019-06-20 11:05:07 +00:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-10-15 16:12:27 +00:00
lookbackDelta , err := getMaxLookback ( r )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2019-10-15 16:12:27 +00:00
if lookbackDelta <= 0 {
lookbackDelta = defaultStep
}
start , err := getTime ( r , "start" , ct - lookbackDelta )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-05-22 21:16:55 +00:00
if start >= end {
start = end - defaultStep
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-08-04 19:15:33 +00:00
rss , err := netstorage . ProcessSearchQuery ( sq , true , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
resultsCh := make ( chan * quicktemplate . ByteBuffer )
doneCh := make ( chan error )
go func ( ) {
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-05-22 21:16:55 +00:00
bb := quicktemplate . AcquireByteBuffer ( )
WriteFederate ( bb , rs )
resultsCh <- bb
} )
close ( resultsCh )
doneCh <- err
} ( )
w . Header ( ) . Set ( "Content-Type" , "text/plain" )
for bb := range resultsCh {
w . Write ( bb . B )
quicktemplate . ReleaseByteBuffer ( bb )
}
err = <- doneCh
if err != nil {
return fmt . Errorf ( "error during data fetching: %s" , err )
}
federateDuration . UpdateDuration ( startTime )
return nil
}
var federateDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/federate"} ` )
// ExportHandler exports data in raw format from /api/v1/export.
func ExportHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
ct := currentTime ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %s" , err )
}
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
// Maintain backwards compatibility
match := r . FormValue ( "match" )
2019-06-20 11:05:07 +00:00
if len ( match ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-05-22 21:16:55 +00:00
matches = [ ] string { match }
}
2019-06-06 19:17:13 +00:00
start , err := getTime ( r , "start" , 0 )
if err != nil {
return err
}
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
format := r . FormValue ( "format" )
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForExport ( r )
2019-05-22 21:16:55 +00:00
if start >= end {
2019-11-22 14:10:33 +00:00
end = start + defaultStep
2019-05-22 21:16:55 +00:00
}
if err := exportHandler ( w , matches , start , end , format , deadline ) ; err != nil {
2020-01-22 15:32:11 +00:00
return fmt . Errorf ( "error when exporting data for queries=%q on the time range (start=%d, end=%d): %s" , matches , start , end , err )
2019-05-22 21:16:55 +00:00
}
exportDuration . UpdateDuration ( startTime )
return nil
}
var exportDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/export"} ` )
func exportHandler ( w http . ResponseWriter , matches [ ] string , start , end int64 , format string , deadline netstorage . Deadline ) error {
writeResponseFunc := WriteExportStdResponse
writeLineFunc := WriteExportJSONLine
2019-11-26 15:44:24 +00:00
contentType := "application/stream+json"
2019-05-22 21:16:55 +00:00
if format == "prometheus" {
contentType = "text/plain"
writeLineFunc = WriteExportPrometheusLine
} else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse
writeLineFunc = WriteExportPromAPILine
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-08-04 19:15:33 +00:00
rss , err := netstorage . ProcessSearchQuery ( sq , true , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
resultsCh := make ( chan * quicktemplate . ByteBuffer , runtime . GOMAXPROCS ( - 1 ) )
doneCh := make ( chan error )
go func ( ) {
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-05-22 21:16:55 +00:00
bb := quicktemplate . AcquireByteBuffer ( )
writeLineFunc ( bb , rs )
resultsCh <- bb
} )
close ( resultsCh )
doneCh <- err
} ( )
w . Header ( ) . Set ( "Content-Type" , contentType )
writeResponseFunc ( w , resultsCh )
2019-05-24 11:54:31 +00:00
// Consume all the data from resultsCh in the event writeResponseFunc
// fails to consume all the data.
for bb := range resultsCh {
quicktemplate . ReleaseByteBuffer ( bb )
}
2019-05-22 21:16:55 +00:00
err = <- doneCh
if err != nil {
return fmt . Errorf ( "error during data fetching: %s" , err )
}
return nil
}
// DeleteHandler processes /api/v1/admin/tsdb/delete_series prometheus API request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
func DeleteHandler ( r * http . Request ) error {
startTime := time . Now ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse request form values: %s" , err )
}
if r . FormValue ( "start" ) != "" || r . FormValue ( "end" ) != "" {
return fmt . Errorf ( "start and end aren't supported. Remove these args from the query in order to delete all the matching metrics" )
}
matches := r . Form [ "match[]" ]
2019-06-20 11:05:07 +00:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-05-22 21:16:55 +00:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
sq := & storage . SearchQuery {
TagFilterss : tagFilterss ,
}
deletedCount , err := netstorage . DeleteSeries ( sq )
if err != nil {
return fmt . Errorf ( "cannot delete time series matching %q: %s" , matches , err )
}
if deletedCount > 0 {
promql . ResetRollupResultCache ( )
}
deleteDuration . UpdateDuration ( startTime )
return nil
}
var deleteDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/admin/tsdb/delete_series"} ` )
// LabelValuesHandler processes /api/v1/label/<labelName>/values request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#querying-label-values
func LabelValuesHandler ( labelName string , w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-08-04 20:09:18 +00:00
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse form values: %s" , err )
}
var labelValues [ ] string
if len ( r . Form [ "match[]" ] ) == 0 && len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
var err error
labelValues , err = netstorage . GetLabelValues ( labelName , deadline )
if err != nil {
return fmt . Errorf ( ` cannot obtain label values for %q: %s ` , labelName , err )
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/label/foo/values?match[]=foobar{baz="abc"}&start=...&end=...
// is equivalent to `label_values(foobar{baz="abc"}, foo)` call on the selected
// time range in Grafana templating.
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
matches = [ ] string { fmt . Sprintf ( "{%s!=''}" , labelName ) }
}
ct := currentTime ( )
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
start , err := getTime ( r , "start" , end - defaultStep )
if err != nil {
return err
}
labelValues , err = labelValuesWithMatches ( labelName , matches , start , end , deadline )
if err != nil {
return fmt . Errorf ( "cannot obtain label values for %q, match[]=%q, start=%d, end=%d: %s" , labelName , matches , start , end , err )
}
2019-05-22 21:16:55 +00:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteLabelValuesResponse ( w , labelValues )
labelValuesDuration . UpdateDuration ( startTime )
return nil
}
2019-08-04 20:09:18 +00:00
func labelValuesWithMatches ( labelName string , matches [ ] string , start , end int64 , deadline netstorage . Deadline ) ( [ ] string , error ) {
if len ( matches ) == 0 {
logger . Panicf ( "BUG: matches must be non-empty" )
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
2019-12-14 22:07:09 +00:00
for i , tfs := range tagFilterss {
// Add `labelName!=''` tag filter in order to filter out series without the labelName.
tagFilterss [ i ] = append ( tfs , storage . TagFilter {
Key : [ ] byte ( labelName ) ,
IsNegative : true ,
} )
}
2019-08-04 20:09:18 +00:00
if start >= end {
2019-11-22 14:10:33 +00:00
end = start + defaultStep
2019-08-04 20:09:18 +00:00
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
rss , err := netstorage . ProcessSearchQuery ( sq , false , deadline )
if err != nil {
return nil , fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
m := make ( map [ string ] struct { } )
var mLock sync . Mutex
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
labelValue := rs . MetricName . GetTagValue ( labelName )
if len ( labelValue ) == 0 {
return
}
mLock . Lock ( )
m [ string ( labelValue ) ] = struct { } { }
mLock . Unlock ( )
} )
if err != nil {
return nil , fmt . Errorf ( "error when data fetching: %s" , err )
}
labelValues := make ( [ ] string , 0 , len ( m ) )
for labelValue := range m {
labelValues = append ( labelValues , labelValue )
}
sort . Strings ( labelValues )
return labelValues , nil
}
2019-05-22 21:16:55 +00:00
var labelValuesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/label/ { }/values"} ` )
2019-06-10 15:55:20 +00:00
// LabelsCountHandler processes /api/v1/labels/count request.
func LabelsCountHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-06-10 15:55:20 +00:00
labelEntries , err := netstorage . GetLabelEntries ( deadline )
if err != nil {
return fmt . Errorf ( ` cannot obtain label entries: %s ` , err )
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteLabelsCountResponse ( w , labelEntries )
labelsCountDuration . UpdateDuration ( startTime )
return nil
}
var labelsCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels/count"} ` )
2019-05-22 21:16:55 +00:00
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
func LabelsHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-12-14 22:07:09 +00:00
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse form values: %s" , err )
}
var labels [ ] string
if len ( r . Form [ "match[]" ] ) == 0 && len ( r . Form [ "start" ] ) == 0 && len ( r . Form [ "end" ] ) == 0 {
var err error
labels , err = netstorage . GetLabels ( deadline )
if err != nil {
return fmt . Errorf ( "cannot obtain labels: %s" , err )
}
} else {
// Extended functionality that allows filtering by label filters and time range
// i.e. /api/v1/labels?match[]=foobar{baz="abc"}&start=...&end=...
matches := r . Form [ "match[]" ]
if len ( matches ) == 0 {
matches = [ ] string { "{__name__!=''}" }
}
ct := currentTime ( )
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
start , err := getTime ( r , "start" , end - defaultStep )
if err != nil {
return err
}
labels , err = labelsWithMatches ( matches , start , end , deadline )
if err != nil {
return fmt . Errorf ( "cannot obtain labels for match[]=%q, start=%d, end=%d: %s" , matches , start , end , err )
}
2019-05-22 21:16:55 +00:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteLabelsResponse ( w , labels )
labelsDuration . UpdateDuration ( startTime )
return nil
}
2019-12-14 22:07:09 +00:00
func labelsWithMatches ( matches [ ] string , start , end int64 , deadline netstorage . Deadline ) ( [ ] string , error ) {
if len ( matches ) == 0 {
logger . Panicf ( "BUG: matches must be non-empty" )
}
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return nil , err
}
if start >= end {
end = start + defaultStep
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
rss , err := netstorage . ProcessSearchQuery ( sq , false , deadline )
if err != nil {
return nil , fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
m := make ( map [ string ] struct { } )
var mLock sync . Mutex
err = rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
mLock . Lock ( )
tags := rs . MetricName . Tags
for i := range tags {
t := & tags [ i ]
m [ string ( t . Key ) ] = struct { } { }
}
m [ "__name__" ] = struct { } { }
mLock . Unlock ( )
} )
if err != nil {
return nil , fmt . Errorf ( "error when data fetching: %s" , err )
}
labels := make ( [ ] string , 0 , len ( m ) )
for label := range m {
labels = append ( labels , label )
}
sort . Strings ( labels )
return labels , nil
}
2019-05-22 21:16:55 +00:00
var labelsDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/labels"} ` )
// SeriesCountHandler processes /api/v1/series/count request.
func SeriesCountHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-05-22 21:16:55 +00:00
n , err := netstorage . GetSeriesCount ( deadline )
if err != nil {
return fmt . Errorf ( "cannot obtain series count: %s" , err )
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteSeriesCountResponse ( w , n )
seriesCountDuration . UpdateDuration ( startTime )
return nil
}
var seriesCountDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series/count"} ` )
// SeriesHandler processes /api/v1/series request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func SeriesHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
ct := currentTime ( )
if err := r . ParseForm ( ) ; err != nil {
return fmt . Errorf ( "cannot parse form values: %s" , err )
}
matches := r . Form [ "match[]" ]
2019-06-20 11:05:07 +00:00
if len ( matches ) == 0 {
return fmt . Errorf ( "missing `match[]` arg" )
}
2019-08-04 16:42:36 +00:00
end , err := getTime ( r , "end" , ct )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2019-08-04 16:42:36 +00:00
// Do not set start to minTimeMsecs by default as Prometheus does,
// since this leads to fetching and scanning all the data from the storage,
// which can take a lot of time for big storages.
// It is better setting start as end-defaultStep by default.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/91
start , err := getTime ( r , "start" , end - defaultStep )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-05-22 21:16:55 +00:00
tagFilterss , err := getTagFilterssFromMatches ( matches )
if err != nil {
return err
}
if start >= end {
2019-11-22 14:10:33 +00:00
end = start + defaultStep
2019-05-22 21:16:55 +00:00
}
sq := & storage . SearchQuery {
MinTimestamp : start ,
MaxTimestamp : end ,
TagFilterss : tagFilterss ,
}
2019-08-04 19:15:33 +00:00
rss , err := netstorage . ProcessSearchQuery ( sq , false , deadline )
2019-05-22 21:16:55 +00:00
if err != nil {
return fmt . Errorf ( "cannot fetch data for %q: %s" , sq , err )
}
resultsCh := make ( chan * quicktemplate . ByteBuffer )
doneCh := make ( chan error )
go func ( ) {
2019-07-12 12:51:02 +00:00
err := rss . RunParallel ( func ( rs * netstorage . Result , workerID uint ) {
2019-05-22 21:16:55 +00:00
bb := quicktemplate . AcquireByteBuffer ( )
writemetricNameObject ( bb , & rs . MetricName )
resultsCh <- bb
} )
close ( resultsCh )
doneCh <- err
} ( )
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteSeriesResponse ( w , resultsCh )
// Consume all the data from resultsCh in the event WriteSeriesResponse
2019-05-24 11:54:31 +00:00
// fails to consume all the data.
2019-05-22 21:16:55 +00:00
for bb := range resultsCh {
quicktemplate . ReleaseByteBuffer ( bb )
}
err = <- doneCh
if err != nil {
return fmt . Errorf ( "error during data fetching: %s" , err )
}
seriesDuration . UpdateDuration ( startTime )
return nil
}
var seriesDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/series"} ` )
// QueryHandler processes /api/v1/query request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
func QueryHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
ct := currentTime ( )
query := r . FormValue ( "query" )
2019-06-20 11:05:07 +00:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2019-06-06 19:17:13 +00:00
start , err := getTime ( r , "time" , ct )
if err != nil {
return err
}
2019-10-28 10:30:50 +00:00
queryOffset := getLatencyOffsetMilliseconds ( )
step , err := getDuration ( r , "step" , queryOffset )
2019-06-06 19:17:13 +00:00
if err != nil {
return err
}
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-10-15 16:12:27 +00:00
lookbackDelta , err := getMaxLookback ( r )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
if len ( query ) > * maxQueryLen {
2020-01-16 11:03:28 +00:00
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , * maxQueryLen )
2019-05-22 21:16:55 +00:00
}
2019-11-26 20:55:23 +00:00
if ! getBool ( r , "nocache" ) && ct - start < queryOffset {
// Adjust start time only if `nocache` arg isn't set.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/241
2019-11-18 22:41:13 +00:00
start = ct - queryOffset
2019-05-22 21:16:55 +00:00
}
if childQuery , windowStr , offsetStr := promql . IsMetricSelectorWithRollup ( query ) ; childQuery != "" {
2019-12-10 22:40:36 +00:00
window , err := parsePositiveDuration ( windowStr , step )
if err != nil {
return fmt . Errorf ( "cannot parse window: %s" , err )
2019-05-22 21:16:55 +00:00
}
2019-12-10 22:40:36 +00:00
offset , err := parseDuration ( offsetStr , step )
if err != nil {
return fmt . Errorf ( "cannot parse offset: %s" , err )
2019-05-22 21:16:55 +00:00
}
start -= offset
end := start
start = end - window
if err := exportHandler ( w , [ ] string { childQuery } , start , end , "promapi" , deadline ) ; err != nil {
2020-01-22 15:32:11 +00:00
return fmt . Errorf ( "error when exporting data for query=%q on the time range (start=%d, end=%d): %s" , childQuery , start , end , err )
2019-05-22 21:16:55 +00:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2019-12-10 22:40:36 +00:00
if childQuery , windowStr , stepStr , offsetStr := promql . IsRollup ( query ) ; childQuery != "" {
newStep , err := parsePositiveDuration ( stepStr , step )
if err != nil {
return fmt . Errorf ( "cannot parse step: %s" , err )
}
if newStep > 0 {
step = newStep
}
window , err := parsePositiveDuration ( windowStr , step )
if err != nil {
return fmt . Errorf ( "cannot parse window: %s" , err )
}
offset , err := parseDuration ( offsetStr , step )
if err != nil {
return fmt . Errorf ( "cannot parse offset: %s" , err )
}
start -= offset
end := start
start = end - window
if err := queryRangeHandler ( w , childQuery , start , end , step , r , ct ) ; err != nil {
2020-01-22 15:32:11 +00:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %s" , childQuery , start , end , step , err )
2019-12-10 22:40:36 +00:00
}
queryDuration . UpdateDuration ( startTime )
return nil
}
2019-05-22 21:16:55 +00:00
ec := promql . EvalConfig {
2019-10-15 16:12:27 +00:00
Start : start ,
End : start ,
Step : step ,
Deadline : deadline ,
LookbackDelta : lookbackDelta ,
2019-05-22 21:16:55 +00:00
}
2019-07-01 14:14:49 +00:00
result , err := promql . Exec ( & ec , query , true )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-01-22 15:32:11 +00:00
return fmt . Errorf ( "error when executing query=%q for (time=%d, step=%d): %s" , query , start , step , err )
2019-05-22 21:16:55 +00:00
}
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteQueryResponse ( w , result )
queryDuration . UpdateDuration ( startTime )
return nil
}
var queryDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query"} ` )
2019-12-10 22:40:36 +00:00
func parseDuration ( s string , step int64 ) ( int64 , error ) {
if len ( s ) == 0 {
return 0 , nil
}
2019-12-25 19:35:47 +00:00
return metricsql . DurationValue ( s , step )
2019-12-10 22:40:36 +00:00
}
func parsePositiveDuration ( s string , step int64 ) ( int64 , error ) {
if len ( s ) == 0 {
return 0 , nil
}
2019-12-25 19:35:47 +00:00
return metricsql . PositiveDurationValue ( s , step )
2019-12-10 22:40:36 +00:00
}
2019-05-22 21:16:55 +00:00
// QueryRangeHandler processes /api/v1/query_range request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
func QueryRangeHandler ( w http . ResponseWriter , r * http . Request ) error {
startTime := time . Now ( )
ct := currentTime ( )
query := r . FormValue ( "query" )
2019-06-20 11:05:07 +00:00
if len ( query ) == 0 {
return fmt . Errorf ( "missing `query` arg" )
}
2019-06-06 19:17:13 +00:00
start , err := getTime ( r , "start" , ct - defaultStep )
if err != nil {
return err
}
end , err := getTime ( r , "end" , ct )
if err != nil {
return err
}
step , err := getDuration ( r , "step" , defaultStep )
if err != nil {
return err
}
2019-12-10 22:40:36 +00:00
if err := queryRangeHandler ( w , query , start , end , step , r , ct ) ; err != nil {
2020-01-22 15:32:11 +00:00
return fmt . Errorf ( "error when executing query=%q on the time range (start=%d, end=%d, step=%d): %s" , query , start , end , step , err )
2019-12-10 22:40:36 +00:00
}
queryRangeDuration . UpdateDuration ( startTime )
return nil
}
func queryRangeHandler ( w http . ResponseWriter , query string , start , end , step int64 , r * http . Request , ct int64 ) error {
2019-12-20 09:34:52 +00:00
deadline := getDeadlineForQuery ( r )
2019-05-22 21:16:55 +00:00
mayCache := ! getBool ( r , "nocache" )
2019-10-15 16:12:27 +00:00
lookbackDelta , err := getMaxLookback ( r )
if err != nil {
return err
}
2019-05-22 21:16:55 +00:00
// Validate input args.
if len ( query ) > * maxQueryLen {
2020-01-16 11:03:28 +00:00
return fmt . Errorf ( "too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes" , len ( query ) , * maxQueryLen )
2019-05-22 21:16:55 +00:00
}
if start > end {
2019-11-22 14:10:33 +00:00
end = start + defaultStep
2019-05-22 21:16:55 +00:00
}
if err := promql . ValidateMaxPointsPerTimeseries ( start , end , step ) ; err != nil {
return err
}
2019-09-04 10:09:20 +00:00
if mayCache {
start , end = promql . AdjustStartEnd ( start , end , step )
}
2019-05-22 21:16:55 +00:00
ec := promql . EvalConfig {
2019-10-15 16:12:27 +00:00
Start : start ,
End : end ,
Step : step ,
Deadline : deadline ,
MayCache : mayCache ,
LookbackDelta : lookbackDelta ,
2019-05-22 21:16:55 +00:00
}
2019-07-01 14:14:49 +00:00
result , err := promql . Exec ( & ec , query , false )
2019-05-22 21:16:55 +00:00
if err != nil {
2020-01-22 15:32:11 +00:00
return fmt . Errorf ( "cannot execute query: %s" , err )
2019-05-22 21:16:55 +00:00
}
2019-10-28 10:30:50 +00:00
queryOffset := getLatencyOffsetMilliseconds ( )
if ct - end < queryOffset {
2019-07-04 06:14:15 +00:00
result = adjustLastPoints ( result )
2019-05-22 21:16:55 +00:00
}
2019-08-20 19:52:49 +00:00
// Remove NaN values as Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/153
removeNaNValuesInplace ( result )
2019-05-22 21:16:55 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/json" )
WriteQueryRangeResponse ( w , result )
return nil
}
2019-08-20 19:52:49 +00:00
func removeNaNValuesInplace ( tss [ ] netstorage . Result ) {
for i := range tss {
ts := & tss [ i ]
hasNaNs := false
for _ , v := range ts . Values {
if math . IsNaN ( v ) {
hasNaNs = true
break
}
}
if ! hasNaNs {
// Fast path: nothing to remove.
continue
}
// Slow path: remove NaNs.
srcTimestamps := ts . Timestamps
dstValues := ts . Values [ : 0 ]
dstTimestamps := ts . Timestamps [ : 0 ]
for j , v := range ts . Values {
if math . IsNaN ( v ) {
continue
}
dstValues = append ( dstValues , v )
dstTimestamps = append ( dstTimestamps , srcTimestamps [ j ] )
}
ts . Values = dstValues
ts . Timestamps = dstTimestamps
}
}
2019-05-22 21:16:55 +00:00
var queryRangeDuration = metrics . NewSummary ( ` vm_request_duration_seconds { path="/api/v1/query_range"} ` )
// adjustLastPoints substitutes the last point values with the previous
// point values, since the last points may contain garbage.
2019-07-04 06:14:15 +00:00
func adjustLastPoints ( tss [ ] netstorage . Result ) [ ] netstorage . Result {
2019-05-22 21:16:55 +00:00
if len ( tss ) == 0 {
2019-07-04 06:14:15 +00:00
return nil
2019-05-22 21:16:55 +00:00
}
// Search for the last non-NaN value across all the timeseries.
lastNonNaNIdx := - 1
for i := range tss {
2019-07-04 06:14:15 +00:00
values := tss [ i ] . Values
j := len ( values ) - 1
for j >= 0 && math . IsNaN ( values [ j ] ) {
2019-05-22 21:16:55 +00:00
j --
}
if j > lastNonNaNIdx {
lastNonNaNIdx = j
}
}
if lastNonNaNIdx == - 1 {
// All timeseries contain only NaNs.
2019-07-04 06:14:15 +00:00
return nil
2019-05-22 21:16:55 +00:00
}
2019-07-04 06:14:15 +00:00
// Substitute the last two values starting from lastNonNaNIdx
2019-05-22 21:16:55 +00:00
// with the previous values for each timeseries.
for i := range tss {
2019-07-04 06:14:15 +00:00
values := tss [ i ] . Values
for j := 0 ; j < 2 ; j ++ {
2019-05-22 21:16:55 +00:00
idx := lastNonNaNIdx + j
2019-07-04 06:14:15 +00:00
if idx <= 0 || idx >= len ( values ) || math . IsNaN ( values [ idx - 1 ] ) {
2019-05-22 21:16:55 +00:00
continue
}
2019-07-04 06:14:15 +00:00
values [ idx ] = values [ idx - 1 ]
2019-05-22 21:16:55 +00:00
}
}
2019-07-04 06:14:15 +00:00
return tss
2019-05-22 21:16:55 +00:00
}
2019-06-06 19:17:13 +00:00
func getTime ( r * http . Request , argKey string , defaultValue int64 ) ( int64 , error ) {
2019-05-22 21:16:55 +00:00
argValue := r . FormValue ( argKey )
if len ( argValue ) == 0 {
2019-06-06 19:17:13 +00:00
return defaultValue , nil
2019-05-22 21:16:55 +00:00
}
secs , err := strconv . ParseFloat ( argValue , 64 )
if err != nil {
// Try parsing string format
t , err := time . Parse ( time . RFC3339 , argValue )
if err != nil {
2019-07-07 16:44:26 +00:00
// Handle Prometheus'-provided minTime and maxTime.
// See https://github.com/prometheus/client_golang/issues/614
switch argValue {
case prometheusMinTimeFormatted :
return minTimeMsecs , nil
case prometheusMaxTimeFormatted :
return maxTimeMsecs , nil
}
2019-06-06 19:17:13 +00:00
return 0 , fmt . Errorf ( "cannot parse %q=%q: %s" , argKey , argValue , err )
2019-05-22 21:16:55 +00:00
}
secs = float64 ( t . UnixNano ( ) ) / 1e9
}
msecs := int64 ( secs * 1e3 )
2019-07-11 14:07:20 +00:00
if msecs < minTimeMsecs {
msecs = 0
}
if msecs > maxTimeMsecs {
msecs = maxTimeMsecs
2019-05-22 21:16:55 +00:00
}
2019-06-06 19:17:13 +00:00
return msecs , nil
2019-05-22 21:16:55 +00:00
}
2019-07-07 16:44:26 +00:00
var (
// These constants were obtained from https://github.com/prometheus/prometheus/blob/91d7175eaac18b00e370965f3a8186cc40bf9f55/web/api/v1/api.go#L442
// See https://github.com/prometheus/client_golang/issues/614 for details.
prometheusMinTimeFormatted = time . Unix ( math . MinInt64 / 1000 + 62135596801 , 0 ) . UTC ( ) . Format ( time . RFC3339Nano )
prometheusMaxTimeFormatted = time . Unix ( math . MaxInt64 / 1000 - 62135596801 , 999999999 ) . UTC ( ) . Format ( time . RFC3339Nano )
)
2019-05-22 21:16:55 +00:00
const (
// These values prevent from overflow when storing msec-precision time in int64.
2019-07-11 14:07:20 +00:00
minTimeMsecs = 0 // use 0 instead of `int64(-1<<63) / 1e6` because the storage engine doesn't actually support negative time
2019-05-22 21:16:55 +00:00
maxTimeMsecs = int64 ( 1 << 63 - 1 ) / 1e6
)
2019-06-06 19:17:13 +00:00
func getDuration ( r * http . Request , argKey string , defaultValue int64 ) ( int64 , error ) {
2019-05-22 21:16:55 +00:00
argValue := r . FormValue ( argKey )
if len ( argValue ) == 0 {
2019-06-06 19:17:13 +00:00
return defaultValue , nil
2019-05-22 21:16:55 +00:00
}
secs , err := strconv . ParseFloat ( argValue , 64 )
if err != nil {
// Try parsing string format
d , err := time . ParseDuration ( argValue )
if err != nil {
2019-06-06 19:17:13 +00:00
return 0 , fmt . Errorf ( "cannot parse %q=%q: %s" , argKey , argValue , err )
2019-05-22 21:16:55 +00:00
}
secs = d . Seconds ( )
}
msecs := int64 ( secs * 1e3 )
if msecs <= 0 || msecs > maxDurationMsecs {
2019-06-27 16:36:15 +00:00
return 0 , fmt . Errorf ( "%q=%dms is out of allowed range [%d ... %d]" , argKey , msecs , 0 , int64 ( maxDurationMsecs ) )
2019-05-22 21:16:55 +00:00
}
2019-06-06 19:17:13 +00:00
return msecs , nil
2019-05-22 21:16:55 +00:00
}
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
2019-10-15 16:12:27 +00:00
func getMaxLookback ( r * http . Request ) ( int64 , error ) {
d := int64 ( * maxLookback / time . Millisecond )
return getDuration ( r , "max_lookback" , d )
}
2019-12-20 09:34:52 +00:00
func getDeadlineForQuery ( r * http . Request ) netstorage . Deadline {
dMax := int64 ( maxQueryDuration . Seconds ( ) * 1e3 )
2020-01-22 13:50:34 +00:00
return getDeadlineWithMaxDuration ( r , dMax , "-search.maxQueryDuration" )
2019-12-20 09:34:52 +00:00
}
func getDeadlineForExport ( r * http . Request ) netstorage . Deadline {
dMax := int64 ( maxExportDuration . Seconds ( ) * 1e3 )
2020-01-22 13:50:34 +00:00
return getDeadlineWithMaxDuration ( r , dMax , "-search.maxExportDuration" )
2019-12-20 09:34:52 +00:00
}
2020-01-22 13:50:34 +00:00
func getDeadlineWithMaxDuration ( r * http . Request , dMax int64 , flagHint string ) netstorage . Deadline {
2019-06-06 19:17:13 +00:00
d , err := getDuration ( r , "timeout" , 0 )
if err != nil {
d = 0
}
2019-05-22 21:16:55 +00:00
if d <= 0 || d > dMax {
d = dMax
}
timeout := time . Duration ( d ) * time . Millisecond
2020-01-22 13:50:34 +00:00
return netstorage . NewDeadline ( timeout , flagHint )
2019-05-22 21:16:55 +00:00
}
func getBool ( r * http . Request , argKey string ) bool {
argValue := r . FormValue ( argKey )
switch strings . ToLower ( argValue ) {
case "" , "0" , "f" , "false" , "no" :
return false
default :
return true
}
}
func currentTime ( ) int64 {
return int64 ( time . Now ( ) . UTC ( ) . Unix ( ) ) * 1e3
}
func getTagFilterssFromMatches ( matches [ ] string ) ( [ ] [ ] storage . TagFilter , error ) {
tagFilterss := make ( [ ] [ ] storage . TagFilter , 0 , len ( matches ) )
for _ , match := range matches {
tagFilters , err := promql . ParseMetricSelector ( match )
if err != nil {
return nil , fmt . Errorf ( "cannot parse %q: %s" , match , err )
}
tagFilterss = append ( tagFilterss , tagFilters )
}
return tagFilterss , nil
}
2019-10-28 10:30:50 +00:00
func getLatencyOffsetMilliseconds ( ) int64 {
d := int64 ( * latencyOffset / time . Millisecond )
if d <= 1000 {
d = 1000
}
return d
}