diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 0bad941379..14e8b3f3a7 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -56,10 +56,6 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } - matches := r.Form["match[]"] - if len(matches) == 0 { - return fmt.Errorf("missing `match[]` arg") - } lookbackDelta, err := getMaxLookback(r) if err != nil { return err @@ -79,7 +75,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, if start >= end { start = end - defaultStep } - tagFilterss, err := getTagFilterssFromMatches(matches) + tagFilterss, err := getTagFilterssFromRequest(r) if err != nil { return err } @@ -129,15 +125,6 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter return fmt.Errorf("missing `format` arg; see https://victoriametrics.github.io/#how-to-export-csv-data") } fieldNames := strings.Split(format, ",") - matches := r.Form["match[]"] - if len(matches) == 0 { - // Maintain backwards compatibility - match := r.FormValue("match") - if len(match) == 0 { - return fmt.Errorf("missing `match[]` arg") - } - matches = []string{match} - } start, err := searchutils.GetTime(r, "start", 0) if err != nil { return err @@ -147,7 +134,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter return err } deadline := searchutils.GetDeadlineForExport(r, startTime) - tagFilterss, err := getTagFilterssFromMatches(matches) + tagFilterss, err := getTagFilterssFromRequest(r) if err != nil { return err } @@ -206,15 +193,6 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } - matches := r.Form["match[]"] - if len(matches) == 0 { - // Maintain backwards compatibility - match := r.FormValue("match") - if len(match) == 0 { - return fmt.Errorf("missing `match[]` arg") - } - matches = []string{match} - } start, err := searchutils.GetTime(r, "start", 0) if err != nil { return err @@ -224,7 +202,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri return err } deadline := searchutils.GetDeadlineForExport(r, startTime) - tagFilterss, err := getTagFilterssFromMatches(matches) + tagFilterss, err := getTagFilterssFromRequest(r) if err != nil { return err } @@ -288,14 +266,9 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } - matches := r.Form["match[]"] - if len(matches) == 0 { - // Maintain backwards compatibility - match := r.FormValue("match") - if len(match) == 0 { - return fmt.Errorf("missing `match[]` arg") - } - matches = []string{match} + matches, err := getMatchesFromRequest(r) + if err != nil { + return err } start, err := searchutils.GetTime(r, "start", 0) if err != nil { @@ -312,7 +285,11 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if start >= end { end = start + defaultStep } - if err := exportHandler(at, w, r, matches, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil { + etf, err := getEnforcedTagFiltersFromRequest(r) + if err != nil { + return err + } + if err := exportHandler(at, w, r, matches, etf, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil { return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err) } exportDuration.UpdateDuration(startTime) @@ -321,7 +298,7 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) -func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, +func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, etf []storage.TagFilter, start, end int64, format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error { writeResponseFunc := WriteExportStdResponse writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { @@ -379,6 +356,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match if err != nil { return err } + tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf) sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) w.Header().Set("Content-Type", contentType) bw := bufferedwriter.Get(w) @@ -473,19 +451,15 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error { if r.FormValue("start") != "" || r.FormValue("end") != "" { return fmt.Errorf("start and end aren't supported. Remove these args from the query in order to delete all the matching metrics") } - matches := r.Form["match[]"] - if len(matches) == 0 { - return fmt.Errorf("missing `match[]` arg") - } deadline := searchutils.GetDeadlineForQuery(r, startTime) - tagFilterss, err := getTagFilterssFromMatches(matches) + tagFilterss, err := getTagFilterssFromRequest(r) if err != nil { return err } sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, tagFilterss) deletedCount, err := netstorage.DeleteSeries(at, sq, deadline) if err != nil { - return fmt.Errorf("cannot delete time series matching %q: %w", matches, err) + return fmt.Errorf("cannot delete time series: %w", err) } if deletedCount > 0 { // Reset rollup result cache on all the vmselect nodes, @@ -545,10 +519,14 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } + etf, err := getEnforcedTagFiltersFromRequest(r) + if err != nil { + return err + } var labelValues []string var isPartial bool denyPartialResponse := searchutils.GetDenyPartialResponse(r) - if len(r.Form["match[]"]) == 0 { + if len(r.Form["match[]"]) == 0 && len(etf) == 0 { if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { var err error labelValues, isPartial, err = netstorage.GetLabelValues(at, denyPartialResponse, labelName, deadline) @@ -592,7 +570,7 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w if err != nil { return err } - labelValues, isPartial, err = labelValuesWithMatches(at, denyPartialResponse, labelName, matches, start, end, deadline) + labelValues, isPartial, err = labelValuesWithMatches(at, denyPartialResponse, labelName, matches, etf, start, end, deadline) if err != nil { return fmt.Errorf("cannot obtain label values for %q, match[]=%q, start=%d, end=%d: %w", labelName, matches, start, end, err) } @@ -609,10 +587,8 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w return nil } -func labelValuesWithMatches(at *auth.Token, denyPartialResponse bool, labelName string, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) { - if len(matches) == 0 { - logger.Panicf("BUG: matches must be non-empty") - } +func labelValuesWithMatches(at *auth.Token, denyPartialResponse bool, labelName string, matches []string, etf []storage.TagFilter, + start, end int64, deadline searchutils.Deadline) ([]string, bool, error) { tagFilterss, err := getTagFilterssFromMatches(matches) if err != nil { return nil, false, err @@ -633,6 +609,10 @@ func labelValuesWithMatches(at *auth.Token, denyPartialResponse bool, labelName if start >= end { end = start + defaultStep } + tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf) + if len(tagFilterss) == 0 { + logger.Panicf("BUG: tagFilterss must be non-empty") + } sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) m := make(map[string]struct{}) isPartial := false @@ -764,10 +744,14 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } + etf, err := getEnforcedTagFiltersFromRequest(r) + if err != nil { + return err + } var labels []string var isPartial bool denyPartialResponse := searchutils.GetDenyPartialResponse(r) - if len(r.Form["match[]"]) == 0 { + if len(r.Form["match[]"]) == 0 && len(etf) == 0 { if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 { var err error labels, isPartial, err = netstorage.GetLabels(at, denyPartialResponse, deadline) @@ -809,7 +793,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if err != nil { return err } - labels, isPartial, err = labelsWithMatches(at, denyPartialResponse, matches, start, end, deadline) + labels, isPartial, err = labelsWithMatches(at, denyPartialResponse, matches, etf, start, end, deadline) if err != nil { return fmt.Errorf("cannot obtain labels for match[]=%q, start=%d, end=%d: %w", matches, start, end, err) } @@ -826,10 +810,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r return nil } -func labelsWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) { - if len(matches) == 0 { - logger.Panicf("BUG: matches must be non-empty") - } +func labelsWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, etf []storage.TagFilter, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) { tagFilterss, err := getTagFilterssFromMatches(matches) if err != nil { return nil, false, err @@ -837,6 +818,10 @@ func labelsWithMatches(at *auth.Token, denyPartialResponse bool, matches []strin if start >= end { end = start + defaultStep } + tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf) + if len(tagFilterss) == 0 { + logger.Panicf("BUG: tagFilterss must be non-empty") + } sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss) m := make(map[string]struct{}) isPartial := false @@ -915,10 +900,6 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse form values: %w", err) } - matches := r.Form["match[]"] - if len(matches) == 0 { - return fmt.Errorf("missing `match[]` arg") - } end, err := searchutils.GetTime(r, "end", ct) if err != nil { return err @@ -934,7 +915,7 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r } deadline := searchutils.GetDeadlineForQuery(r, startTime) - tagFilterss, err := getTagFilterssFromMatches(matches) + tagFilterss, err := getTagFilterssFromRequest(r) if err != nil { return err } @@ -1036,6 +1017,10 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r if len(query) > maxQueryLen.N { return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N) } + etf, err := getEnforcedTagFiltersFromRequest(r) + if err != nil { + return err + } if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" { window, err := parsePositiveDuration(windowStr, step) if err != nil { @@ -1048,7 +1033,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r start -= offset end := start start = end - window - if err := exportHandler(at, w, r, []string{childQuery}, start, end, "promapi", 0, false, deadline); err != nil { + if err := exportHandler(at, w, r, []string{childQuery}, etf, start, end, "promapi", 0, false, deadline); err != nil { return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err) } queryDuration.UpdateDuration(startTime) @@ -1073,7 +1058,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r start -= offset end := start start = end - window - if err := queryRangeHandler(startTime, at, w, childQuery, start, end, step, r, ct); err != nil { + if err := queryRangeHandler(startTime, at, w, childQuery, start, end, step, r, ct, etf); err != nil { return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", childQuery, start, end, step, err) } queryDuration.UpdateDuration(startTime) @@ -1091,13 +1076,14 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r queryOffset = 0 } ec := promql.EvalConfig{ - AuthToken: at, - Start: start, - End: start, - Step: step, - QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r), - Deadline: deadline, - LookbackDelta: lookbackDelta, + AuthToken: at, + Start: start, + End: start, + Step: step, + QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r), + Deadline: deadline, + LookbackDelta: lookbackDelta, + EnforcedTagFilters: etf, DenyPartialResponse: searchutils.GetDenyPartialResponse(r), } @@ -1162,14 +1148,18 @@ func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite if err != nil { return err } - if err := queryRangeHandler(startTime, at, w, query, start, end, step, r, ct); err != nil { + etf, err := getEnforcedTagFiltersFromRequest(r) + if err != nil { + return err + } + if err := queryRangeHandler(startTime, at, w, query, start, end, step, r, ct, etf); err != nil { return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", query, start, end, step, err) } queryRangeDuration.UpdateDuration(startTime) return nil } -func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error { +func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64, etf []storage.TagFilter) error { deadline := searchutils.GetDeadlineForQuery(r, startTime) mayCache := !searchutils.GetBool(r, "nocache") lookbackDelta, err := getMaxLookback(r) @@ -1192,14 +1182,15 @@ func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite } ec := promql.EvalConfig{ - AuthToken: at, - Start: start, - End: end, - Step: step, - QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r), - Deadline: deadline, - MayCache: mayCache, - LookbackDelta: lookbackDelta, + AuthToken: at, + Start: start, + End: end, + Step: step, + QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r), + Deadline: deadline, + MayCache: mayCache, + LookbackDelta: lookbackDelta, + EnforcedTagFilters: etf, DenyPartialResponse: searchutils.GetDenyPartialResponse(r), } @@ -1309,6 +1300,38 @@ func getMaxLookback(r *http.Request) (int64, error) { return searchutils.GetDuration(r, "max_lookback", d) } +func getEnforcedTagFiltersFromRequest(r *http.Request) ([]storage.TagFilter, error) { + // fast path. + extraLabels := r.Form["extra_label"] + if len(extraLabels) == 0 { + return nil, nil + } + tagFilters := make([]storage.TagFilter, 0, len(extraLabels)) + for _, match := range extraLabels { + tmp := strings.SplitN(match, "=", 2) + if len(tmp) != 2 { + return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", match) + } + tagFilters = append(tagFilters, storage.TagFilter{ + Key: []byte(tmp[0]), + Value: []byte(tmp[1]), + }) + } + return tagFilters, nil +} + +func addEnforcedFiltersToTagFilterss(dstTfss [][]storage.TagFilter, enforcedFilters []storage.TagFilter) [][]storage.TagFilter { + if len(dstTfss) == 0 { + return [][]storage.TagFilter{ + enforcedFilters, + } + } + for i := range dstTfss { + dstTfss[i] = append(dstTfss[i], enforcedFilters...) + } + return dstTfss +} + func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) { tagFilterss := make([][]storage.TagFilter, 0, len(matches)) for _, match := range matches { @@ -1321,6 +1344,35 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) return tagFilterss, nil } +func getTagFilterssFromRequest(r *http.Request) ([][]storage.TagFilter, error) { + matches, err := getMatchesFromRequest(r) + if err != nil { + return nil, err + } + tagFilterss, err := getTagFilterssFromMatches(matches) + if err != nil { + return nil, err + } + etf, err := getEnforcedTagFiltersFromRequest(r) + if err != nil { + return nil, err + } + tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf) + return tagFilterss, nil +} + +func getMatchesFromRequest(r *http.Request) ([]string, error) { + matches := r.Form["match[]"] + if len(matches) > 0 { + return matches, nil + } + match := r.Form.Get("match") + if len(match) == 0 { + return nil, fmt.Errorf("missing `match[]` query arg") + } + return []string{match}, nil +} + func getLatencyOffsetMilliseconds() int64 { d := latencyOffset.Milliseconds() if d <= 1000 { diff --git a/app/vmselect/prometheus/prometheus_test.go b/app/vmselect/prometheus/prometheus_test.go index 70d2b06f2e..d376a928b6 100644 --- a/app/vmselect/prometheus/prometheus_test.go +++ b/app/vmselect/prometheus/prometheus_test.go @@ -2,10 +2,12 @@ package prometheus import ( "math" + "net/http" "reflect" "testing" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) func TestRemoveEmptyValuesAndTimeseries(t *testing.T) { @@ -195,3 +197,75 @@ func TestAdjustLastPoints(t *testing.T) { }, }) } + +// helper for tests +func tfFromKV(k, v string) storage.TagFilter { + return storage.TagFilter{ + Key: []byte(k), + Value: []byte(v), + } +} + +func Test_addEnforcedFiltersToTagFilterss(t *testing.T) { + f := func(t *testing.T, dstTfss [][]storage.TagFilter, enforcedFilters []storage.TagFilter, want [][]storage.TagFilter) { + t.Helper() + got := addEnforcedFiltersToTagFilterss(dstTfss, enforcedFilters) + if !reflect.DeepEqual(got, want) { + t.Fatalf("unxpected result for addEnforcedFiltersToTagFilterss, \ngot: %v,\n want: %v", want, got) + } + } + f(t, [][]storage.TagFilter{{tfFromKV("label", "value")}}, + nil, + [][]storage.TagFilter{{tfFromKV("label", "value")}}) + + f(t, nil, + []storage.TagFilter{tfFromKV("ext-label", "ext-value")}, + [][]storage.TagFilter{{tfFromKV("ext-label", "ext-value")}}) + + f(t, [][]storage.TagFilter{ + {tfFromKV("l1", "v1")}, + {tfFromKV("l2", "v2")}, + }, + []storage.TagFilter{tfFromKV("ext-l1", "v2")}, + [][]storage.TagFilter{ + {tfFromKV("l1", "v1"), tfFromKV("ext-l1", "v2")}, + {tfFromKV("l2", "v2"), tfFromKV("ext-l1", "v2")}, + }) +} + +func Test_getEnforcedTagFiltersFromRequest(t *testing.T) { + httpReqWithForm := func(tfs []string) *http.Request { + return &http.Request{ + Form: map[string][]string{ + "extra_label": tfs, + }, + } + } + f := func(t *testing.T, r *http.Request, want []storage.TagFilter, wantErr bool) { + t.Helper() + got, err := getEnforcedTagFiltersFromRequest(r) + if (err != nil) != wantErr { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("unxpected result for getEnforcedTagFiltersFromRequest, \ngot: %v,\n want: %v", want, got) + } + } + + f(t, httpReqWithForm([]string{"label=value"}), + []storage.TagFilter{ + tfFromKV("label", "value"), + }, + false) + + f(t, httpReqWithForm([]string{"job=vmagent", "dc=gce"}), + []storage.TagFilter{tfFromKV("job", "vmagent"), tfFromKV("dc", "gce")}, + false, + ) + f(t, httpReqWithForm([]string{"bad_filter"}), + nil, + true, + ) + f(t, &http.Request{}, + nil, false) +} diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 42d3d54b6d..e2768124a2 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -100,6 +100,9 @@ type EvalConfig struct { // LookbackDelta is analog to `-query.lookback-delta` from Prometheus. LookbackDelta int64 + // EnforcedTagFilters used for apply additional label filters to query. + EnforcedTagFilters []storage.TagFilter + DenyPartialResponse bool // IsPartialResponse is set during query execution and can be used by Exec caller after query execution. @@ -119,6 +122,7 @@ func newEvalConfig(src *EvalConfig) *EvalConfig { ec.Deadline = src.Deadline ec.MayCache = src.MayCache ec.LookbackDelta = src.LookbackDelta + ec.EnforcedTagFilters = src.EnforcedTagFilters ec.DenyPartialResponse = src.DenyPartialResponse ec.IsPartialResponse = src.IsPartialResponse @@ -665,6 +669,8 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc, // Fetch the remaining part of the result. tfs := toTagFilters(me.LabelFilters) + // append external filters. + tfs = append(tfs, ec.EnforcedTagFilters...) minTimestamp := start - maxSilenceInterval if window > ec.Step { minTimestamp -= window