diff --git a/lib/logstorage/logfmt_parser.go b/lib/logstorage/logfmt_parser.go new file mode 100644 index 000000000..5eede8bbb --- /dev/null +++ b/lib/logstorage/logfmt_parser.go @@ -0,0 +1,77 @@ +package logstorage + +import ( + "strings" + "sync" +) + +type logfmtParser struct { + fields []Field +} + +func (p *logfmtParser) reset() { + clear(p.fields) + p.fields = p.fields[:0] +} + +func (p *logfmtParser) addField(name, value string) { + p.fields = append(p.fields, Field{ + Name: name, + Value: value, + }) +} + +func (p *logfmtParser) parse(s string) { + for { + // Search for field name + n := strings.IndexByte(s, '=') + if n < 0 { + // field name couldn't be read + return + } + + name := strings.TrimSpace(s[:n]) + s = s[n+1:] + if len(s) == 0 { + p.addField(name, "") + return + } + + // Search for field value + value, nOffset := tryUnquoteString(s) + if nOffset >= 0 { + p.addField(name, value) + s = s[nOffset:] + if len(s) == 0 { + return + } + if s[0] != ' ' { + return + } + s = s[1:] + } else { + n := strings.IndexByte(s, ' ') + if n < 0 { + p.addField(name, s) + return + } + p.addField(name, s[:n]) + s = s[n+1:] + } + } +} + +func getLogfmtParser() *logfmtParser { + v := logfmtParserPool.Get() + if v == nil { + return &logfmtParser{} + } + return v.(*logfmtParser) +} + +func putLogfmtParser(p *logfmtParser) { + p.reset() + logfmtParserPool.Put(p) +} + +var logfmtParserPool sync.Pool diff --git a/lib/logstorage/logfmt_parser_test.go b/lib/logstorage/logfmt_parser_test.go new file mode 100644 index 000000000..60161271e --- /dev/null +++ b/lib/logstorage/logfmt_parser_test.go @@ -0,0 +1,30 @@ +package logstorage + +import ( + "testing" +) + +func TestLogfmtParser(t *testing.T) { + f := func(s, resultExpected string) { + t.Helper() + + p := getLogfmtParser() + defer putLogfmtParser(p) + + p.parse(s) + result := marshalFieldsToJSON(nil, p.fields) + if string(result) != resultExpected { + t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected) + } + } + + f(``, `{}`) + f(`foo=bar`, `{"foo":"bar"}`) + f(`foo="bar=baz x=y"`, `{"foo":"bar=baz x=y"}`) + f(`foo=`, `{"foo":""}`) + f(`foo=bar baz="x y" a=b`, `{"foo":"bar","baz":"x y","a":"b"}`) + + // errors + f(`foo`, `{}`) + f(`foo=bar baz=x z qwe`, `{"foo":"bar","baz":"x"}`) +} diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index dd6cdab3e..790950c45 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -44,15 +44,44 @@ func (pu *pipeUnpackJSON) String() string { } func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { + updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields) +} + +func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff *ifFilter, neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { - unneededFields.remove(pu.fromField) - if pu.iff != nil { - unneededFields.removeFields(pu.iff.neededFields) + unneededFieldsOrig := unneededFields.clone() + unneededFieldsCount := 0 + if len(outFields) > 0 { + for _, f := range outFields { + if unneededFieldsOrig.contains(f) { + unneededFieldsCount++ + } + unneededFields.add(f) + } + } + if len(outFields) == 0 || unneededFieldsCount < len(outFields) { + unneededFields.remove(fromField) + if iff != nil { + unneededFields.removeFields(iff.neededFields) + } } } else { - neededFields.add(pu.fromField) - if pu.iff != nil { - neededFields.addFields(pu.iff.neededFields) + neededFieldsOrig := neededFields.clone() + needFromField := len(outFields) == 0 + if len(outFields) > 0 { + needFromField = false + for _, f := range outFields { + if neededFieldsOrig.contains(f) { + needFromField = true + } + neededFields.remove(f) + } + } + if needFromField { + neededFields.add(fromField) + if iff != nil { + neededFields.addFields(iff.neededFields) + } } } } @@ -64,19 +93,29 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, return } p := GetJSONParser() - if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil { + err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)) + if err != nil { + for _, fieldName := range pu.fields { + uctx.addField(fieldName, "") + } + } else { if len(pu.fields) == 0 { for _, f := range p.Fields { uctx.addField(f.Name, f.Value) } } else { for _, fieldName := range pu.fields { + addedField := false for _, f := range p.Fields { if f.Name == fieldName { uctx.addField(f.Name, f.Value) + addedField = true break } } + if !addedField { + uctx.addField(fieldName, "") + } } } } diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index d0c06d01d..53650f071 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -64,6 +64,7 @@ func TestPipeUnpackJSON(t *testing.T) { { {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, {"foo", "bar"}, + {"b", ""}, }, }) @@ -465,24 +466,34 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { // all the needed fields f("unpack_json from x", "*", "", "*", "") f("unpack_json if (y:z) from x", "*", "", "*", "") + f("unpack_json if (y:z) from x fields (a, b)", "*", "", "*", "a,b") // all the needed fields, unneeded fields do not intersect with src f("unpack_json from x", "*", "f1,f2", "*", "f1,f2") f("unpack_json if (y:z) from x", "*", "f1,f2", "*", "f1,f2") f("unpack_json if (f1:z) from x", "*", "f1,f2", "*", "f2") + f("unpack_json if (y:z) from x fields (f3)", "*", "f1,f2", "*", "f1,f2,f3") + f("unpack_json if (y:z) from x fields (f1)", "*", "f1,f2", "*", "f1,f2") // all the needed fields, unneeded fields intersect with src f("unpack_json from x", "*", "f2,x", "*", "f2") f("unpack_json if (y:z) from x", "*", "f2,x", "*", "f2") f("unpack_json if (f2:z) from x", "*", "f1,f2,x", "*", "f1") + f("unpack_json if (f2:z) from x fields (f3)", "*", "f1,f2,x", "*", "f1,f3") // needed fields do not intersect with src f("unpack_json from x", "f1,f2", "", "f1,f2,x", "") f("unpack_json if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "") f("unpack_json if (f1:z) from x", "f1,f2", "", "f1,f2,x", "") + f("unpack_json if (y:z) from x fields (f3)", "f1,f2", "", "f1,f2", "") + f("unpack_json if (y:z) from x fields (f2)", "f1,f2", "", "f1,x,y", "") + f("unpack_json if (f2:z) from x fields (f2)", "f1,f2", "", "f1,f2,x", "") // needed fields intersect with src f("unpack_json from x", "f2,x", "", "f2,x", "") f("unpack_json if (y:z) from x", "f2,x", "", "f2,x,y", "") f("unpack_json if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "") + f("unpack_json if (y:z) from x fields (f1)", "f2,x", "", "f2,x", "") + f("unpack_json if (y:z) from x fields (f2)", "f2,x", "", "x,y", "") + f("unpack_json if (y:z) from x fields (x)", "f2,x", "", "f2,x,y", "") } diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index 984df2498..bc294a9b5 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -3,7 +3,6 @@ package logstorage import ( "fmt" "slices" - "strings" ) // pipeUnpackLogfmt processes '| unpack_logfmt ...' pipe. @@ -43,63 +42,35 @@ func (pu *pipeUnpackLogfmt) String() string { } func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) { - if neededFields.contains("*") { - unneededFields.remove(pu.fromField) - if pu.iff != nil { - unneededFields.removeFields(pu.iff.neededFields) - } - } else { - neededFields.add(pu.fromField) - if pu.iff != nil { - neededFields.addFields(pu.iff.neededFields) - } - } + updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields) } func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - addField := func(uctx *fieldsUnpackerContext, name, value string) { - if len(pu.fields) == 0 || slices.Contains(pu.fields, name) { - uctx.addField(name, value) - } - } - unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) { - for { - // Search for field name - n := strings.IndexByte(s, '=') - if n < 0 { - // field name couldn't be read - return - } + p := getLogfmtParser() - name := strings.TrimSpace(s[:n]) - s = s[n+1:] - if len(s) == 0 { - addField(uctx, name, "") + p.parse(s) + if len(pu.fields) == 0 { + for _, f := range p.fields { + uctx.addField(f.Name, f.Value) } - - // Search for field value - value, nOffset := tryUnquoteString(s) - if nOffset >= 0 { - addField(uctx, name, value) - s = s[nOffset:] - if len(s) == 0 { - return + } else { + for _, fieldName := range pu.fields { + addedField := false + for _, f := range p.fields { + if f.Name == fieldName { + uctx.addField(f.Name, f.Value) + addedField = true + break + } } - if s[0] != ' ' { - return + if !addedField { + uctx.addField(fieldName, "") } - s = s[1:] - } else { - n := strings.IndexByte(s, ' ') - if n < 0 { - addField(uctx, name, s) - return - } - addField(uctx, name, s[:n]) - s = s[n+1:] } } + + putLogfmtParser(p) } return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) diff --git a/lib/logstorage/pipe_unpack_logfmt_test.go b/lib/logstorage/pipe_unpack_logfmt_test.go index ff1711d69..a5c7426f8 100644 --- a/lib/logstorage/pipe_unpack_logfmt_test.go +++ b/lib/logstorage/pipe_unpack_logfmt_test.go @@ -63,6 +63,7 @@ func TestPipeUnpackLogfmt(t *testing.T) { {"_msg", `foo=bar baz="x y=z" a=b`}, {"foo", "bar"}, {"a", "b"}, + {"b", ""}, }, })