lib/storage: properly match {tag!="|foo"} filters

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/546
This commit is contained in:
Aliaksandr Valialkin 2020-06-10 18:40:00 +03:00
parent e426434770
commit ae1cc0fc4b
3 changed files with 52 additions and 17 deletions

View file

@ -1808,6 +1808,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
// Search for matching tag name.
tagMatched := false
tagSeen := false
for j := range mn.Tags {
tag := &mn.Tags[j]
if string(tag.Key) != string(tf.key) {
@ -1815,6 +1816,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
}
// Found the matching tag name. Match the value.
tagSeen = true
b := tag.Marshal(kb.B)
kb.B = b[:len(kb.B)]
ok, err := matchTagFilter(b, tf)
@ -1832,15 +1834,27 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
tagMatched = true
break
}
if !tagMatched && !tf.isNegative {
// Matching tag name wasn't found.
// Move failed tf to start.
// This should reduce the amount of useless work for the next mn.
if i > 0 {
tfs[0], tfs[i] = tfs[i], tfs[0]
}
return false, nil
if !tagSeen && tf.isNegative && !tf.isEmptyMatch {
// tf contains negative filter for non-exsisting tag key
// and this filter doesn't match empty string, i.e. {non_existing_tag_key!="foobar"}
// Such filter matches anything.
//
// Note that the filter `{non_existing_tag_key!~"|foobar"}` shouldn't match anything,
// since it is expected that it matches non-empty `non_existing_tag_key`.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/546 for details.
continue
}
if tagMatched {
// tf matches mn. Go to the next tf.
continue
}
// Matching tag name wasn't found.
// Move failed tf to start.
// This should reduce the amount of useless work for the next mn.
if i > 0 {
tfs[0], tfs[i] = tfs[i], tfs[0]
}
return false, nil
}
return true, nil
}

View file

@ -1280,6 +1280,30 @@ func TestMatchTagFilters(t *testing.T) {
t.Fatalf("Shouldn't match")
}
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/546
tfs.Reset()
if err := tfs.Add([]byte("key 3"), []byte("|value 3"), true, true); err != nil {
t.Fatalf("cannot add regexp, negative filter: %s", err)
}
ok, err = matchTagFilters(&mn, toTFPointers(tfs.tfs), &bb)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if ok {
t.Fatalf("Shouldn't match")
}
tfs.Reset()
if err := tfs.Add([]byte("key 3"), []byte("|value 2"), true, true); err != nil {
t.Fatalf("cannot add regexp, negative filter: %s", err)
}
ok, err = matchTagFilters(&mn, toTFPointers(tfs.tfs), &bb)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !ok {
t.Fatalf("Should match")
}
// Positive match by existing tag
tfs.Reset()
if err := tfs.Add([]byte("key 0"), []byte("value 0"), false, false); err != nil {

View file

@ -84,7 +84,7 @@ func (tfs *TagFilters) Finalize() []*TagFilters {
var tfssNew []*TagFilters
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf.matchesEmptyValue {
if !tf.isNegative && tf.isEmptyMatch {
// tf matches empty value, so it must be accompanied with `key!~".+"` tag filter
// in order to match time series without the given label.
tfssNew = append(tfssNew, tfs.cloneWithNegativeFilter(tf))
@ -156,10 +156,8 @@ type tagFilter struct {
// Matches regexp suffix.
reSuffixMatch func(b []byte) bool
// Set to true for filter that matches empty value, i.e. "", "|foo" or ".*"
//
// Such a filter must be applied directly to metricNames.
matchesEmptyValue bool
// Set to true for filters matching empty value.
isEmptyMatch bool
// Contains reverse suffix for Graphite wildcard.
// I.e. for `{__name__=~"foo\\.[^.]*\\.bar\\.baz"}` the value will be `zab.rab.`
@ -238,7 +236,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
tf.orSuffixes = tf.orSuffixes[:0]
tf.reSuffixMatch = nil
tf.matchesEmptyValue = false
tf.isEmptyMatch = false
tf.graphiteReverseSuffix = tf.graphiteReverseSuffix[:0]
tf.prefix = append(tf.prefix, commonPrefix...)
@ -258,6 +256,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
// Add empty orSuffix in order to trigger fast path for orSuffixes
// during the search for matching metricIDs.
tf.orSuffixes = append(tf.orSuffixes[:0], "")
tf.isEmptyMatch = len(prefix) == 0
return nil
}
rcv, err := getRegexpFromCache(expr)
@ -266,9 +265,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
}
tf.orSuffixes = append(tf.orSuffixes[:0], rcv.orValues...)
tf.reSuffixMatch = rcv.reMatch
if len(prefix) == 0 && !tf.isNegative && tf.reSuffixMatch(nil) {
tf.matchesEmptyValue = true
}
tf.isEmptyMatch = len(prefix) == 0 && tf.reSuffixMatch(nil)
if !tf.isNegative && len(key) == 0 && strings.IndexByte(rcv.literalSuffix, '.') >= 0 {
// Reverse suffix is needed only for non-negative regexp filters on __name__ that contains dots.
tf.graphiteReverseSuffix = reverseBytes(tf.graphiteReverseSuffix[:0], []byte(rcv.literalSuffix))