mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: optimize search by label filters matching big number of time series
This commit is contained in:
parent
5c9715a89a
commit
fa0ef143b1
5 changed files with 496 additions and 103 deletions
|
@ -29,6 +29,8 @@ var (
|
||||||
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search")
|
maxTagValuesPerSearch = flag.Int("search.maxTagValues", 100e3, "The maximum number of tag values returned per search")
|
||||||
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
maxTagValueSuffixesPerSearch = flag.Int("search.maxTagValueSuffixesPerSearch", 100e3, "The maximum number of tag value suffixes returned from /metrics/find")
|
||||||
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan")
|
maxMetricsPerSearch = flag.Int("search.maxUniqueTimeseries", 300e3, "The maximum number of unique time series each search can scan")
|
||||||
|
disableCompositeTagFilters = flag.Bool("search.disableCompositeTagFilters", false, "Whether to disable composite tag filters. This option is useful "+
|
||||||
|
"for querying old data, which is created before v1.54.0 release. Note that disabled composite tag filters may reduce query performance")
|
||||||
|
|
||||||
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
|
precisionBits = flag.Int("precisionBits", 64, "The number of precision bits to store per each value. Lower precision bits improves data compression at the cost of precision loss")
|
||||||
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
|
disableRPCCompression = flag.Bool(`rpc.disableCompression`, false, "Disable compression of RPC traffic. This reduces CPU usage at the cost of higher network bandwidth usage")
|
||||||
|
@ -1268,6 +1270,9 @@ func (ctx *vmselectRequestCtx) setupTfss(s *storage.Storage, tr storage.TimeRang
|
||||||
accountID := ctx.sq.AccountID
|
accountID := ctx.sq.AccountID
|
||||||
projectID := ctx.sq.ProjectID
|
projectID := ctx.sq.ProjectID
|
||||||
for _, tagFilters := range ctx.sq.TagFilterss {
|
for _, tagFilters := range ctx.sq.TagFilterss {
|
||||||
|
if !*disableCompositeTagFilters {
|
||||||
|
tagFilters = storage.ConvertToCompositeTagFilters(tagFilters)
|
||||||
|
}
|
||||||
tfs := storage.NewTagFilters(accountID, projectID)
|
tfs := storage.NewTagFilters(accountID, projectID)
|
||||||
for i := range tagFilters {
|
for i := range tagFilters {
|
||||||
tf := &tagFilters[i]
|
tf := &tagFilters[i]
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
# tip
|
# tip
|
||||||
|
|
||||||
|
* FEATURE: optimize searching for time series by label filters where individual filters match big number of time series (more than a million). For example, the query `up{job="foobar"}` should work faster if `{job="foobar"}` matches a million of time series, while `up{job="foobar"}` matches much lower number of time series. The optimization can be disabled by passing `-search.disableCompositeTagFilters` command-line flag to VictoriaMetrics.
|
||||||
* FEATURE: single-node VictoriaMetrics now accepts requests to handlers with `/prometheus` and `/graphite` prefixes such as `/prometheus/api/v1/query`. This improves compatibility with [handlers from VictoriaMetrics cluster](https://victoriametrics.github.io/Cluster-VictoriaMetrics.html#url-format).
|
* FEATURE: single-node VictoriaMetrics now accepts requests to handlers with `/prometheus` and `/graphite` prefixes such as `/prometheus/api/v1/query`. This improves compatibility with [handlers from VictoriaMetrics cluster](https://victoriametrics.github.io/Cluster-VictoriaMetrics.html#url-format).
|
||||||
* FEATURE: expose `process_open_fds` and `process_max_fds` metrics. These metrics can be used for alerting when `process_open_fds` reaches `process_max_fds`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/402 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1037
|
* FEATURE: expose `process_open_fds` and `process_max_fds` metrics. These metrics can be used for alerting when `process_open_fds` reaches `process_max_fds`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/402 and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1037
|
||||||
* FEATURE: vmalert: add `-datasource.appendTypePrefix` command-line option for querying both Prometheus and Graphite datasource in cluster version of VictoriaMetrics. See [these docs](https://victoriametrics.github.io/vmalert.html#graphite) for details.
|
* FEATURE: vmalert: add `-datasource.appendTypePrefix` command-line option for querying both Prometheus and Graphite datasource in cluster version of VictoriaMetrics. See [these docs](https://victoriametrics.github.io/vmalert.html#graphite) for details.
|
||||||
|
|
|
@ -621,51 +621,34 @@ func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error {
|
||||||
// The order of index items is important.
|
// The order of index items is important.
|
||||||
// It guarantees index consistency.
|
// It guarantees index consistency.
|
||||||
|
|
||||||
items := getIndexItems()
|
ii := getIndexItems()
|
||||||
|
defer putIndexItems(ii)
|
||||||
|
|
||||||
// Create MetricName -> TSID index.
|
// Create MetricName -> TSID index.
|
||||||
items.B = append(items.B, nsPrefixMetricNameToTSID)
|
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
|
||||||
items.B = mn.Marshal(items.B)
|
ii.B = mn.Marshal(ii.B)
|
||||||
items.B = append(items.B, kvSeparatorChar)
|
ii.B = append(ii.B, kvSeparatorChar)
|
||||||
items.B = tsid.Marshal(items.B)
|
ii.B = tsid.Marshal(ii.B)
|
||||||
items.Next()
|
ii.Next()
|
||||||
|
|
||||||
// Create MetricID -> MetricName index.
|
// Create MetricID -> MetricName index.
|
||||||
items.B = marshalCommonPrefix(items.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
|
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
|
||||||
items.B = encoding.MarshalUint64(items.B, tsid.MetricID)
|
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
|
||||||
items.B = mn.Marshal(items.B)
|
ii.B = mn.Marshal(ii.B)
|
||||||
items.Next()
|
ii.Next()
|
||||||
|
|
||||||
// Create MetricID -> TSID index.
|
// Create MetricID -> TSID index.
|
||||||
items.B = marshalCommonPrefix(items.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)
|
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)
|
||||||
items.B = encoding.MarshalUint64(items.B, tsid.MetricID)
|
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
|
||||||
items.B = tsid.Marshal(items.B)
|
ii.B = tsid.Marshal(ii.B)
|
||||||
items.Next()
|
ii.Next()
|
||||||
|
|
||||||
commonPrefix := kbPool.Get()
|
prefix := kbPool.Get()
|
||||||
commonPrefix.B = marshalCommonPrefix(commonPrefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
|
prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
|
||||||
|
ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
|
||||||
|
kbPool.Put(prefix)
|
||||||
|
|
||||||
// Create MetricGroup -> MetricID index.
|
return db.tb.AddItems(ii.Items)
|
||||||
items.B = append(items.B, commonPrefix.B...)
|
|
||||||
items.B = marshalTagValue(items.B, nil)
|
|
||||||
items.B = marshalTagValue(items.B, mn.MetricGroup)
|
|
||||||
items.B = encoding.MarshalUint64(items.B, tsid.MetricID)
|
|
||||||
items.Next()
|
|
||||||
addReverseMetricGroupIfNeeded(items, commonPrefix.B, mn, tsid.MetricID)
|
|
||||||
|
|
||||||
// For each tag create tag -> MetricID index.
|
|
||||||
for i := range mn.Tags {
|
|
||||||
tag := &mn.Tags[i]
|
|
||||||
items.B = append(items.B, commonPrefix.B...)
|
|
||||||
items.B = tag.Marshal(items.B)
|
|
||||||
items.B = encoding.MarshalUint64(items.B, tsid.MetricID)
|
|
||||||
items.Next()
|
|
||||||
}
|
|
||||||
|
|
||||||
kbPool.Put(commonPrefix)
|
|
||||||
err := db.tb.AddItems(items.Items)
|
|
||||||
putIndexItems(items)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type indexItems struct {
|
type indexItems struct {
|
||||||
|
@ -722,10 +705,6 @@ func (db *indexDB) SearchTagKeysOnTimeRange(accountID, projectID uint32, tr Time
|
||||||
|
|
||||||
keys := make([]string, 0, len(tks))
|
keys := make([]string, 0, len(tks))
|
||||||
for key := range tks {
|
for key := range tks {
|
||||||
if key == string(graphiteReverseTagKey) {
|
|
||||||
// Do not show artificially created graphiteReverseTagKey to the caller.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Do not skip empty keys, since they are converted to __name__
|
// Do not skip empty keys, since they are converted to __name__
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
}
|
}
|
||||||
|
@ -796,16 +775,20 @@ func (is *indexSearch) searchTagKeysOnDate(tks map[string]struct{}, date uint64,
|
||||||
if mp.IsDeletedTag(dmis) {
|
if mp.IsDeletedTag(dmis) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
key := mp.Tag.Key
|
||||||
|
if isArtificialTagKey(key) {
|
||||||
|
// Skip artificially created tag key.
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Store tag key.
|
// Store tag key.
|
||||||
tks[string(mp.Tag.Key)] = struct{}{}
|
tks[string(key)] = struct{}{}
|
||||||
|
|
||||||
// Search for the next tag key.
|
// Search for the next tag key.
|
||||||
// The last char in kb.B must be tagSeparatorChar.
|
// The last char in kb.B must be tagSeparatorChar.
|
||||||
// Just increment it in order to jump to the next tag key.
|
// Just increment it in order to jump to the next tag key.
|
||||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
|
kb.B = marshalTagValue(kb.B, key)
|
||||||
kb.B[len(kb.B)-1]++
|
kb.B[len(kb.B)-1]++
|
||||||
ts.Seek(kb.B)
|
ts.Seek(kb.B)
|
||||||
}
|
}
|
||||||
|
@ -837,10 +820,6 @@ func (db *indexDB) SearchTagKeys(accountID, projectID uint32, maxTagKeys int, de
|
||||||
|
|
||||||
keys := make([]string, 0, len(tks))
|
keys := make([]string, 0, len(tks))
|
||||||
for key := range tks {
|
for key := range tks {
|
||||||
if key == string(graphiteReverseTagKey) {
|
|
||||||
// Do not show artificially created graphiteReverseTagKey to the caller.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Do not skip empty keys, since they are converted to __name__
|
// Do not skip empty keys, since they are converted to __name__
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
}
|
}
|
||||||
|
@ -875,15 +854,19 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
|
||||||
if mp.IsDeletedTag(dmis) {
|
if mp.IsDeletedTag(dmis) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
key := mp.Tag.Key
|
||||||
|
if isArtificialTagKey(key) {
|
||||||
|
// Skip artificailly created tag keys.
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Store tag key.
|
// Store tag key.
|
||||||
tks[string(mp.Tag.Key)] = struct{}{}
|
tks[string(key)] = struct{}{}
|
||||||
|
|
||||||
// Search for the next tag key.
|
// Search for the next tag key.
|
||||||
// The last char in kb.B must be tagSeparatorChar.
|
// The last char in kb.B must be tagSeparatorChar.
|
||||||
// Just increment it in order to jump to the next tag key.
|
// Just increment it in order to jump to the next tag key.
|
||||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
||||||
kb.B = marshalTagValue(kb.B, mp.Tag.Key)
|
kb.B = marshalTagValue(kb.B, key)
|
||||||
kb.B[len(kb.B)-1]++
|
kb.B[len(kb.B)-1]++
|
||||||
ts.Seek(kb.B)
|
ts.Seek(kb.B)
|
||||||
}
|
}
|
||||||
|
@ -1375,6 +1358,10 @@ func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %w", item, err)
|
return nil, fmt.Errorf("cannot unmarshal tag key from line %q: %w", item, err)
|
||||||
}
|
}
|
||||||
|
if isArtificialTagKey(tmp) {
|
||||||
|
// Skip artificially created tag keys.
|
||||||
|
continue
|
||||||
|
}
|
||||||
if len(tmp) == 0 {
|
if len(tmp) == 0 {
|
||||||
tmp = append(tmp, "__name__"...)
|
tmp = append(tmp, "__name__"...)
|
||||||
}
|
}
|
||||||
|
@ -2124,16 +2111,58 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
|
||||||
return nil, metricIDs, nil
|
return nil, metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fromCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter {
|
||||||
|
tfsNew := make([]*tagFilter, 0, len(tfs))
|
||||||
|
for _, tf := range tfs {
|
||||||
|
if !bytes.HasPrefix(tf.prefix, prefix) {
|
||||||
|
tfsNew = append(tfsNew, tf)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
suffix := tf.prefix[len(prefix):]
|
||||||
|
var tagKey, tail []byte
|
||||||
|
var err error
|
||||||
|
tail, tagKey, err = unmarshalTagValue(tagKey[:0], suffix)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err)
|
||||||
|
}
|
||||||
|
if len(tagKey) == 0 || tagKey[0] != compositeTagKeyPrefix {
|
||||||
|
tfsNew = append(tfsNew, tf)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tagKey = tagKey[1:]
|
||||||
|
var nameLen uint64
|
||||||
|
tagKey, nameLen, err = encoding.UnmarshalVarUint64(tagKey)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q: %s", tagKey, err)
|
||||||
|
}
|
||||||
|
if uint64(len(tagKey)) < nameLen {
|
||||||
|
logger.Panicf("BUG: expecting at %d bytes for name in tagKey=%q; got %d bytes", nameLen, tagKey, len(tagKey))
|
||||||
|
}
|
||||||
|
tagKey = tagKey[nameLen:]
|
||||||
|
tfNew := *tf
|
||||||
|
tfNew.prefix = append(tfNew.prefix[:0], prefix...)
|
||||||
|
tfNew.prefix = marshalTagValue(tfNew.prefix, tagKey)
|
||||||
|
tfNew.prefix = append(tfNew.prefix, tail...)
|
||||||
|
tfsNew = append(tfsNew, &tfNew)
|
||||||
|
}
|
||||||
|
return tfsNew
|
||||||
|
}
|
||||||
|
|
||||||
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
|
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
|
||||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
|
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
|
||||||
|
tfs = fromCompositeTagFilters(tfs, kb.B)
|
||||||
for i, tf := range tfs {
|
for i, tf := range tfs {
|
||||||
|
if bytes.Equal(tf.key, graphiteReverseTagKey) {
|
||||||
|
// Skip artificial tag filter for Graphite-like metric names with dots,
|
||||||
|
// since mn doesn't contain the corresponding tag.
|
||||||
|
continue
|
||||||
|
}
|
||||||
if len(tf.key) == 0 || string(tf.key) == "__graphite__" {
|
if len(tf.key) == 0 || string(tf.key) == "__graphite__" {
|
||||||
// Match against mn.MetricGroup.
|
// Match against mn.MetricGroup.
|
||||||
b := marshalTagValue(kb.B, nil)
|
b := marshalTagValue(kb.B, nil)
|
||||||
b = marshalTagValue(b, mn.MetricGroup)
|
b = marshalTagValue(b, mn.MetricGroup)
|
||||||
kb.B = b[:len(kb.B)]
|
kb.B = b[:len(kb.B)]
|
||||||
ok, err := matchTagFilter(b, tf)
|
ok, err := tf.match(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("cannot match MetricGroup %q with tagFilter %s: %w", mn.MetricGroup, tf, err)
|
return false, fmt.Errorf("cannot match MetricGroup %q with tagFilter %s: %w", mn.MetricGroup, tf, err)
|
||||||
}
|
}
|
||||||
|
@ -2147,17 +2176,10 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if bytes.Equal(tf.key, graphiteReverseTagKey) {
|
|
||||||
// Skip artificial tag filter for Graphite-like metric names with dots,
|
|
||||||
// since mn doesn't contain the corresponding tag.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Search for matching tag name.
|
// Search for matching tag name.
|
||||||
tagMatched := false
|
tagMatched := false
|
||||||
tagSeen := false
|
tagSeen := false
|
||||||
for j := range mn.Tags {
|
for _, tag := range mn.Tags {
|
||||||
tag := &mn.Tags[j]
|
|
||||||
if string(tag.Key) != string(tf.key) {
|
if string(tag.Key) != string(tf.key) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -2166,7 +2188,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
|
||||||
tagSeen = true
|
tagSeen = true
|
||||||
b := tag.Marshal(kb.B)
|
b := tag.Marshal(kb.B)
|
||||||
kb.B = b[:len(kb.B)]
|
kb.B = b[:len(kb.B)]
|
||||||
ok, err := matchTagFilter(b, tf)
|
ok, err := tf.match(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("cannot match tag %q with tagFilter %s: %w", tag, tf, err)
|
return false, fmt.Errorf("cannot match tag %q with tagFilter %s: %w", tag, tf, err)
|
||||||
}
|
}
|
||||||
|
@ -2206,20 +2228,6 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func matchTagFilter(b []byte, tf *tagFilter) (bool, error) {
|
|
||||||
if !bytes.HasPrefix(b, tf.prefix) {
|
|
||||||
return tf.isNegative, nil
|
|
||||||
}
|
|
||||||
ok, err := tf.matchSuffix(b[len(tf.prefix):])
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
return tf.isNegative, nil
|
|
||||||
}
|
|
||||||
return !tf.isNegative, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
|
func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
|
||||||
metricIDs := &uint64set.Set{}
|
metricIDs := &uint64set.Set{}
|
||||||
for _, tfs := range tfss {
|
for _, tfs := range tfss {
|
||||||
|
@ -2875,13 +2883,13 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
||||||
items := getIndexItems()
|
ii := getIndexItems()
|
||||||
defer putIndexItems(items)
|
defer putIndexItems(ii)
|
||||||
|
|
||||||
items.B = is.marshalCommonPrefix(items.B, nsPrefixDateToMetricID)
|
ii.B = is.marshalCommonPrefix(ii.B, nsPrefixDateToMetricID)
|
||||||
items.B = encoding.MarshalUint64(items.B, date)
|
ii.B = encoding.MarshalUint64(ii.B, date)
|
||||||
items.B = encoding.MarshalUint64(items.B, metricID)
|
ii.B = encoding.MarshalUint64(ii.B, metricID)
|
||||||
items.Next()
|
ii.Next()
|
||||||
|
|
||||||
// Create per-day inverted index entries for metricID.
|
// Create per-day inverted index entries for metricID.
|
||||||
kb := kbPool.Get()
|
kb := kbPool.Get()
|
||||||
|
@ -2909,27 +2917,44 @@ func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
||||||
}
|
}
|
||||||
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, date)
|
kb.B = encoding.MarshalUint64(kb.B, date)
|
||||||
|
ii.registerTagIndexes(kb.B, mn, metricID)
|
||||||
items.B = append(items.B, kb.B...)
|
if err = is.db.tb.AddItems(ii.Items); err != nil {
|
||||||
items.B = marshalTagValue(items.B, nil)
|
|
||||||
items.B = marshalTagValue(items.B, mn.MetricGroup)
|
|
||||||
items.B = encoding.MarshalUint64(items.B, metricID)
|
|
||||||
items.Next()
|
|
||||||
addReverseMetricGroupIfNeeded(items, kb.B, mn, metricID)
|
|
||||||
for i := range mn.Tags {
|
|
||||||
tag := &mn.Tags[i]
|
|
||||||
items.B = append(items.B, kb.B...)
|
|
||||||
items.B = tag.Marshal(items.B)
|
|
||||||
items.B = encoding.MarshalUint64(items.B, metricID)
|
|
||||||
items.Next()
|
|
||||||
}
|
|
||||||
if err = is.db.tb.AddItems(items.Items); err != nil {
|
|
||||||
return fmt.Errorf("cannot add per-day entires for metricID %d: %w", metricID, err)
|
return fmt.Errorf("cannot add per-day entires for metricID %d: %w", metricID, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func addReverseMetricGroupIfNeeded(items *indexItems, prefix []byte, mn *MetricName, metricID uint64) {
|
func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {
|
||||||
|
// Add index entry for MetricGroup -> MetricID
|
||||||
|
ii.B = append(ii.B, prefix...)
|
||||||
|
ii.B = marshalTagValue(ii.B, nil)
|
||||||
|
ii.B = marshalTagValue(ii.B, mn.MetricGroup)
|
||||||
|
ii.B = encoding.MarshalUint64(ii.B, metricID)
|
||||||
|
ii.Next()
|
||||||
|
ii.addReverseMetricGroupIfNeeded(prefix, mn, metricID)
|
||||||
|
|
||||||
|
// Add index entries for tags: tag -> MetricID
|
||||||
|
for _, tag := range mn.Tags {
|
||||||
|
ii.B = append(ii.B, prefix...)
|
||||||
|
ii.B = tag.Marshal(ii.B)
|
||||||
|
ii.B = encoding.MarshalUint64(ii.B, metricID)
|
||||||
|
ii.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add index entries for composite tags: MetricGroup+tag -> MetricID
|
||||||
|
compositeKey := kbPool.Get()
|
||||||
|
for _, tag := range mn.Tags {
|
||||||
|
compositeKey.B = marshalCompositeTagKey(compositeKey.B[:0], mn.MetricGroup, tag.Key)
|
||||||
|
ii.B = append(ii.B, prefix...)
|
||||||
|
ii.B = marshalTagValue(ii.B, compositeKey.B)
|
||||||
|
ii.B = marshalTagValue(ii.B, tag.Value)
|
||||||
|
ii.B = encoding.MarshalUint64(ii.B, metricID)
|
||||||
|
ii.Next()
|
||||||
|
}
|
||||||
|
kbPool.Put(compositeKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ii *indexItems) addReverseMetricGroupIfNeeded(prefix []byte, mn *MetricName, metricID uint64) {
|
||||||
if bytes.IndexByte(mn.MetricGroup, '.') < 0 {
|
if bytes.IndexByte(mn.MetricGroup, '.') < 0 {
|
||||||
// The reverse metric group is needed only for Graphite-like metrics with points.
|
// The reverse metric group is needed only for Graphite-like metrics with points.
|
||||||
return
|
return
|
||||||
|
@ -2937,14 +2962,24 @@ func addReverseMetricGroupIfNeeded(items *indexItems, prefix []byte, mn *MetricN
|
||||||
// This is most likely a Graphite metric like 'foo.bar.baz'.
|
// This is most likely a Graphite metric like 'foo.bar.baz'.
|
||||||
// Store reverse metric name 'zab.rab.oof' in order to speed up search for '*.bar.baz'
|
// Store reverse metric name 'zab.rab.oof' in order to speed up search for '*.bar.baz'
|
||||||
// when the Graphite wildcard has a suffix matching small number of time series.
|
// when the Graphite wildcard has a suffix matching small number of time series.
|
||||||
items.B = append(items.B, prefix...)
|
ii.B = append(ii.B, prefix...)
|
||||||
items.B = marshalTagValue(items.B, graphiteReverseTagKey)
|
ii.B = marshalTagValue(ii.B, graphiteReverseTagKey)
|
||||||
revBuf := kbPool.Get()
|
revBuf := kbPool.Get()
|
||||||
revBuf.B = reverseBytes(revBuf.B[:0], mn.MetricGroup)
|
revBuf.B = reverseBytes(revBuf.B[:0], mn.MetricGroup)
|
||||||
items.B = marshalTagValue(items.B, revBuf.B)
|
ii.B = marshalTagValue(ii.B, revBuf.B)
|
||||||
kbPool.Put(revBuf)
|
kbPool.Put(revBuf)
|
||||||
items.B = encoding.MarshalUint64(items.B, metricID)
|
ii.B = encoding.MarshalUint64(ii.B, metricID)
|
||||||
items.Next()
|
ii.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
func isArtificialTagKey(key []byte) bool {
|
||||||
|
if bytes.Equal(key, graphiteReverseTagKey) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if len(key) > 0 && key[0] == compositeTagKeyPrefix {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// The tag key for reverse metric name used for speeding up searching
|
// The tag key for reverse metric name used for speeding up searching
|
||||||
|
@ -2954,6 +2989,20 @@ func addReverseMetricGroupIfNeeded(items *indexItems, prefix []byte, mn *MetricN
|
||||||
// It is expected that the given key isn't be used by users.
|
// It is expected that the given key isn't be used by users.
|
||||||
var graphiteReverseTagKey = []byte("\xff")
|
var graphiteReverseTagKey = []byte("\xff")
|
||||||
|
|
||||||
|
// The prefix for composite tag, which is used for speeding up searching
|
||||||
|
// for composite filters, which contain `{__name__="<metric_name>"}` filter.
|
||||||
|
//
|
||||||
|
// It is expected that the given prefix isn't used by users.
|
||||||
|
const compositeTagKeyPrefix = '\xfe'
|
||||||
|
|
||||||
|
func marshalCompositeTagKey(dst, name, key []byte) []byte {
|
||||||
|
dst = append(dst, compositeTagKeyPrefix)
|
||||||
|
dst = encoding.MarshalVarUint64(dst, uint64(len(name)))
|
||||||
|
dst = append(dst, name...)
|
||||||
|
dst = append(dst, key...)
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
func reverseBytes(dst, src []byte) []byte {
|
func reverseBytes(dst, src []byte) []byte {
|
||||||
for i := len(src) - 1; i >= 0; i-- {
|
for i := len(src) - 1; i >= 0; i-- {
|
||||||
dst = append(dst, src[i])
|
dst = append(dst, src[i])
|
||||||
|
|
|
@ -15,6 +15,51 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ConvertToCompositeTagFilters converts tfs to composite filters.
|
||||||
|
//
|
||||||
|
// This converts `foo{bar="baz",x=~"a.+"}` to `{foo=bar="baz",foo=x=~"a.+"} filter.
|
||||||
|
func ConvertToCompositeTagFilters(tfs []TagFilter) []TagFilter {
|
||||||
|
// Search for metric name filter, which must be used for creating composite filters.
|
||||||
|
var name []byte
|
||||||
|
for _, tf := range tfs {
|
||||||
|
if len(tf.Key) == 0 && !tf.IsNegative && !tf.IsRegexp {
|
||||||
|
name = tf.Value
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(name) == 0 {
|
||||||
|
// There is no metric name filter, so composite filters cannot be created.
|
||||||
|
return tfs
|
||||||
|
}
|
||||||
|
tfsNew := make([]TagFilter, 0, len(tfs))
|
||||||
|
var compositeKey []byte
|
||||||
|
compositeFilters := 0
|
||||||
|
for _, tf := range tfs {
|
||||||
|
if len(tf.Key) == 0 {
|
||||||
|
if tf.IsNegative || tf.IsRegexp || string(tf.Value) != string(name) {
|
||||||
|
tfsNew = append(tfsNew, tf)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if string(tf.Key) == "__graphite__" {
|
||||||
|
tfsNew = append(tfsNew, tf)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
compositeKey = marshalCompositeTagKey(compositeKey[:0], name, tf.Key)
|
||||||
|
tfsNew = append(tfsNew, TagFilter{
|
||||||
|
Key: append([]byte{}, compositeKey...),
|
||||||
|
Value: append([]byte{}, tf.Value...),
|
||||||
|
IsNegative: tf.IsNegative,
|
||||||
|
IsRegexp: tf.IsRegexp,
|
||||||
|
})
|
||||||
|
compositeFilters++
|
||||||
|
}
|
||||||
|
if compositeFilters == 0 {
|
||||||
|
return tfs
|
||||||
|
}
|
||||||
|
return tfsNew
|
||||||
|
}
|
||||||
|
|
||||||
// TagFilters represents filters used for filtering tags.
|
// TagFilters represents filters used for filtering tags.
|
||||||
type TagFilters struct {
|
type TagFilters struct {
|
||||||
accountID uint32
|
accountID uint32
|
||||||
|
@ -359,6 +404,21 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tf *tagFilter) match(b []byte) (bool, error) {
|
||||||
|
prefix := tf.prefix
|
||||||
|
if !bytes.HasPrefix(b, prefix) {
|
||||||
|
return tf.isNegative, nil
|
||||||
|
}
|
||||||
|
ok, err := tf.matchSuffix(b[len(prefix):])
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return tf.isNegative, nil
|
||||||
|
}
|
||||||
|
return !tf.isNegative, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (tf *tagFilter) matchSuffix(b []byte) (bool, error) {
|
func (tf *tagFilter) matchSuffix(b []byte) (bool, error) {
|
||||||
// Remove the trailing tagSeparatorChar.
|
// Remove the trailing tagSeparatorChar.
|
||||||
if len(b) == 0 || b[len(b)-1] != tagSeparatorChar {
|
if len(b) == 0 || b[len(b)-1] != tagSeparatorChar {
|
||||||
|
|
|
@ -6,6 +6,284 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestConvertToCompositeTagFilters(t *testing.T) {
|
||||||
|
f := func(tfs, resultExpected []TagFilter) {
|
||||||
|
t.Helper()
|
||||||
|
result := ConvertToCompositeTagFilters(tfs)
|
||||||
|
if !reflect.DeepEqual(result, resultExpected) {
|
||||||
|
t.Fatalf("unexpected result;\ngot\n%+v\nwant\n%+v", result, resultExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Empty filters
|
||||||
|
f(nil, nil)
|
||||||
|
|
||||||
|
// A single non-name filter
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Multiple non-name filters
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("x"),
|
||||||
|
Value: []byte("yy"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("x"),
|
||||||
|
Value: []byte("yy"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// A single name filter
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Two name filters
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("baz"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("baz"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// A name filter with non-name filter.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Two name filters with non-name filter.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("baz"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("baz"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// A name filter with negative regexp non-name filter.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// A name filter with graphite filter.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("__graphite__"),
|
||||||
|
Value: []byte("foo.*.bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("__graphite__"),
|
||||||
|
Value: []byte("foo.*.bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// A name filter with non-name filter and a graphite filter.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("__graphite__"),
|
||||||
|
Value: []byte("foo.*.bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: []byte("\xfe\x03barfoo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("__graphite__"),
|
||||||
|
Value: []byte("foo.*.bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Regexp name filter with non-name filter.
|
||||||
|
f([]TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
}, []TagFilter{
|
||||||
|
{
|
||||||
|
Key: nil,
|
||||||
|
Value: []byte("bar"),
|
||||||
|
IsNegative: false,
|
||||||
|
IsRegexp: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: []byte("foo"),
|
||||||
|
Value: []byte("abc"),
|
||||||
|
IsNegative: true,
|
||||||
|
IsRegexp: false,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetCommonPrefix(t *testing.T) {
|
func TestGetCommonPrefix(t *testing.T) {
|
||||||
f := func(a []string, expectedPrefix string) {
|
f := func(a []string, expectedPrefix string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
Loading…
Reference in a new issue