From 0b4c103edbcbcb58a1f2076a4982c2a7bf8155b2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 17 May 2024 13:04:51 +0200 Subject: [PATCH] wip --- lib/logstorage/parser.go | 99 ++----------------- lib/logstorage/stream_filter.go | 130 +++++++++++++++++++++--- lib/logstorage/stream_filter_test.go | 141 +++++++++++++++++++++++++++ 3 files changed, 264 insertions(+), 106 deletions(-) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index ddf3cb771..5b851b418 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -11,7 +11,6 @@ import ( "unicode/utf8" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil" ) type lexer struct { @@ -1076,100 +1075,14 @@ func stripTimezoneSuffix(s string) string { } func parseFilterStream(lex *lexer) (*filterStream, error) { - if !lex.isKeyword("{") { - return nil, fmt.Errorf("unexpected token %q instead of '{' in _stream filter", lex.token) + sf, err := parseStreamFilter(lex) + if err != nil { + return nil, err } - if !lex.mustNextToken() { - return nil, fmt.Errorf("incomplete _stream filter after '{'") + fs := &filterStream{ + f: sf, } - var filters []*andStreamFilter - for { - f, err := parseAndStreamFilter(lex) - if err != nil { - return nil, err - } - filters = append(filters, f) - switch { - case lex.isKeyword("}"): - lex.nextToken() - fs := &filterStream{ - f: &StreamFilter{ - orFilters: filters, - }, - } - return fs, nil - case lex.isKeyword("or"): - if !lex.mustNextToken() { - return nil, fmt.Errorf("incomplete _stream filter after 'or'") - } - if lex.isKeyword("}") { - return nil, fmt.Errorf("unexpected '}' after 'or' in _stream filter") - } - default: - return nil, fmt.Errorf("unexpected token in _stream filter: %q; want '}' or 'or'", lex.token) - } - } -} - -func parseAndStreamFilter(lex *lexer) (*andStreamFilter, error) { - var filters []*streamTagFilter - for { - if lex.isKeyword("}") { - asf := &andStreamFilter{ - tagFilters: filters, - } - return asf, nil - } - f, err := parseStreamTagFilter(lex) - if err != nil { - return nil, err - } - filters = append(filters, f) - switch { - case lex.isKeyword("or", "}"): - asf := &andStreamFilter{ - tagFilters: filters, - } - return asf, nil - case lex.isKeyword(","): - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing stream filter after ','") - } - default: - return nil, fmt.Errorf("unexpected token %q in _stream filter; want 'or', 'and', '}' or ','", lex.token) - } - } -} - -func parseStreamTagFilter(lex *lexer) (*streamTagFilter, error) { - tagName := lex.token - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing operation in _stream filter for %q field", tagName) - } - if !lex.isKeyword("=", "!=", "=~", "!~") { - return nil, fmt.Errorf("unsupported operation %q in _steam filter for %q field; supported operations: =, !=, =~, !~", lex.token, tagName) - } - op := lex.token - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing _stream filter value for %q field", tagName) - } - value := lex.token - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing token after %q%s%q filter", tagName, op, value) - } - stf := &streamTagFilter{ - tagName: tagName, - op: op, - value: value, - } - if op == "=~" || op == "!~" { - re, err := regexutil.NewPromRegex(value) - if err != nil { - return nil, fmt.Errorf("invalid regexp %q for stream filter: %w", value, err) - } - stf.regexp = re - } - return stf, nil + return fs, nil } func parseTime(lex *lexer) (int64, string, error) { diff --git a/lib/logstorage/stream_filter.go b/lib/logstorage/stream_filter.go index 2ba2109df..fe3f80b0d 100644 --- a/lib/logstorage/stream_filter.go +++ b/lib/logstorage/stream_filter.go @@ -1,12 +1,14 @@ package logstorage import ( + "fmt" "strconv" "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/regexutil" ) @@ -97,6 +99,101 @@ func (tf *streamTagFilter) String() string { return quoteTokenIfNeeded(tf.tagName) + tf.op + strconv.Quote(tf.value) } +func parseStreamFilter(lex *lexer) (*StreamFilter, error) { + if !lex.isKeyword("{") { + return nil, fmt.Errorf("unexpected token %q instead of '{' in _stream filter", lex.token) + } + if !lex.mustNextToken() { + return nil, fmt.Errorf("incomplete _stream filter after '{'") + } + var filters []*andStreamFilter + for { + f, err := parseAndStreamFilter(lex) + if err != nil { + return nil, err + } + filters = append(filters, f) + switch { + case lex.isKeyword("}"): + lex.nextToken() + sf := &StreamFilter{ + orFilters: filters, + } + return sf, nil + case lex.isKeyword("or"): + if !lex.mustNextToken() { + return nil, fmt.Errorf("incomplete _stream filter after 'or'") + } + if lex.isKeyword("}") { + return nil, fmt.Errorf("unexpected '}' after 'or' in _stream filter") + } + default: + return nil, fmt.Errorf("unexpected token in _stream filter: %q; want '}' or 'or'", lex.token) + } + } +} + +func parseAndStreamFilter(lex *lexer) (*andStreamFilter, error) { + var filters []*streamTagFilter + for { + if lex.isKeyword("}") { + asf := &andStreamFilter{ + tagFilters: filters, + } + return asf, nil + } + f, err := parseStreamTagFilter(lex) + if err != nil { + return nil, err + } + filters = append(filters, f) + switch { + case lex.isKeyword("or", "}"): + asf := &andStreamFilter{ + tagFilters: filters, + } + return asf, nil + case lex.isKeyword(","): + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing stream filter after ','") + } + default: + return nil, fmt.Errorf("unexpected token %q in _stream filter; want 'or', 'and', '}' or ','", lex.token) + } + } +} + +func parseStreamTagFilter(lex *lexer) (*streamTagFilter, error) { + tagName := lex.token + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing operation in _stream filter for %q field", tagName) + } + if !lex.isKeyword("=", "!=", "=~", "!~") { + return nil, fmt.Errorf("unsupported operation %q in _steam filter for %q field; supported operations: =, !=, =~, !~", lex.token, tagName) + } + op := lex.token + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing _stream filter value for %q field", tagName) + } + value := lex.token + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing token after %q%s%q filter", tagName, op, value) + } + stf := &streamTagFilter{ + tagName: tagName, + op: op, + value: value, + } + if op == "=~" || op == "!~" { + re, err := regexutil.NewPromRegex(value) + if err != nil { + return nil, fmt.Errorf("invalid regexp %q for stream filter: %w", value, err) + } + stf.regexp = re + } + return stf, nil +} + func getStreamName() *streamName { v := streamNamePool.Get() if v == nil { @@ -170,20 +267,27 @@ func (sn *streamName) parse(s string) bool { } func (sn *streamName) match(tf *streamTagFilter) bool { + v := sn.getTagValueByTagName(tf.tagName) + switch tf.op { + case "=": + return v == tf.value + case "!=": + return v != tf.value + case "=~": + return tf.regexp.MatchString(v) + case "!~": + return !tf.regexp.MatchString(v) + default: + logger.Panicf("BUG: unexpected tagFilter operation: %q", tf.op) + return false + } +} + +func (sn *streamName) getTagValueByTagName(name string) string { for _, t := range sn.tags { - if t.Name != tf.tagName { - continue - } - switch tf.op { - case "=": - return t.Value == tf.value - case "!=": - return t.Value != tf.value - case "=~": - return tf.regexp.MatchString(t.Value) - case "!~": - return !tf.regexp.MatchString(t.Value) + if t.Name == name { + return t.Value } } - return false + return "" } diff --git a/lib/logstorage/stream_filter_test.go b/lib/logstorage/stream_filter_test.go index ed88bd1c6..2213059bb 100644 --- a/lib/logstorage/stream_filter_test.go +++ b/lib/logstorage/stream_filter_test.go @@ -5,6 +5,147 @@ import ( "testing" ) +func TestStreamFilterMatchStreamName(t *testing.T) { + f := func(filter, streamName string, resultExpected bool) { + t.Helper() + sf := mustNewTestStreamFilter(filter) + result := sf.matchStreamName(streamName) + if result != resultExpected { + t.Fatalf("unexpected result for matching %s against %s; got %v; want %v", streamName, sf, result, resultExpected) + } + } + + // Empty filter matches anything + f(`{}`, `{}`, true) + f(`{}`, `{foo="bar"}`, true) + f(`{}`, `{foo="bar",a="b",c="d"}`, true) + + // empty '=' filter + f(`{foo=""}`, `{}`, true) + f(`{foo=""}`, `{foo="bar"}`, false) + f(`{foo=""}`, `{a="b",c="d"}`, true) + + // non-empty '=' filter + f(`{foo="bar"}`, `{}`, false) + f(`{foo="bar"}`, `{foo="bar"}`, true) + f(`{foo="bar"}`, `{foo="barbaz"}`, false) + f(`{foo="bar"}`, `{foo="bazbar"}`, false) + f(`{foo="bar"}`, `{a="b",foo="bar"}`, true) + f(`{foo="bar"}`, `{foo="bar",a="b"}`, true) + f(`{foo="bar"}`, `{a="b",foo="bar",c="d"}`, true) + f(`{foo="bar"}`, `{foo="baz"}`, false) + f(`{foo="bar"}`, `{foo="baz",a="b"}`, false) + f(`{foo="bar"}`, `{a="b",foo="baz"}`, false) + f(`{foo="bar"}`, `{a="b",foo="baz",b="c"}`, false) + f(`{foo="bar"}`, `{zoo="bar"}`, false) + f(`{foo="bar"}`, `{a="b",zoo="bar"}`, false) + + // empty '!=' filter + f(`{foo!=""}`, `{}`, false) + f(`{foo!=""}`, `{foo="bar"}`, true) + f(`{foo!=""}`, `{a="b",c="d"}`, false) + + // non-empty '!=' filter + f(`{foo!="bar"}`, `{}`, true) + f(`{foo!="bar"}`, `{foo="bar"}`, false) + f(`{foo!="bar"}`, `{foo="barbaz"}`, true) + f(`{foo!="bar"}`, `{foo="bazbar"}`, true) + f(`{foo!="bar"}`, `{a="b",foo="bar"}`, false) + f(`{foo!="bar"}`, `{foo="bar",a="b"}`, false) + f(`{foo!="bar"}`, `{a="b",foo="bar",c="d"}`, false) + f(`{foo!="bar"}`, `{foo="baz"}`, true) + f(`{foo!="bar"}`, `{foo="baz",a="b"}`, true) + f(`{foo!="bar"}`, `{a="b",foo="baz"}`, true) + f(`{foo!="bar"}`, `{a="b",foo="baz",b="c"}`, true) + f(`{foo!="bar"}`, `{zoo="bar"}`, true) + f(`{foo!="bar"}`, `{a="b",zoo="bar"}`, true) + + // empty '=~' filter + f(`{foo=~""}`, `{}`, true) + f(`{foo=~""}`, `{foo="bar"}`, false) + f(`{foo=~""}`, `{a="b",c="d"}`, true) + f(`{foo=~".*"}`, `{}`, true) + f(`{foo=~".*"}`, `{foo="bar"}`, true) + f(`{foo=~".*"}`, `{a="b",c="d"}`, true) + + // non-empty '=~` filter + + f(`{foo=~".+"}`, `{}`, false) + f(`{foo=~".+"}`, `{foo="bar"}`, true) + f(`{foo=~".+"}`, `{a="b",c="d"}`, false) + + f(`{foo=~"bar"}`, `{foo="bar"}`, true) + f(`{foo=~"bar"}`, `{foo="barbaz"}`, false) + f(`{foo=~"bar"}`, `{foo="bazbar"}`, false) + f(`{foo=~"bar"}`, `{a="b",foo="bar"}`, true) + f(`{foo=~"bar"}`, `{foo="bar",a="b"}`, true) + f(`{foo=~"bar"}`, `{a="b",foo="bar",b="c"}`, true) + f(`{foo=~"bar"}`, `{foo="baz"}`, false) + f(`{foo=~"bar"}`, `{foo="baz",a="b"}`, false) + f(`{foo=~"bar"}`, `{zoo="bar"}`, false) + f(`{foo=~"bar"}`, `{a="b",zoo="bar"}`, false) + + f(`{foo=~".*a.+"}`, `{foo="bar"}`, true) + f(`{foo=~".*a.+"}`, `{foo="barboz"}`, true) + f(`{foo=~".*a.+"}`, `{foo="bazbor"}`, true) + f(`{foo=~".*a.+"}`, `{a="b",foo="bar"}`, true) + f(`{foo=~".*a.+"}`, `{foo="bar",a="b"}`, true) + f(`{foo=~".*a.+"}`, `{a="b",foo="bar",b="c"}`, true) + f(`{foo=~".*a.+"}`, `{foo="boz"}`, false) + f(`{foo=~".*a.+"}`, `{foo="boz",a="b"}`, false) + f(`{foo=~".*a.+"}`, `{zoo="bar"}`, false) + f(`{foo=~".*a.+"}`, `{a="b",zoo="bar"}`, false) + + // empty '!~' filter + f(`{foo!~""}`, `{}`, false) + f(`{foo!~""}`, `{foo="bar"}`, true) + f(`{foo!~""}`, `{a="b",c="d"}`, false) + f(`{foo!~".*"}`, `{}`, false) + f(`{foo!~".*"}`, `{foo="bar"}`, false) + f(`{foo!~".*"}`, `{a="b",c="d"}`, false) + + f(`{foo!~"bar"}`, `{foo="bar"}`, false) + f(`{foo!~"bar"}`, `{foo="barbaz"}`, true) + f(`{foo!~"bar"}`, `{foo="bazbar"}`, true) + f(`{foo!~"bar"}`, `{a="b",foo="bar"}`, false) + f(`{foo!~"bar"}`, `{foo="bar",a="b"}`, false) + f(`{foo!~"bar"}`, `{a="b",foo="bar",b="c"}`, false) + f(`{foo!~"bar"}`, `{foo="baz"}`, true) + f(`{foo!~"bar"}`, `{foo="baz",a="b"}`, true) + f(`{foo!~"bar"}`, `{zoo="bar"}`, true) + f(`{foo!~"bar"}`, `{a="b",zoo="bar"}`, true) + + f(`{foo!~".*a.+"}`, `{foo="bar"}`, false) + f(`{foo!~".*a.+"}`, `{foo="barboz"}`, false) + f(`{foo!~".*a.+"}`, `{foo="bazbor"}`, false) + f(`{foo!~".*a.+"}`, `{a="b",foo="bar"}`, false) + f(`{foo!~".*a.+"}`, `{foo="bar",a="b"}`, false) + f(`{foo!~".*a.+"}`, `{a="b",foo="bar",b="c"}`, false) + f(`{foo!~".*a.+"}`, `{foo="boz"}`, true) + f(`{foo!~".*a.+"}`, `{foo="boz",a="b"}`, true) + f(`{foo!~".*a.+"}`, `{zoo="bar"}`, true) + f(`{foo!~".*a.+"}`, `{a="b",zoo="bar"}`, true) + + // multiple 'and' filters + f(`{a="b",b="c"}`, `{a="b"}`, false) + f(`{a="b",b="c"}`, `{b="c",a="b"}`, true) + f(`{a="b",b="c"}`, `{x="y",b="c",a="b",d="e"}`, true) + f(`{a=~"foo.+",a!~".+bar"}`, `{a="foobar"}`, false) + f(`{a=~"foo.+",a!~".+bar"}`, `{a="foozar"}`, true) + + // multple `or` filters + f(`{a="b" or b="c"}`, `{x="y"}`, false) + f(`{a="b" or b="c"}`, `{x="y",b="c"}`, true) + f(`{a="b" or b="c"}`, `{a="b",x="y",b="c"}`, true) + f(`{a="b",b="c" or a=~"foo.+"}`, `{}`, false) + f(`{a="b",b="c" or a=~"foo.+"}`, `{x="y",a="foobar"}`, true) + f(`{a="b",b="c" or a=~"foo.+"}`, `{x="y",a="b"}`, false) + f(`{a="b",b="c" or a=~"foo.+"}`, `{x="y",b="c",a="b"}`, true) + f(`{a="b" or c=""}`, `{}`, true) + f(`{a="b" or c=""}`, `{c="x"}`, false) + f(`{a="b" or c=""}`, `{a="b"}`, true) +} + func TestNewTestStreamFilterSuccess(t *testing.T) { f := func(s, resultExpected string) { t.Helper()