This commit is contained in:
Aliaksandr Valialkin 2024-05-22 20:53:31 +02:00
parent ff260230ea
commit c09d4e17ff
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 184 additions and 55 deletions

View file

@ -0,0 +1,77 @@
package logstorage
import (
"strings"
"sync"
)
type logfmtParser struct {
fields []Field
}
func (p *logfmtParser) reset() {
clear(p.fields)
p.fields = p.fields[:0]
}
func (p *logfmtParser) addField(name, value string) {
p.fields = append(p.fields, Field{
Name: name,
Value: value,
})
}
func (p *logfmtParser) parse(s string) {
for {
// Search for field name
n := strings.IndexByte(s, '=')
if n < 0 {
// field name couldn't be read
return
}
name := strings.TrimSpace(s[:n])
s = s[n+1:]
if len(s) == 0 {
p.addField(name, "")
return
}
// Search for field value
value, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
p.addField(name, value)
s = s[nOffset:]
if len(s) == 0 {
return
}
if s[0] != ' ' {
return
}
s = s[1:]
} else {
n := strings.IndexByte(s, ' ')
if n < 0 {
p.addField(name, s)
return
}
p.addField(name, s[:n])
s = s[n+1:]
}
}
}
func getLogfmtParser() *logfmtParser {
v := logfmtParserPool.Get()
if v == nil {
return &logfmtParser{}
}
return v.(*logfmtParser)
}
func putLogfmtParser(p *logfmtParser) {
p.reset()
logfmtParserPool.Put(p)
}
var logfmtParserPool sync.Pool

View file

@ -0,0 +1,30 @@
package logstorage
import (
"testing"
)
func TestLogfmtParser(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
p := getLogfmtParser()
defer putLogfmtParser(p)
p.parse(s)
result := marshalFieldsToJSON(nil, p.fields)
if string(result) != resultExpected {
t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected)
}
}
f(``, `{}`)
f(`foo=bar`, `{"foo":"bar"}`)
f(`foo="bar=baz x=y"`, `{"foo":"bar=baz x=y"}`)
f(`foo=`, `{"foo":""}`)
f(`foo=bar baz="x y" a=b`, `{"foo":"bar","baz":"x y","a":"b"}`)
// errors
f(`foo`, `{}`)
f(`foo=bar baz=x z qwe`, `{"foo":"bar","baz":"x"}`)
}

View file

@ -44,15 +44,44 @@ func (pu *pipeUnpackJSON) String() string {
}
func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields)
}
func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, iff *ifFilter, neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFields.remove(pu.fromField)
if pu.iff != nil {
unneededFields.removeFields(pu.iff.neededFields)
unneededFieldsOrig := unneededFields.clone()
unneededFieldsCount := 0
if len(outFields) > 0 {
for _, f := range outFields {
if unneededFieldsOrig.contains(f) {
unneededFieldsCount++
}
unneededFields.add(f)
}
}
if len(outFields) == 0 || unneededFieldsCount < len(outFields) {
unneededFields.remove(fromField)
if iff != nil {
unneededFields.removeFields(iff.neededFields)
}
}
} else {
neededFields.add(pu.fromField)
if pu.iff != nil {
neededFields.addFields(pu.iff.neededFields)
neededFieldsOrig := neededFields.clone()
needFromField := len(outFields) == 0
if len(outFields) > 0 {
needFromField = false
for _, f := range outFields {
if neededFieldsOrig.contains(f) {
needFromField = true
}
neededFields.remove(f)
}
}
if needFromField {
neededFields.add(fromField)
if iff != nil {
neededFields.addFields(iff.neededFields)
}
}
}
}
@ -64,19 +93,29 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{},
return
}
p := GetJSONParser()
if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil {
err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s))
if err != nil {
for _, fieldName := range pu.fields {
uctx.addField(fieldName, "")
}
} else {
if len(pu.fields) == 0 {
for _, f := range p.Fields {
uctx.addField(f.Name, f.Value)
}
} else {
for _, fieldName := range pu.fields {
addedField := false
for _, f := range p.Fields {
if f.Name == fieldName {
uctx.addField(f.Name, f.Value)
addedField = true
break
}
}
if !addedField {
uctx.addField(fieldName, "")
}
}
}
}

View file

