From c3c60bee458910dc0be4e734c53b727117eca64f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 30 Jun 2019 01:27:03 +0300 Subject: [PATCH] app/vmselect: add `-search.denyPartialResponse` flag for disabling partial responses if some of vmstorage nodes are unavailable Also accept `deny_partial_response` query arg in Prometheus API handlers. If it is set to true, then return error if some of vmstorage nodes are unavailable. --- app/vmselect/prometheus/prometheus.go | 50 +++++++++++++++++++++------ app/vmselect/promql/eval.go | 10 ++++-- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index fd00ff193..44d2ace75 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -21,9 +21,10 @@ import ( ) var ( - maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution") - maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") - selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481") + maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution") + maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") + denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability") + selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481") ) // Default step used if not set. @@ -71,10 +72,13 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err MaxTimestamp: end, TagFilterss: tagFilterss, } - rss, _, err := netstorage.ProcessSearchQuery(at, sq, deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %s", sq, err) } + if isPartial && getDenyPartialResponse(r) { + return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") + } resultsCh := make(chan *quicktemplate.ByteBuffer) doneCh := make(chan error) @@ -171,7 +175,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, star } if isPartial { rss.Cancel() - return fmt.Errorf("some of the storage nodes are unavailable at the moment") + return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") } resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1)) @@ -280,10 +284,13 @@ var httpClient = &http.Client{ func LabelValuesHandler(at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error { startTime := time.Now() deadline := getDeadline(r) - labelValues, _, err := netstorage.GetLabelValues(at, labelName, deadline) + labelValues, isPartial, err := netstorage.GetLabelValues(at, labelName, deadline) if err != nil { return fmt.Errorf(`cannot obtain label values for %q: %s`, labelName, err) } + if isPartial && getDenyPartialResponse(r) { + return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") + } w.Header().Set("Content-Type", "application/json") WriteLabelValuesResponse(w, labelValues) @@ -297,10 +304,13 @@ var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" func LabelsCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { startTime := time.Now() deadline := getDeadline(r) - labelEntries, _, err := netstorage.GetLabelEntries(at, deadline) + labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline) if err != nil { return fmt.Errorf(`cannot obtain label entries: %s`, err) } + if isPartial && getDenyPartialResponse(r) { + return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") + } w.Header().Set("Content-Type", "application/json") WriteLabelsCountResponse(w, labelEntries) @@ -316,10 +326,13 @@ var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path=" func LabelsHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { startTime := time.Now() deadline := getDeadline(r) - labels, _, err := netstorage.GetLabels(at, deadline) + labels, isPartial, err := netstorage.GetLabels(at, deadline) if err != nil { return fmt.Errorf("cannot obtain labels: %s", err) } + if isPartial && getDenyPartialResponse(r) { + return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") + } w.Header().Set("Content-Type", "application/json") WriteLabelsResponse(w, labels) @@ -333,10 +346,13 @@ var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/ func SeriesCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error { startTime := time.Now() deadline := getDeadline(r) - n, _, err := netstorage.GetSeriesCount(at, deadline) + n, isPartial, err := netstorage.GetSeriesCount(at, deadline) if err != nil { return fmt.Errorf("cannot obtain series count: %s", err) } + if isPartial && getDenyPartialResponse(r) { + return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") + } w.Header().Set("Content-Type", "application/json") WriteSeriesCountResponse(w, n) @@ -384,10 +400,13 @@ func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error MaxTimestamp: end, TagFilterss: tagFilterss, } - rss, _, err := netstorage.ProcessSearchQuery(at, sq, deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %s", sq, err) } + if isPartial && getDenyPartialResponse(r) { + return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") + } resultsCh := make(chan *quicktemplate.ByteBuffer) doneCh := make(chan error) @@ -479,6 +498,8 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error End: start, Step: step, Deadline: deadline, + + DenyPartialResponse: getDenyPartialResponse(r), } result, err := promql.Exec(&ec, query) if err != nil { @@ -538,6 +559,8 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e Step: step, Deadline: deadline, MayCache: mayCache, + + DenyPartialResponse: getDenyPartialResponse(r), } result, err := promql.Exec(&ec, query) if err != nil { @@ -681,3 +704,10 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) } return tagFilterss, nil } + +func getDenyPartialResponse(r *http.Request) bool { + if *denyPartialResponse { + return true + } + return getBool(r, "deny_partial_response") +} diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index bd093dfa7..421ca00c6 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -72,6 +72,8 @@ type EvalConfig struct { MayCache bool + DenyPartialResponse bool + timestamps []int64 timestampsOnce sync.Once } @@ -85,6 +87,7 @@ func newEvalConfig(src *EvalConfig) *EvalConfig { ec.Step = src.Step ec.Deadline = src.Deadline ec.MayCache = src.MayCache + ec.DenyPartialResponse = src.DenyPartialResponse // do not copy src.timestamps - they must be generated again. return &ec @@ -504,10 +507,13 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me TagFilterss: [][]storage.TagFilter{me.TagFilters}, } - rss, denyCache, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, ec.Deadline) + rss, isPartial, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, ec.Deadline) if err != nil { return nil, err } + if isPartial && ec.DenyPartialResponse { + return nil, fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable") + } rssLen := rss.Len() if rssLen == 0 { rss.Cancel() @@ -565,7 +571,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me } } tss = mergeTimeseries(tssCached, tss, start, ec) - if !denyCache { + if !isPartial { rollupResultCacheV.Put(name, ec, me, window, tss) } return tss, nil