mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmselect: add ability to set Graphite-compatible filter via {__graphite__="foo.*.bar"}
syntax
This commit is contained in:
parent
4068f8d590
commit
157c02622b
13 changed files with 322 additions and 57 deletions
|
@ -588,6 +588,10 @@ VictoriaMetrics supports the following Graphite APIs:
|
|||
* Metrics API - see [these docs](#graphite-metrics-api-usage).
|
||||
* Tags API - see [these docs](#graphite-tags-api-usage).
|
||||
|
||||
VictoriaMetrics supports `__graphite__` pseudo-label for filtering time series with Graphite-compatible filters in [MetricsQL](https://victoriametrics.github.io/MetricsQL.html).
|
||||
For example, `{__graphite__="foo.*.bar"}` is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster
|
||||
and it is easier to use when migrating from Graphite to VictoriaMetrics.
|
||||
|
||||
|
||||
### Graphite Metrics API usage
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
//
|
||||
// See https://graphite.readthedocs.io/en/stable/tags.html#removing-series-from-the-tagdb
|
||||
func TagsDelSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
|
||||
deadline := searchutils.GetDeadlineForQuery(r, startTime)
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse form values: %w", err)
|
||||
}
|
||||
|
@ -30,7 +31,7 @@ func TagsDelSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Re
|
|||
totalDeleted := 0
|
||||
var row graphiteparser.Row
|
||||
var tagsPool []graphiteparser.Tag
|
||||
ct := time.Now().UnixNano() / 1e6
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
for _, path := range paths {
|
||||
var err error
|
||||
tagsPool, err = row.UnmarshalMetricAndTags(path, tagsPool[:0])
|
||||
|
@ -49,7 +50,7 @@ func TagsDelSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.Re
|
|||
})
|
||||
}
|
||||
sq := storage.NewSearchQuery(0, ct, [][]storage.TagFilter{tfs})
|
||||
n, err := netstorage.DeleteSeries(sq)
|
||||
n, err := netstorage.DeleteSeries(sq, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete series for %q: %w", sq, err)
|
||||
}
|
||||
|
@ -89,7 +90,7 @@ func registerMetrics(startTime time.Time, w http.ResponseWriter, r *http.Request
|
|||
var b []byte
|
||||
var tagsPool []graphiteparser.Tag
|
||||
mrs := make([]storage.MetricRow, len(paths))
|
||||
ct := time.Now().UnixNano() / 1e6
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
canonicalPaths := make([]string, len(paths))
|
||||
for i, path := range paths {
|
||||
var err error
|
||||
|
@ -186,7 +187,7 @@ func TagsAutoCompleteValuesHandler(startTime time.Time, w http.ResponseWriter, r
|
|||
}
|
||||
} else {
|
||||
// Slow path: use netstorage.SearchMetricNames for applying `expr` filters.
|
||||
sq, err := getSearchQueryForExprs(exprs)
|
||||
sq, err := getSearchQueryForExprs(startTime, exprs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -268,7 +269,7 @@ func TagsAutoCompleteTagsHandler(startTime time.Time, w http.ResponseWriter, r *
|
|||
}
|
||||
} else {
|
||||
// Slow path: use netstorage.SearchMetricNames for applying `expr` filters.
|
||||
sq, err := getSearchQueryForExprs(exprs)
|
||||
sq, err := getSearchQueryForExprs(startTime, exprs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -331,7 +332,7 @@ func TagsFindSeriesHandler(startTime time.Time, w http.ResponseWriter, r *http.R
|
|||
if len(exprs) == 0 {
|
||||
return fmt.Errorf("expecting at least one `expr` query arg")
|
||||
}
|
||||
sq, err := getSearchQueryForExprs(exprs)
|
||||
sq, err := getSearchQueryForExprs(startTime, exprs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -456,12 +457,12 @@ func getInt(r *http.Request, argName string) (int, error) {
|
|||
return n, nil
|
||||
}
|
||||
|
||||
func getSearchQueryForExprs(exprs []string) (*storage.SearchQuery, error) {
|
||||
func getSearchQueryForExprs(startTime time.Time, exprs []string) (*storage.SearchQuery, error) {
|
||||
tfs, err := exprsToTagFilters(exprs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ct := time.Now().UnixNano() / 1e6
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
sq := storage.NewSearchQuery(0, ct, [][]storage.TagFilter{tfs})
|
||||
return sq, nil
|
||||
}
|
||||
|
|
|
@ -446,8 +446,12 @@ func (sbh *sortBlocksHeap) Pop() interface{} {
|
|||
}
|
||||
|
||||
// DeleteSeries deletes time series matching the given tagFilterss.
|
||||
func DeleteSeries(sq *storage.SearchQuery) (int, error) {
|
||||
tfss, err := setupTfss(sq.TagFilterss)
|
||||
func DeleteSeries(sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, deadline)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -613,6 +617,11 @@ func GetTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix string, de
|
|||
return nil, fmt.Errorf("error during search for suffixes for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s: %w",
|
||||
tagKey, tagValuePrefix, delimiter, tr.String(), err)
|
||||
}
|
||||
if len(suffixes) >= *maxTagValueSuffixesPerSearch {
|
||||
return nil, fmt.Errorf("more than -search.maxTagValueSuffixesPerSearch=%d tag value suffixes found for tagKey=%q, tagValuePrefix=%q, delimiter=%c on time range %s; "+
|
||||
"either narrow down the query or increase -search.maxTagValueSuffixesPerSearch command-line flag value",
|
||||
*maxTagValueSuffixesPerSearch, tagKey, tagValuePrefix, delimiter, tr.String())
|
||||
}
|
||||
return suffixes, nil
|
||||
}
|
||||
|
||||
|
@ -695,10 +704,6 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func
|
|||
if deadline.Exceeded() {
|
||||
return fmt.Errorf("timeout exceeded before starting data export: %s", deadline.String())
|
||||
}
|
||||
tfss, err := setupTfss(sq.TagFilterss)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
|
@ -706,6 +711,10 @@ func ExportBlocks(sq *storage.SearchQuery, deadline searchutils.Deadline, f func
|
|||
if err := vmstorage.CheckTimeRange(tr); err != nil {
|
||||
return err
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, deadline)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
vmstorage.WG.Add(1)
|
||||
defer vmstorage.WG.Done()
|
||||
|
@ -801,10 +810,6 @@ func SearchMetricNames(sq *storage.SearchQuery, deadline searchutils.Deadline) (
|
|||
}
|
||||
|
||||
// Setup search.
|
||||
tfss, err := setupTfss(sq.TagFilterss)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
|
@ -812,6 +817,10 @@ func SearchMetricNames(sq *storage.SearchQuery, deadline searchutils.Deadline) (
|
|||
if err := vmstorage.CheckTimeRange(tr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mns, err := vmstorage.SearchMetricNames(tfss, tr, *maxMetricsPerSearch, deadline.Deadline())
|
||||
if err != nil {
|
||||
|
@ -829,10 +838,6 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
|
|||
}
|
||||
|
||||
// Setup search.
|
||||
tfss, err := setupTfss(sq.TagFilterss)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
|
@ -840,6 +845,10 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
|
|||
if err := vmstorage.CheckTimeRange(tr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tfss, err := setupTfss(tr, sq.TagFilterss, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vmstorage.WG.Add(1)
|
||||
defer vmstorage.WG.Done()
|
||||
|
@ -917,12 +926,25 @@ type blockRef struct {
|
|||
addr tmpBlockAddr
|
||||
}
|
||||
|
||||
func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) {
|
||||
func setupTfss(tr storage.TimeRange, tagFilterss [][]storage.TagFilter, deadline searchutils.Deadline) ([]*storage.TagFilters, error) {
|
||||
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
|
||||
for _, tagFilters := range tagFilterss {
|
||||
tfs := storage.NewTagFilters()
|
||||
for i := range tagFilters {
|
||||
tf := &tagFilters[i]
|
||||
if string(tf.Key) == "__graphite__" {
|
||||
query := tf.Value
|
||||
paths, err := vmstorage.SearchGraphitePaths(tr, query, *maxMetricsPerSearch, deadline.Deadline())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when searching for Graphite paths for query %q: %w", query, err)
|
||||
}
|
||||
if len(paths) >= *maxMetricsPerSearch {
|
||||
return nil, fmt.Errorf("more than -search.maxUniqueTimeseries=%d time series match Graphite query %q; "+
|
||||
"either narrow down the query or increase -search.maxUniqueTimeseries command-line flag value", *maxMetricsPerSearch, query)
|
||||
}
|
||||
tfs.AddGraphiteQuery(query, paths, tf.IsNegative)
|
||||
continue
|
||||
}
|
||||
if err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse tag filter %s: %w", tf, err)
|
||||
}
|
||||
|
|
|
@ -438,6 +438,7 @@ var exportBlockPool = &sync.Pool{
|
|||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#delete-series
|
||||
func DeleteHandler(startTime time.Time, r *http.Request) error {
|
||||
deadline := searchutils.GetDeadlineForQuery(r, startTime)
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse request form values: %w", err)
|
||||
}
|
||||
|
@ -448,8 +449,9 @@ func DeleteHandler(startTime time.Time, r *http.Request) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sq := storage.NewSearchQuery(0, 0, tagFilterss)
|
||||
deletedCount, err := netstorage.DeleteSeries(sq)
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
sq := storage.NewSearchQuery(0, ct, tagFilterss)
|
||||
deletedCount, err := netstorage.DeleteSeries(sq, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete time series: %w", err)
|
||||
}
|
||||
|
|
|
@ -186,6 +186,14 @@ func SearchTagValueSuffixes(tr storage.TimeRange, tagKey, tagValuePrefix []byte,
|
|||
return suffixes, err
|
||||
}
|
||||
|
||||
// SearchGraphitePaths returns all the metric names matching the given Graphite query.
|
||||
func SearchGraphitePaths(tr storage.TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
WG.Add(1)
|
||||
paths, err := Storage.SearchGraphitePaths(tr, query, maxPaths, deadline)
|
||||
WG.Done()
|
||||
return paths, err
|
||||
}
|
||||
|
||||
// SearchTagEntries searches for tag entries.
|
||||
func SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]storage.TagEntry, error) {
|
||||
WG.Add(1)
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
* FEATURE: added [vmctl tool](https://victoriametrics.github.io/vmctl.html) to VictoriaMetrics release process. Now it is packaged in `vmutils-*.tar.gz` archive on [the releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases). Source code for `vmctl` tool has been moved from [github.com/VictoriaMetrics/vmctl](https://github.com/VictoriaMetrics/vmctl) to [github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/vmctl).
|
||||
* FEATURE: added `-loggerTimezone` command-line flag for adjusting time zone for timestamps in log messages. By default UTC is used.
|
||||
* FEATURE: added `-search.maxStepForPointsAdjustment` command-line flag, which can be used for disabling adjustment for points returned by `/api/v1/query_range` handler if such points have timestamps closer than `-search.latencyOffset` to the current time. Such points may contain incomplete data, so they are substituted by the previous values for `step` query args smaller than one minute by default.
|
||||
* FEATURE: vmselect: added ability to use Graphite-compatible filters in MetricsQL via `{__graphite__="foo.*.bar"}` syntax. This expression is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster and it is easier to use when migrating from Graphite to VictoriaMetrics.
|
||||
* FEATURE: vmselect: added ability to set additional label filters, which must be applied during queries. Such label filters can be set via optional `extra_label` query arg, which is accepted by [querying API](https://victoriametrics.github.io/#prometheus-querying-api-usage) handlers. For example, the request to `/api/v1/query_range?extra_label=tenant_id=123&query=<query>` adds `{tenant_id="123"}` label filter to the given `<query>`. It is expected that the `extra_label` query arg is automatically set by auth proxy sitting
|
||||
in front of VictoriaMetrics. [Contact us](mailto:sales@victoriametrics.com) if you need assistance with such a proxy. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1021 .
|
||||
* FEATURE: vmalert: added `-datasource.queryStep` command-line flag for passing optional `step` query arg to `/api/v1/query` endpoint. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1025
|
||||
|
@ -18,6 +19,7 @@ in front of VictoriaMetrics. [Contact us](mailto:sales@victoriametrics.com) if y
|
|||
- `vm_promscrape_discovery_retries_total`
|
||||
- `vm_promscrape_scrape_retries_total`
|
||||
- `vm_promscrape_service_discovery_duration_seconds`
|
||||
|
||||
* BUGFIX: vmagent: reduce HTTP reconnection rate for scrape targets. Previously vmagent could errorneusly close HTTP keep-alive connections more frequently than needed.
|
||||
* BUGFIX: vmagent: retry scrape and service discovery requests when the remote server closes HTTP keep-alive connection. Previously `disable_keepalive: true` option could be used under `scrape_configs` section when working with such servers.
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ Feel free [filing a feature request](https://github.com/VictoriaMetrics/Victoria
|
|||
This functionality can be tried at [an editable Grafana dashboard](http://play-grafana.victoriametrics.com:3000/d/4ome8yJmz/node-exporter-on-victoriametrics-demo).
|
||||
|
||||
- [`WITH` templates](https://play.victoriametrics.com/promql/expand-with-exprs). This feature simplifies writing and managing complex queries. Go to [`WITH` templates playground](https://play.victoriametrics.com/promql/expand-with-exprs) and try it.
|
||||
- Graphite-compatible filters can be passed via `{__graphite__="foo.*.bar"}` syntax. This is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but usually works faster and is easier to use when migrating from Graphite to VictoriaMetrics.
|
||||
- Range duration in functions such as [rate](https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()) may be omitted. VictoriaMetrics automatically selects range duration depending on the current step used for building the graph. For instance, the following query is valid in VictoriaMetrics: `rate(node_network_receive_bytes_total)`.
|
||||
- All the aggregate functions support optional `limit N` suffix in order to limit the number of output series. For example, `sum(x) by (y) limit 10` limits
|
||||
the number of output time series after the aggregation to 10. All the other time series are dropped.
|
||||
|
|
|
@ -588,6 +588,10 @@ VictoriaMetrics supports the following Graphite APIs:
|
|||
* Metrics API - see [these docs](#graphite-metrics-api-usage).
|
||||
* Tags API - see [these docs](#graphite-tags-api-usage).
|
||||
|
||||
VictoriaMetrics supports `__graphite__` pseudo-label for filtering time series with Graphite-compatible filters in [MetricsQL](https://victoriametrics.github.io/MetricsQL.html).
|
||||
For example, `{__graphite__="foo.*.bar"}` is equivalent to `{__name__=~"foo[.][^.]*[.]bar"}`, but it works faster
|
||||
and it is easier to use when migrating from Graphite to VictoriaMetrics.
|
||||
|
||||
|
||||
### Graphite Metrics API usage
|
||||
|
||||
|
|
|
@ -1101,6 +1101,8 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
|
|||
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
|
||||
//
|
||||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||
//
|
||||
// If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found.
|
||||
func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
// TODO: cache results?
|
||||
|
||||
|
@ -1111,13 +1113,15 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(deadline)
|
||||
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
if ok && err != nil {
|
||||
return nil, err
|
||||
if len(tvss) < maxTagValueSuffixes {
|
||||
ok := db.doExtDB(func(extDB *indexDB) {
|
||||
is := extDB.getIndexSearch(deadline)
|
||||
err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
extDB.putIndexSearch(is)
|
||||
})
|
||||
if ok && err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
suffixes := make([]string, 0, len(tvss))
|
||||
|
@ -1125,6 +1129,9 @@ func (db *indexDB) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix [
|
|||
// Do not skip empty suffixes, since they may represent leaf tag values.
|
||||
suffixes = append(suffixes, suffix)
|
||||
}
|
||||
if len(suffixes) > maxTagValueSuffixes {
|
||||
suffixes = suffixes[:maxTagValueSuffixes]
|
||||
}
|
||||
// Do not sort suffixes, since they must be sorted by vmselect.
|
||||
return suffixes, nil
|
||||
}
|
||||
|
@ -1156,6 +1163,9 @@ func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct
|
|||
errGlobal = err
|
||||
return
|
||||
}
|
||||
if len(tvss) > maxTagValueSuffixes {
|
||||
return
|
||||
}
|
||||
for k := range tvssLocal {
|
||||
tvss[k] = struct{}{}
|
||||
}
|
||||
|
@ -1174,7 +1184,7 @@ func (is *indexSearch) searchTagValueSuffixesAll(tvss map[string]struct{}, tagKe
|
|||
kb.B = marshalTagValue(kb.B, tagValuePrefix)
|
||||
kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
|
||||
prefix := append([]byte(nil), kb.B...)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes)
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, date uint64, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
|
||||
|
@ -1186,10 +1196,10 @@ func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, d
|
|||
kb.B = marshalTagValue(kb.B, tagValuePrefix)
|
||||
kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
|
||||
prefix := append([]byte(nil), kb.B...)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, tagValuePrefix, delimiter, maxTagValueSuffixes)
|
||||
return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes)
|
||||
}
|
||||
|
||||
func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int) error {
|
||||
func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix []byte, tagValuePrefixLen int, delimiter byte, maxTagValueSuffixes int) error {
|
||||
kb := &is.kb
|
||||
ts := &is.ts
|
||||
mp := &is.mp
|
||||
|
@ -1215,10 +1225,7 @@ func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{},
|
|||
continue
|
||||
}
|
||||
tagValue := mp.Tag.Value
|
||||
if !bytes.HasPrefix(tagValue, tagValuePrefix) {
|
||||
continue
|
||||
}
|
||||
suffix := tagValue[len(tagValuePrefix):]
|
||||
suffix := tagValue[tagValuePrefixLen:]
|
||||
n := bytes.IndexByte(suffix, delimiter)
|
||||
if n < 0 {
|
||||
// Found leaf tag value that doesn't have delimiters after the given tagValuePrefix.
|
||||
|
@ -2118,7 +2125,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
|
|||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
||||
|
||||
for i, tf := range tfs {
|
||||
if len(tf.key) == 0 {
|
||||
if len(tf.key) == 0 || string(tf.key) == "__graphite__" {
|
||||
// Match against mn.MetricGroup.
|
||||
b := marshalTagValue(kb.B, nil)
|
||||
b = marshalTagValue(b, mn.MetricGroup)
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -979,10 +980,121 @@ func (s *Storage) SearchTagValues(tagKey []byte, maxTagValues int, deadline uint
|
|||
// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
|
||||
//
|
||||
// This allows implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find or similar APIs.
|
||||
//
|
||||
// If more than maxTagValueSuffixes suffixes is found, then only the first maxTagValueSuffixes suffixes is returned.
|
||||
func (s *Storage) SearchTagValueSuffixes(tr TimeRange, tagKey, tagValuePrefix []byte, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
|
||||
return s.idb().SearchTagValueSuffixes(tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes, deadline)
|
||||
}
|
||||
|
||||
// SearchGraphitePaths returns all the matching paths for the given graphite query on the given tr.
|
||||
//
|
||||
// If more than maxPaths paths is found, then only the first maxPaths paths is returned.
|
||||
func (s *Storage) SearchGraphitePaths(tr TimeRange, query []byte, maxPaths int, deadline uint64) ([]string, error) {
|
||||
queryStr := string(query)
|
||||
n := strings.IndexAny(queryStr, "*[{")
|
||||
if n < 0 {
|
||||
// Verify that the query matches a metric name.
|
||||
suffixes, err := s.SearchTagValueSuffixes(tr, nil, query, '.', 1, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(suffixes) == 0 {
|
||||
// The query doesn't match anything.
|
||||
return nil, nil
|
||||
}
|
||||
if len(suffixes[0]) > 0 {
|
||||
// The query matches a metric name with additional suffix.
|
||||
return nil, nil
|
||||
}
|
||||
return []string{queryStr}, nil
|
||||
}
|
||||
suffixes, err := s.SearchTagValueSuffixes(tr, nil, query[:n], '.', maxPaths, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(suffixes) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if len(suffixes) >= maxPaths {
|
||||
return nil, fmt.Errorf("more than maxPaths=%d suffixes found", maxPaths)
|
||||
}
|
||||
qPrefixStr := queryStr[:n]
|
||||
qNode := queryStr[n:]
|
||||
qTail := ""
|
||||
mustMatchLeafs := true
|
||||
if m := strings.IndexByte(qNode, '.'); m >= 0 {
|
||||
qNode = qNode[:m+1]
|
||||
qTail = qNode[m+1:]
|
||||
mustMatchLeafs = false
|
||||
}
|
||||
re, err := getRegexpForGraphiteNodeQuery(qNode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var paths []string
|
||||
for _, suffix := range suffixes {
|
||||
if len(paths) > maxPaths {
|
||||
paths = paths[:maxPaths]
|
||||
break
|
||||
}
|
||||
if !re.MatchString(suffix) {
|
||||
continue
|
||||
}
|
||||
if mustMatchLeafs {
|
||||
paths = append(paths, qPrefixStr+suffix)
|
||||
continue
|
||||
}
|
||||
q := qPrefixStr + suffix + qTail
|
||||
ps, err := s.SearchGraphitePaths(tr, []byte(q), maxPaths, deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
paths = append(paths, ps...)
|
||||
}
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func getRegexpForGraphiteNodeQuery(q string) (*regexp.Regexp, error) {
|
||||
parts := getRegexpPartsForGraphiteNodeQuery(q)
|
||||
reStr := "^" + strings.Join(parts, "") + "$"
|
||||
return regexp.Compile(reStr)
|
||||
}
|
||||
|
||||
func getRegexpPartsForGraphiteNodeQuery(q string) []string {
|
||||
var parts []string
|
||||
for {
|
||||
n := strings.IndexAny(q, "*{[")
|
||||
if n < 0 {
|
||||
return append(parts, regexp.QuoteMeta(q))
|
||||
}
|
||||
parts = append(parts, regexp.QuoteMeta(q[:n]))
|
||||
q = q[n:]
|
||||
switch q[0] {
|
||||
case '*':
|
||||
parts = append(parts, "[^.]*")
|
||||
q = q[1:]
|
||||
case '{':
|
||||
n := strings.IndexByte(q, '}')
|
||||
if n < 0 {
|
||||
return append(parts, regexp.QuoteMeta(q))
|
||||
}
|
||||
var tmp []string
|
||||
for _, x := range strings.Split(q[1:n], ",") {
|
||||
tmp = append(tmp, strings.Join(getRegexpPartsForGraphiteNodeQuery(x), ""))
|
||||
}
|
||||
parts = append(parts, "(?:"+strings.Join(tmp, "|")+")")
|
||||
q = q[n+1:]
|
||||
case '[':
|
||||
n := strings.IndexByte(q, ']')
|
||||
if n < 0 {
|
||||
return append(parts, regexp.QuoteMeta(q))
|
||||
}
|
||||
parts = append(parts, q[:n+1])
|
||||
q = q[n+1:]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SearchTagEntries returns a list of (tagName -> tagValues)
|
||||
func (s *Storage) SearchTagEntries(maxTagKeys, maxTagValues int, deadline uint64) ([]TagEntry, error) {
|
||||
idb := s.idb()
|
||||
|
|
|
@ -14,6 +14,28 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
|
||||
)
|
||||
|
||||
func TestGetRegexpForGraphiteNodeQuery(t *testing.T) {
|
||||
f := func(q, expectedRegexp string) {
|
||||
t.Helper()
|
||||
re, err := getRegexpForGraphiteNodeQuery(q)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error for query=%q: %s", q, err)
|
||||
}
|
||||
reStr := re.String()
|
||||
if reStr != expectedRegexp {
|
||||
t.Fatalf("unexpected regexp for query %q; got %q want %q", q, reStr, expectedRegexp)
|
||||
}
|
||||
}
|
||||
f(``, `^$`)
|
||||
f(`*`, `^[^.]*$`)
|
||||
f(`foo.`, `^foo\.$`)
|
||||
f(`foo.bar`, `^foo\.bar$`)
|
||||
f(`{foo,b*ar,b[a-z]}`, `^(?:foo|b[^.]*ar|b[a-z])$`)
|
||||
f(`[-a-zx.]`, `^[-a-zx.]$`)
|
||||
f(`**`, `^[^.]*[^.]*$`)
|
||||
f(`a*[de]{x,y}z`, `^a[^.]*[de](?:x|y)z$`)
|
||||
}
|
||||
|
||||
func TestDateMetricIDCacheSerial(t *testing.T) {
|
||||
c := newDateMetricIDCache()
|
||||
if err := testDateMetricIDCache(c, false); err != nil {
|
||||
|
|
|
@ -30,6 +30,12 @@ func NewTagFilters() *TagFilters {
|
|||
}
|
||||
}
|
||||
|
||||
// AddGraphiteQuery adds the given Graphite query that matches the given paths to tfs.
|
||||
func (tfs *TagFilters) AddGraphiteQuery(query []byte, paths []string, isNegative bool) {
|
||||
tf := tfs.addTagFilter()
|
||||
tf.InitFromGraphiteQuery(tfs.commonPrefix, query, paths, isNegative)
|
||||
}
|
||||
|
||||
// Add adds the given tag filter to tfs.
|
||||
//
|
||||
// MetricGroup must be encoded with nil key.
|
||||
|
@ -52,7 +58,7 @@ func (tfs *TagFilters) Add(key, value []byte, isNegative, isRegexp bool) error {
|
|||
}
|
||||
|
||||
// Substitute negative tag filter matching anything with negative tag filter matching non-empty value
|
||||
// in order to out all the time series with the given key.
|
||||
// in order to filter out all the time series with the given key.
|
||||
value = []byte(".+")
|
||||
}
|
||||
|
||||
|
@ -162,6 +168,8 @@ type tagFilter struct {
|
|||
prefix []byte
|
||||
|
||||
// or values obtained from regexp suffix if it equals to "foo|bar|..."
|
||||
//
|
||||
// This array is also populated with matching Graphite metrics if key="__graphite__"
|
||||
orSuffixes []string
|
||||
|
||||
// Matches regexp suffix.
|
||||
|
@ -228,6 +236,49 @@ func (tf *tagFilter) Marshal(dst []byte) []byte {
|
|||
return dst
|
||||
}
|
||||
|
||||
// InitFromGraphiteQuery initializes tf from the given graphite query expanded to the given paths.
|
||||
func (tf *tagFilter) InitFromGraphiteQuery(commonPrefix, query []byte, paths []string, isNegative bool) {
|
||||
if len(paths) == 0 {
|
||||
// explicitly add empty path in order match zero metric names.
|
||||
paths = []string{""}
|
||||
}
|
||||
prefix, orSuffixes := getCommonPrefix(paths)
|
||||
if len(orSuffixes) == 0 {
|
||||
orSuffixes = append(orSuffixes, "")
|
||||
}
|
||||
tf.key = append(tf.key[:0], "__graphite__"...)
|
||||
tf.value = append(tf.value[:0], query...)
|
||||
tf.isNegative = isNegative
|
||||
tf.isRegexp = true // this is needed for tagFilter.matchSuffix
|
||||
tf.prefix = append(tf.prefix[:0], commonPrefix...)
|
||||
tf.prefix = marshalTagValue(tf.prefix, nil)
|
||||
tf.prefix = marshalTagValueNoTrailingTagSeparator(tf.prefix, []byte(prefix))
|
||||
tf.orSuffixes = append(tf.orSuffixes[:0], orSuffixes...)
|
||||
tf.reSuffixMatch, tf.matchCost = newMatchFuncForOrSuffixes(orSuffixes)
|
||||
}
|
||||
|
||||
func getCommonPrefix(ss []string) (string, []string) {
|
||||
if len(ss) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
prefix := ss[0]
|
||||
for _, s := range ss[1:] {
|
||||
i := 0
|
||||
for i < len(s) && i < len(prefix) && s[i] == prefix[i] {
|
||||
i++
|
||||
}
|
||||
prefix = prefix[:i]
|
||||
if len(prefix) == 0 {
|
||||
return "", ss
|
||||
}
|
||||
}
|
||||
result := make([]string, len(ss))
|
||||
for i, s := range ss {
|
||||
result[i] = s[len(prefix):]
|
||||
}
|
||||
return prefix, result
|
||||
}
|
||||
|
||||
// Init initializes the tag filter for the given commonPrefix, key and value.
|
||||
//
|
||||
// If isNegaitve is true, then the tag filter matches all the values
|
||||
|
@ -242,6 +293,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
|
|||
tf.value = append(tf.value[:0], value...)
|
||||
tf.isNegative = isNegative
|
||||
tf.isRegexp = isRegexp
|
||||
tf.matchCost = 0
|
||||
|
||||
tf.prefix = tf.prefix[:0]
|
||||
|
||||
|
@ -345,22 +397,7 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
|
|||
var reCost uint64
|
||||
var literalSuffix string
|
||||
if len(orValues) > 0 {
|
||||
if len(orValues) == 1 {
|
||||
v := orValues[0]
|
||||
reMatch = func(b []byte) bool {
|
||||
return string(b) == v
|
||||
}
|
||||
} else {
|
||||
reMatch = func(b []byte) bool {
|
||||
for _, v := range orValues {
|
||||
if string(b) == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
reCost = uint64(len(orValues)) * literalMatchCost
|
||||
reMatch, reCost = newMatchFuncForOrSuffixes(orValues)
|
||||
} else {
|
||||
reMatch, literalSuffix, reCost = getOptimizedReMatchFunc(re.Match, sExpr)
|
||||
}
|
||||
|
@ -388,6 +425,26 @@ func getRegexpFromCache(expr []byte) (regexpCacheValue, error) {
|
|||
return rcv, nil
|
||||
}
|
||||
|
||||
func newMatchFuncForOrSuffixes(orValues []string) (reMatch func(b []byte) bool, reCost uint64) {
|
||||
if len(orValues) == 1 {
|
||||
v := orValues[0]
|
||||
reMatch = func(b []byte) bool {
|
||||
return string(b) == v
|
||||
}
|
||||
} else {
|
||||
reMatch = func(b []byte) bool {
|
||||
for _, v := range orValues {
|
||||
if string(b) == v {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
reCost = uint64(len(orValues)) * literalMatchCost
|
||||
return reMatch, reCost
|
||||
}
|
||||
|
||||
// getOptimizedReMatchFunc tries returning optimized function for matching the given expr.
|
||||
// '.*'
|
||||
// '.+'
|
||||
|
|
|
@ -2,9 +2,32 @@ package storage
|
|||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGetCommonPrefix(t *testing.T) {
|
||||
f := func(a []string, expectedPrefix string) {
|
||||
t.Helper()
|
||||
prefix, result := getCommonPrefix(a)
|
||||
if prefix != expectedPrefix {
|
||||
t.Fatalf("unexpected prefix; got %q; want %q", prefix, expectedPrefix)
|
||||
}
|
||||
for i, s := range a {
|
||||
if !strings.HasPrefix(s, prefix) {
|
||||
t.Fatalf("s=%q has no prefix %q", s, prefix)
|
||||
}
|
||||
if s[len(prefix):] != result[i] {
|
||||
t.Fatalf("unexpected result[%d]; got %q; want %q", i, s[len(prefix):], result[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
f(nil, "")
|
||||
f([]string{"foo"}, "foo")
|
||||
f([]string{"foo", "bar"}, "")
|
||||
f([]string{"foo1", "foo2", "foo34"}, "foo")
|
||||
}
|
||||
|
||||
func TestExtractRegexpPrefix(t *testing.T) {
|
||||
f := func(s string, expectedPrefix, expectedSuffix string) {
|
||||
t.Helper()
|
||||
|
|
Loading…
Reference in a new issue