diff --git a/lib/logstorage/pattern.go b/lib/logstorage/pattern.go new file mode 100644 index 000000000..27b597eee --- /dev/null +++ b/lib/logstorage/pattern.go @@ -0,0 +1,185 @@ +package logstorage + +import ( + "fmt" + "html" + "strconv" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// pattern represents text pattern in the form 'some_textother_text...' +type pattern struct { + // steps contains steps for extracting fields from string + steps []patternStep + + // matches contains matches for every step in steps + matches []string + + // fields contains matches for non-empty fields + fields []patternField +} + +type patternField struct { + name string + value *string +} + +type patternStep struct { + prefix string + field string +} + +func newPattern(steps []patternStep) *pattern { + if len(steps) == 0 { + logger.Panicf("BUG: steps cannot be empty") + } + + matches := make([]string, len(steps)) + + var fields []patternField + for i, step := range steps { + if step.field != "" { + fields = append(fields, patternField{ + name: step.field, + value: &matches[i], + }) + } + } + if len(fields) == 0 { + logger.Panicf("BUG: fields cannot be empty") + } + + ef := &pattern{ + steps: steps, + matches: matches, + fields: fields, + } + return ef +} + +func (ef *pattern) apply(s string) { + clear(ef.matches) + + steps := ef.steps + + if prefix := steps[0].prefix; prefix != "" { + n := strings.Index(s, prefix) + if n < 0 { + // Mismatch + return + } + s = s[n+len(prefix):] + } + + matches := ef.matches + for i := range steps { + nextPrefix := "" + if i+1 < len(steps) { + nextPrefix = steps[i+1].prefix + } + + us, nOffset := tryUnquoteString(s) + if nOffset >= 0 { + // Matched quoted string + matches[i] = us + s = s[nOffset:] + if !strings.HasPrefix(s, nextPrefix) { + // Mismatch + return + } + s = s[len(nextPrefix):] + } else { + // Match unquoted string until the nextPrefix + if nextPrefix == "" { + matches[i] = s + return + } + n := strings.Index(s, nextPrefix) + if n < 0 { + // Mismatch + return + } + matches[i] = s[:n] + s = s[n+len(nextPrefix):] + } + } +} + +func tryUnquoteString(s string) (string, int) { + if len(s) == 0 { + return s, -1 + } + if s[0] != '"' && s[0] != '`' { + return s, -1 + } + qp, err := strconv.QuotedPrefix(s) + if err != nil { + return s, -1 + } + us, err := strconv.Unquote(qp) + if err != nil { + return s, -1 + } + return us, len(qp) +} + +func parsePatternSteps(s string) ([]patternStep, error) { + var steps []patternStep + + hasNamedField := false + + n := strings.IndexByte(s, '<') + if n < 0 { + return nil, fmt.Errorf("missing <...> fields") + } + prefix := s[:n] + s = s[n+1:] + for { + n := strings.IndexByte(s, '>') + if n < 0 { + return nil, fmt.Errorf("missing '>' for <%s", s) + } + field := s[:n] + s = s[n+1:] + + if field == "_" || field == "*" { + field = "" + } + steps = append(steps, patternStep{ + prefix: prefix, + field: field, + }) + if !hasNamedField && field != "" { + hasNamedField = true + } + if len(s) == 0 { + break + } + + n = strings.IndexByte(s, '<') + if n < 0 { + steps = append(steps, patternStep{ + prefix: s, + }) + break + } + if n == 0 { + return nil, fmt.Errorf("missing delimiter after <%s>", field) + } + prefix = s[:n] + s = s[n+1:] + } + + if !hasNamedField { + return nil, fmt.Errorf("missing named fields like ") + } + + for i := range steps { + step := &steps[i] + step.prefix = html.UnescapeString(step.prefix) + } + + return steps, nil +} diff --git a/lib/logstorage/pattern_test.go b/lib/logstorage/pattern_test.go new file mode 100644 index 000000000..2bfa2d1d6 --- /dev/null +++ b/lib/logstorage/pattern_test.go @@ -0,0 +1,179 @@ +package logstorage + +import ( + "reflect" + "testing" +) + +func TestPatternApply(t *testing.T) { + f := func(pattern, s string, resultsExpected []string) { + t.Helper() + + steps, err := parsePatternSteps(pattern) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + ef := newPattern(steps) + ef.apply(s) + + if len(ef.fields) != len(resultsExpected) { + t.Fatalf("unexpected number of results; got %d; want %d", len(ef.fields), len(resultsExpected)) + } + for i, f := range ef.fields { + if v := *f.value; v != resultsExpected[i] { + t.Fatalf("unexpected value for field %q; got %q; want %q", f.name, v, resultsExpected[i]) + } + } + } + + f("", "", []string{""}) + f("", "abc", []string{"abc"}) + f("bar", "", []string{""}) + f("bar", "bar", []string{""}) + f("bar", "bazbar", []string{"baz"}) + f("bar", "a bazbar xdsf", []string{"a baz"}) + f("bar<>", "a bazbar xdsf", []string{"a baz"}) + f("bar<>x", "a bazbar xdsf", []string{"a baz"}) + f("foo", "", []string{""}) + f("foo", "foo", []string{""}) + f("foo", "a foo xdf sdf", []string{" xdf sdf"}) + f("foo", "a foo foobar", []string{" foobar"}) + f("foobaz", "a foo foobar", []string{""}) + f("foobaz", "a foobaz bar", []string{""}) + f("foobaz", "a foo foobar baz", []string{" foobar "}) + f("foobaz", "a foo foobar bazabc", []string{" foobar "}) + + f("ip= <> path= ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "/foo/bar"}) + + // escaped pattern + f("ip=<>", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4"}) + f("ip=<>", "foo ip= bar", []string{"foo&bar"}) + + // quoted fields + f(`"msg":,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar` + "\n\t"}) + f(`foo=`, "foo=`bar baz,abc` def", []string{"bar baz,abc"}) + f(`foo= `, "foo=`bar baz,abc` def", []string{"bar baz,abc"}) + f(``, `"foo,\"bar"`, []string{`foo,"bar`}) + f(`,"bar`, `"foo,\"bar"`, []string{`foo,"bar`}) +} + +func TestParsePatternStepsSuccess(t *testing.T) { + f := func(s string, stepsExpected []patternStep) { + t.Helper() + + steps, err := parsePatternSteps(s) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", s, err) + } + if !reflect.DeepEqual(steps, stepsExpected) { + t.Fatalf("unexpected steps for [%s]; got %v; want %v", s, steps, stepsExpected) + } + } + + f("", []patternStep{ + { + field: "foo", + }, + }) + f("bar", []patternStep{ + { + field: "foo", + }, + { + prefix: "bar", + }, + }) + f("<>bar", []patternStep{ + {}, + { + prefix: "bar", + field: "foo", + }, + }) + f("bar", []patternStep{ + { + prefix: "bar", + field: "foo", + }, + }) + f("barabc", []patternStep{ + { + prefix: "bar", + field: "foo", + }, + { + prefix: "abc", + }, + }) + f("barabc<_>", []patternStep{ + { + prefix: "bar", + field: "foo", + }, + { + prefix: "abc", + }, + }) + f("bar", []patternStep{ + { + field: "foo", + }, + { + prefix: "bar", + field: "baz", + }, + }) + f("barbaz", []patternStep{ + { + prefix: "bar", + field: "foo", + }, + { + prefix: "baz", + }, + }) + f("<&gt;", []patternStep{ + { + prefix: "<", + field: "foo", + }, + { + prefix: ">", + }, + }) +} + +func TestParsePatternStepsFailure(t *testing.T) { + f := func(s string) { + t.Helper() + + _, err := parsePatternSteps(s) + if err == nil { + t.Fatalf("expecting non-nil error when parsing %q", s) + } + } + + // empty string + f("") + + // zero fields + f("foobar") + + // Zero named fields + f("<>") + f("foo<>") + f("<>foo") + f("foo<_>bar<*>baz<>xxx") + + // missing delimiter between fields + f("") + f("<>") + f("<>") + f("bb<>aa") + f("aa") + f("aabb") + + // missing > + f(""`, a) + benchmarkPatternApply(b, `"level":""`, a) }) b.Run("single-small-field-at-start-unquote", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"level":`, a) + benchmarkPatternApply(b, `"level":`, a) }) b.Run("single-small-field-at-end", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"foo":""`, a) + benchmarkPatternApply(b, `"foo":""`, a) }) b.Run("single-small-field-at-end-unquote", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"foo":`, a) + benchmarkPatternApply(b, `"foo":`, a) }) b.Run("single-medium-field", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"msg":""`, a) + benchmarkPatternApply(b, `"msg":""`, a) }) b.Run("single-medium-field-unquote", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"msg":`, a) + benchmarkPatternApply(b, `"msg":`, a) }) b.Run("single-large-field", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"stacktrace":""`, a) + benchmarkPatternApply(b, `"stacktrace":""`, a) }) b.Run("single-large-field-unquote", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"stacktrace":`, a) + benchmarkPatternApply(b, `"stacktrace":`, a) }) b.Run("two-fields", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"level":"",<_>"msg":""`, a) + benchmarkPatternApply(b, `"level":"",<_>"msg":""`, a) }) b.Run("two-fields-unquote", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"level":,<_>"msg":`, a) + benchmarkPatternApply(b, `"level":,<_>"msg":`, a) }) b.Run("many-fields", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"level":"","ts":"","caller":"","msg":"","error":""`, a) + benchmarkPatternApply(b, `"level":"","ts":"","caller":"","msg":"","error":""`, a) }) b.Run("many-fields-unquote", func(b *testing.B) { - benchmarkExtractFormatApply(b, `"level":,"ts":,"caller":,"msg":,"error":`, a) + benchmarkPatternApply(b, `"level":,"ts":,"caller":,"msg":,"error":`, a) }) } -func benchmarkExtractFormatApply(b *testing.B, pattern string, a []string) { - steps, err := parseExtractFormatSteps(pattern) +func benchmarkPatternApply(b *testing.B, pattern string, a []string) { + steps, err := parsePatternSteps(pattern) if err != nil { b.Fatalf("unexpected error: %s", err) } @@ -66,7 +66,7 @@ func benchmarkExtractFormatApply(b *testing.B, pattern string, a []string) { b.SetBytes(int64(n)) b.RunParallel(func(pb *testing.PB) { sink := 0 - ef := newExtractFormat(steps) + ef := newPattern(steps) for pb.Next() { for _, s := range a { ef.apply(s) diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 854c51138..eab780856 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -2,12 +2,7 @@ package logstorage import ( "fmt" - "html" - "strconv" - "strings" "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) // pipeExtract processes '| extract from ' pipe. @@ -15,7 +10,7 @@ import ( // See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe type pipeExtract struct { fromField string - steps []extractFormatStep + steps []patternStep pattern string } @@ -63,7 +58,7 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { shards := make([]pipeExtractProcessorShard, workersCount) for i := range shards { - ef := newExtractFormat(pe.steps) + ef := newPattern(pe.steps) rcs := make([]resultColumn, len(ef.fields)) for j := range rcs { rcs[j].name = ef.fields[j].name @@ -100,7 +95,7 @@ type pipeExtractProcessorShard struct { } type pipeExtractProcessorShardNopad struct { - ef *extractFormat + ef *pattern rcs []resultColumn } @@ -169,7 +164,7 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { if err != nil { return nil, fmt.Errorf("cannot read 'pattern': %w", err) } - steps, err := parseExtractFormatSteps(pattern) + steps, err := parsePatternSteps(pattern) if err != nil { return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", pattern, err) } @@ -181,177 +176,3 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) { } return pe, nil } - -type extractFormat struct { - // steps contains steps for extracting fields from string - steps []extractFormatStep - - // matches contains matches for every step in steps - matches []string - - // fields contains matches for non-empty fields - fields []extractField -} - -type extractField struct { - name string - value *string -} - -type extractFormatStep struct { - prefix string - field string -} - -func newExtractFormat(steps []extractFormatStep) *extractFormat { - if len(steps) == 0 { - logger.Panicf("BUG: steps cannot be empty") - } - - matches := make([]string, len(steps)) - - var fields []extractField - for i, step := range steps { - if step.field != "" { - fields = append(fields, extractField{ - name: step.field, - value: &matches[i], - }) - } - } - if len(fields) == 0 { - logger.Panicf("BUG: fields cannot be empty") - } - - ef := &extractFormat{ - steps: steps, - matches: matches, - fields: fields, - } - return ef -} - -func (ef *extractFormat) apply(s string) { - clear(ef.matches) - - steps := ef.steps - - if prefix := steps[0].prefix; prefix != "" { - n := strings.Index(s, prefix) - if n < 0 { - // Mismatch - return - } - s = s[n+len(prefix):] - } - - matches := ef.matches - for i := range steps { - nextPrefix := "" - if i+1 < len(steps) { - nextPrefix = steps[i+1].prefix - } - - us, nOffset := tryUnquoteString(s) - if nOffset >= 0 { - // Matched quoted string - matches[i] = us - s = s[nOffset:] - if !strings.HasPrefix(s, nextPrefix) { - // Mismatch - return - } - s = s[len(nextPrefix):] - } else { - // Match unquoted string until the nextPrefix - if nextPrefix == "" { - matches[i] = s - return - } - n := strings.Index(s, nextPrefix) - if n < 0 { - // Mismatch - return - } - matches[i] = s[:n] - s = s[n+len(nextPrefix):] - } - } -} - -func tryUnquoteString(s string) (string, int) { - if len(s) == 0 { - return s, -1 - } - if s[0] != '"' && s[0] != '`' { - return s, -1 - } - qp, err := strconv.QuotedPrefix(s) - if err != nil { - return s, -1 - } - us, err := strconv.Unquote(qp) - if err != nil { - return s, -1 - } - return us, len(qp) -} - -func parseExtractFormatSteps(s string) ([]extractFormatStep, error) { - var steps []extractFormatStep - - hasNamedField := false - - n := strings.IndexByte(s, '<') - if n < 0 { - return nil, fmt.Errorf("missing <...> fields") - } - prefix := s[:n] - s = s[n+1:] - for { - n := strings.IndexByte(s, '>') - if n < 0 { - return nil, fmt.Errorf("missing '>' for <%s", s) - } - field := s[:n] - s = s[n+1:] - - if field == "_" || field == "*" { - field = "" - } - steps = append(steps, extractFormatStep{ - prefix: prefix, - field: field, - }) - if !hasNamedField && field != "" { - hasNamedField = true - } - if len(s) == 0 { - break - } - - n = strings.IndexByte(s, '<') - if n < 0 { - steps = append(steps, extractFormatStep{ - prefix: s, - }) - break - } - if n == 0 { - return nil, fmt.Errorf("missing delimiter after <%s>", field) - } - prefix = s[:n] - s = s[n+1:] - } - - if !hasNamedField { - return nil, fmt.Errorf("missing named fields like ") - } - - for i := range steps { - step := &steps[i] - step.prefix = html.UnescapeString(step.prefix) - } - - return steps, nil -} diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index b8c21ff0a..940b610a9 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -1,183 +1,9 @@ package logstorage import ( - "reflect" "testing" ) -func TestExtractFormatApply(t *testing.T) { - f := func(pattern, s string, resultsExpected []string) { - t.Helper() - - steps, err := parseExtractFormatSteps(pattern) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - ef := newExtractFormat(steps) - ef.apply(s) - - if len(ef.fields) != len(resultsExpected) { - t.Fatalf("unexpected number of results; got %d; want %d", len(ef.fields), len(resultsExpected)) - } - for i, f := range ef.fields { - if v := *f.value; v != resultsExpected[i] { - t.Fatalf("unexpected value for field %q; got %q; want %q", f.name, v, resultsExpected[i]) - } - } - } - - f("", "", []string{""}) - f("", "abc", []string{"abc"}) - f("bar", "", []string{""}) - f("bar", "bar", []string{""}) - f("bar", "bazbar", []string{"baz"}) - f("bar", "a bazbar xdsf", []string{"a baz"}) - f("bar<>", "a bazbar xdsf", []string{"a baz"}) - f("bar<>x", "a bazbar xdsf", []string{"a baz"}) - f("foo", "", []string{""}) - f("foo", "foo", []string{""}) - f("foo", "a foo xdf sdf", []string{" xdf sdf"}) - f("foo", "a foo foobar", []string{" foobar"}) - f("foobaz", "a foo foobar", []string{""}) - f("foobaz", "a foobaz bar", []string{""}) - f("foobaz", "a foo foobar baz", []string{" foobar "}) - f("foobaz", "a foo foobar bazabc", []string{" foobar "}) - - f("ip= <> path= ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "/foo/bar"}) - - // escaped pattern - f("ip=<>", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4"}) - f("ip=<>", "foo ip= bar", []string{"foo&bar"}) - - // quoted fields - f(`"msg":,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar` + "\n\t"}) - f(`foo=`, "foo=`bar baz,abc` def", []string{"bar baz,abc"}) - f(`foo= `, "foo=`bar baz,abc` def", []string{"bar baz,abc"}) - f(``, `"foo,\"bar"`, []string{`foo,"bar`}) - f(`,"bar`, `"foo,\"bar"`, []string{`foo,"bar`}) -} - -func TestParseExtractFormatStepsSuccess(t *testing.T) { - f := func(s string, stepsExpected []extractFormatStep) { - t.Helper() - - steps, err := parseExtractFormatSteps(s) - if err != nil { - t.Fatalf("unexpected error when parsing %q: %s", s, err) - } - if !reflect.DeepEqual(steps, stepsExpected) { - t.Fatalf("unexpected steps for [%s]; got %v; want %v", s, steps, stepsExpected) - } - } - - f("", []extractFormatStep{ - { - field: "foo", - }, - }) - f("bar", []extractFormatStep{ - { - field: "foo", - }, - { - prefix: "bar", - }, - }) - f("<>bar", []extractFormatStep{ - {}, - { - prefix: "bar", - field: "foo", - }, - }) - f("bar", []extractFormatStep{ - { - prefix: "bar", - field: "foo", - }, - }) - f("barabc", []extractFormatStep{ - { - prefix: "bar", - field: "foo", - }, - { - prefix: "abc", - }, - }) - f("barabc<_>", []extractFormatStep{ - { - prefix: "bar", - field: "foo", - }, - { - prefix: "abc", - }, - }) - f("bar", []extractFormatStep{ - { - field: "foo", - }, - { - prefix: "bar", - field: "baz", - }, - }) - f("barbaz", []extractFormatStep{ - { - prefix: "bar", - field: "foo", - }, - { - prefix: "baz", - }, - }) - f("<&gt;", []extractFormatStep{ - { - prefix: "<", - field: "foo", - }, - { - prefix: ">", - }, - }) -} - -func TestParseExtractFormatStepFailure(t *testing.T) { - f := func(s string) { - t.Helper() - - _, err := parseExtractFormatSteps(s) - if err == nil { - t.Fatalf("expecting non-nil error when parsing %q", s) - } - } - - // empty string - f("") - - // zero fields - f("foobar") - - // Zero named fields - f("<>") - f("foo<>") - f("<>foo") - f("foo<_>bar<*>baz<>xxx") - - // missing delimiter between fields - f("") - f("<>") - f("<>") - f("bb<>aa") - f("aa") - f("aabb") - - // missing > - f("