mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
58e6cdba8b
commit
b605f1eaa4
2 changed files with 317 additions and 0 deletions
153
lib/logstorage/pipe_extract.go
Normal file
153
lib/logstorage/pipe_extract.go
Normal file
|
@ -0,0 +1,153 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"html"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type extractFormat struct {
|
||||
steps []*extractFormatStep
|
||||
|
||||
results []string
|
||||
}
|
||||
|
||||
type extractFormatStep struct {
|
||||
prefix string
|
||||
field string
|
||||
}
|
||||
|
||||
func (ef *extractFormat) apply(s string) {
|
||||
clear(ef.results)
|
||||
|
||||
steps := ef.steps
|
||||
|
||||
if prefix := steps[0].prefix; prefix != "" {
|
||||
n := strings.Index(s, prefix)
|
||||
if n < 0 {
|
||||
// Mismatch
|
||||
return
|
||||
}
|
||||
s = s[n+len(prefix):]
|
||||
}
|
||||
|
||||
results := ef.results
|
||||
for i, step := range steps[1:] {
|
||||
prefix := step.prefix
|
||||
|
||||
if steps[i].field != "" {
|
||||
us, nOffset, ok := tryUnquoteString(s)
|
||||
if ok {
|
||||
results[i] = us
|
||||
s = s[nOffset:]
|
||||
if !strings.HasPrefix(s, prefix) {
|
||||
// Mismatch
|
||||
return
|
||||
}
|
||||
s = s[len(prefix):]
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
n := strings.Index(s, prefix)
|
||||
if n < 0 {
|
||||
// Mismatch
|
||||
return
|
||||
}
|
||||
results[i] = s[:n]
|
||||
s = s[n+len(prefix):]
|
||||
}
|
||||
|
||||
if steps[len(steps)-1].field != "" {
|
||||
us, _, ok := tryUnquoteString(s)
|
||||
if ok {
|
||||
s = us
|
||||
}
|
||||
}
|
||||
results[len(steps)-1] = s
|
||||
}
|
||||
|
||||
func tryUnquoteString(s string) (string, int, bool) {
|
||||
if len(s) == 0 {
|
||||
return s, 0, false
|
||||
}
|
||||
if s[0] != '"' && s[0] != '`' {
|
||||
return s, 0, false
|
||||
}
|
||||
qp, err := strconv.QuotedPrefix(s)
|
||||
if err != nil {
|
||||
return s, 0, false
|
||||
}
|
||||
us, err := strconv.Unquote(qp)
|
||||
if err != nil {
|
||||
return s, 0, false
|
||||
}
|
||||
return us, len(qp), true
|
||||
}
|
||||
|
||||
func parseExtractFormat(s string) (*extractFormat, error) {
|
||||
steps, err := parseExtractFormatSteps(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ef := &extractFormat{
|
||||
steps: steps,
|
||||
|
||||
results: make([]string, len(steps)),
|
||||
}
|
||||
return ef, nil
|
||||
}
|
||||
|
||||
func (efs *extractFormatStep) String() string {
|
||||
return fmt.Sprintf("[prefix=%q, field=%q]", efs.prefix, efs.field)
|
||||
}
|
||||
|
||||
func parseExtractFormatSteps(s string) ([]*extractFormatStep, error) {
|
||||
var steps []*extractFormatStep
|
||||
|
||||
n := strings.IndexByte(s, '<')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("missing <...> fields")
|
||||
}
|
||||
prefix := s[:n]
|
||||
s = s[n+1:]
|
||||
for {
|
||||
n := strings.IndexByte(s, '>')
|
||||
if n < 0 {
|
||||
return nil, fmt.Errorf("missing '>' for <%s", s)
|
||||
}
|
||||
field := s[:n]
|
||||
s = s[n+1:]
|
||||
|
||||
if field == "_" || field == "*" {
|
||||
field = ""
|
||||
}
|
||||
steps = append(steps, &extractFormatStep{
|
||||
prefix: prefix,
|
||||
field: field,
|
||||
})
|
||||
if len(s) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
n = strings.IndexByte(s, '<')
|
||||
if n < 0 {
|
||||
steps = append(steps, &extractFormatStep{
|
||||
prefix: s,
|
||||
})
|
||||
break
|
||||
}
|
||||
if n == 0 {
|
||||
return nil, fmt.Errorf("missing delimiter after <%s>", field)
|
||||
}
|
||||
prefix = s[:n]
|
||||
s = s[n+1:]
|
||||
}
|
||||
|
||||
for _, step := range steps {
|
||||
step.prefix = html.UnescapeString(step.prefix)
|
||||
}
|
||||
|
||||
return steps, nil
|
||||
}
|
164
lib/logstorage/pipe_extract_test.go
Normal file
164
lib/logstorage/pipe_extract_test.go
Normal file
|
@ -0,0 +1,164 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestExtractFormatApply(t *testing.T) {
|
||||
f := func(format, s string, resultsExpected []string) {
|
||||
t.Helper()
|
||||
|
||||
ef, err := parseExtractFormat(format)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
ef.apply(s)
|
||||
|
||||
if !reflect.DeepEqual(ef.results, resultsExpected) {
|
||||
t.Fatalf("unexpected results; got %q; want %q", ef.results, resultsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("<foo>", "", []string{""})
|
||||
f("<foo>", "abc", []string{"abc"})
|
||||
f("<foo>bar", "", []string{"", ""})
|
||||
f("<foo>bar", "bar", []string{"", ""})
|
||||
f("<foo>bar", "bazbar", []string{"baz", ""})
|
||||
f("<foo>bar", "a bazbar xdsf", []string{"a baz", " xdsf"})
|
||||
f("foo<bar>", "", []string{""})
|
||||
f("foo<bar>", "foo", []string{""})
|
||||
f("foo<bar>", "a foo xdf sdf", []string{" xdf sdf"})
|
||||
f("foo<bar>", "a foo foobar", []string{" foobar"})
|
||||
f("foo<bar>baz", "a foo foobar", []string{"", ""})
|
||||
f("foo<bar>baz", "a foo foobar baz", []string{" foobar ", ""})
|
||||
f("foo<bar>baz", "a foo foobar bazabc", []string{" foobar ", "abc"})
|
||||
f("ip=<ip> <> path=<path> ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "method=GET host='abc'", "/foo/bar", "some tail here"})
|
||||
f("ip=<<ip>>", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4", " bar"})
|
||||
|
||||
// quoted fields
|
||||
f(`"msg":<msg>,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar`+"\n\t", `"baz":"x"}`})
|
||||
f(`foo=<bar>`, "foo=`bar baz,abc` def", []string{"bar baz,abc"})
|
||||
f(`foo=<bar> `, "foo=`bar baz,abc` def", []string{"bar baz,abc", "def"})
|
||||
}
|
||||
|
||||
func TestParseExtractFormatStepsSuccess(t *testing.T) {
|
||||
f := func(s string, stepsExpected []*extractFormatStep) {
|
||||
t.Helper()
|
||||
|
||||
steps, err := parseExtractFormatSteps(s)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error when parsing %q: %s", s, err)
|
||||
}
|
||||
if !reflect.DeepEqual(steps, stepsExpected) {
|
||||
t.Fatalf("unexpected steps for [%s]; got %v; want %v", s, steps, stepsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("<foo>", []*extractFormatStep{
|
||||
{
|
||||
field: "foo",
|
||||
},
|
||||
})
|
||||
f("<_>", []*extractFormatStep{
|
||||
{},
|
||||
})
|
||||
f("<*>", []*extractFormatStep{
|
||||
{},
|
||||
})
|
||||
f("<>", []*extractFormatStep{
|
||||
{},
|
||||
})
|
||||
f("<foo>bar", []*extractFormatStep{
|
||||
{
|
||||
field: "foo",
|
||||
},
|
||||
{
|
||||
prefix: "bar",
|
||||
},
|
||||
})
|
||||
f("<>bar<foo>", []*extractFormatStep{
|
||||
{},
|
||||
{
|
||||
prefix: "bar",
|
||||
field: "foo",
|
||||
},
|
||||
})
|
||||
f("bar<foo>", []*extractFormatStep{
|
||||
{
|
||||
prefix: "bar",
|
||||
field: "foo",
|
||||
},
|
||||
})
|
||||
f("bar<foo>abc", []*extractFormatStep{
|
||||
{
|
||||
prefix: "bar",
|
||||
field: "foo",
|
||||
},
|
||||
{
|
||||
prefix: "abc",
|
||||
},
|
||||
})
|
||||
f("bar<foo>abc<_>", []*extractFormatStep{
|
||||
{
|
||||
prefix: "bar",
|
||||
field: "foo",
|
||||
},
|
||||
{
|
||||
prefix: "abc",
|
||||
},
|
||||
})
|
||||
f("<foo>bar<baz>", []*extractFormatStep{
|
||||
{
|
||||
field: "foo",
|
||||
},
|
||||
{
|
||||
prefix: "bar",
|
||||
field: "baz",
|
||||
},
|
||||
})
|
||||
f("bar<foo>baz", []*extractFormatStep{
|
||||
{
|
||||
prefix: "bar",
|
||||
field: "foo",
|
||||
},
|
||||
{
|
||||
prefix: "baz",
|
||||
},
|
||||
})
|
||||
f("<<foo>&gt;", []*extractFormatStep{
|
||||
{
|
||||
prefix: "<",
|
||||
field: "foo",
|
||||
},
|
||||
{
|
||||
prefix: ">",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestParseExtractFormatStepFailure(t *testing.T) {
|
||||
f := func(s string) {
|
||||
t.Helper()
|
||||
|
||||
_, err := parseExtractFormatSteps(s)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error when parsing %q", s)
|
||||
}
|
||||
}
|
||||
|
||||
// empty string
|
||||
f("")
|
||||
|
||||
// zero fields
|
||||
f("foobar")
|
||||
|
||||
// missing delimiter between fields
|
||||
f("<foo><bar>")
|
||||
f("aa<foo><bar>")
|
||||
f("aa<foo><bar>bb")
|
||||
|
||||
// missing >
|
||||
f("<foo")
|
||||
f("foo<bar")
|
||||
}
|
Loading…
Reference in a new issue