mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vmselect: add ability to set Graphite-compatible filter via {__graphite__="foo.*.bar"}
syntax
This commit is contained in:
parent
755b0998ce
commit
4b930b9ffe
10 changed files with 301 additions and 50 deletions
|
@ -31,7 +31,7 @@ func TagsDelSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseWr
|
|||
totalDeleted := 0
|
||||
var row graphiteparser.Row
|
||||
var tagsPool []graphiteparser.Tag
|
||||
ct := time.Now().UnixNano() / 1e6
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
for _, path := range paths {
|
||||
var err error
|
||||
tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0])
|
||||
|
@ -91,7 +91,7 @@ func registerMetrics(startTime time.Time, at *auth.Token, w http.ResponseWriter,
|
|||
var b []byte
|
||||
var tagsPool []graphiteparser.Tag
|
||||
mrs := make([]storage.MetricRow, len(paths))
|
||||
ct := time.Now().UnixNano() / 1e6
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
canonicalPaths := make([]string, len(paths))
|
||||
for i, path := range paths {
|
||||
var err error
|
||||
|
@ -190,7 +190,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, at *auth.Token, w http.R
|
|||
}
|
||||
} else {
|
||||
// Slow path: use netstorage.SearchMetricNames for applying `expr` filters.
|
||||
sq, err := getSearchQueryForExprs(at, exprs)
|
||||
sq, err := getSearchQueryForExprs(startTime, at, exprs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -275,7 +275,7 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, at *auth.Token, w http.Res
|
|||
}
|
||||
} else {
|
||||
// Slow path: use netstorage.SearchMetricNames for applying `expr` filters.
|
||||
sq, err := getSearchQueryForExprs(at, exprs)
|
||||
sq, err := getSearchQueryForExprs(startTime, at, exprs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ func TagsFindSeriesHandler(startTime time.Time, at *auth.Token, w http.ResponseW
|
|||
if len(exprs) == 0 {
|
||||
return fmt.Errorf("expecting at least one `expr` query arg")
|
||||
}
|
||||
sq, err := getSearchQueryForExprs(at, exprs)
|
||||
sq, err := getSearchQueryForExprs(startTime, at, exprs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -467,12 +467,12 @@ func getInt(r *http.Request, argName string) (int, error) {
|
|||
return n, nil
|
||||
}
|
||||
|
||||
func getSearchQueryForExprs(at *auth.Token, exprs []string) (*storage.SearchQuery, error) {
|
||||
func getSearchQueryForExprs(startTime time.Time, at *auth.Token, exprs []string) (*storage.SearchQuery, error) {
|
||||
tfs, err := exprsToTagFilters(exprs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ct := time.Now().UnixNano() / 1e6
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, [][]storage.TagFilter{tfs})
|
||||
return sq, nil
|
||||
}
|
||||
|
|
|
@ -445,18 +445,19 @@ var exportBlockPool = &sync.Pool{
|
|||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
|
||||
func DeleteHandler(startTime time.Time, at *auth.Token, r *http.Request) error {
|
||||
deadline := searchutils.GetDeadlineForQuery(r, startTime)
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse request form values: %w", err)
|
||||
}
|
||||
if r.FormValue("start") != "" || r.FormValue("end") != "" {
|
||||
return fmt.Errorf("start and end aren't supported. Remove these args from the query in order to delete all the matching metrics")
|
||||
}
|
||||
deadline := searchutils.GetDeadlineForQuery(r, startTime)
|
||||
tagFilterss, err := getTagFilterssFromRequest(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, 0, tagFilterss)
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, 0, ct, tagFilterss)
|
||||
deletedCount, err := netstorage.DeleteSeries(at, sq, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete time series: %w", err)
|
||||
|
|
|
@ -763,7 +763,11 @@ func (s *Server) processVMSelectDeleteMetrics(ctx *vmselectRequestCtx) error {
|
|||
}
|
||||
|
||||
// Setup ctx.tfss
|
||||
if err := ctx.setupTfss(); err != nil {
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: 0,
|
||||
MaxTimestamp: time.Now().UnixNano() / 1e6,
|
||||
}
|
||||
if err := ctx.setupTfss(s.storage, tr); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
|
||||
|
@ -954,6 +958,13 @@ func (s *Server) processVMSelectTagValueSuffixes(ctx *vmselectRequestCtx) error
|
|||
if err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
if len(suffixes) >= *maxTagValueSuffixesPerSearch {
|
||||
err := fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found "+
|
||||
"for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+
|
||||
"either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value",
|
||||
*maxTagValueSuffixesPerSearch, tagKey, tagValuePrefix, delimiter, tr.String())
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
|
||||
// Send an empty error message to vmselect.
|
||||
if err := ctx.writeString(""); err != nil {
|
||||
|
@ -1128,13 +1139,13 @@ func (s *Server) processVMSelectSearchMetricNames(ctx *vmselectRequestCtx) error
|
|||
}
|
||||
|
||||
// Search metric names.
|
||||
if err := ctx.setupTfss(); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: ctx.sq.MinTimestamp,
|
||||
MaxTimestamp: ctx.sq.MaxTimestamp,
|
||||
}
|
||||
if err := ctx.setupTfss(s.storage, tr); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
mns, err := s.storage.SearchMetricNames(ctx.tfss, tr, *maxMetricsPerSearch, ctx.deadline)
|
||||
if err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
|
@ -1172,13 +1183,13 @@ func (s *Server) processVMSelectSearch(ctx *vmselectRequestCtx) error {
|
|||
}
|
||||
|
||||
// Setup search.
|
||||
if err := ctx.setupTfss(); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: ctx.sq.MinTimestamp,
|
||||
MaxTimestamp: ctx.sq.MaxTimestamp,
|
||||
}
|
||||
if err := ctx.setupTfss(s.storage, tr); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
if err := checkTimeRange(s.storage, tr); err != nil {
|
||||
return ctx.writeErrorMessage(err)
|
||||
}
|
||||
|
@ -1252,12 +1263,27 @@ var (
|
|||
vmselectMetricRowsRead = metrics.NewCounter(`vm_vmselect_metric_rows_read_total`)
|
||||
)
|
||||
|
||||
func (ctx *vmselectRequestCtx) setupTfss() error {
|
||||
func (ctx *vmselectRequestCtx) setupTfss(s *storage.Storage, tr storage.TimeRange) error {
|
||||
tfss := ctx.tfss[:0]
|
||||
accountID := ctx.sq.AccountID
|
||||
projectID := ctx.sq.ProjectID
|
||||
for _, tagFilters := range ctx.sq.TagFilterss {
|
||||
tfs := storage.NewTagFilters(ctx.sq.AccountID, ctx.sq.ProjectID)
|
||||
tfs := storage.NewTagFilters(accountID, projectID)
|
||||
for i := range tagFilters {
|
||||
tf := &tagFilters[i]
|
||||
if string(tf.Key) == "__graphite__" {
|
||||
query := tf.Value
|
||||
paths, err := s.SearchGraphitePaths(accountID, projectID, tr, query, *maxMetricsPerSearch, ctx.deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
|
||||
}
|
||||
if len(paths) >= *maxMetricsPerSearch {
|
||||
return fmt.Errorf("more than -search.maxUniqueTimeseries=%d time series match Graphite query %q; "+
|
||||
"either narrow down the query or increase -search.maxUniqueTimeseries command-line flag value", *maxMetricsPerSearch, query)
|
||||
}
|
||||
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
|
||||
continue
|
||||
}
|
||||
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
|
||||
return fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
* FEATURE: added [vmctl tool](https://victoriametrics.github.io/vmctl.html) to VictoriaMetrics release process. Now it is packaged in `vmutils-*.tar.gz` archive on [the releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). Source code for `vmctl` tool has been moved from [github.com/VictoriaMetrics/vmctl](https://github.com/VictoriaMetrics/vmctl) to [github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmctl).
|
||||
* FEATURE: added `-loggerTimezone` command-line flag for adjusting time zone for timestamps in log messages. By default UTC is used.
|
||||
* FEATURE: added `-search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned by `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default.
|
||||
* FEATURE: vmselect: added ability to use Graphite-compatible filters in MetricsQL via `{__graphite__="foo.*.bar"}` syntax. This expression is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster and it is easier to use when migrating from Graphite to VictoriaMetrics.
|
||||
* FEATURE: vmselect: added ability to set additional label filters, which must be applied during queries. Such label filters can be set via optional `extra_label` query arg, which is accepted by [querying API](https://victoriametrics.github.io/#prometheus-querying-api-usage) handlers. For example, the request to `/api/v1/query_range?extra_label=tenant_id=123&query=<query>` adds `{tenant_id="123"}` label filter to the given `<query>`. It is expected that the `extra_label` query arg is automatically set by auth proxy sitting
|
||||
in front of VictoriaMetrics. [Contact us](mailto:sales@victoriametrics.com) if you need assistance with such a proxy. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1021 .
|
||||
* FEATURE: vmalert: added `-datasource.queryStep` command-line flag for passing optional `step` query arg to `/api/v1/query` endpoint. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1025
|
||||
|
@ -18,6 +19,7 @@ in front of VictoriaMetrics. [Contact us](mailto:sales@victoriametrics.com) if y
|
|||
- `vm_promscrape_discovery_retries_total`
|
||||
- `vm_promscrape_scrape_retries_total`
|
||||
- `vm_promscrape_service_discovery_duration_seconds`
|
||||
|
||||
* BUGFIX: vmagent: reduce HTTP reconnection rate for scrape targets. Previously vmagent could errorneusly close HTTP keep-alive connections more frequently than needed.
|
||||
* BUGFIX: vmagent: retry scrape and service discovery requests when the remote server closes HTTP keep-alive connection. Previously `disable_keepalive: true` option could be used under `scrape_configs` section when working with such servers.
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ Feel free [filing a feature request](https://github.com/VictoriaMetrics/Victoria
|
|||
This functionality can be tried at [an editable Grafana dashboard](http://play-grafana.victoriametrics.com:3000/d/4ome8yJmz/node-exporter-on-victoriametrics-demo).
|
||||
|
||||
- [`WITH` templates](https://play.victoriametrics.com/promql/expand-with-exprs). This feature simplifies writing and managing complex queries. Go to [`WITH` templates playground](https://play.victoriametrics.com/promql/expand-with-exprs) and try it.
|
||||
- Graphite-compatible filters can be passed via `{__graphite__="foo.*.bar"}` syntax. This is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but usually works faster and is easier to use when migrating from Graphite to VictoriaMetrics.
|
||||
- Range duration in functions such as [rate](https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()) may be omitted. VictoriaMetrics automatically selects range duration depending on the current step used for building the graph. For instance, the following query is valid in VictoriaMetrics: `rate(node_network_receive_bytes_total)`.
|
||||
- All the aggregate functions support optional `limit N` suffix in order to limit the number of output series. For example, `sum(x) by (y) limit 10` limits
|
||||
the number of output time series after the aggregation to 10. All the other time series are dropped.
|
||||
|
|
|
@ -1123,6 +1123,8 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
|
|||
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
|
||||
//
|
||||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||
//
|
||||
// If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found.
|
||||
func (db *indexDB) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
|
||||
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
// TODO: cache results?
|
||||
|
@ -1134,13 +1136,15 @@ func (db *indexDB) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRa
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
if ok && err != nil {
|
||||
return nil, err
|
||||
if len(tvss) < maxTagValueSuffixes {
|
||||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(accountID, projectID, deadline)
|
||||
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
if ok && err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
suffixes := make([]string, 0, len(tvss))
|
||||
|
@ -1148,6 +1152,9 @@ func (db *indexDB) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRa
|
|||
// Do not skip empty suffixes, since they may represent leaf tag values.
|
||||
suffixes = append(suffixes, suffix)
|
||||
}
|
||||
if len(suffixes) > maxTagValueSuffixes {
|
||||
suffixes = suffixes[:maxTagValueSuffixes]
|
||||
}
|
||||
// Do not sort suffixes, since they must be sorted by vmselect.
|
||||
return suffixes, nil
|
||||
}
|
||||
|
@ -1179,6 +1186,9 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
|
|||
errGlobal = err
|
||||
return
|
||||
}
|
||||
if len(tvss) > maxTagValueSuffixes {
|
||||
return
|
||||
}
|
||||
for k := range tvssLocal {
|
||||
tvss[k] = struct{}{}
|
||||
}
|
||||
|
@ -1197,7 +1207,7 @@ func (is *indexSearch) searchTagValueSuffixesAll(tvss map[string]struct{}, tagKe
|
|||
kb.B = marshalTagValue(kb.B, tagValuePrefix)
|
||||
kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
|
||||
prefix := append([]byte(nil), kb.B...)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes)
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, date uint64, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
|
||||
|
@ -1209,10 +1219,10 @@ func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, d
|
|||
kb.B = marshalTagValue(kb.B, tagValuePrefix)
|
||||
kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
|
||||
prefix := append([]byte(nil), kb.B...)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes)
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
|
||||
func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix []byte, tagValuePrefixLen int, delimiter byte, maxTagValueSuffixes int) error {
|
||||
kb := &is.kb
|
||||
ts := &is.ts
|
||||
mp := &is.mp
|
||||
|
@ -1238,10 +1248,7 @@ func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{},
|
|||
continue
|
||||
}
|
||||
tagValue := mp.Tag.Value
|
||||
if !bytes.HasPrefix(tagValue, tagValuePrefix) {
|
||||
continue
|
||||
}
|
||||
suffix := tagValue[len(tagValuePrefix):]
|
||||
suffix := tagValue[tagValuePrefixLen:]
|
||||
n := bytes.IndexByte(suffix, delimiter)
|
||||
if n < 0 {
|
||||
// Found leaf tag value that doesn't have delimiters after the given tagValuePrefix.
|
||||
|
@ -2143,7 +2150,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
|
|||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
|
||||
|
||||
for i, tf := range tfs {
|
||||
if len(tf.key) == 0 {
|
||||
if len(tf.key) == 0 || string(tf.key) == "__graphite__" {
|
||||
// Match against mn.MetricGroup.
|
||||
b := marshalTagValue(kb.B, nil)
|
||||
b = marshalTagValue(b, mn.MetricGroup)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -1054,11 +1055,122 @@ func (s *Storage) SearchTagValues(accountID, projectID uint32, tagKey []byte, ma
|
|||
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
|
||||
//
|
||||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||
//
|
||||
// If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned.
|
||||
func (s *Storage) SearchTagValueSuffixes(accountID, projectID uint32, tr TimeRange, tagKey, tagValuePrefix []byte,
|
||||
delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
return s.idb().SearchTagValueSuffixes(accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
||||
}
|
||||
|
||||
// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr.
|
||||
//
|
||||
// If more than maxPaths paths is found, then only the first maxPaths paths is returned.
|
||||
func (s *Storage) SearchGraphitePaths(accountID, projectID uint32, tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
queryStr := string(query)
|
||||
n := strings.IndexAny(queryStr, "*[{")
|
||||
if n < 0 {
|
||||
// Verify that the query matches a metric name.
|
||||
suffixes, err := s.SearchTagValueSuffixes(accountID, projectID, tr, nil, query, '.', 1, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(suffixes) == 0 {
|
||||
// The query doesn't match anything.
|
||||
return nil, nil
|
||||
}
|
||||
if len(suffixes[0]) > 0 {
|
||||
// The query matches a metric name with additional suffix.
|
||||
return nil, nil
|
||||
}
|
||||
return []string{queryStr}, nil
|
||||
}
|
||||
suffixes, err := s.SearchTagValueSuffixes(accountID, projectID, tr, nil, query[:n], '.', maxPaths, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(suffixes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if len(suffixes) >= maxPaths {
|
||||
return nil, fmt.Errorf("more than maxPaths=%d suffixes found", maxPaths)
|
||||
}
|
||||
qPrefixStr := queryStr[:n]
|
||||
qNode := queryStr[n:]
|
||||
qTail := ""
|
||||
mustMatchLeafs := true
|
||||
if m := strings.IndexByte(qNode, '.'); m >= 0 {
|
||||
qNode = qNode[:m+1]
|
||||
qTail = qNode[m+1:]
|
||||
mustMatchLeafs = false
|
||||
}
|
||||
re, err := getRegexpForGraphiteNodeQuery(qNode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var paths []string
|
||||
for _, suffix := range suffixes {
|
||||
if len(paths) > maxPaths {
|
||||
paths = paths[:maxPaths]
|
||||
break
|
||||
}
|
||||
if !re.MatchString(suffix) {
|
||||
continue
|
||||
}
|
||||
if mustMatchLeafs {
|
||||
paths = append(paths, qPrefixStr+suffix)
|
||||
continue
|
||||
}
|
||||
q := qPrefixStr + suffix + qTail
|
||||
ps, err := s.SearchGraphitePaths(accountID, projectID, tr, []byte(q), maxPaths, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
paths = append(paths, ps...)
|
||||
}
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func getRegexpForGraphiteNodeQuery(q string) (*regexp.Regexp, error) {
|
||||
parts := getRegexpPartsForGraphiteNodeQuery(q)
|
||||
reStr := "^" + strings.Join(parts, "") + "$"
|
||||
return regexp.Compile(reStr)
|
||||
}
|
||||
|
||||
func getRegexpPartsForGraphiteNodeQuery(q string) []string {
|
||||
var parts []string
|
||||
for {
|
||||
n := strings.IndexAny(q, "*{[")
|
||||
if n < 0 {
|
||||
return append(parts, regexp.QuoteMeta(q))
|
||||
}
|
||||
parts = append(parts, regexp.QuoteMeta(q[:n]))
|
||||
q = q[n:]
|
||||
switch q[0] {
|
||||
case '*':
|
||||
parts = append(parts, "[^.]*")
|
||||
q = q[1:]
|
||||
case '{':
|
||||
n := strings.IndexByte(q, '}')
|
||||
if n < 0 {
|
||||
return append(parts, regexp.QuoteMeta(q))
|
||||
}
|
||||
var tmp []string
|
||||
for _, x := range strings.Split(q[1:n], ",") {
|
||||
tmp = append(tmp, strings.Join(getRegexpPartsForGraphiteNodeQuery(x), ""))
|
||||
}
|
||||
parts = append(parts, "(?:"+strings.Join(tmp, "|")+")")
|
||||
q = q[n+1:]
|
||||
case '[':
|
||||
n := strings.IndexByte(q, ']')
|
||||
if n < 0 {
|
||||
return append(parts, regexp.QuoteMeta(q))
|
||||
}
|
||||
parts = append(parts, q[:n+1])
|
||||
q = q[n+1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SearchTagEntries returns a list of (tagName -> tagValues) for (accountID, projectID).
|
||||
func (s *Storage) SearchTagEntries(accountID, projectID uint32, maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) {
|
||||
idb := s.idb()
|
||||
|
|
|
@ -14,6 +14,28 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
func TestGetRegexpForGraphiteNodeQuery(t *testing.T) {
|
||||
f := func(q, expectedRegexp string) {
|
||||
t.Helper()
|
||||
re, err := getRegexpForGraphiteNodeQuery(q)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error for query=%q: %s", q, err)
|
||||
}
|
||||
reStr := re.String()
|
||||
if reStr != expectedRegexp {
|
||||
t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp)
|
||||
}
|
||||
}
|
||||
f(``, `^$`)
|
||||
f(`*`, `^[^.]*$`)
|
||||
f(`foo.`, `^foo\.$`)
|
||||
f(`foo.bar`, `^foo\.bar$`)
|
||||
f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`)
|
||||
f(`[-a-zx.]`, `^[-a-zx.]$`)
|
||||
f(`**`, `^[^.]*[^.]*$`)
|
||||
f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`)
|
||||
}
|
||||
|
||||
func TestDateMetricIDCacheSerial(t *testing.T) {
|
||||
c := newDateMetricIDCache()
|
||||
if err := testDateMetricIDCache(c, false); err != nil {
|
||||
|
|
|
@ -36,6 +36,12 @@ func NewTagFilters(accountID, projectID uint32) *TagFilters {
|
|||
}
|
||||
}
|
||||
|
||||
// AddGraphiteQuery adds the given Graphite query that matches the given paths to tfs.
|
||||
func (tfs *TagFilters) AddGraphiteQuery(query []byte, paths []string, isNegative bool) {
|
||||
tf := tfs.addTagFilter()
|
||||
tf.InitFromGraphiteQuery(tfs.commonPrefix, query, paths, isNegative)
|
||||
}
|
||||
|
||||
// Add adds the given tag filter to tfs.
|
||||
//
|
||||
// MetricGroup must be encoded with nil key.
|
||||
|
@ -58,7 +64,7 @@ func (tfs *TagFilters) Add(key, value []byte, isNegative, isRegexp bool) error {
|
|||
}
|
||||
|
||||
// Substitute negative tag filter matching anything with negative tag filter matching non-empty value
|
||||
// in order to out all the time series with the given key.
|
||||
// in order to filter out all the time series with the given key.
|
||||
value = []byte(".+")
|
||||
}
|
||||
|
||||
|
@ -174,6 +180,8 @@ type tagFilter struct {
|
|||
prefix []byte
|
||||
|
||||
// or values obtained from regexp suffix if it equals to "foo|bar|..."
|
||||
//
|
||||
// This array is also populated with matching Graphite metrics if key="__graphite__"
|
||||
orSuffixes []string
|
||||
|
||||
// Matches regexp suffix.
|
||||
|
@ -246,6 +254,49 @@ func (tf *tagFilter) MarshalNoAccountIDProjectID(dst []byte) []byte {
|
|||
return dst
|
||||
}
|
||||
|
||||
// InitFromGraphiteQuery initializes tf from the given graphite query expanded to the given paths.
|
||||
func (tf *tagFilter) InitFromGraphiteQuery(commonPrefix, query []byte, paths []string, isNegative bool) {
|
||||
if len(paths) == 0 {
|
||||
// explicitly add empty path in order match zero metric names.
|
||||
paths = []string{""}
|
||||
}
|
||||
prefix, orSuffixes := getCommonPrefix(paths)
|
||||
if len(orSuffixes) == 0 {
|
||||
orSuffixes = append(orSuffixes, "")
|
||||
}
|
||||
tf.key = append(tf.key[:0], "__graphite__"...)
|
||||
tf.value = append(tf.value[:0], query...)
|
||||
tf.isNegative = isNegative
|
||||
tf.isRegexp = true // this is needed for tagFilter.matchSuffix
|
||||
tf.prefix = append(tf.prefix[:0], commonPrefix...)
|
||||
tf.prefix = marshalTagValue(tf.prefix, nil)
|
||||
tf.prefix = marshalTagValueNoTrailingTagSeparator(tf.prefix, []byte(prefix))
|
||||
tf.orSuffixes = append(tf.orSuffixes[:0], orSuffixes...)
|
||||
tf.reSuffixMatch, tf.matchCost = newMatchFuncForOrSuffixes(orSuffixes)
|
||||
}
|
||||
|
||||
func getCommonPrefix(ss []string) (string, []string) {
|
||||
if len(ss) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
prefix := ss[0]
|
||||
for _, s := range ss[1:] {
|
||||
i := 0
|
||||
for i < len(s) && i < len(prefix) && s[i] == prefix[i] {
|
||||
i++
|
||||
}
|
||||
prefix = prefix[:i]
|
||||
if len(prefix) == 0 {
|
||||
return "", ss
|
||||
}
|
||||
}
|
||||
result := make([]string, len(ss))
|
||||
for i, s := range ss {
|
||||
result[i] = s[len(prefix):]
|
||||
}
|
||||
return prefix, result
|
||||
}
|
||||
|
||||
// Init initializes the tag filter for the given commonPrefix, key and value.
|
||||
//
|
||||
// If isNegaitve is true, then the tag filter matches all the values
|
||||
|
@ -260,6 +311,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
|
|||
tf.value = append(tf.value[:0], value...)
|
||||
tf.isNegative = isNegative
|
||||
tf.isRegexp = isRegexp
|
||||
tf.matchCost = 0
|
||||
|
||||
tf.prefix = tf.prefix[:0]
|
||||
|
||||
|
@ -363,22 +415,7 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
|
|||
var reCost uint64
|
||||
var literalSuffix string
|
||||
if len(orValues) > 0 {
|
||||
if len(orValues) == 1 {
|
||||
v := orValues[0]
|
||||
reMatch = func(b []byte) bool {
|
||||
return string(b) == v
|
||||
}
|
||||
} else {
|
||||
reMatch = func(b []byte) bool {
|
||||
for _, v := range orValues {
|
||||
if string(b) == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
reCost = uint64(len(orValues)) * literalMatchCost
|
||||
reMatch, reCost = newMatchFuncForOrSuffixes(orValues)
|
||||
} else {
|
||||
reMatch, literalSuffix, reCost = getOptimizedReMatchFunc(re.Match, sExpr)
|
||||
}
|
||||
|
@ -406,6 +443,26 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
|
|||
return rcv, nil
|
||||
}
|
||||
|
||||
func newMatchFuncForOrSuffixes(orValues []string) (reMatch func(b []byte) bool, reCost uint64) {
|
||||
if len(orValues) == 1 {
|
||||
v := orValues[0]
|
||||
reMatch = func(b []byte) bool {
|
||||
return string(b) == v
|
||||
}
|
||||
} else {
|
||||
reMatch = func(b []byte) bool {
|
||||
for _, v := range orValues {
|
||||
if string(b) == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
reCost = uint64(len(orValues)) * literalMatchCost
|
||||
return reMatch, reCost
|
||||
}
|
||||
|
||||
// getOptimizedReMatchFunc tries returning optimized function for matching the given expr.
|
||||
// '.*'
|
||||
// '.+'
|
||||
|
|
|
@ -2,9 +2,32 @@ package storage
|
|||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGetCommonPrefix(t *testing.T) {
|
||||
f := func(a []string, expectedPrefix string) {
|
||||
t.Helper()
|
||||
prefix, result := getCommonPrefix(a)
|
||||
if prefix != expectedPrefix {
|
||||
t.Fatalf("unexpected prefix; got %q; want %q", prefix, expectedPrefix)
|
||||
}
|
||||
for i, s := range a {
|
||||
if !strings.HasPrefix(s, prefix) {
|
||||
t.Fatalf("s=%q has no prefix %q", s, prefix)
|
||||
}
|
||||
if s[len(prefix):] != result[i] {
|
||||
t.Fatalf("unexpected result[%d]; got %q; want %q", i, s[len(prefix):], result[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
f(nil, "")
|
||||
f([]string{"foo"}, "foo")
|
||||
f([]string{"foo", "bar"}, "")
|
||||
f([]string{"foo1", "foo2", "foo34"}, "foo")
|
||||
}
|
||||
|
||||
func TestExtractRegexpPrefix(t *testing.T) {
|
||||
f := func(s string, expectedPrefix, expectedSuffix string) {
|
||||
t.Helper()
|
||||
|
|
Loading…
Reference in a new issue