This commit is contained in:
Aliaksandr Valialkin 2024-05-17 13:04:51 +02:00
parent 3da4b970d7
commit 0b4c103edb
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 264 additions and 106 deletions

View file

@ -11,7 +11,6 @@ import (
"unicode/utf8"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil"
)
type lexer struct {
@ -1076,100 +1075,14 @@ func stripTimezoneSuffix(s string) string {
}
func parseFilterStream(lex *lexer) (*filterStream, error) {
if !lex.isKeyword("{") {
return nil, fmt.Errorf("unexpected token %q instead of '{' in _stream filter", lex.token)
}
if !lex.mustNextToken() {
return nil, fmt.Errorf("incomplete _stream filter after '{'")
}
var filters []*andStreamFilter
for {
f, err := parseAndStreamFilter(lex)
sf, err := parseStreamFilter(lex)
if err != nil {
return nil, err
}
filters = append(filters, f)
switch {
case lex.isKeyword("}"):
lex.nextToken()
fs := &filterStream{
f: &StreamFilter{
orFilters: filters,
},
f: sf,
}
return fs, nil
case lex.isKeyword("or"):
if !lex.mustNextToken() {
return nil, fmt.Errorf("incomplete _stream filter after 'or'")
}
if lex.isKeyword("}") {
return nil, fmt.Errorf("unexpected '}' after 'or' in _stream filter")
}
default:
return nil, fmt.Errorf("unexpected token in _stream filter: %q; want '}' or 'or'", lex.token)
}
}
}
func parseAndStreamFilter(lex *lexer) (*andStreamFilter, error) {
var filters []*streamTagFilter
for {
if lex.isKeyword("}") {
asf := &andStreamFilter{
tagFilters: filters,
}
return asf, nil
}
f, err := parseStreamTagFilter(lex)
if err != nil {
return nil, err
}
filters = append(filters, f)
switch {
case lex.isKeyword("or", "}"):
asf := &andStreamFilter{
tagFilters: filters,
}
return asf, nil
case lex.isKeyword(","):
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing stream filter after ','")
}
default:
return nil, fmt.Errorf("unexpected token %q in _stream filter; want 'or', 'and', '}' or ','", lex.token)
}
}
}
func parseStreamTagFilter(lex *lexer) (*streamTagFilter, error) {
tagName := lex.token
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing operation in _stream filter for %q field", tagName)
}
if !lex.isKeyword("=", "!=", "=~", "!~") {
return nil, fmt.Errorf("unsupported operation %q in _steam filter for %q field; supported operations: =, !=, =~, !~", lex.token, tagName)
}
op := lex.token
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing _stream filter value for %q field", tagName)
}
value := lex.token
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing token after %q%s%q filter", tagName, op, value)
}
stf := &streamTagFilter{
tagName: tagName,
op: op,
value: value,
}
if op == "=~" || op == "!~" {
re, err := regexutil.NewPromRegex(value)
if err != nil {
return nil, fmt.Errorf("invalid regexp %q for stream filter: %w", value, err)
}
stf.regexp = re
}
return stf, nil
}
func parseTime(lex *lexer) (int64, string, error) {

View file

@ -1,12 +1,14 @@
package logstorage
import (
"fmt"
"strconv"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil"
)
@ -97,6 +99,101 @@ func (tf *streamTagFilter) String() string {
return quoteTokenIfNeeded(tf.tagName) + tf.op + strconv.Quote(tf.value)
}
func parseStreamFilter(lex *lexer) (*StreamFilter, error) {
if !lex.isKeyword("{") {
return nil, fmt.Errorf("unexpected token %q instead of '{' in _stream filter", lex.token)
}
if !lex.mustNextToken() {
return nil, fmt.Errorf("incomplete _stream filter after '{'")
}
var filters []*andStreamFilter
for {
f, err := parseAndStreamFilter(lex)
if err != nil {
return nil, err
}
filters = append(filters, f)
switch {
case lex.isKeyword("}"):
lex.nextToken()
sf := &StreamFilter{
orFilters: filters,
}
return sf, nil
case lex.isKeyword("or"):
if !lex.mustNextToken() {
return nil, fmt.Errorf("incomplete _stream filter after 'or'")
}
if lex.isKeyword("}") {
return nil, fmt.Errorf("unexpected '}' after 'or' in _stream filter")
}
default:
return nil, fmt.Errorf("unexpected token in _stream filter: %q; want '}' or 'or'", lex.token)
}
}
}
func parseAndStreamFilter(lex *lexer) (*andStreamFilter, error) {
var filters []*streamTagFilter
for {
if lex.isKeyword("}") {
asf := &andStreamFilter{
tagFilters: filters,
}
return asf, nil
}
f, err := parseStreamTagFilter(lex)
if err != nil {
return nil, err
}
filters = append(filters, f)
switch {
case lex.isKeyword("or", "}"):
asf := &andStreamFilter{
tagFilters: filters,
}
return asf, nil
case lex.isKeyword(","):
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing stream filter after ','")
}
default:
return nil, fmt.Errorf("unexpected token %q in _stream filter; want 'or', 'and', '}' or ','", lex.token)
}
}
}
func parseStreamTagFilter(lex *lexer) (*streamTagFilter, error) {
tagName := lex.token
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing operation in _stream filter for %q field", tagName)
}
if !lex.isKeyword("=", "!=", "=~", "!~") {
return nil, fmt.Errorf("unsupported operation %q in _steam filter for %q field; supported operations: =, !=, =~, !~", lex.token, tagName)
}
op := lex.token
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing _stream filter value for %q field", tagName)
}
value := lex.token
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing token after %q%s%q filter", tagName, op, value)
}
stf := &streamTagFilter{
tagName: tagName,
op: op,
value: value,
}
if op == "=~" || op == "!~" {
re, err := regexutil.NewPromRegex(value)
if err != nil {
return nil, fmt.Errorf("invalid regexp %q for stream filter: %w", value, err)
}
stf.regexp = re
}
return stf, nil
}
func getStreamName() *streamName {
v := streamNamePool.Get()
if v == nil {
@ -170,20 +267,27 @@ func (sn *streamName) parse(s string) bool {
}
func (sn *streamName) match(tf *streamTagFilter) bool {
for _, t := range sn.tags {
if t.Name != tf.tagName {
continue
}
v := sn.getTagValueByTagName(tf.tagName)
switch tf.op {
case "=":
return t.Value == tf.value
return v == tf.value
case "!=":
return t.Value != tf.value
return v != tf.value
case "=~":
return tf.regexp.MatchString(t.Value)
return tf.regexp.MatchString(v)
case "!~":
return !tf.regexp.MatchString(t.Value)
}
}
return !tf.regexp.MatchString(v)
default:
logger.Panicf("BUG: unexpected tagFilter operation: %q", tf.op)
return false
}
}
func (sn *streamName) getTagValueByTagName(name string) string {
for _, t := range sn.tags {
if t.Name == name {
return t.Value
}
}
return ""
}

View file

@ -5,6 +5,147 @@ import (
"testing"
)
func TestStreamFilterMatchStreamName(t *testing.T) {
f := func(filter, streamName string, resultExpected bool) {
t.Helper()
sf := mustNewTestStreamFilter(filter)
result := sf.matchStreamName(streamName)
if result != resultExpected {
t.Fatalf("unexpected result for matching %s against %s; got %v; want %v", streamName, sf, result, resultExpected)
}
}
// Empty filter matches anything
f(`{}`, `{}`, true)
f(`{}`, `{foo="bar"}`, true)
f(`{}`, `{foo="bar",a="b",c="d"}`, true)
// empty '=' filter
f(`{foo=""}`, `{}`, true)
f(`{foo=""}`, `{foo="bar"}`, false)
f(`{foo=""}`, `{a="b",c="d"}`, true)
// non-empty '=' filter
f(`{foo="bar"}`, `{}`, false)
f(`{foo="bar"}`, `{foo="bar"}`, true)
f(`{foo="bar"}`, `{foo="barbaz"}`, false)
f(`{foo="bar"}`, `{foo="bazbar"}`, false)
f(`{foo="bar"}`, `{a="b",foo="bar"}`, true)
f(`{foo="bar"}`, `{foo="bar",a="b"}`, true)
f(`{foo="bar"}`, `{a="b",foo="bar",c="d"}`, true)
f(`{foo="bar"}`, `{foo="baz"}`, false)
f(`{foo="bar"}`, `{foo="baz",a="b"}`, false)
f(`{foo="bar"}`, `{a="b",foo="baz"}`, false)
f(`{foo="bar"}`, `{a="b",foo="baz",b="c"}`, false)
f(`{foo="bar"}`, `{zoo="bar"}`, false)
f(`{foo="bar"}`, `{a="b",zoo="bar"}`, false)
// empty '!=' filter
f(`{foo!=""}`, `{}`, false)
f(`{foo!=""}`, `{foo="bar"}`, true)
f(`{foo!=""}`, `{a="b",c="d"}`, false)
// non-empty '!=' filter
f(`{foo!="bar"}`, `{}`, true)
f(`{foo!="bar"}`, `{foo="bar"}`, false)
f(`{foo!="bar"}`, `{foo="barbaz"}`, true)
f(`{foo!="bar"}`, `{foo="bazbar"}`, true)
f(`{foo!="bar"}`, `{a="b",foo="bar"}`, false)
f(`{foo!="bar"}`, `{foo="bar",a="b"}`, false)
f(`{foo!="bar"}`, `{a="b",foo="bar",c="d"}`, false)
f(`{foo!="bar"}`, `{foo="baz"}`, true)
f(`{foo!="bar"}`, `{foo="baz",a="b"}`, true)
f(`{foo!="bar"}`, `{a="b",foo="baz"}`, true)
f(`{foo!="bar"}`, `{a="b",foo="baz",b="c"}`, true)
f(`{foo!="bar"}`, `{zoo="bar"}`, true)
f(`{foo!="bar"}`, `{a="b",zoo="bar"}`, true)
// empty '=~' filter
f(`{foo=~""}`, `{}`, true)
f(`{foo=~""}`, `{foo="bar"}`, false)
f(`{foo=~""}`, `{a="b",c="d"}`, true)
f(`{foo=~".*"}`, `{}`, true)
f(`{foo=~".*"}`, `{foo="bar"}`, true)
f(`{foo=~".*"}`, `{a="b",c="d"}`, true)
// non-empty '=~` filter
f(`{foo=~".+"}`, `{}`, false)
f(`{foo=~".+"}`, `{foo="bar"}`, true)
f(`{foo=~".+"}`, `{a="b",c="d"}`, false)
f(`{foo=~"bar"}`, `{foo="bar"}`, true)
f(`{foo=~"bar"}`, `{foo="barbaz"}`, false)
f(`{foo=~"bar"}`, `{foo="bazbar"}`, false)
f(`{foo=~"bar"}`, `{a="b",foo="bar"}`, true)
f(`{foo=~"bar"}`, `{foo="bar",a="b"}`, true)
f(`{foo=~"bar"}`, `{a="b",foo="bar",b="c"}`, true)
f(`{foo=~"bar"}`, `{foo="baz"}`, false)
f(`{foo=~"bar"}`, `{foo="baz",a="b"}`, false)
f(`{foo=~"bar"}`, `{zoo="bar"}`, false)
f(`{foo=~"bar"}`, `{a="b",zoo="bar"}`, false)
f(`{foo=~".*a.+"}`, `{foo="bar"}`, true)
f(`{foo=~".*a.+"}`, `{foo="barboz"}`, true)
f(`{foo=~".*a.+"}`, `{foo="bazbor"}`, true)
f(`{foo=~".*a.+"}`, `{a="b",foo="bar"}`, true)
f(`{foo=~".*a.+"}`, `{foo="bar",a="b"}`, true)
f(`{foo=~".*a.+"}`, `{a="b",foo="bar",b="c"}`, true)
f(`{foo=~".*a.+"}`, `{foo="boz"}`, false)
f(`{foo=~".*a.+"}`, `{foo="boz",a="b"}`, false)
f(`{foo=~".*a.+"}`, `{zoo="bar"}`, false)
f(`{foo=~".*a.+"}`, `{a="b",zoo="bar"}`, false)
// empty '!~' filter
f(`{foo!~""}`, `{}`, false)
f(`{foo!~""}`, `{foo="bar"}`, true)
f(`{foo!~""}`, `{a="b",c="d"}`, false)
f(`{foo!~".*"}`, `{}`, false)
f(`{foo!~".*"}`, `{foo="bar"}`, false)
f(`{foo!~".*"}`, `{a="b",c="d"}`, false)
f(`{foo!~"bar"}`, `{foo="bar"}`, false)
f(`{foo!~"bar"}`, `{foo="barbaz"}`, true)
f(`{foo!~"bar"}`, `{foo="bazbar"}`, true)
f(`{foo!~"bar"}`, `{a="b",foo="bar"}`, false)
f(`{foo!~"bar"}`, `{foo="bar",a="b"}`, false)
f(`{foo!~"bar"}`, `{a="b",foo="bar",b="c"}`, false)
f(`{foo!~"bar"}`, `{foo="baz"}`, true)
f(`{foo!~"bar"}`, `{foo="baz",a="b"}`, true)
f(`{foo!~"bar"}`, `{zoo="bar"}`, true)
f(`{foo!~"bar"}`, `{a="b",zoo="bar"}`, true)
f(`{foo!~".*a.+"}`, `{foo="bar"}`, false)
f(`{foo!~".*a.+"}`, `{foo="barboz"}`, false)
f(`{foo!~".*a.+"}`, `{foo="bazbor"}`, false)
f(`{foo!~".*a.+"}`, `{a="b",foo="bar"}`, false)
f(`{foo!~".*a.+"}`, `{foo="bar",a="b"}`, false)
f(`{foo!~".*a.+"}`, `{a="b",foo="bar",b="c"}`, false)
f(`{foo!~".*a.+"}`, `{foo="boz"}`, true)
f(`{foo!~".*a.+"}`, `{foo="boz",a="b"}`, true)
f(`{foo!~".*a.+"}`, `{zoo="bar"}`, true)
f(`{foo!~".*a.+"}`, `{a="b",zoo="bar"}`, true)
// multiple 'and' filters
f(`{a="b",b="c"}`, `{a="b"}`, false)
f(`{a="b",b="c"}`, `{b="c",a="b"}`, true)
f(`{a="b",b="c"}`, `{x="y",b="c",a="b",d="e"}`, true)
f(`{a=~"foo.+",a!~".+bar"}`, `{a="foobar"}`, false)
f(`{a=~"foo.+",a!~".+bar"}`, `{a="foozar"}`, true)
// multple `or` filters
f(`{a="b" or b="c"}`, `{x="y"}`, false)
f(`{a="b" or b="c"}`, `{x="y",b="c"}`, true)
f(`{a="b" or b="c"}`, `{a="b",x="y",b="c"}`, true)
f(`{a="b",b="c" or a=~"foo.+"}`, `{}`, false)
f(`{a="b",b="c" or a=~"foo.+"}`, `{x="y",a="foobar"}`, true)
f(`{a="b",b="c" or a=~"foo.+"}`, `{x="y",a="b"}`, false)
f(`{a="b",b="c" or a=~"foo.+"}`, `{x="y",b="c",a="b"}`, true)
f(`{a="b" or c=""}`, `{}`, true)
f(`{a="b" or c=""}`, `{c="x"}`, false)
f(`{a="b" or c=""}`, `{a="b"}`, true)
}
func TestNewTestStreamFilterSuccess(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()