diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 53b5293b5..45918d9b8 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -919,6 +919,10 @@ func getCompoundSuffix(lex *lexer, allowColon bool) string { func getCompoundToken(lex *lexer) (string, error) { stopTokens := []string{",", "(", ")", "[", "]", "|", ""} + return getCompoundTokenExt(lex, stopTokens) +} + +func getCompoundTokenExt(lex *lexer, stopTokens []string) (string, error) { if lex.isKeyword(stopTokens...) { return "", fmt.Errorf("compound token cannot start with '%s'", lex.token) } diff --git a/lib/logstorage/stream_filter.go b/lib/logstorage/stream_filter.go index fe3f80b0d..e2efe6d4c 100644 --- a/lib/logstorage/stream_filter.go +++ b/lib/logstorage/stream_filter.go @@ -86,7 +86,8 @@ func (af *andStreamFilter) String() string { type streamTagFilter struct { // tagName is the name for the tag to filter tagName string - // op is operation such as `=`, `!=`, `=~` or `!~` + + // op is operation such as `=`, `!=`, `=~`, `!~` or `:` op string // value is the value @@ -164,20 +165,23 @@ func parseAndStreamFilter(lex *lexer) (*andStreamFilter, error) { } func parseStreamTagFilter(lex *lexer) (*streamTagFilter, error) { - tagName := lex.token - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing operation in _stream filter for %q field", tagName) + // parse tagName + tagName, err := parseStreamTagName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse stream tag name: %w", err) } if !lex.isKeyword("=", "!=", "=~", "!~") { return nil, fmt.Errorf("unsupported operation %q in _steam filter for %q field; supported operations: =, !=, =~, !~", lex.token, tagName) } + + // parse op op := lex.token - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing _stream filter value for %q field", tagName) - } - value := lex.token - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing token after %q%s%q filter", tagName, op, value) + lex.nextToken() + + // parse tag value + value, err := parseStreamTagValue(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse value for tag %q: %w", tagName, err) } stf := &streamTagFilter{ tagName: tagName, @@ -194,6 +198,16 @@ func parseStreamTagFilter(lex *lexer) (*streamTagFilter, error) { return stf, nil } +func parseStreamTagName(lex *lexer) (string, error) { + stopTokens := []string{"=", "!=", "=~", "!~", ",", "{", "}", "'", `"`, "`", ""} + return getCompoundTokenExt(lex, stopTokens) +} + +func parseStreamTagValue(lex *lexer) (string, error) { + stopTokens := []string{",", "{", "}", "'", `"`, "`", ""} + return getCompoundTokenExt(lex, stopTokens) +} + func getStreamName() *streamName { v := streamNamePool.Get() if v == nil { diff --git a/lib/logstorage/stream_filter_test.go b/lib/logstorage/stream_filter_test.go index 2213059bb..fceb51492 100644 --- a/lib/logstorage/stream_filter_test.go +++ b/lib/logstorage/stream_filter_test.go @@ -163,7 +163,7 @@ func TestNewTestStreamFilterSuccess(t *testing.T) { f(`{foo="bar"}`, `{foo="bar"}`) f(`{ "foo" =~ "bar.+" , baz!="a" or x="y"}`, `{foo=~"bar.+",baz!="a" or x="y"}`) f(`{"a b"='c}"d' OR de="aaa"}`, `{"a b"="c}\"d" or de="aaa"}`) - f(`{a="b", c="d" or x="y"}`, `{a="b",c="d" or x="y"}`) + f(`{a-q:w.z="b", c="d" or 'x a'=y-z=q}`, `{"a-q:w.z"="b",c="d" or "x a"="y-z=q"}`) } func TestNewTestStreamFilterFailure(t *testing.T) {