mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
app/vmselect: add -search.denyPartialResponse
flag for disabling partial responses if some of vmstorage nodes are unavailable
Also accept `deny_partial_response` query arg in Prometheus API handlers. If it is set to true, then return error if some of vmstorage nodes are unavailable.
This commit is contained in:
parent
60cff62586
commit
c3c60bee45
2 changed files with 48 additions and 12 deletions
|
@ -21,9 +21,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
|
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum time for search query execution")
|
||||||
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
|
maxQueryLen = flag.Int("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
|
||||||
selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
|
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability")
|
||||||
|
selectNodes = flagutil.NewArray("selectNode", "Addresses of vmselect nodes; usage: -selectNode=vmselect-host1:8481 -selectNode=vmselect-host2:8481")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default step used if not set.
|
// Default step used if not set.
|
||||||
|
@ -71,10 +72,13 @@ func FederateHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) err
|
||||||
MaxTimestamp: end,
|
MaxTimestamp: end,
|
||||||
TagFilterss: tagFilterss,
|
TagFilterss: tagFilterss,
|
||||||
}
|
}
|
||||||
rss, _, err := netstorage.ProcessSearchQuery(at, sq, deadline)
|
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
||||||
}
|
}
|
||||||
|
if isPartial && getDenyPartialResponse(r) {
|
||||||
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
resultsCh := make(chan *quicktemplate.ByteBuffer)
|
resultsCh := make(chan *quicktemplate.ByteBuffer)
|
||||||
doneCh := make(chan error)
|
doneCh := make(chan error)
|
||||||
|
@ -171,7 +175,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, matches []string, star
|
||||||
}
|
}
|
||||||
if isPartial {
|
if isPartial {
|
||||||
rss.Cancel()
|
rss.Cancel()
|
||||||
return fmt.Errorf("some of the storage nodes are unavailable at the moment")
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
}
|
}
|
||||||
|
|
||||||
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
|
resultsCh := make(chan *quicktemplate.ByteBuffer, runtime.GOMAXPROCS(-1))
|
||||||
|
@ -280,10 +284,13 @@ var httpClient = &http.Client{
|
||||||
func LabelValuesHandler(at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
|
func LabelValuesHandler(at *auth.Token, labelName string, w http.ResponseWriter, r *http.Request) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
deadline := getDeadline(r)
|
deadline := getDeadline(r)
|
||||||
labelValues, _, err := netstorage.GetLabelValues(at, labelName, deadline)
|
labelValues, isPartial, err := netstorage.GetLabelValues(at, labelName, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf(`cannot obtain label values for %q: %s`, labelName, err)
|
return fmt.Errorf(`cannot obtain label values for %q: %s`, labelName, err)
|
||||||
}
|
}
|
||||||
|
if isPartial && getDenyPartialResponse(r) {
|
||||||
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
WriteLabelValuesResponse(w, labelValues)
|
WriteLabelValuesResponse(w, labelValues)
|
||||||
|
@ -297,10 +304,13 @@ var labelValuesDuration = metrics.NewSummary(`vm_request_duration_seconds{path="
|
||||||
func LabelsCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func LabelsCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
deadline := getDeadline(r)
|
deadline := getDeadline(r)
|
||||||
labelEntries, _, err := netstorage.GetLabelEntries(at, deadline)
|
labelEntries, isPartial, err := netstorage.GetLabelEntries(at, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf(`cannot obtain label entries: %s`, err)
|
return fmt.Errorf(`cannot obtain label entries: %s`, err)
|
||||||
}
|
}
|
||||||
|
if isPartial && getDenyPartialResponse(r) {
|
||||||
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
WriteLabelsCountResponse(w, labelEntries)
|
WriteLabelsCountResponse(w, labelEntries)
|
||||||
|
@ -316,10 +326,13 @@ var labelsCountDuration = metrics.NewSummary(`vm_request_duration_seconds{path="
|
||||||
func LabelsHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func LabelsHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
deadline := getDeadline(r)
|
deadline := getDeadline(r)
|
||||||
labels, _, err := netstorage.GetLabels(at, deadline)
|
labels, isPartial, err := netstorage.GetLabels(at, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot obtain labels: %s", err)
|
return fmt.Errorf("cannot obtain labels: %s", err)
|
||||||
}
|
}
|
||||||
|
if isPartial && getDenyPartialResponse(r) {
|
||||||
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
WriteLabelsResponse(w, labels)
|
WriteLabelsResponse(w, labels)
|
||||||
|
@ -333,10 +346,13 @@ var labelsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/
|
||||||
func SeriesCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
func SeriesCountHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
deadline := getDeadline(r)
|
deadline := getDeadline(r)
|
||||||
n, _, err := netstorage.GetSeriesCount(at, deadline)
|
n, isPartial, err := netstorage.GetSeriesCount(at, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot obtain series count: %s", err)
|
return fmt.Errorf("cannot obtain series count: %s", err)
|
||||||
}
|
}
|
||||||
|
if isPartial && getDenyPartialResponse(r) {
|
||||||
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
WriteSeriesCountResponse(w, n)
|
WriteSeriesCountResponse(w, n)
|
||||||
|
@ -384,10 +400,13 @@ func SeriesHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
|
||||||
MaxTimestamp: end,
|
MaxTimestamp: end,
|
||||||
TagFilterss: tagFilterss,
|
TagFilterss: tagFilterss,
|
||||||
}
|
}
|
||||||
rss, _, err := netstorage.ProcessSearchQuery(at, sq, deadline)
|
rss, isPartial, err := netstorage.ProcessSearchQuery(at, sq, deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
return fmt.Errorf("cannot fetch data for %q: %s", sq, err)
|
||||||
}
|
}
|
||||||
|
if isPartial && getDenyPartialResponse(r) {
|
||||||
|
return fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
|
}
|
||||||
|
|
||||||
resultsCh := make(chan *quicktemplate.ByteBuffer)
|
resultsCh := make(chan *quicktemplate.ByteBuffer)
|
||||||
doneCh := make(chan error)
|
doneCh := make(chan error)
|
||||||
|
@ -479,6 +498,8 @@ func QueryHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) error
|
||||||
End: start,
|
End: start,
|
||||||
Step: step,
|
Step: step,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
|
|
||||||
|
DenyPartialResponse: getDenyPartialResponse(r),
|
||||||
}
|
}
|
||||||
result, err := promql.Exec(&ec, query)
|
result, err := promql.Exec(&ec, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -538,6 +559,8 @@ func QueryRangeHandler(at *auth.Token, w http.ResponseWriter, r *http.Request) e
|
||||||
Step: step,
|
Step: step,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
MayCache: mayCache,
|
MayCache: mayCache,
|
||||||
|
|
||||||
|
DenyPartialResponse: getDenyPartialResponse(r),
|
||||||
}
|
}
|
||||||
result, err := promql.Exec(&ec, query)
|
result, err := promql.Exec(&ec, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -681,3 +704,10 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error)
|
||||||
}
|
}
|
||||||
return tagFilterss, nil
|
return tagFilterss, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getDenyPartialResponse(r *http.Request) bool {
|
||||||
|
if *denyPartialResponse {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return getBool(r, "deny_partial_response")
|
||||||
|
}
|
||||||
|
|
|
@ -72,6 +72,8 @@ type EvalConfig struct {
|
||||||
|
|
||||||
MayCache bool
|
MayCache bool
|
||||||
|
|
||||||
|
DenyPartialResponse bool
|
||||||
|
|
||||||
timestamps []int64
|
timestamps []int64
|
||||||
timestampsOnce sync.Once
|
timestampsOnce sync.Once
|
||||||
}
|
}
|
||||||
|
@ -85,6 +87,7 @@ func newEvalConfig(src *EvalConfig) *EvalConfig {
|
||||||
ec.Step = src.Step
|
ec.Step = src.Step
|
||||||
ec.Deadline = src.Deadline
|
ec.Deadline = src.Deadline
|
||||||
ec.MayCache = src.MayCache
|
ec.MayCache = src.MayCache
|
||||||
|
ec.DenyPartialResponse = src.DenyPartialResponse
|
||||||
|
|
||||||
// do not copy src.timestamps - they must be generated again.
|
// do not copy src.timestamps - they must be generated again.
|
||||||
return &ec
|
return &ec
|
||||||
|
@ -504,10 +507,13 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
||||||
TagFilterss: [][]storage.TagFilter{me.TagFilters},
|
TagFilterss: [][]storage.TagFilter{me.TagFilters},
|
||||||
}
|
}
|
||||||
|
|
||||||
rss, denyCache, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, ec.Deadline)
|
rss, isPartial, err := netstorage.ProcessSearchQuery(ec.AuthToken, sq, ec.Deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if isPartial && ec.DenyPartialResponse {
|
||||||
|
return nil, fmt.Errorf("cannot return full response, since some of vmstorage nodes are unavailable")
|
||||||
|
}
|
||||||
rssLen := rss.Len()
|
rssLen := rss.Len()
|
||||||
if rssLen == 0 {
|
if rssLen == 0 {
|
||||||
rss.Cancel()
|
rss.Cancel()
|
||||||
|
@ -565,7 +571,7 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, me
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tss = mergeTimeseries(tssCached, tss, start, ec)
|
tss = mergeTimeseries(tssCached, tss, start, ec)
|
||||||
if !denyCache {
|
if !isPartial {
|
||||||
rollupResultCacheV.Put(name, ec, me, window, tss)
|
rollupResultCacheV.Put(name, ec, me, window, tss)
|
||||||
}
|
}
|
||||||
return tss, nil
|
return tss, nil
|
||||||
|
|
Loading…
Reference in a new issue