diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index a0f3df56a..1df8c8d41 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -5,21 +5,148 @@ import ( "html" "strconv" "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) -type extractFormat struct { - steps []*extractFormatStep +// pipeExtract processes '| extract (field, format)' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe +type pipeExtract struct { + field string + steps []extractFormatStep - results []string + stepsStr string +} + +func (pe *pipeExtract) String() string { + return fmt.Sprintf("extract(%s, %s)", quoteTokenIfNeeded(pe.field), pe.stepsStr) +} + +func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFields.add(pe.field) + + for _, step := range pe.steps { + if step.field != "" { + unneededFields.remove(step.field) + } + } +} + +func (pe *pipeExtract) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + shards := make([]pipeExtractProcessorShard, workersCount) + for i := range shards { + shards[i] = pipeExtractProcessorShard{ + pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{ + ef: newExtractFormat(pe.steps), + }, + } + } + + pep := &pipeExtractProcessor{ + pe: pe, + stopCh: stopCh, + ppBase: ppBase, + + shards: shards, + } + return pep +} + +type pipeExtractProcessor struct { + pe *pipeExtract + stopCh <-chan struct{} + ppBase pipeProcessor + + shards []pipeExtractProcessorShard +} + +type pipeExtractProcessorShard struct { + pipeExtractProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeExtractProcessorShardNopad{})%128]byte +} + +type pipeExtractProcessorShardNopad struct { + ef *extractFormat +} + +func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pep.shards[workerID] + c := br.getColumnByName(pep.pe.field) + values := c.getValues(br) + + ef := shard.ef + for _, v := range values { + ef.apply(v) + /* for i, result := range ef.results { + rcs[i].addValue(result) + } + */ + } +} + +func (pep *pipeExtractProcessor) flush() error { + return nil +} + +type extractFormat struct { + // steps contains steps for extracting fields from string + steps []extractFormatStep + + // matches contains matches for every step in steps + matches []string + + // fields contains matches for non-empty fields + fields []extractField +} + +type extractField struct { + name string + value *string } type extractFormatStep struct { prefix string - field string + field string +} + +func newExtractFormat(steps []extractFormatStep) *extractFormat { + if len(steps) == 0 { + logger.Panicf("BUG: steps cannot be empty") + } + + matches := make([]string, len(steps)) + + var fields []extractField + for i, step := range steps { + if step.field != "" { + fields = append(fields, extractField{ + name: step.field, + value: &matches[i], + }) + } + } + if len(fields) == 0 { + logger.Panicf("BUG: fields cannot be empty") + } + + ef := &extractFormat{ + steps: steps, + matches: matches, + fields: fields, + } + return ef } func (ef *extractFormat) apply(s string) { - clear(ef.results) + clear(ef.matches) steps := ef.steps @@ -32,40 +159,38 @@ func (ef *extractFormat) apply(s string) { s = s[n+len(prefix):] } - results := ef.results - for i, step := range steps[1:] { - prefix := step.prefix - - if steps[i].field != "" { - us, nOffset, ok := tryUnquoteString(s) - if ok { - results[i] = us - s = s[nOffset:] - if !strings.HasPrefix(s, prefix) { - // Mismatch - return - } - s = s[len(prefix):] - continue - } + matches := ef.matches + for i := range steps { + nextPrefix := "" + if i + 1 < len(steps) { + nextPrefix = steps[i+1].prefix } - n := strings.Index(s, prefix) - if n < 0 { - // Mismatch - return - } - results[i] = s[:n] - s = s[n+len(prefix):] - } - - if steps[len(steps)-1].field != "" { - us, _, ok := tryUnquoteString(s) + us, nOffset, ok := tryUnquoteString(s) if ok { - s = us + // Matched quoted string + matches[i] = us + s = s[nOffset:] + if !strings.HasPrefix(s, nextPrefix) { + // Mismatch + return + } + s = s[len(nextPrefix):] + } else { + // Match unquoted string until the nextPrefix + if nextPrefix == "" { + matches[i] = s + return + } + n := strings.Index(s, nextPrefix) + if n < 0 { + // Mismatch + return + } + matches[i] = s[:n] + s = s[n+len(nextPrefix):] } } - results[len(steps)-1] = s } func tryUnquoteString(s string) (string, int, bool) { @@ -86,25 +211,10 @@ func tryUnquoteString(s string) (string, int, bool) { return us, len(qp), true } -func parseExtractFormat(s string) (*extractFormat, error) { - steps, err := parseExtractFormatSteps(s) - if err != nil { - return nil, err - } - ef := &extractFormat{ - steps: steps, +func parseExtractFormatSteps(s string) ([]extractFormatStep, error) { + var steps []extractFormatStep - results: make([]string, len(steps)), - } - return ef, nil -} - -func (efs *extractFormatStep) String() string { - return fmt.Sprintf("[prefix=%q, field=%q]", efs.prefix, efs.field) -} - -func parseExtractFormatSteps(s string) ([]*extractFormatStep, error) { - var steps []*extractFormatStep + hasNamedField := false n := strings.IndexByte(s, '<') if n < 0 { @@ -123,17 +233,20 @@ func parseExtractFormatSteps(s string) ([]*extractFormatStep, error) { if field == "_" || field == "*" { field = "" } - steps = append(steps, &extractFormatStep{ + steps = append(steps, extractFormatStep{ prefix: prefix, - field: field, + field: field, }) + if !hasNamedField && field != "" { + hasNamedField = true + } if len(s) == 0 { break } n = strings.IndexByte(s, '<') if n < 0 { - steps = append(steps, &extractFormatStep{ + steps = append(steps, extractFormatStep{ prefix: s, }) break @@ -145,7 +258,12 @@ func parseExtractFormatSteps(s string) ([]*extractFormatStep, error) { s = s[n+1:] } - for _, step := range steps { + if !hasNamedField { + return nil, fmt.Errorf("missing named fields like ") + } + + for i := range steps { + step := &steps[i] step.prefix = html.UnescapeString(step.prefix) } diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index cf0d22fea..49255dee0 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -9,41 +9,56 @@ func TestExtractFormatApply(t *testing.T) { f := func(format, s string, resultsExpected []string) { t.Helper() - ef, err := parseExtractFormat(format) + steps, err := parseExtractFormatSteps(format) if err != nil { t.Fatalf("unexpected error: %s", err) } + ef := newExtractFormat(steps) ef.apply(s) - if !reflect.DeepEqual(ef.results, resultsExpected) { - t.Fatalf("unexpected results; got %q; want %q", ef.results, resultsExpected) + if len(ef.fields) != len(resultsExpected) { + t.Fatalf("unexpected number of results; got %d; want %d", len(ef.fields), len(resultsExpected)) + } + for i, f := range ef.fields { + if v := *f.value; v != resultsExpected[i] { + t.Fatalf("unexpected value for field %q; got %q; want %q", f.name, v, resultsExpected[i]) + } } } f("", "", []string{""}) f("", "abc", []string{"abc"}) - f("bar", "", []string{"", ""}) - f("bar", "bar", []string{"", ""}) - f("bar", "bazbar", []string{"baz", ""}) - f("bar", "a bazbar xdsf", []string{"a baz", " xdsf"}) + f("bar", "", []string{""}) + f("bar", "bar", []string{""}) + f("bar", "bazbar", []string{"baz"}) + f("bar", "a bazbar xdsf", []string{"a baz"}) + f("bar<>", "a bazbar xdsf", []string{"a baz"}) + f("bar<>x", "a bazbar xdsf", []string{"a baz"}) f("foo", "", []string{""}) f("foo", "foo", []string{""}) f("foo", "a foo xdf sdf", []string{" xdf sdf"}) f("foo", "a foo foobar", []string{" foobar"}) - f("foobaz", "a foo foobar", []string{"", ""}) - f("foobaz", "a foo foobar baz", []string{" foobar ", ""}) - f("foobaz", "a foo foobar bazabc", []string{" foobar ", "abc"}) - f("ip= <> path= ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "method=GET host='abc'", "/foo/bar", "some tail here"}) - f("ip=<>", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4", " bar"}) + f("foobaz", "a foo foobar", []string{""}) + f("foobaz", "a foobaz bar", []string{""}) + f("foobaz", "a foo foobar baz", []string{" foobar "}) + f("foobaz", "a foo foobar bazabc", []string{" foobar "}) + + f("ip= <> path= ", "x=a, ip=1.2.3.4 method=GET host='abc' path=/foo/bar some tail here", []string{"1.2.3.4", "/foo/bar"}) + + // escaped format + f("ip=<>", "foo ip=<1.2.3.4> bar", []string{"1.2.3.4"}) + f("ip=<>", "foo ip= bar", []string{"foo&bar"}) // quoted fields - f(`"msg":,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar`+"\n\t", `"baz":"x"}`}) + f(`"msg":,`, `{"foo":"bar","msg":"foo,b\"ar\n\t","baz":"x"}`, []string{`foo,b"ar` + "\n\t"}) f(`foo=`, "foo=`bar baz,abc` def", []string{"bar baz,abc"}) - f(`foo= `, "foo=`bar baz,abc` def", []string{"bar baz,abc", "def"}) + f(`foo= `, "foo=`bar baz,abc` def", []string{"bar baz,abc"}) + f(``, `"foo,\"bar"`, []string{`foo,"bar`}) + f(`,"bar`, `"foo,\"bar"`, []string{`foo,"bar`}) } func TestParseExtractFormatStepsSuccess(t *testing.T) { - f := func(s string, stepsExpected []*extractFormatStep) { + f := func(s string, stepsExpected []extractFormatStep) { t.Helper() steps, err := parseExtractFormatSteps(s) @@ -55,21 +70,12 @@ func TestParseExtractFormatStepsSuccess(t *testing.T) { } } - f("", []*extractFormatStep{ + f("", []extractFormatStep{ { field: "foo", }, }) - f("<_>", []*extractFormatStep{ - {}, - }) - f("<*>", []*extractFormatStep{ - {}, - }) - f("<>", []*extractFormatStep{ - {}, - }) - f("bar", []*extractFormatStep{ + f("bar", []extractFormatStep{ { field: "foo", }, @@ -77,59 +83,59 @@ func TestParseExtractFormatStepsSuccess(t *testing.T) { prefix: "bar", }, }) - f("<>bar", []*extractFormatStep{ + f("<>bar", []extractFormatStep{ {}, { prefix: "bar", - field: "foo", + field: "foo", }, }) - f("bar", []*extractFormatStep{ + f("bar", []extractFormatStep{ { prefix: "bar", - field: "foo", + field: "foo", }, }) - f("barabc", []*extractFormatStep{ + f("barabc", []extractFormatStep{ { prefix: "bar", - field: "foo", + field: "foo", }, { prefix: "abc", }, }) - f("barabc<_>", []*extractFormatStep{ + f("barabc<_>", []extractFormatStep{ { prefix: "bar", - field: "foo", + field: "foo", }, { prefix: "abc", }, }) - f("bar", []*extractFormatStep{ + f("bar", []extractFormatStep{ { field: "foo", }, { prefix: "bar", - field: "baz", + field: "baz", }, }) - f("barbaz", []*extractFormatStep{ + f("barbaz", []extractFormatStep{ { prefix: "bar", - field: "foo", + field: "foo", }, { prefix: "baz", }, }) - f("<&gt;", []*extractFormatStep{ + f("<&gt;", []extractFormatStep{ { prefix: "<", - field: "foo", + field: "foo", }, { prefix: ">", @@ -153,8 +159,17 @@ func TestParseExtractFormatStepFailure(t *testing.T) { // zero fields f("foobar") + // Zero named fields + f("<>") + f("foo<>") + f("<>foo") + f("foo<_>bar<*>baz<>xxx") + // missing delimiter between fields f("") + f("<>") + f("<>") + f("bb<>aa") f("aa") f("aabb") diff --git a/lib/logstorage/pipe_extract_timing_test.go b/lib/logstorage/pipe_extract_timing_test.go new file mode 100644 index 000000000..ea2dea926 --- /dev/null +++ b/lib/logstorage/pipe_extract_timing_test.go @@ -0,0 +1,80 @@ +package logstorage + +import ( + "testing" +) + +func BenchmarkExtractFormatApply(b *testing.B) { + a := []string{ + `{"level":"error","ts":1716113701.63973,"caller":"gcm/export.go:498","msg":"Failed to export self-observability metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).startSelfObservability\n\tcloud/kubernetes/metrics/common/gcm/export.go:498","foo":"bar"}`, + `{"level":"error","ts":1716113370.2321634,"caller":"gcm/export.go:434","msg":"Failed to export metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).exportBuffer\n\tcloud/kubernetes/metrics/common/gcm/export.go:434\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:383\ngoogle3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).Flush\n\tcloud/kubernetes/metrics/common/gcm/export.go:365\ngoogle3/cloud/kubernetes/metrics/components/collector/adapter/adapter.(*adapter).Finalize\n\tcloud/kubernetes/metrics/components/collector/adapter/consume.go:131\ngoogle3/cloud/kubernetes/metrics/components/collector/prometheus/prometheus.(*parser).ParseText\n\tcloud/kubernetes/metrics/components/collector/prometheus/parse.go:158\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.runScrapeLoop\n\tcloud/kubernetes/metrics/components/collector/collector.go:103\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Run\n\tcloud/kubernetes/metrics/components/collector/collector.go:81\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Start.func1\n\tcloud/kubernetes/metrics/components/collector/multi_target_collector.go:45","foo":"bar"}`, + `{"level":"error","ts":1716113127.7496774,"caller":"collector/collector.go:105","msg":"Failed to process metrics","scrape_target":"http://localhost:8093/metrics","error":"failed to finalize exporting: \"2 errors occurred:\\n\\t* failed to export 1 (out of 1) batches of metrics to Cloud Monitoring\\n\\t* failed to export 1 (out of 1) batches of metrics to Cloud Monitoring\\n\\n\"","stacktrace":"google3/cloud/kubernetes/metrics/components/collector/collector.runScrapeLoop\n\tcloud/kubernetes/metrics/components/collector/collector.go:105\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Run\n\tcloud/kubernetes/metrics/components/collector/collector.go:81\ngoogle3/cloud/kubernetes/metrics/components/collector/collector.Start.func1\n\tcloud/kubernetes/metrics/components/collector/multi_target_collector.go:45","foo":"bar"}`, + `{"level":"error","ts":1716113547.6429873,"caller":"gcm/export.go:498","msg":"Failed to export self-observability metrics to Cloud Monitoring","error":"rpc error: code = PermissionDenied desc = Permission monitoring.timeSeries.create denied (or the resource may not exist).","stacktrace":"google3/cloud/kubernetes/metrics/common/gcm/gcm.(*exporter).startSelfObservability\n\tcloud/kubernetes/metrics/common/gcm/export.go:498", "foo":"bar"}`, + `{"level":"error","ts":1716113541.4445803,"caller":"periodicexporter/periodic_exporter.go:180","msg":"Failed to flush metrics to Cloud Monitoring","error":"1 error occurred:\n\t* failed to export 1 (out of 1) batches of metrics to Cloud Monitoring\n\n","stacktrace":"google3/cloud/kubernetes/metrics/common/periodicexporter/periodicexporter.(*Exporter).exportAll\n\tcloud/kubernetes/metrics/common/periodicexporter/periodic_exporter.go:180\ngoogle3/cloud/kubernetes/metrics/common/periodicexporter/periodicexporter.(*Exporter).periodicExporter\n\tcloud/kubernetes/metrics/common/periodicexporter/periodic_exporter.go:157","foo":"bar"}`, + } + + b.Run("single-small-field-at-start", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"level":""`, a) + }) + b.Run("single-small-field-at-start-unquote", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"level":`, a) + }) + b.Run("single-small-field-at-end", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"foo":""`, a) + }) + b.Run("single-small-field-at-end-unquote", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"foo":`, a) + }) + b.Run("single-medium-field", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"msg":""`, a) + }) + b.Run("single-medium-field-unquote", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"msg":`, a) + }) + b.Run("single-large-field", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"stacktrace":""`, a) + }) + b.Run("single-large-field-unquote", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"stacktrace":`, a) + }) + b.Run("two-fields", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"level":"",<_>"msg":""`, a) + }) + b.Run("two-fields-unquote", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"level":,<_>"msg":`, a) + }) + b.Run("many-fields", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"level":"","ts":"","caller":"","msg":"","error":""`, a) + }) + b.Run("many-fields-unquote", func(b *testing.B) { + benchmarkExtractFormatApply(b, `"level":,"ts":,"caller":,"msg":,"error":`, a) + }) +} + +func benchmarkExtractFormatApply(b *testing.B, format string, a []string) { + steps, err := parseExtractFormatSteps(format) + if err != nil { + b.Fatalf("unexpected error: %s", err) + } + + n := 0 + for _, s := range a { + n += len(s) + } + + b.ReportAllocs() + b.SetBytes(int64(n)) + b.RunParallel(func(pb *testing.PB) { + sink := 0 + ef := newExtractFormat(steps) + for pb.Next() { + for _, s := range a { + ef.apply(s) + for _, v := range ef.matches { + sink += len(v) + } + } + } + GlobalSink.Add(uint64(sink)) + }) +}