mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
00294e5d0d
commit
ef2df6889e
5 changed files with 384 additions and 373 deletions
185
lib/logstorage/pattern.go
Normal file
185
lib/logstorage/pattern.go
Normal file
|
@ -0,0 +1,185 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"html"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
// pattern represents text pattern in the form 'some_text<some_field>other_text...'
|
||||||
|
type pattern struct {
|
||||||
|
// steps contains steps for extracting fields from string
|
||||||
|
steps []patternStep
|
||||||
|
|
||||||
|
// matches contains matches for every step in steps
|
||||||
|
matches []string
|
||||||
|
|
||||||
|
// fields contains matches for non-empty fields
|
||||||
|
fields []patternField
|
||||||
|
}
|
||||||
|
|
||||||
|
type patternField struct {
|
||||||
|
name string
|
||||||
|
value *string
|
||||||
|
}
|
||||||
|
|
||||||
|
type patternStep struct {
|
||||||
|
prefix string
|
||||||
|
field string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPattern(steps []patternStep) *pattern {
|
||||||
|
if len(steps) == 0 {
|
||||||
|
logger.Panicf("BUG: steps cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
matches := make([]string, len(steps))
|
||||||
|
|
||||||
|
var fields []patternField
|
||||||
|
for i, step := range steps {
|
||||||
|
if step.field != "" {
|
||||||
|
fields = append(fields, patternField{
|
||||||
|
name: step.field,
|
||||||
|
value: &matches[i],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(fields) == 0 {
|
||||||
|
logger.Panicf("BUG: fields cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
ef := &pattern{
|
||||||
|
steps: steps,
|
||||||
|
matches: matches,
|
||||||
|
fields: fields,
|
||||||
|
}
|
||||||
|
return ef
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ef *pattern) apply(s string) {
|
||||||
|
clear(ef.matches)
|
||||||
|
|
||||||
|
steps := ef.steps
|
||||||
|
|
||||||
|
if prefix := steps[0].prefix; prefix != "" {
|
||||||
|
n := strings.Index(s, prefix)
|
||||||
|
if n < 0 {
|
||||||
|
// Mismatch
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s = s[n+len(prefix):]
|
||||||
|
}
|
||||||
|
|
||||||
|
matches := ef.matches
|
||||||
|
for i := range steps {
|
||||||
|
nextPrefix := ""
|
||||||
|
if i+1 < len(steps) {
|
||||||
|
nextPrefix = steps[i+1].prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
us, nOffset := tryUnquoteString(s)
|
||||||
|
if nOffset >= 0 {
|
||||||
|
// Matched quoted string
|
||||||
|
matches[i] = us
|
||||||
|
s = s[nOffset:]
|
||||||
|
if !strings.HasPrefix(s, nextPrefix) {
|
||||||
|
// Mismatch
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s = s[len(nextPrefix):]
|
||||||
|
} else {
|
||||||
|
// Match unquoted string until the nextPrefix
|
||||||
|
if nextPrefix == "" {
|
||||||
|
matches[i] = s
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n := strings.Index(s, nextPrefix)
|
||||||
|
if n < 0 {
|
||||||
|
// Mismatch
|
||||||
|
return
|
||||||
|
}
|
||||||
|
matches[i] = s[:n]
|
||||||
|
s = s[n+len(nextPrefix):]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func tryUnquoteString(s string) (string, int) {
|
||||||
|
if len(s) == 0 {
|
||||||
|
return s, -1
|
||||||
|
}
|
||||||
|
if s[0] != '"' && s[0] != '`' {
|
||||||
|
return s, -1
|
||||||
|
}
|
||||||
|
qp, err := strconv.QuotedPrefix(s)
|
||||||
|
if err != nil {
|
||||||
|
return s, -1
|
||||||
|
}
|
||||||
|
us, err := strconv.Unquote(qp)
|
||||||
|
if err != nil {
|
||||||
|
return s, -1
|
||||||
|
}
|
||||||
|
return us, len(qp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parsePatternSteps(s string) ([]patternStep, error) {
|
||||||
|
var steps []patternStep
|
||||||
|
|
||||||
|
hasNamedField := false
|
||||||
|
|
||||||
|
n := strings.IndexByte(s, '<')
|
||||||
|
if n < 0 {
|
||||||
|
return nil, fmt.Errorf("missing <...> fields")
|
||||||
|
}
|
||||||
|
prefix := s[:n]
|
||||||
|
s = s[n+1:]
|
||||||
|
for {
|
||||||
|
n := strings.IndexByte(s, '>')
|
||||||
|
if n < 0 {
|
||||||
|
return nil, fmt.Errorf("missing '>' for <%s", s)
|
||||||
|
}
|
||||||
|
field := s[:n]
|
||||||
|
s = s[n+1:]
|
||||||
|
|
||||||
|
if field == "_" || field == "*" {
|
||||||
|
field = ""
|
||||||
|
}
|
||||||
|
steps = append(steps, patternStep{
|
||||||
|
prefix: prefix,
|
||||||
|
field: field,
|
||||||
|
})
|
||||||
|
if !hasNamedField && field != "" {
|
||||||
|
hasNamedField = true
|
||||||
|
}
|
||||||
|
if len(s) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
n = strings.IndexByte(s, '<')
|
||||||
|
if n < 0 {
|
||||||
|
steps = append(steps, patternStep{
|
||||||
|
prefix: s,
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
return nil, fmt.Errorf("missing delimiter after <%s>", field)
|
||||||
|
}
|
||||||
|
prefix = s[:n]
|
||||||
|
s = s[n+1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasNamedField {
|
||||||
|
return nil, fmt.Errorf("missing named fields like <name>")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range steps {
|
||||||
|
step := &steps[i]
|
||||||
|
step.prefix = html.UnescapeString(step.prefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
return steps, nil
|
||||||
|
}
|
179
lib/logstorage/pattern_test.go
Normal file
179
lib/logstorage/pattern_test.go
Normal file
|
@ -0,0 +1,179 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPatternApply(t *testing.T) {
|
||||||
|
f := func(pattern, s string, resultsExpected []string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
steps, err := parsePatternSteps(pattern)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %s", err)
|
||||||
|
}
|
||||||
|
ef := newPattern(steps)
|
||||||
|
ef.apply(s)
|
||||||
|
|
||||||
|
if len(ef.fields) != len(resultsExpected) {
|
||||||
|
t.Fatalf("unexpected number of results; got %d; want %d", len(ef.fields), len(resultsExpected))
|
||||||
|
}
|
||||||
|
for i, f := range ef.fields {
|
||||||
|
if v := *f.value; v != resultsExpected[i] {
|
||||||
|
t.Fatalf("unexpected value for field %q; got %q; want %q", f.name, v, resultsExpected[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f("<foo>", "", []string{""})
|
||||||
|
f("<foo>", "abc", []string{"abc"})
|
||||||
|
f("<foo>bar", "", []string{""})
|
||||||
|
f("<foo>bar", "bar", []string{""})
|
||||||
|
f("<foo>bar", "bazbar", []string{"baz"})
|
||||||
|
f("<foo>bar", "a bazbar xdsf", []string{"a baz"})
|
||||||
|
f("<foo>bar<>", "a bazbar xdsf", []string{"a baz"})
|
||||||
|
f("<foo>bar<>x", "a bazbar xdsf", []string{"a baz"})
|
||||||
|
f("foo<bar>", "", []string{""})
|
||||||
|
f("foo<bar>", "foo", []string{""})
|
||||||
|
f("foo<bar>", "a foo xdf sdf", []string{" xdf sdf"})
|
||||||
|
f("foo<bar>", "a foo foobar", []string{" foobar"})
|
||||||
|
f("foo<bar>baz", "a foo foobar", []string{""})
|
||||||
|
f("foo<bar>baz", "a foobaz bar", []string{""})
|
||||||
|
f("foo<bar>baz", "a foo foobar baz", []string{" foobar "})
|
||||||
|
f("foo<bar>baz", "a foo foobar bazabc", []string{" foobar "})
|
||||||
|
|
||||||
|
f("ip=<ip> <> path=<path> ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "/foo/bar"})
|
||||||
|
|
||||||
|
// escaped pattern
|
||||||
|
f("ip=<<ip>>", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4"})
|
||||||
|
f("ip=<<ip>>", "foo ip=<foo&bar> bar", []string{"foo&bar"})
|
||||||
|
|
||||||
|
// quoted fields
|
||||||
|
f(`"msg":<msg>,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar` + "\n\t"})
|
||||||
|
f(`foo=<bar>`, "foo=`bar baz,abc` def", []string{"bar baz,abc"})
|
||||||
|
f(`foo=<bar> `, "foo=`bar baz,abc` def", []string{"bar baz,abc"})
|
||||||
|
f(`<foo>`, `"foo,\"bar"`, []string{`foo,"bar`})
|
||||||
|
f(`<foo>,"bar`, `"foo,\"bar"`, []string{`foo,"bar`})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePatternStepsSuccess(t *testing.T) {
|
||||||
|
f := func(s string, stepsExpected []patternStep) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
steps, err := parsePatternSteps(s)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(steps, stepsExpected) {
|
||||||
|
t.Fatalf("unexpected steps for [%s]; got %v; want %v", s, steps, stepsExpected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f("<foo>", []patternStep{
|
||||||
|
{
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("<foo>bar", []patternStep{
|
||||||
|
{
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "bar",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("<>bar<foo>", []patternStep{
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
prefix: "bar",
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("bar<foo>", []patternStep{
|
||||||
|
{
|
||||||
|
prefix: "bar",
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("bar<foo>abc", []patternStep{
|
||||||
|
{
|
||||||
|
prefix: "bar",
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "abc",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("bar<foo>abc<_>", []patternStep{
|
||||||
|
{
|
||||||
|
prefix: "bar",
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "abc",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("<foo>bar<baz>", []patternStep{
|
||||||
|
{
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "bar",
|
||||||
|
field: "baz",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("bar<foo>baz", []patternStep{
|
||||||
|
{
|
||||||
|
prefix: "bar",
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: "baz",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
f("<<foo>&gt;", []patternStep{
|
||||||
|
{
|
||||||
|
prefix: "<",
|
||||||
|
field: "foo",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: ">",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePatternStepsFailure(t *testing.T) {
|
||||||
|
f := func(s string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
_, err := parsePatternSteps(s)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expecting non-nil error when parsing %q", s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// empty string
|
||||||
|
f("")
|
||||||
|
|
||||||
|
// zero fields
|
||||||
|
f("foobar")
|
||||||
|
|
||||||
|
// Zero named fields
|
||||||
|
f("<>")
|
||||||
|
f("foo<>")
|
||||||
|
f("<>foo")
|
||||||
|
f("foo<_>bar<*>baz<>xxx")
|
||||||
|
|
||||||
|
// missing delimiter between fields
|
||||||
|
f("<foo><bar>")
|
||||||
|
f("<><bar>")
|
||||||
|
f("<foo><>")
|
||||||
|
f("bb<foo><><bar>aa")
|
||||||
|
f("aa<foo><bar>")
|
||||||
|
f("aa<foo><bar>bb")
|
||||||
|
|
||||||
|
// missing >
|
||||||
|
f("<foo")
|
||||||
|
f("foo<bar")
|
||||||
|
}
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkExtractFormatApply(b *testing.B) {
|
func BenchmarkPatternApply(b *testing.B) {
|
||||||
a := []string{
|
a := []string{
|
||||||
`{"level":"error","ts":1716113701.63973,"caller":"gcm/export.go:498","msg":"Failed to export self-observability metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).startSelfObservability\n\tcloud/kubernetes/metrics/common/gcm/export.go:498","foo":"bar"}`,
|
`{"level":"error","ts":1716113701.63973,"caller":"gcm/export.go:498","msg":"Failed to export self-observability metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).startSelfObservability\n\tcloud/kubernetes/metrics/common/gcm/export.go:498","foo":"bar"}`,
|
||||||
`{"level":"error","ts":1716113370.2321634,"caller":"gcm/export.go:434","msg":"Failed to export metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).exportBuffer\n\tcloud/kubernetes/metrics/common/gcm/export.go:434\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:383\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).Flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:365\ngoogle3/cloud/kubernetes/metrics/components/collector/adapter/adapter.(*adapter).Finalize\n\tcloud/kubernetes/metrics/components/collector/adapter/consume.go:131\ngoogle3/cloud/kubernetes/metrics/components/collector/prometheus/prometheus.(*parser).ParseText\n\tcloud/kubernetes/metrics/components/collector/prometheus/parse.go:158\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.runScrapeLoop\n\tcloud/kubernetes/metrics/components/collector/collector.go:103\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Run\n\tcloud/kubernetes/metrics/components/collector/collector.go:81\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Start.func1\n\tcloud/kubernetes/metrics/components/collector/multi_target_collector.go:45","foo":"bar"}`,
|
`{"level":"error","ts":1716113370.2321634,"caller":"gcm/export.go:434","msg":"Failed to export metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).exportBuffer\n\tcloud/kubernetes/metrics/common/gcm/export.go:434\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:383\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).Flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:365\ngoogle3/cloud/kubernetes/metrics/components/collector/adapter/adapter.(*adapter).Finalize\n\tcloud/kubernetes/metrics/components/collector/adapter/consume.go:131\ngoogle3/cloud/kubernetes/metrics/components/collector/prometheus/prometheus.(*parser).ParseText\n\tcloud/kubernetes/metrics/components/collector/prometheus/parse.go:158\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.runScrapeLoop\n\tcloud/kubernetes/metrics/components/collector/collector.go:103\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Run\n\tcloud/kubernetes/metrics/components/collector/collector.go:81\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Start.func1\n\tcloud/kubernetes/metrics/components/collector/multi_target_collector.go:45","foo":"bar"}`,
|
||||||
|
@ -14,45 +14,45 @@ func BenchmarkExtractFormatApply(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
b.Run("single-small-field-at-start", func(b *testing.B) {
|
b.Run("single-small-field-at-start", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"level":"<level>"`, a)
|
benchmarkPatternApply(b, `"level":"<level>"`, a)
|
||||||
})
|
})
|
||||||
b.Run("single-small-field-at-start-unquote", func(b *testing.B) {
|
b.Run("single-small-field-at-start-unquote", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"level":<level>`, a)
|
benchmarkPatternApply(b, `"level":<level>`, a)
|
||||||
})
|
})
|
||||||
b.Run("single-small-field-at-end", func(b *testing.B) {
|
b.Run("single-small-field-at-end", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"foo":"<foo>"`, a)
|
benchmarkPatternApply(b, `"foo":"<foo>"`, a)
|
||||||
})
|
})
|
||||||
b.Run("single-small-field-at-end-unquote", func(b *testing.B) {
|
b.Run("single-small-field-at-end-unquote", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"foo":<foo>`, a)
|
benchmarkPatternApply(b, `"foo":<foo>`, a)
|
||||||
})
|
})
|
||||||
b.Run("single-medium-field", func(b *testing.B) {
|
b.Run("single-medium-field", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"msg":"<message>"`, a)
|
benchmarkPatternApply(b, `"msg":"<message>"`, a)
|
||||||
})
|
})
|
||||||
b.Run("single-medium-field-unquote", func(b *testing.B) {
|
b.Run("single-medium-field-unquote", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"msg":<message>`, a)
|
benchmarkPatternApply(b, `"msg":<message>`, a)
|
||||||
})
|
})
|
||||||
b.Run("single-large-field", func(b *testing.B) {
|
b.Run("single-large-field", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"stacktrace":"<stacktrace>"`, a)
|
benchmarkPatternApply(b, `"stacktrace":"<stacktrace>"`, a)
|
||||||
})
|
})
|
||||||
b.Run("single-large-field-unquote", func(b *testing.B) {
|
b.Run("single-large-field-unquote", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"stacktrace":<stacktrace>`, a)
|
benchmarkPatternApply(b, `"stacktrace":<stacktrace>`, a)
|
||||||
})
|
})
|
||||||
b.Run("two-fields", func(b *testing.B) {
|
b.Run("two-fields", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"level":"<level>",<_>"msg":"<msg>"`, a)
|
benchmarkPatternApply(b, `"level":"<level>",<_>"msg":"<msg>"`, a)
|
||||||
})
|
})
|
||||||
b.Run("two-fields-unquote", func(b *testing.B) {
|
b.Run("two-fields-unquote", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"level":<level>,<_>"msg":<msg>`, a)
|
benchmarkPatternApply(b, `"level":<level>,<_>"msg":<msg>`, a)
|
||||||
})
|
})
|
||||||
b.Run("many-fields", func(b *testing.B) {
|
b.Run("many-fields", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"level":"<level>","ts":"<ts>","caller":"<caller>","msg":"<msg>","error":"<error>"`, a)
|
benchmarkPatternApply(b, `"level":"<level>","ts":"<ts>","caller":"<caller>","msg":"<msg>","error":"<error>"`, a)
|
||||||
})
|
})
|
||||||
b.Run("many-fields-unquote", func(b *testing.B) {
|
b.Run("many-fields-unquote", func(b *testing.B) {
|
||||||
benchmarkExtractFormatApply(b, `"level":<level>,"ts":<ts>,"caller":<caller>,"msg":<msg>,"error":<error>`, a)
|
benchmarkPatternApply(b, `"level":<level>,"ts":<ts>,"caller":<caller>,"msg":<msg>,"error":<error>`, a)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkExtractFormatApply(b *testing.B, pattern string, a []string) {
|
func benchmarkPatternApply(b *testing.B, pattern string, a []string) {
|
||||||
steps, err := parseExtractFormatSteps(pattern)
|
steps, err := parsePatternSteps(pattern)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("unexpected error: %s", err)
|
b.Fatalf("unexpected error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ func benchmarkExtractFormatApply(b *testing.B, pattern string, a []string) {
|
||||||
b.SetBytes(int64(n))
|
b.SetBytes(int64(n))
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
sink := 0
|
sink := 0
|
||||||
ef := newExtractFormat(steps)
|
ef := newPattern(steps)
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
for _, s := range a {
|
for _, s := range a {
|
||||||
ef.apply(s)
|
ef.apply(s)
|
|
@ -2,12 +2,7 @@ package logstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"html"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// pipeExtract processes '| extract from <field> <pattern>' pipe.
|
// pipeExtract processes '| extract from <field> <pattern>' pipe.
|
||||||
|
@ -15,7 +10,7 @@ import (
|
||||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe
|
||||||
type pipeExtract struct {
|
type pipeExtract struct {
|
||||||
fromField string
|
fromField string
|
||||||
steps []extractFormatStep
|
steps []patternStep
|
||||||
|
|
||||||
pattern string
|
pattern string
|
||||||
}
|
}
|
||||||
|
@ -63,7 +58,7 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
|
||||||
func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
|
func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
|
||||||
shards := make([]pipeExtractProcessorShard, workersCount)
|
shards := make([]pipeExtractProcessorShard, workersCount)
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
ef := newExtractFormat(pe.steps)
|
ef := newPattern(pe.steps)
|
||||||
rcs := make([]resultColumn, len(ef.fields))
|
rcs := make([]resultColumn, len(ef.fields))
|
||||||
for j := range rcs {
|
for j := range rcs {
|
||||||
rcs[j].name = ef.fields[j].name
|
rcs[j].name = ef.fields[j].name
|
||||||
|
@ -100,7 +95,7 @@ type pipeExtractProcessorShard struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type pipeExtractProcessorShardNopad struct {
|
type pipeExtractProcessorShardNopad struct {
|
||||||
ef *extractFormat
|
ef *pattern
|
||||||
|
|
||||||
rcs []resultColumn
|
rcs []resultColumn
|
||||||
}
|
}
|
||||||
|
@ -169,7 +164,7 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot read 'pattern': %w", err)
|
return nil, fmt.Errorf("cannot read 'pattern': %w", err)
|
||||||
}
|
}
|
||||||
steps, err := parseExtractFormatSteps(pattern)
|
steps, err := parsePatternSteps(pattern)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", pattern, err)
|
return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", pattern, err)
|
||||||
}
|
}
|
||||||
|
@ -181,177 +176,3 @@ func parsePipeExtract(lex *lexer) (*pipeExtract, error) {
|
||||||
}
|
}
|
||||||
return pe, nil
|
return pe, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type extractFormat struct {
|
|
||||||
// steps contains steps for extracting fields from string
|
|
||||||
steps []extractFormatStep
|
|
||||||
|
|
||||||
// matches contains matches for every step in steps
|
|
||||||
matches []string
|
|
||||||
|
|
||||||
// fields contains matches for non-empty fields
|
|
||||||
fields []extractField
|
|
||||||
}
|
|
||||||
|
|
||||||
type extractField struct {
|
|
||||||
name string
|
|
||||||
value *string
|
|
||||||
}
|
|
||||||
|
|
||||||
type extractFormatStep struct {
|
|
||||||
prefix string
|
|
||||||
field string
|
|
||||||
}
|
|
||||||
|
|
||||||
func newExtractFormat(steps []extractFormatStep) *extractFormat {
|
|
||||||
if len(steps) == 0 {
|
|
||||||
logger.Panicf("BUG: steps cannot be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
matches := make([]string, len(steps))
|
|
||||||
|
|
||||||
var fields []extractField
|
|
||||||
for i, step := range steps {
|
|
||||||
if step.field != "" {
|
|
||||||
fields = append(fields, extractField{
|
|
||||||
name: step.field,
|
|
||||||
value: &matches[i],
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(fields) == 0 {
|
|
||||||
logger.Panicf("BUG: fields cannot be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
ef := &extractFormat{
|
|
||||||
steps: steps,
|
|
||||||
matches: matches,
|
|
||||||
fields: fields,
|
|
||||||
}
|
|
||||||
return ef
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ef *extractFormat) apply(s string) {
|
|
||||||
clear(ef.matches)
|
|
||||||
|
|
||||||
steps := ef.steps
|
|
||||||
|
|
||||||
if prefix := steps[0].prefix; prefix != "" {
|
|
||||||
n := strings.Index(s, prefix)
|
|
||||||
if n < 0 {
|
|
||||||
// Mismatch
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s = s[n+len(prefix):]
|
|
||||||
}
|
|
||||||
|
|
||||||
matches := ef.matches
|
|
||||||
for i := range steps {
|
|
||||||
nextPrefix := ""
|
|
||||||
if i+1 < len(steps) {
|
|
||||||
nextPrefix = steps[i+1].prefix
|
|
||||||
}
|
|
||||||
|
|
||||||
us, nOffset := tryUnquoteString(s)
|
|
||||||
if nOffset >= 0 {
|
|
||||||
// Matched quoted string
|
|
||||||
matches[i] = us
|
|
||||||
s = s[nOffset:]
|
|
||||||
if !strings.HasPrefix(s, nextPrefix) {
|
|
||||||
// Mismatch
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s = s[len(nextPrefix):]
|
|
||||||
} else {
|
|
||||||
// Match unquoted string until the nextPrefix
|
|
||||||
if nextPrefix == "" {
|
|
||||||
matches[i] = s
|
|
||||||
return
|
|
||||||
}
|
|
||||||
n := strings.Index(s, nextPrefix)
|
|
||||||
if n < 0 {
|
|
||||||
// Mismatch
|
|
||||||
return
|
|
||||||
}
|
|
||||||
matches[i] = s[:n]
|
|
||||||
s = s[n+len(nextPrefix):]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func tryUnquoteString(s string) (string, int) {
|
|
||||||
if len(s) == 0 {
|
|
||||||
return s, -1
|
|
||||||
}
|
|
||||||
if s[0] != '"' && s[0] != '`' {
|
|
||||||
return s, -1
|
|
||||||
}
|
|
||||||
qp, err := strconv.QuotedPrefix(s)
|
|
||||||
if err != nil {
|
|
||||||
return s, -1
|
|
||||||
}
|
|
||||||
us, err := strconv.Unquote(qp)
|
|
||||||
if err != nil {
|
|
||||||
return s, -1
|
|
||||||
}
|
|
||||||
return us, len(qp)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseExtractFormatSteps(s string) ([]extractFormatStep, error) {
|
|
||||||
var steps []extractFormatStep
|
|
||||||
|
|
||||||
hasNamedField := false
|
|
||||||
|
|
||||||
n := strings.IndexByte(s, '<')
|
|
||||||
if n < 0 {
|
|
||||||
return nil, fmt.Errorf("missing <...> fields")
|
|
||||||
}
|
|
||||||
prefix := s[:n]
|
|
||||||
s = s[n+1:]
|
|
||||||
for {
|
|
||||||
n := strings.IndexByte(s, '>')
|
|
||||||
if n < 0 {
|
|
||||||
return nil, fmt.Errorf("missing '>' for <%s", s)
|
|
||||||
}
|
|
||||||
field := s[:n]
|
|
||||||
s = s[n+1:]
|
|
||||||
|
|
||||||
if field == "_" || field == "*" {
|
|
||||||
field = ""
|
|
||||||
}
|
|
||||||
steps = append(steps, extractFormatStep{
|
|
||||||
prefix: prefix,
|
|
||||||
field: field,
|
|
||||||
})
|
|
||||||
if !hasNamedField && field != "" {
|
|
||||||
hasNamedField = true
|
|
||||||
}
|
|
||||||
if len(s) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
n = strings.IndexByte(s, '<')
|
|
||||||
if n < 0 {
|
|
||||||
steps = append(steps, extractFormatStep{
|
|
||||||
prefix: s,
|
|
||||||
})
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if n == 0 {
|
|
||||||
return nil, fmt.Errorf("missing delimiter after <%s>", field)
|
|
||||||
}
|
|
||||||
prefix = s[:n]
|
|
||||||
s = s[n+1:]
|
|
||||||
}
|
|
||||||
|
|
||||||
if !hasNamedField {
|
|
||||||
return nil, fmt.Errorf("missing named fields like <name>")
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range steps {
|
|
||||||
step := &steps[i]
|
|
||||||
step.prefix = html.UnescapeString(step.prefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
return steps, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,183 +1,9 @@
|
||||||
package logstorage
|
package logstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestExtractFormatApply(t *testing.T) {
|
|
||||||
f := func(pattern, s string, resultsExpected []string) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
steps, err := parseExtractFormatSteps(pattern)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error: %s", err)
|
|
||||||
}
|
|
||||||
ef := newExtractFormat(steps)
|
|
||||||
ef.apply(s)
|
|
||||||
|
|
||||||
if len(ef.fields) != len(resultsExpected) {
|
|
||||||
t.Fatalf("unexpected number of results; got %d; want %d", len(ef.fields), len(resultsExpected))
|
|
||||||
}
|
|
||||||
for i, f := range ef.fields {
|
|
||||||
if v := *f.value; v != resultsExpected[i] {
|
|
||||||
t.Fatalf("unexpected value for field %q; got %q; want %q", f.name, v, resultsExpected[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
f("<foo>", "", []string{""})
|
|
||||||
f("<foo>", "abc", []string{"abc"})
|
|
||||||
f("<foo>bar", "", []string{""})
|
|
||||||
f("<foo>bar", "bar", []string{""})
|
|
||||||
f("<foo>bar", "bazbar", []string{"baz"})
|
|
||||||
f("<foo>bar", "a bazbar xdsf", []string{"a baz"})
|
|
||||||
f("<foo>bar<>", "a bazbar xdsf", []string{"a baz"})
|
|
||||||
f("<foo>bar<>x", "a bazbar xdsf", []string{"a baz"})
|
|
||||||
f("foo<bar>", "", []string{""})
|
|
||||||
f("foo<bar>", "foo", []string{""})
|
|
||||||
f("foo<bar>", "a foo xdf sdf", []string{" xdf sdf"})
|
|
||||||
f("foo<bar>", "a foo foobar", []string{" foobar"})
|
|
||||||
f("foo<bar>baz", "a foo foobar", []string{""})
|
|
||||||
f("foo<bar>baz", "a foobaz bar", []string{""})
|
|
||||||
f("foo<bar>baz", "a foo foobar baz", []string{" foobar "})
|
|
||||||
f("foo<bar>baz", "a foo foobar bazabc", []string{" foobar "})
|
|
||||||
|
|
||||||
f("ip=<ip> <> path=<path> ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "/foo/bar"})
|
|
||||||
|
|
||||||
// escaped pattern
|
|
||||||
f("ip=<<ip>>", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4"})
|
|
||||||
f("ip=<<ip>>", "foo ip=<foo&bar> bar", []string{"foo&bar"})
|
|
||||||
|
|
||||||
// quoted fields
|
|
||||||
f(`"msg":<msg>,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar` + "\n\t"})
|
|
||||||
f(`foo=<bar>`, "foo=`bar baz,abc` def", []string{"bar baz,abc"})
|
|
||||||
f(`foo=<bar> `, "foo=`bar baz,abc` def", []string{"bar baz,abc"})
|
|
||||||
f(`<foo>`, `"foo,\"bar"`, []string{`foo,"bar`})
|
|
||||||
f(`<foo>,"bar`, `"foo,\"bar"`, []string{`foo,"bar`})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestParseExtractFormatStepsSuccess(t *testing.T) {
|
|
||||||
f := func(s string, stepsExpected []extractFormatStep) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
steps, err := parseExtractFormatSteps(s)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(steps, stepsExpected) {
|
|
||||||
t.Fatalf("unexpected steps for [%s]; got %v; want %v", s, steps, stepsExpected)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
f("<foo>", []extractFormatStep{
|
|
||||||
{
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("<foo>bar", []extractFormatStep{
|
|
||||||
{
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
prefix: "bar",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("<>bar<foo>", []extractFormatStep{
|
|
||||||
{},
|
|
||||||
{
|
|
||||||
prefix: "bar",
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("bar<foo>", []extractFormatStep{
|
|
||||||
{
|
|
||||||
prefix: "bar",
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("bar<foo>abc", []extractFormatStep{
|
|
||||||
{
|
|
||||||
prefix: "bar",
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
prefix: "abc",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("bar<foo>abc<_>", []extractFormatStep{
|
|
||||||
{
|
|
||||||
prefix: "bar",
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
prefix: "abc",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("<foo>bar<baz>", []extractFormatStep{
|
|
||||||
{
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
prefix: "bar",
|
|
||||||
field: "baz",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("bar<foo>baz", []extractFormatStep{
|
|
||||||
{
|
|
||||||
prefix: "bar",
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
prefix: "baz",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
f("<<foo>&gt;", []extractFormatStep{
|
|
||||||
{
|
|
||||||
prefix: "<",
|
|
||||||
field: "foo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
prefix: ">",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestParseExtractFormatStepFailure(t *testing.T) {
|
|
||||||
f := func(s string) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
_, err := parseExtractFormatSteps(s)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("expecting non-nil error when parsing %q", s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// empty string
|
|
||||||
f("")
|
|
||||||
|
|
||||||
// zero fields
|
|
||||||
f("foobar")
|
|
||||||
|
|
||||||
// Zero named fields
|
|
||||||
f("<>")
|
|
||||||
f("foo<>")
|
|
||||||
f("<>foo")
|
|
||||||
f("foo<_>bar<*>baz<>xxx")
|
|
||||||
|
|
||||||
// missing delimiter between fields
|
|
||||||
f("<foo><bar>")
|
|
||||||
f("<><bar>")
|
|
||||||
f("<foo><>")
|
|
||||||
f("bb<foo><><bar>aa")
|
|
||||||
f("aa<foo><bar>")
|
|
||||||
f("aa<foo><bar>bb")
|
|
||||||
|
|
||||||
// missing >
|
|
||||||
f("<foo")
|
|
||||||
f("foo<bar")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPipeExtractUpdateNeededFields(t *testing.T) {
|
func TestPipeExtractUpdateNeededFields(t *testing.T) {
|
||||||
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
Loading…
Reference in a new issue