lib/storage: properly match {tag!="|foo"} filters

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/546
This commit is contained in:
Aliaksandr Valialkin 2020-06-10 18:40:00 +03:00
parent d71b6e6584
commit c40f29f783
3 changed files with 52 additions and 17 deletions

View file

@ -1835,6 +1835,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
// Search for matching tag name. // Search for matching tag name.
tagMatched := false tagMatched := false
tagSeen := false
for j := range mn.Tags { for j := range mn.Tags {
tag := &mn.Tags[j] tag := &mn.Tags[j]
if string(tag.Key) != string(tf.key) { if string(tag.Key) != string(tf.key) {
@ -1842,6 +1843,7 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
} }
// Found the matching tag name. Match the value. // Found the matching tag name. Match the value.
tagSeen = true
b := tag.Marshal(kb.B) b := tag.Marshal(kb.B)
kb.B = b[:len(kb.B)] kb.B = b[:len(kb.B)]
ok, err := matchTagFilter(b, tf) ok, err := matchTagFilter(b, tf)
@ -1859,15 +1861,27 @@ func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer)
tagMatched = true tagMatched = true
break break
} }
if !tagMatched && !tf.isNegative { if !tagSeen && tf.isNegative && !tf.isEmptyMatch {
// Matching tag name wasn't found. // tf contains negative filter for non-exsisting tag key
// Move failed tf to start. // and this filter doesn't match empty string, i.e. {non_existing_tag_key!="foobar"}
// This should reduce the amount of useless work for the next mn. // Such filter matches anything.
if i > 0 { //
tfs[0], tfs[i] = tfs[i], tfs[0] // Note that the filter `{non_existing_tag_key!~"|foobar"}` shouldn't match anything,
} // since it is expected that it matches non-empty `non_existing_tag_key`.
return false, nil // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/546 for details.
continue
} }
if tagMatched {
// tf matches mn. Go to the next tf.
continue
}
// Matching tag name wasn't found.
// Move failed tf to start.
// This should reduce the amount of useless work for the next mn.
if i > 0 {
tfs[0], tfs[i] = tfs[i], tfs[0]
}
return false, nil
} }
return true, nil return true, nil
} }

View file

@ -1349,6 +1349,30 @@ func TestMatchTagFilters(t *testing.T) {
t.Fatalf("Shouldn't match") t.Fatalf("Shouldn't match")
} }
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/546
tfs.Reset(mn.AccountID, mn.ProjectID)
if err := tfs.Add([]byte("key 3"), []byte("|value 3"), true, true); err != nil {
t.Fatalf("cannot add regexp, negative filter: %s", err)
}
ok, err = matchTagFilters(&mn, toTFPointers(tfs.tfs), &bb)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if ok {
t.Fatalf("Shouldn't match")
}
tfs.Reset(mn.AccountID, mn.ProjectID)
if err := tfs.Add([]byte("key 3"), []byte("|value 2"), true, true); err != nil {
t.Fatalf("cannot add regexp, negative filter: %s", err)
}
ok, err = matchTagFilters(&mn, toTFPointers(tfs.tfs), &bb)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !ok {
t.Fatalf("Should match")
}
// Positive match by existing tag // Positive match by existing tag
tfs.Reset(mn.AccountID, mn.ProjectID) tfs.Reset(mn.AccountID, mn.ProjectID)
if err := tfs.Add([]byte("key 0"), []byte("value 0"), false, false); err != nil { if err := tfs.Add([]byte("key 0"), []byte("value 0"), false, false); err != nil {

View file

@ -90,7 +90,7 @@ func (tfs *TagFilters) Finalize() []*TagFilters {
var tfssNew []*TagFilters var tfssNew []*TagFilters
for i := range tfs.tfs { for i := range tfs.tfs {
tf := &tfs.tfs[i] tf := &tfs.tfs[i]
if tf.matchesEmptyValue { if !tf.isNegative && tf.isEmptyMatch {
// tf matches empty value, so it must be accompanied with `key!~".+"` tag filter // tf matches empty value, so it must be accompanied with `key!~".+"` tag filter
// in order to match time series without the given label. // in order to match time series without the given label.
tfssNew = append(tfssNew, tfs.cloneWithNegativeFilter(tf)) tfssNew = append(tfssNew, tfs.cloneWithNegativeFilter(tf))
@ -168,10 +168,8 @@ type tagFilter struct {
// Matches regexp suffix. // Matches regexp suffix.
reSuffixMatch func(b []byte) bool reSuffixMatch func(b []byte) bool
// Set to true for filter that matches empty value, i.e. "", "|foo" or ".*" // Set to true for filters matching empty value.
// isEmptyMatch bool
// Such a filter must be applied directly to metricNames.
matchesEmptyValue bool
// Contains reverse suffix for Graphite wildcard. // Contains reverse suffix for Graphite wildcard.
// I.e. for `{__name__=~"foo\\.[^.]*\\.bar\\.baz"}` the value will be `zab.rab.` // I.e. for `{__name__=~"foo\\.[^.]*\\.bar\\.baz"}` the value will be `zab.rab.`
@ -256,7 +254,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
tf.orSuffixes = tf.orSuffixes[:0] tf.orSuffixes = tf.orSuffixes[:0]
tf.reSuffixMatch = nil tf.reSuffixMatch = nil
tf.matchesEmptyValue = false tf.isEmptyMatch = false
tf.graphiteReverseSuffix = tf.graphiteReverseSuffix[:0] tf.graphiteReverseSuffix = tf.graphiteReverseSuffix[:0]
tf.prefix = append(tf.prefix, commonPrefix...) tf.prefix = append(tf.prefix, commonPrefix...)
@ -276,6 +274,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
// Add empty orSuffix in order to trigger fast path for orSuffixes // Add empty orSuffix in order to trigger fast path for orSuffixes
// during the search for matching metricIDs. // during the search for matching metricIDs.
tf.orSuffixes = append(tf.orSuffixes[:0], "") tf.orSuffixes = append(tf.orSuffixes[:0], "")
tf.isEmptyMatch = len(prefix) == 0
return nil return nil
} }
rcv, err := getRegexpFromCache(expr) rcv, err := getRegexpFromCache(expr)
@ -284,9 +283,7 @@ func (tf *tagFilter) Init(commonPrefix, key, value []byte, isNegative, isRegexp
} }
tf.orSuffixes = append(tf.orSuffixes[:0], rcv.orValues...) tf.orSuffixes = append(tf.orSuffixes[:0], rcv.orValues...)
tf.reSuffixMatch = rcv.reMatch tf.reSuffixMatch = rcv.reMatch
if len(prefix) == 0 && !tf.isNegative && tf.reSuffixMatch(nil) { tf.isEmptyMatch = len(prefix) == 0 && tf.reSuffixMatch(nil)
tf.matchesEmptyValue = true
}
if !tf.isNegative && len(key) == 0 && strings.IndexByte(rcv.literalSuffix, '.') >= 0 { if !tf.isNegative && len(key) == 0 && strings.IndexByte(rcv.literalSuffix, '.') >= 0 {
// Reverse suffix is needed only for non-negative regexp filters on __name__ that contains dots. // Reverse suffix is needed only for non-negative regexp filters on __name__ that contains dots.
tf.graphiteReverseSuffix = reverseBytes(tf.graphiteReverseSuffix[:0], []byte(rcv.literalSuffix)) tf.graphiteReverseSuffix = reverseBytes(tf.graphiteReverseSuffix[:0], []byte(rcv.literalSuffix))