This commit is contained in:
Aliaksandr Valialkin 2024-05-19 12:56:08 +02:00
parent b605f1eaa4
commit 5e20724e1a
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 309 additions and 96 deletions

View file

@ -5,21 +5,148 @@ import (
"html" "html"
"strconv" "strconv"
"strings" "strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
type extractFormat struct { // pipeExtract processes '| extract (field, format)' pipe.
steps []*extractFormatStep //
// See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe
type pipeExtract struct {
field string
steps []extractFormatStep
results []string stepsStr string
}
func (pe *pipeExtract) String() string {
return fmt.Sprintf("extract(%s, %s)", quoteTokenIfNeeded(pe.field), pe.stepsStr)
}
func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.add(pe.field)
for _, step := range pe.steps {
if step.field != "" {
unneededFields.remove(step.field)
}
}
}
func (pe *pipeExtract) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeExtractProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeExtractProcessorShard{
pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{
ef: newExtractFormat(pe.steps),
},
}
}
pep := &pipeExtractProcessor{
pe: pe,
stopCh: stopCh,
ppBase: ppBase,
shards: shards,
}
return pep
}
type pipeExtractProcessor struct {
pe *pipeExtract
stopCh <-chan struct{}
ppBase pipeProcessor
shards []pipeExtractProcessorShard
}
type pipeExtractProcessorShard struct {
pipeExtractProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeExtractProcessorShardNopad{})%128]byte
}
type pipeExtractProcessorShardNopad struct {
ef *extractFormat
}
func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pep.shards[workerID]
c := br.getColumnByName(pep.pe.field)
values := c.getValues(br)
ef := shard.ef
for _, v := range values {
ef.apply(v)
/* for i, result := range ef.results {
rcs[i].addValue(result)
}
*/
}
}
func (pep *pipeExtractProcessor) flush() error {
return nil
}
type extractFormat struct {
// steps contains steps for extracting fields from string
steps []extractFormatStep
// matches contains matches for every step in steps
matches []string
// fields contains matches for non-empty fields
fields []extractField
}
type extractField struct {
name string
value *string
} }
type extractFormatStep struct { type extractFormatStep struct {
prefix string prefix string
field string field string
}
func newExtractFormat(steps []extractFormatStep) *extractFormat {
if len(steps) == 0 {
logger.Panicf("BUG: steps cannot be empty")
}
matches := make([]string, len(steps))
var fields []extractField
for i, step := range steps {
if step.field != "" {
fields = append(fields, extractField{
name: step.field,
value: &matches[i],
})
}
}
if len(fields) == 0 {
logger.Panicf("BUG: fields cannot be empty")
}
ef := &extractFormat{
steps: steps,
matches: matches,
fields: fields,
}
return ef
} }
func (ef *extractFormat) apply(s string) { func (ef *extractFormat) apply(s string) {
clear(ef.results) clear(ef.matches)
steps := ef.steps steps := ef.steps
@ -32,40 +159,38 @@ func (ef *extractFormat) apply(s string) {
s = s[n+len(prefix):] s = s[n+len(prefix):]
} }
results := ef.results matches := ef.matches
for i, step := range steps[1:] { for i := range steps {
prefix := step.prefix nextPrefix := ""
if i + 1 < len(steps) {
if steps[i].field != "" { nextPrefix = steps[i+1].prefix
us, nOffset, ok := tryUnquoteString(s)
if ok {
results[i] = us
s = s[nOffset:]
if !strings.HasPrefix(s, prefix) {
// Mismatch
return
}
s = s[len(prefix):]
continue
}
} }
n := strings.Index(s, prefix) us, nOffset, ok := tryUnquoteString(s)
if n < 0 {
// Mismatch
return
}
results[i] = s[:n]
s = s[n+len(prefix):]
}
if steps[len(steps)-1].field != "" {
us, _, ok := tryUnquoteString(s)
if ok { if ok {
s = us // Matched quoted string
matches[i] = us
s = s[nOffset:]
if !strings.HasPrefix(s, nextPrefix) {
// Mismatch
return
}
s = s[len(nextPrefix):]
} else {
// Match unquoted string until the nextPrefix
if nextPrefix == "" {
matches[i] = s
return
}
n := strings.Index(s, nextPrefix)
if n < 0 {
// Mismatch
return
}
matches[i] = s[:n]
s = s[n+len(nextPrefix):]
} }
} }
results[len(steps)-1] = s
} }
func tryUnquoteString(s string) (string, int, bool) { func tryUnquoteString(s string) (string, int, bool) {
@ -86,25 +211,10 @@ func tryUnquoteString(s string) (string, int, bool) {
return us, len(qp), true return us, len(qp), true
} }
func parseExtractFormat(s string) (*extractFormat, error) { func parseExtractFormatSteps(s string) ([]extractFormatStep, error) {
steps, err := parseExtractFormatSteps(s) var steps []extractFormatStep
if err != nil {
return nil, err
}
ef := &extractFormat{
steps: steps,
results: make([]string, len(steps)), hasNamedField := false
}
return ef, nil
}
func (efs *extractFormatStep) String() string {
return fmt.Sprintf("[prefix=%q, field=%q]", efs.prefix, efs.field)
}
func parseExtractFormatSteps(s string) ([]*extractFormatStep, error) {
var steps []*extractFormatStep
n := strings.IndexByte(s, '<') n := strings.IndexByte(s, '<')
if n < 0 { if n < 0 {
@ -123,17 +233,20 @@ func parseExtractFormatSteps(s string) ([]*extractFormatStep, error) {
if field == "_" || field == "*" { if field == "_" || field == "*" {
field = "" field = ""
} }
steps = append(steps, &extractFormatStep{ steps = append(steps, extractFormatStep{
prefix: prefix, prefix: prefix,
field: field, field: field,
}) })
if !hasNamedField && field != "" {
hasNamedField = true
}
if len(s) == 0 { if len(s) == 0 {
break break
} }
n = strings.IndexByte(s, '<') n = strings.IndexByte(s, '<')
if n < 0 { if n < 0 {
steps = append(steps, &extractFormatStep{ steps = append(steps, extractFormatStep{
prefix: s, prefix: s,
}) })
break break
@ -145,7 +258,12 @@ func parseExtractFormatSteps(s string) ([]*extractFormatStep, error) {
s = s[n+1:] s = s[n+1:]
} }
for _, step := range steps { if !hasNamedField {
return nil, fmt.Errorf("missing named fields like <name>")
}
for i := range steps {
step := &steps[i]
step.prefix = html.UnescapeString(step.prefix) step.prefix = html.UnescapeString(step.prefix)
} }

View file

@ -9,41 +9,56 @@ func TestExtractFormatApply(t *testing.T) {
f := func(format, s string, resultsExpected []string) { f := func(format, s string, resultsExpected []string) {
t.Helper() t.Helper()
ef, err := parseExtractFormat(format) steps, err := parseExtractFormatSteps(format)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
ef := newExtractFormat(steps)
ef.apply(s) ef.apply(s)
if !reflect.DeepEqual(ef.results, resultsExpected) { if len(ef.fields) != len(resultsExpected) {
t.Fatalf("unexpected results; got %q; want %q", ef.results, resultsExpected) t.Fatalf("unexpected number of results; got %d; want %d", len(ef.fields), len(resultsExpected))
}
for i, f := range ef.fields {
if v := *f.value; v != resultsExpected[i] {
t.Fatalf("unexpected value for field %q; got %q; want %q", f.name, v, resultsExpected[i])
}
} }
} }
f("<foo>", "", []string{""}) f("<foo>", "", []string{""})
f("<foo>", "abc", []string{"abc"}) f("<foo>", "abc", []string{"abc"})
f("<foo>bar", "", []string{"", ""}) f("<foo>bar", "", []string{""})
f("<foo>bar", "bar", []string{"", ""}) f("<foo>bar", "bar", []string{""})
f("<foo>bar", "bazbar", []string{"baz", ""}) f("<foo>bar", "bazbar", []string{"baz"})
f("<foo>bar", "a bazbar xdsf", []string{"a baz", " xdsf"}) f("<foo>bar", "a bazbar xdsf", []string{"a baz"})
f("<foo>bar<>", "a bazbar xdsf", []string{"a baz"})
f("<foo>bar<>x", "a bazbar xdsf", []string{"a baz"})
f("foo<bar>", "", []string{""}) f("foo<bar>", "", []string{""})
f("foo<bar>", "foo", []string{""}) f("foo<bar>", "foo", []string{""})
f("foo<bar>", "a foo xdf sdf", []string{" xdf sdf"}) f("foo<bar>", "a foo xdf sdf", []string{" xdf sdf"})
f("foo<bar>", "a foo foobar", []string{" foobar"}) f("foo<bar>", "a foo foobar", []string{" foobar"})
f("foo<bar>baz", "a foo foobar", []string{"", ""}) f("foo<bar>baz", "a foo foobar", []string{""})
f("foo<bar>baz", "a foo foobar baz", []string{" foobar ", ""}) f("foo<bar>baz", "a foobaz bar", []string{""})
f("foo<bar>baz", "a foo foobar bazabc", []string{" foobar ", "abc"}) f("foo<bar>baz", "a foo foobar baz", []string{" foobar "})
f("ip=<ip> <> path=<path> ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "method=GET host='abc'", "/foo/bar", "some tail here"}) f("foo<bar>baz", "a foo foobar bazabc", []string{" foobar "})
f("ip=&lt;<ip>&gt;", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4", " bar"})
f("ip=<ip> <> path=<path> ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "/foo/bar"})
// escaped format
f("ip=&lt;<ip>&gt;", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4"})
f("ip=&lt;<ip>&gt;", "foo ip=<foo&amp;bar> bar", []string{"foo&amp;bar"})
// quoted fields // quoted fields
f(`"msg":<msg>,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar`+"\n\t", `"baz":"x"}`}) f(`"msg":<msg>,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar` + "\n\t"})
f(`foo=<bar>`, "foo=`bar baz,abc` def", []string{"bar baz,abc"}) f(`foo=<bar>`, "foo=`bar baz,abc` def", []string{"bar baz,abc"})
f(`foo=<bar> `, "foo=`bar baz,abc` def", []string{"bar baz,abc", "def"}) f(`foo=<bar> `, "foo=`bar baz,abc` def", []string{"bar baz,abc"})
f(`<foo>`, `"foo,\"bar"`, []string{`foo,"bar`})
f(`<foo>,"bar`, `"foo,\"bar"`, []string{`foo,"bar`})
} }
func TestParseExtractFormatStepsSuccess(t *testing.T) { func TestParseExtractFormatStepsSuccess(t *testing.T) {
f := func(s string, stepsExpected []*extractFormatStep) { f := func(s string, stepsExpected []extractFormatStep) {
t.Helper() t.Helper()
steps, err := parseExtractFormatSteps(s) steps, err := parseExtractFormatSteps(s)
@ -55,21 +70,12 @@ func TestParseExtractFormatStepsSuccess(t *testing.T) {
} }
} }
f("<foo>", []*extractFormatStep{ f("<foo>", []extractFormatStep{
{ {
field: "foo", field: "foo",
}, },
}) })
f("<_>", []*extractFormatStep{ f("<foo>bar", []extractFormatStep{
{},
})
f("<*>", []*extractFormatStep{
{},
})
f("<>", []*extractFormatStep{
{},
})
f("<foo>bar", []*extractFormatStep{
{ {
field: "foo", field: "foo",
}, },
@ -77,59 +83,59 @@ func TestParseExtractFormatStepsSuccess(t *testing.T) {
prefix: "bar", prefix: "bar",
}, },
}) })
f("<>bar<foo>", []*extractFormatStep{ f("<>bar<foo>", []extractFormatStep{
{}, {},
{ {
prefix: "bar", prefix: "bar",
field: "foo", field: "foo",
}, },
}) })
f("bar<foo>", []*extractFormatStep{ f("bar<foo>", []extractFormatStep{
{ {
prefix: "bar", prefix: "bar",
field: "foo", field: "foo",
}, },
}) })
f("bar<foo>abc", []*extractFormatStep{ f("bar<foo>abc", []extractFormatStep{
{ {
prefix: "bar", prefix: "bar",
field: "foo", field: "foo",
}, },
{ {
prefix: "abc", prefix: "abc",
}, },
}) })
f("bar<foo>abc<_>", []*extractFormatStep{ f("bar<foo>abc<_>", []extractFormatStep{
{ {
prefix: "bar", prefix: "bar",
field: "foo", field: "foo",
}, },
{ {
prefix: "abc", prefix: "abc",
}, },
}) })
f("<foo>bar<baz>", []*extractFormatStep{ f("<foo>bar<baz>", []extractFormatStep{
{ {
field: "foo", field: "foo",
}, },
{ {
prefix: "bar", prefix: "bar",
field: "baz", field: "baz",
}, },
}) })
f("bar<foo>baz", []*extractFormatStep{ f("bar<foo>baz", []extractFormatStep{
{ {
prefix: "bar", prefix: "bar",
field: "foo", field: "foo",
}, },
{ {
prefix: "baz", prefix: "baz",
}, },
}) })
f("&lt;<foo>&amp;gt;", []*extractFormatStep{ f("&lt;<foo>&amp;gt;", []extractFormatStep{
{ {
prefix: "<", prefix: "<",
field: "foo", field: "foo",
}, },
{ {
prefix: "&gt;", prefix: "&gt;",
@ -153,8 +159,17 @@ func TestParseExtractFormatStepFailure(t *testing.T) {
// zero fields // zero fields
f("foobar") f("foobar")
// Zero named fields
f("<>")
f("foo<>")
f("<>foo")
f("foo<_>bar<*>baz<>xxx")
// missing delimiter between fields // missing delimiter between fields
f("<foo><bar>") f("<foo><bar>")
f("<><bar>")
f("<foo><>")
f("bb<foo><><bar>aa")
f("aa<foo><bar>") f("aa<foo><bar>")
f("aa<foo><bar>bb") f("aa<foo><bar>bb")

View file

@ -0,0 +1,80 @@
package logstorage
import (
"testing"
)
func BenchmarkExtractFormatApply(b *testing.B) {
a := []string{
`{"level":"error","ts":1716113701.63973,"caller":"gcm/export.go:498","msg":"Failed to export self-observability metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).startSelfObservability\n\tcloud/kubernetes/metrics/common/gcm/export.go:498","foo":"bar"}`,
`{"level":"error","ts":1716113370.2321634,"caller":"gcm/export.go:434","msg":"Failed to export metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).exportBuffer\n\tcloud/kubernetes/metrics/common/gcm/export.go:434\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:383\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).Flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:365\ngoogle3/cloud/kubernetes/metrics/components/collector/adapter/adapter.(*adapter).Finalize\n\tcloud/kubernetes/metrics/components/collector/adapter/consume.go:131\ngoogle3/cloud/kubernetes/metrics/components/collector/prometheus/prometheus.(*parser).ParseText\n\tcloud/kubernetes/metrics/components/collector/prometheus/parse.go:158\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.runScrapeLoop\n\tcloud/kubernetes/metrics/components/collector/collector.go:103\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Run\n\tcloud/kubernetes/metrics/components/collector/collector.go:81\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Start.func1\n\tcloud/kubernetes/metrics/components/collector/multi_target_collector.go:45","foo":"bar"}`,
`{"level":"error","ts":1716113127.7496774,"caller":"collector/collector.go:105","msg":"Failed to process metrics","scrape_target":"http://localhost:8093/metrics","error":"failed to finalize exporting: \"2 errors occurred:\\n\\t* failed to export 1 (out of 1) batches of metrics to Cloud Monitoring\\n\\t* failed to export 1 (out of 1) batches of metrics to Cloud Monitoring\\n\\n\"","stacktrace":"google3/cloud/kubernetes/metrics/components/collector/collector.runScrapeLoop\n\tcloud/kubernetes/metrics/components/collector/collector.go:105\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Run\n\tcloud/kubernetes/metrics/components/collector/collector.go:81\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Start.func1\n\tcloud/kubernetes/metrics/components/collector/multi_target_collector.go:45","foo":"bar"}`,
`{"level":"error","ts":1716113547.6429873,"caller":"gcm/export.go:498","msg":"Failed to export self-observability metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).startSelfObservability\n\tcloud/kubernetes/metrics/common/gcm/export.go:498", "foo":"bar"}`,
`{"level":"error","ts":1716113541.4445803,"caller":"periodicexporter/periodic_exporter.go:180","msg":"Failed to flush metrics to Cloud Monitoring","error":"1 error occurred:\n\t* failed to export 1 (out of 1) batches of metrics to Cloud Monitoring\n\n","stacktrace":"google3/cloud/kubernetes/metrics/common/periodicexporter/periodicexporter.(*Exporter).exportAll\n\tcloud/kubernetes/metrics/common/periodicexporter/periodic_exporter.go:180\ngoogle3/cloud/kubernetes/metrics/common/periodicexporter/periodicexporter.(*Exporter).periodicExporter\n\tcloud/kubernetes/metrics/common/periodicexporter/periodic_exporter.go:157","foo":"bar"}`,
}
b.Run("single-small-field-at-start", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"level":"<level>"`, a)
})
b.Run("single-small-field-at-start-unquote", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"level":<level>`, a)
})
b.Run("single-small-field-at-end", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"foo":"<foo>"`, a)
})
b.Run("single-small-field-at-end-unquote", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"foo":<foo>`, a)
})
b.Run("single-medium-field", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"msg":"<message>"`, a)
})
b.Run("single-medium-field-unquote", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"msg":<message>`, a)
})
b.Run("single-large-field", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"stacktrace":"<stacktrace>"`, a)
})
b.Run("single-large-field-unquote", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"stacktrace":<stacktrace>`, a)
})
b.Run("two-fields", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"level":"<level>",<_>"msg":"<msg>"`, a)
})
b.Run("two-fields-unquote", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"level":<level>,<_>"msg":<msg>`, a)
})
b.Run("many-fields", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"level":"<level>","ts":"<ts>","caller":"<caller>","msg":"<msg>","error":"<error>"`, a)
})
b.Run("many-fields-unquote", func(b *testing.B) {
benchmarkExtractFormatApply(b, `"level":<level>,"ts":<ts>,"caller":<caller>,"msg":<msg>,"error":<error>`, a)
})
}
func benchmarkExtractFormatApply(b *testing.B, format string, a []string) {
steps, err := parseExtractFormatSteps(format)
if err != nil {
b.Fatalf("unexpected error: %s", err)
}
n := 0
for _, s := range a {
n += len(s)
}
b.ReportAllocs()
b.SetBytes(int64(n))
b.RunParallel(func(pb *testing.PB) {
sink := 0
ef := newExtractFormat(steps)
for pb.Next() {
for _, s := range a {
ef.apply(s)
for _, v := range ef.matches {
sink += len(v)
}
}
}
GlobalSink.Add(uint64(sink))
})
}