@ -64,6 +64,7 @@ func TestPipeUnpackJSON(t *testing.T) {
{
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
{"foo", "bar"},
{"b", ""},
},
})
@ -465,24 +466,34 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) {
// all the needed fields
f("unpack_json from x", "*", "", "*", "")
f("unpack_json if (y:z) from x", "*", "", "*", "")
f("unpack_json if (y:z) from x fields (a, b)", "*", "", "*", "a,b")
// all the needed fields, unneeded fields do not intersect with src
f("unpack_json from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_json if (y:z) from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_json if (f1:z) from x", "*", "f1,f2", "*", "f2")
f("unpack_json if (y:z) from x fields (f3)", "*", "f1,f2", "*", "f1,f2,f3")
f("unpack_json if (y:z) from x fields (f1)", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with src
f("unpack_json from x", "*", "f2,x", "*", "f2")
f("unpack_json if (y:z) from x", "*", "f2,x", "*", "f2")
f("unpack_json if (f2:z) from x", "*", "f1,f2,x", "*", "f1")
f("unpack_json if (f2:z) from x fields (f3)", "*", "f1,f2,x", "*", "f1,f3")
// needed fields do not intersect with src
f("unpack_json from x", "f1,f2", "", "f1,f2,x", "")
f("unpack_json if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "")
f("unpack_json if (f1:z) from x", "f1,f2", "", "f1,f2,x", "")
f("unpack_json if (y:z) from x fields (f3)", "f1,f2", "", "f1,f2", "")
f("unpack_json if (y:z) from x fields (f2)", "f1,f2", "", "f1,x,y", "")
f("unpack_json if (f2:z) from x fields (f2)", "f1,f2", "", "f1,f2,x", "")
// needed fields intersect with src
f("unpack_json from x", "f2,x", "", "f2,x", "")
f("unpack_json if (y:z) from x", "f2,x", "", "f2,x,y", "")
f("unpack_json if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "")
f("unpack_json if (y:z) from x fields (f1)", "f2,x", "", "f2,x", "")
f("unpack_json if (y:z) from x fields (f2)", "f2,x", "", "x,y", "")
f("unpack_json if (y:z) from x fields (x)", "f2,x", "", "f2,x,y", "")
}

View file

@ -3,7 +3,6 @@ package logstorage
import (
"fmt"
"slices"
"strings"
)
// pipeUnpackLogfmt processes '| unpack_logfmt ...' pipe.
@ -43,63 +42,35 @@ func (pu *pipeUnpackLogfmt) String() string {
}
func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFields.remove(pu.fromField)
if pu.iff != nil {
unneededFields.removeFields(pu.iff.neededFields)
}
} else {
neededFields.add(pu.fromField)
if pu.iff != nil {
neededFields.addFields(pu.iff.neededFields)
}
}
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.iff, neededFields, unneededFields)
}
func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
addField := func(uctx *fieldsUnpackerContext, name, value string) {
if len(pu.fields) == 0 || slices.Contains(pu.fields, name) {
uctx.addField(name, value)
}
}
unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) {
for {
// Search for field name
n := strings.IndexByte(s, '=')
if n < 0 {
// field name couldn't be read
return
}
p := getLogfmtParser()
name := strings.TrimSpace(s[:n])
s = s[n+1:]
if len(s) == 0 {
addField(uctx, name, "")
p.parse(s)
if len(pu.fields) == 0 {
for _, f := range p.fields {
uctx.addField(f.Name, f.Value)
}
// Search for field value
value, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
addField(uctx, name, value)
s = s[nOffset:]
if len(s) == 0 {
return
} else {
for _, fieldName := range pu.fields {
addedField := false
for _, f := range p.fields {
if f.Name == fieldName {
uctx.addField(f.Name, f.Value)
addedField = true
break
}
}
if s[0] != ' ' {
return
if !addedField {
uctx.addField(fieldName, "")
}
s = s[1:]
} else {
n := strings.IndexByte(s, ' ')
if n < 0 {
addField(uctx, name, s)
return
}
addField(uctx, name, s[:n])
s = s[n+1:]
}
}
putLogfmtParser(p)
}
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff)

View file

@ -63,6 +63,7 @@ func TestPipeUnpackLogfmt(t *testing.T) {
{"_msg", `foo=bar baz="x y=z" a=b`},
{"foo", "bar"},
{"a", "b"},
{"b", ""},
},
})