app/vmselect: add ability to set an additional label filters via extra_label query arg

This commit is contained in:
Aliaksandr Valialkin 2021-02-01 17:42:35 +02:00
parent 6811445b64
commit 7d23f3ff3a
3 changed files with 209 additions and 77 deletions

View file

@ -56,10 +56,6 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
matches := r.Form["match[]"]
if len(matches) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
lookbackDelta, err := getMaxLookback(r)
if err != nil {
return err
@ -79,7 +75,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
if start >= end {
start = end - defaultStep
}
tagFilterss, err := getTagFilterssFromMatches(matches)
tagFilterss, err := getTagFilterssFromRequest(r)
if err != nil {
return err
}
@ -129,15 +125,6 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
return fmt.Errorf("missing `format` arg; see https://victoriametrics.github.io/#how-to-export-csv-data")
}
fieldNames := strings.Split(format, ",")
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
@ -147,7 +134,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
return err
}
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
tagFilterss, err := getTagFilterssFromRequest(r)
if err != nil {
return err
}
@ -206,15 +193,6 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
return err
@ -224,7 +202,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
return err
}
deadline := searchutils.GetDeadlineForExport(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
tagFilterss, err := getTagFilterssFromRequest(r)
if err != nil {
return err
}
@ -288,14 +266,9 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse request form values: %w", err)
}
matches := r.Form["match[]"]
if len(matches) == 0 {
// Maintain backwards compatibility
match := r.FormValue("match")
if len(match) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
matches = []string{match}
matches, err := getMatchesFromRequest(r)
if err != nil {
return err
}
start, err := searchutils.GetTime(r, "start", 0)
if err != nil {
@ -312,7 +285,11 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if start >= end {
end = start + defaultStep
}
if err := exportHandler(at, w, r, matches, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil {
etf, err := getEnforcedTagFiltersFromRequest(r)
if err != nil {
return err
}
if err := exportHandler(at, w, r, matches, etf, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil {
return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err)
}
exportDuration.UpdateDuration(startTime)
@ -321,7 +298,7 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64,
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, etf []storage.TagFilter, start, end int64,
format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error {
writeResponseFunc := WriteExportStdResponse
writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
@ -379,6 +356,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
if err != nil {
return err
}
tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf)
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss)
w.Header().Set("Content-Type", contentType)
bw := bufferedwriter.Get(w)
@ -473,19 +451,15 @@ func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error {
if r.FormValue("start") != "" || r.FormValue("end") != "" {
return fmt.Errorf("start and end aren't supported. Remove these args from the query in order to delete all the matching metrics")
}
matches := r.Form["match[]"]
if len(matches) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
deadline := searchutils.GetDeadlineForQuery(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
tagFilterss, err := getTagFilterssFromRequest(r)
if err != nil {
return err
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, tagFilterss)
deletedCount, err := netstorage.DeleteSeries(at, sq, deadline)
if err != nil {
return fmt.Errorf("cannot delete time series matching %q: %w", matches, err)
return fmt.Errorf("cannot delete time series: %w", err)
}
if deletedCount > 0 {
// Reset rollup result cache on all the vmselect nodes,
@ -545,10 +519,14 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
etf, err := getEnforcedTagFiltersFromRequest(r)
if err != nil {
return err
}
var labelValues []string
var isPartial bool
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
if len(r.Form["match[]"]) == 0 {
if len(r.Form["match[]"]) == 0 && len(etf) == 0 {
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
var err error
labelValues, isPartial, err = netstorage.GetLabelValues(at, denyPartialResponse, labelName, deadline)
@ -592,7 +570,7 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
if err != nil {
return err
}
labelValues, isPartial, err = labelValuesWithMatches(at, denyPartialResponse, labelName, matches, start, end, deadline)
labelValues, isPartial, err = labelValuesWithMatches(at, denyPartialResponse, labelName, matches, etf, start, end, deadline)
if err != nil {
return fmt.Errorf("cannot obtain label values for %q, match[]=%q, start=%d, end=%d: %w", labelName, matches, start, end, err)
}
@ -609,10 +587,8 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
return nil
}
func labelValuesWithMatches(at *auth.Token, denyPartialResponse bool, labelName string, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) {
if len(matches) == 0 {
logger.Panicf("BUG: matches must be non-empty")
}
func labelValuesWithMatches(at *auth.Token, denyPartialResponse bool, labelName string, matches []string, etf []storage.TagFilter,
start, end int64, deadline searchutils.Deadline) ([]string, bool, error) {
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return nil, false, err
@ -633,6 +609,10 @@ func labelValuesWithMatches(at *auth.Token, denyPartialResponse bool, labelName
if start >= end {
end = start + defaultStep
}
tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf)
if len(tagFilterss) == 0 {
logger.Panicf("BUG: tagFilterss must be non-empty")
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss)
m := make(map[string]struct{})
isPartial := false
@ -764,10 +744,14 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
etf, err := getEnforcedTagFiltersFromRequest(r)
if err != nil {
return err
}
var labels []string
var isPartial bool
denyPartialResponse := searchutils.GetDenyPartialResponse(r)
if len(r.Form["match[]"]) == 0 {
if len(r.Form["match[]"]) == 0 && len(etf) == 0 {
if len(r.Form["start"]) == 0 && len(r.Form["end"]) == 0 {
var err error
labels, isPartial, err = netstorage.GetLabels(at, denyPartialResponse, deadline)
@ -809,7 +793,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if err != nil {
return err
}
labels, isPartial, err = labelsWithMatches(at, denyPartialResponse, matches, start, end, deadline)
labels, isPartial, err = labelsWithMatches(at, denyPartialResponse, matches, etf, start, end, deadline)
if err != nil {
return fmt.Errorf("cannot obtain labels for match[]=%q, start=%d, end=%d: %w", matches, start, end, err)
}
@ -826,10 +810,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
return nil
}
func labelsWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) {
if len(matches) == 0 {
logger.Panicf("BUG: matches must be non-empty")
}
func labelsWithMatches(at *auth.Token, denyPartialResponse bool, matches []string, etf []storage.TagFilter, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) {
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return nil, false, err
@ -837,6 +818,10 @@ func labelsWithMatches(at *auth.Token, denyPartialResponse bool, matches []strin
if start >= end {
end = start + defaultStep
}
tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf)
if len(tagFilterss) == 0 {
logger.Panicf("BUG: tagFilterss must be non-empty")
}
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss)
m := make(map[string]struct{})
isPartial := false
@ -915,10 +900,6 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
matches := r.Form["match[]"]
if len(matches) == 0 {
return fmt.Errorf("missing `match[]` arg")
}
end, err := searchutils.GetTime(r, "end", ct)
if err != nil {
return err
@ -934,7 +915,7 @@ func SeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
}
deadline := searchutils.GetDeadlineForQuery(r, startTime)
tagFilterss, err := getTagFilterssFromMatches(matches)
tagFilterss, err := getTagFilterssFromRequest(r)
if err != nil {
return err
}
@ -1036,6 +1017,10 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
if len(query) > maxQueryLen.N {
return fmt.Errorf("too long query; got %d bytes; mustn't exceed `-search.maxQueryLen=%d` bytes", len(query), maxQueryLen.N)
}
etf, err := getEnforcedTagFiltersFromRequest(r)
if err != nil {
return err
}
if childQuery, windowStr, offsetStr := promql.IsMetricSelectorWithRollup(query); childQuery != "" {
window, err := parsePositiveDuration(windowStr, step)
if err != nil {
@ -1048,7 +1033,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
start -= offset
end := start
start = end - window
if err := exportHandler(at, w, r, []string{childQuery}, start, end, "promapi", 0, false, deadline); err != nil {
if err := exportHandler(at, w, r, []string{childQuery}, etf, start, end, "promapi", 0, false, deadline); err != nil {
return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err)
}
queryDuration.UpdateDuration(startTime)
@ -1073,7 +1058,7 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
start -= offset
end := start
start = end - window
if err := queryRangeHandler(startTime, at, w, childQuery, start, end, step, r, ct); err != nil {
if err := queryRangeHandler(startTime, at, w, childQuery, start, end, step, r, ct, etf); err != nil {
return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", childQuery, start, end, step, err)
}
queryDuration.UpdateDuration(startTime)
@ -1091,13 +1076,14 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
queryOffset = 0
}
ec := promql.EvalConfig{
AuthToken: at,
Start: start,
End: start,
Step: step,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
LookbackDelta: lookbackDelta,
AuthToken: at,
Start: start,
End: start,
Step: step,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
LookbackDelta: lookbackDelta,
EnforcedTagFilters: etf,
DenyPartialResponse: searchutils.GetDenyPartialResponse(r),
}
@ -1162,14 +1148,18 @@ func QueryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
if err != nil {
return err
}
if err := queryRangeHandler(startTime, at, w, query, start, end, step, r, ct); err != nil {
etf, err := getEnforcedTagFiltersFromRequest(r)
if err != nil {
return err
}
if err := queryRangeHandler(startTime, at, w, query, start, end, step, r, ct, etf); err != nil {
return fmt.Errorf("error when executing query=%q on the time range (start=%d, end=%d, step=%d): %w", query, start, end, step, err)
}
queryRangeDuration.UpdateDuration(startTime)
return nil
}
func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64) error {
func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, start, end, step int64, r *http.Request, ct int64, etf []storage.TagFilter) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
mayCache := !searchutils.GetBool(r, "nocache")
lookbackDelta, err := getMaxLookback(r)
@ -1192,14 +1182,15 @@ func queryRangeHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
}
ec := promql.EvalConfig{
AuthToken: at,
Start: start,
End: end,
Step: step,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
LookbackDelta: lookbackDelta,
AuthToken: at,
Start: start,
End: end,
Step: step,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
LookbackDelta: lookbackDelta,
EnforcedTagFilters: etf,
DenyPartialResponse: searchutils.GetDenyPartialResponse(r),
}
@ -1309,6 +1300,38 @@ func getMaxLookback(r *http.Request) (int64, error) {
return searchutils.GetDuration(r, "max_lookback", d)
}
func getEnforcedTagFiltersFromRequest(r *http.Request) ([]storage.TagFilter, error) {
// fast path.
extraLabels := r.Form["extra_label"]
if len(extraLabels) == 0 {
return nil, nil
}
tagFilters := make([]storage.TagFilter, 0, len(extraLabels))
for _, match := range extraLabels {
tmp := strings.SplitN(match, "=", 2)
if len(tmp) != 2 {
return nil, fmt.Errorf("`extra_label` query arg must have the format `name=value`; got %q", match)
}
tagFilters = append(tagFilters, storage.TagFilter{
Key: []byte(tmp[0]),
Value: []byte(tmp[1]),
})
}
return tagFilters, nil
}
func addEnforcedFiltersToTagFilterss(dstTfss [][]storage.TagFilter, enforcedFilters []storage.TagFilter) [][]storage.TagFilter {
if len(dstTfss) == 0 {
return [][]storage.TagFilter{
enforcedFilters,
}
}
for i := range dstTfss {
dstTfss[i] = append(dstTfss[i], enforcedFilters...)
}
return dstTfss
}
func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error) {
tagFilterss := make([][]storage.TagFilter, 0, len(matches))
for _, match := range matches {
@ -1321,6 +1344,35 @@ func getTagFilterssFromMatches(matches []string) ([][]storage.TagFilter, error)
return tagFilterss, nil
}
func getTagFilterssFromRequest(r *http.Request) ([][]storage.TagFilter, error) {
matches, err := getMatchesFromRequest(r)
if err != nil {
return nil, err
}
tagFilterss, err := getTagFilterssFromMatches(matches)
if err != nil {
return nil, err
}
etf, err := getEnforcedTagFiltersFromRequest(r)
if err != nil {
return nil, err
}
tagFilterss = addEnforcedFiltersToTagFilterss(tagFilterss, etf)
return tagFilterss, nil
}
func getMatchesFromRequest(r *http.Request) ([]string, error) {
matches := r.Form["match[]"]
if len(matches) > 0 {
return matches, nil
}
match := r.Form.Get("match")
if len(match) == 0 {
return nil, fmt.Errorf("missing `match[]` query arg")
}
return []string{match}, nil
}
func getLatencyOffsetMilliseconds() int64 {
d := latencyOffset.Milliseconds()
if d <= 1000 {

View file

@ -2,10 +2,12 @@ package prometheus
import (
"math"
"net/http"
"reflect"
"testing"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestRemoveEmptyValuesAndTimeseries(t *testing.T) {
@ -195,3 +197,75 @@ func TestAdjustLastPoints(t *testing.T) {
},
})
}
// helper for tests
func tfFromKV(k, v string) storage.TagFilter {
return storage.TagFilter{
Key: []byte(k),
Value: []byte(v),
}
}
func Test_addEnforcedFiltersToTagFilterss(t *testing.T) {
f := func(t *testing.T, dstTfss [][]storage.TagFilter, enforcedFilters []storage.TagFilter, want [][]storage.TagFilter) {
t.Helper()
got := addEnforcedFiltersToTagFilterss(dstTfss, enforcedFilters)
if !reflect.DeepEqual(got, want) {
t.Fatalf("unxpected result for addEnforcedFiltersToTagFilterss, \ngot: %v,\n want: %v", want, got)
}
}
f(t, [][]storage.TagFilter{{tfFromKV("label", "value")}},
nil,
[][]storage.TagFilter{{tfFromKV("label", "value")}})
f(t, nil,
[]storage.TagFilter{tfFromKV("ext-label", "ext-value")},
[][]storage.TagFilter{{tfFromKV("ext-label", "ext-value")}})
f(t, [][]storage.TagFilter{
{tfFromKV("l1", "v1")},
{tfFromKV("l2", "v2")},
},
[]storage.TagFilter{tfFromKV("ext-l1", "v2")},
[][]storage.TagFilter{
{tfFromKV("l1", "v1"), tfFromKV("ext-l1", "v2")},
{tfFromKV("l2", "v2"), tfFromKV("ext-l1", "v2")},
})
}
func Test_getEnforcedTagFiltersFromRequest(t *testing.T) {
httpReqWithForm := func(tfs []string) *http.Request {
return &http.Request{
Form: map[string][]string{
"extra_label": tfs,
},
}
}
f := func(t *testing.T, r *http.Request, want []storage.TagFilter, wantErr bool) {
t.Helper()
got, err := getEnforcedTagFiltersFromRequest(r)
if (err != nil) != wantErr {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("unxpected result for getEnforcedTagFiltersFromRequest, \ngot: %v,\n want: %v", want, got)
}
}
f(t, httpReqWithForm([]string{"label=value"}),
[]storage.TagFilter{
tfFromKV("label", "value"),
},
false)
f(t, httpReqWithForm([]string{"job=vmagent", "dc=gce"}),
[]storage.TagFilter{tfFromKV("job", "vmagent"), tfFromKV("dc", "gce")},
false,
)
f(t, httpReqWithForm([]string{"bad_filter"}),
nil,
true,
)
f(t, &http.Request{},
nil, false)
}

View file

@ -100,6 +100,9 @@ type EvalConfig struct {
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
// EnforcedTagFilters used for apply additional label filters to query.
EnforcedTagFilters []storage.TagFilter
DenyPartialResponse bool
// IsPartialResponse is set during query execution and can be used by Exec caller after query execution.
@ -119,6 +122,7 @@ func newEvalConfig(src *EvalConfig) *EvalConfig {
ec.Deadline = src.Deadline
ec.MayCache = src.MayCache
ec.LookbackDelta = src.LookbackDelta
ec.EnforcedTagFilters = src.EnforcedTagFilters
ec.DenyPartialResponse = src.DenyPartialResponse
ec.IsPartialResponse = src.IsPartialResponse
@ -665,6 +669,8 @@ func evalRollupFuncWithMetricExpr(ec *EvalConfig, name string, rf rollupFunc,
// Fetch the remaining part of the result.
tfs := toTagFilters(me.LabelFilters)
// append external filters.
tfs = append(tfs, ec.EnforcedTagFilters...)
minTimestamp := start - maxSilenceInterval
if window > ec.Step {
minTimestamp -= window