changes metricsFind api (#1137)

it should be able mitigate crash if label value contains *,[ or { symbols
This commit is contained in:
Nikolay 2021-03-18 17:12:02 +03:00 committed by GitHub
parent 726f6ad804
commit 529d7be26b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -76,7 +76,7 @@ func MetricsFindHandler(startTime time.Time, w http.ResponseWriter, r *http.Requ
MinTimestamp: from, MinTimestamp: from,
MaxTimestamp: until, MaxTimestamp: until,
} }
paths, err := metricsFind(tr, label, query, delimiter[0], false, deadline) paths, err := metricsFind(tr, label, "", query, delimiter[0], false, deadline)
if err != nil { if err != nil {
return err return err
} }
@ -155,7 +155,7 @@ func MetricsExpandHandler(startTime time.Time, w http.ResponseWriter, r *http.Re
} }
m := make(map[string][]string, len(queries)) m := make(map[string][]string, len(queries))
for _, query := range queries { for _, query := range queries {
paths, err := metricsFind(tr, label, query, delimiter[0], true, deadline) paths, err := metricsFind(tr, label, "", query, delimiter[0], true, deadline)
if err != nil { if err != nil {
return err return err
} }
@ -222,44 +222,50 @@ func MetricsIndexHandler(startTime time.Time, w http.ResponseWriter, r *http.Req
return nil return nil
} }
// metricsFind searches for label values that match the given query. // metricsFind searches for label values that match the given head and tail.
func metricsFind(tr storage.TimeRange, label, query string, delimiter byte, isExpand bool, deadline searchutils.Deadline) ([]string, error) { func metricsFind(tr storage.TimeRange, label, head, tail string, delimiter byte, isExpand bool, deadline searchutils.Deadline) ([]string, error) {
n := strings.IndexAny(query, "*{[") n := strings.IndexAny(tail, "*{[")
if n < 0 || n == len(query)-1 && strings.HasSuffix(query, "*") { // fast path.
expandTail := n >= 0 if n < 0 {
if expandTail { res, err := netstorage.GetTagValueSuffixes(tr, label, head+tail, delimiter, deadline)
query = query[:len(query)-1] if err != nil {
return nil, err
} }
suffixes, err := netstorage.GetTagValueSuffixes(tr, label, query, delimiter, deadline) if len(res) == 0 {
return nil, nil
}
return []string{head + tail}, nil
}
if strings.HasSuffix(head, "*") {
head = head[:len(head)-1]
suffixes, err := netstorage.GetTagValueSuffixes(tr, label, head, delimiter, deadline)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(suffixes) == 0 { if len(suffixes) == 0 {
return nil, nil return nil, nil
} }
if !expandTail && len(query) > 0 && query[len(query)-1] == delimiter {
return []string{query}, nil
}
results := make([]string, 0, len(suffixes)) results := make([]string, 0, len(suffixes))
for _, suffix := range suffixes { for _, suffix := range suffixes {
if expandTail || len(suffix) == 0 || len(suffix) == 1 && suffix[0] == delimiter { results = append(results, head+suffix)
results = append(results, query+suffix)
}
} }
return results, nil return results, nil
} }
subquery := query[:n] + "*"
paths, err := metricsFind(tr, label, subquery, delimiter, isExpand, deadline) head += tail[:n]
subquery := head + "*"
// execute subquery with the given head.
paths, err := metricsFind(tr, label, subquery, "*", delimiter, isExpand, deadline)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tail := "" tailNew := ""
suffix := query[n:] suffix := tail[n:]
if m := strings.IndexByte(suffix, delimiter); m >= 0 { if m := strings.IndexByte(suffix, delimiter); m >= 0 {
tail = suffix[m+1:] tailNew = suffix[m+1:]
suffix = suffix[:m+1] suffix = suffix[:m+1]
} }
qPrefix := query[:n] + suffix qPrefix := head + suffix
rePrefix, err := getRegexpForQuery(qPrefix, delimiter) rePrefix, err := getRegexpForQuery(qPrefix, delimiter)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot convert query %q to regexp: %w", qPrefix, err) return nil, fmt.Errorf("cannot convert query %q to regexp: %w", qPrefix, err)
@ -269,12 +275,11 @@ func metricsFind(tr storage.TimeRange, label, query string, delimiter byte, isEx
if !rePrefix.MatchString(path) { if !rePrefix.MatchString(path) {
continue continue
} }
if tail == "" { if tailNew == "" {
results = append(results, path) results = append(results, path)
continue continue
} }
subquery := path + tail fullPaths, err := metricsFind(tr, label, path, tailNew, delimiter, isExpand, deadline)
fullPaths, err := metricsFind(tr, label, subquery, delimiter, isExpand, deadline)
if err != nil { if err != nil {
return nil, err return nil, err
} }