lib/storage: prevent from infinite loop if {__graphite__="..."} filter matches a metric name with *, [ or { chars

The idea has been borrowed from https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1137
This commit is contained in:
Aliaksandr Valialkin 2021-03-18 14:52:49 +02:00
parent e061a4fa19
commit 6cee5338b2
2 changed files with 25 additions and 18 deletions

View file

@ -9,6 +9,8 @@
* `process_resident_memory_peak_bytes` - peak RSS usage for the process.
* `process_virtual_memory_peak_bytes` - peak virtual memory usage for the process.
* BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars.
# [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0)

View file

@ -1,6 +1,7 @@
package storage
import (
"bytes"
"fmt"
"io"
"io/ioutil"
@ -15,6 +16,7 @@ import (
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -1046,14 +1048,16 @@ func (s *Storage) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []
}
// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr.
//
// If more than maxPaths paths is found, then only the first maxPaths paths is returned.
func (s *Storage) SearchGraphitePaths(tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
queryStr := string(query)
n := strings.IndexAny(queryStr, "*[{")
return s.searchGraphitePaths(tr, nil, query, maxPaths, deadline)
}
func (s *Storage) searchGraphitePaths(tr TimeRange, qHead, qTail []byte, maxPaths int, deadline uint64) ([]string, error) {
n := strings.IndexAny(bytesutil.ToUnsafeString(qTail), "*[{")
if n < 0 {
// Verify that the query matches a metric name.
suffixes, err := s.SearchTagValueSuffixes(tr, nil, query, '.', 1, deadline)
// Verify that qHead matches a metric name.
qHead = append(qHead, qTail...)
suffixes, err := s.SearchTagValueSuffixes(tr, nil, qHead, '.', 1, deadline)
if err != nil {
return nil, err
}
@ -1065,9 +1069,10 @@ func (s *Storage) SearchGraphitePaths(tr TimeRange, query []byte, maxPaths int,
// The query matches a metric name with additional suffix.
return nil, nil
}
return []string{queryStr}, nil
return []string{string(qHead)}, nil
}
suffixes, err := s.SearchTagValueSuffixes(tr, nil, query[:n], '.', maxPaths, deadline)
qHead = append(qHead, qTail[:n]...)
suffixes, err := s.SearchTagValueSuffixes(tr, nil, qHead, '.', maxPaths, deadline)
if err != nil {
return nil, err
}
@ -1077,34 +1082,34 @@ func (s *Storage) SearchGraphitePaths(tr TimeRange, query []byte, maxPaths int,
if len(suffixes) >= maxPaths {
return nil, fmt.Errorf("more than maxPaths=%d suffixes found", maxPaths)
}
qPrefixStr := queryStr[:n]
qTail := ""
qNode := queryStr[n:]
qNode := qTail[n:]
qTail = nil
mustMatchLeafs := true
if m := strings.IndexByte(qNode, '.'); m >= 0 {
if m := bytes.IndexByte(qNode, '.'); m >= 0 {
qTail = qNode[m+1:]
qNode = qNode[:m+1]
mustMatchLeafs = false
}
re, err := getRegexpForGraphiteQuery(qNode)
re, err := getRegexpForGraphiteQuery(string(qNode))
if err != nil {
return nil, err
}
qHeadLen := len(qHead)
var paths []string
for _, suffix := range suffixes {
if len(paths) > maxPaths {
paths = paths[:maxPaths]
break
return nil, fmt.Errorf("more than maxPath=%d paths found", maxPaths)
}
if !re.MatchString(suffix) {
continue
}
if mustMatchLeafs {
paths = append(paths, qPrefixStr+suffix)
qHead = append(qHead[:qHeadLen], suffix...)
paths = append(paths, string(qHead))
continue
}
q := qPrefixStr + suffix + qTail
ps, err := s.SearchGraphitePaths(tr, []byte(q), maxPaths, deadline)
qHead = append(qHead[:qHeadLen], suffix...)
ps, err := s.searchGraphitePaths(tr, qHead, qTail, maxPaths, deadline)
if err != nil {
return nil, err
}