This commit is contained in:
Aliaksandr Valialkin 2024-05-20 14:09:39 +02:00
parent ef2df6889e
commit fba053b34d
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 244 additions and 346 deletions

View file

@ -210,7 +210,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string,
} }
line = sc.Bytes() line = sc.Bytes()
p := logstorage.GetJSONParser() p := logstorage.GetJSONParser()
if err := p.ParseLogMessage(line, ""); err != nil { if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
} }

View file

@ -105,7 +105,7 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f
} }
p := logstorage.GetJSONParser() p := logstorage.GetJSONParser()
if err := p.ParseLogMessage(line, ""); err != nil { if err := p.ParseLogMessage(line); err != nil {
return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err)
} }
ts, err := extractTimestampFromFields(timeField, p.Fields) ts, err := extractTimestampFromFields(timeField, p.Fields)

View file

@ -33,16 +33,10 @@ type JSONParser struct {
} }
func (p *JSONParser) reset() { func (p *JSONParser) reset() {
p.resetNobuf()
p.buf = p.buf[:0]
}
func (p *JSONParser) resetNobuf() {
clear(p.Fields) clear(p.Fields)
p.Fields = p.Fields[:0] p.Fields = p.Fields[:0]
p.prefixBuf = p.prefixBuf[:0] p.buf = p.buf[:0]
} }
// GetJSONParser returns JSONParser ready to parse JSON lines. // GetJSONParser returns JSONParser ready to parse JSON lines.
@ -66,36 +60,20 @@ func PutJSONParser(p *JSONParser) {
var parserPool sync.Pool var parserPool sync.Pool
// ParseLogMessageNoResetBuf parses the given JSON log message msg into p.Fields.
//
// It adds the given prefix to all the parsed field names.
//
// The p.Fields remains valid until the next call to PutJSONParser().
func (p *JSONParser) ParseLogMessageNoResetBuf(msg, prefix string) error {
p.resetNobuf()
return p.parseLogMessage(msg, prefix)
}
// ParseLogMessage parses the given JSON log message msg into p.Fields. // ParseLogMessage parses the given JSON log message msg into p.Fields.
// //
// It adds the given prefix to all the parsed field names.
//
// The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser(). // The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().
func (p *JSONParser) ParseLogMessage(msg []byte, prefix string) error { func (p *JSONParser) ParseLogMessage(msg []byte) error {
msgStr := bytesutil.ToUnsafeString(msg)
p.reset() p.reset()
return p.parseLogMessage(msgStr, prefix)
}
func (p *JSONParser) parseLogMessage(msg, prefix string) error { msgStr := bytesutil.ToUnsafeString(msg)
v, err := p.p.Parse(msg) v, err := p.p.Parse(msgStr)
if err != nil { if err != nil {
return fmt.Errorf("cannot parse json: %w", err) return fmt.Errorf("cannot parse json: %w", err)
} }
if t := v.Type(); t != fastjson.TypeObject { if t := v.Type(); t != fastjson.TypeObject {
return fmt.Errorf("expecting json dictionary; got %s", t) return fmt.Errorf("expecting json dictionary; got %s", t)
} }
p.prefixBuf = append(p.prefixBuf[:0], prefix...)
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v) p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v)
return nil return nil
} }

View file

@ -10,7 +10,7 @@ func TestJSONParserFailure(t *testing.T) {
t.Helper() t.Helper()
p := GetJSONParser() p := GetJSONParser()
err := p.ParseLogMessage([]byte(data), "") err := p.ParseLogMessage([]byte(data))
if err == nil { if err == nil {
t.Fatalf("expecting non-nil error") t.Fatalf("expecting non-nil error")
} }
@ -23,11 +23,11 @@ func TestJSONParserFailure(t *testing.T) {
} }
func TestJSONParserSuccess(t *testing.T) { func TestJSONParserSuccess(t *testing.T) {
f := func(data, prefix string, fieldsExpected []Field) { f := func(data string, fieldsExpected []Field) {
t.Helper() t.Helper()
p := GetJSONParser() p := GetJSONParser()
err := p.ParseLogMessage([]byte(data), prefix) err := p.ParseLogMessage([]byte(data))
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
@ -37,23 +37,21 @@ func TestJSONParserSuccess(t *testing.T) {
PutJSONParser(p) PutJSONParser(p)
} }
f("{}", "", nil) f("{}", nil)
f(`{"foo":"bar"}`, "", []Field{ f(`{"foo":"bar"}`, []Field{
{ {
Name: "foo", Name: "foo",
Value: "bar", Value: "bar",
}, },
}) })
f(`{"foo":"bar"}`, "prefix_", []Field{ f(`{"foo":{"bar":{"x":"y","z":["foo"]}},"a":1,"b":true,"c":[1,2],"d":false}`, []Field{
{ {
Name: "prefix_foo", Name: "foo.bar.x",
Value: "bar", Value: "y",
}, },
})
f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, "", []Field{
{ {
Name: "foo.bar", Name: "foo.bar.z",
Value: "baz", Value: `["foo"]`,
}, },
{ {
Name: "a", Name: "a",
@ -72,26 +70,4 @@ func TestJSONParserSuccess(t *testing.T) {
Value: "false", Value: "false",
}, },
}) })
f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, "prefix_", []Field{
{
Name: "prefix_foo.bar",
Value: "baz",
},
{
Name: "prefix_a",
Value: "1",
},
{
Name: "prefix_b",
Value: "true",
},
{
Name: "prefix_c",
Value: "[1,2]",
},
{
Name: "prefix_d",
Value: "false",
},
})
} }

View file

@ -0,0 +1,184 @@
package logstorage
import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
type fieldsUnpackerContext struct {
fields []Field
a arena
}
func (uctx *fieldsUnpackerContext) reset() {
uctx.resetFields()
uctx.a.reset()
}
func (uctx *fieldsUnpackerContext) resetFields() {
clear(uctx.fields)
uctx.fields = uctx.fields[:0]
}
func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) {
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
copy(nameBuf, fieldPrefix)
copy(nameBuf[len(fieldPrefix):], name)
nameCopy := bytesutil.ToUnsafeString(nameBuf)
valueCopy := uctx.a.copyString(value)
uctx.fields = append(uctx.fields, Field{
Name: nameCopy,
Value: valueCopy,
})
}
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, fromField, fieldPrefix string) *pipeUnpackProcessor {
return &pipeUnpackProcessor{
unpackFunc: unpackFunc,
ppBase: ppBase,
shards: make([]pipeUnpackProcessorShard, workersCount),
fromField: fromField,
fieldPrefix: fieldPrefix,
}
}
type pipeUnpackProcessor struct {
unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string)
ppBase pipeProcessor
shards []pipeUnpackProcessorShard
fromField string
fieldPrefix string
}
type pipeUnpackProcessorShard struct {
pipeUnpackProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnpackProcessorShardNopad{})%128]byte
}
type pipeUnpackProcessorShardNopad struct {
uctx fieldsUnpackerContext
wctx pipeUnpackWriteContext
}
func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pup.shards[workerID]
shard.wctx.init(br, pup.ppBase)
c := br.getColumnByName(pup.fromField)
if c.isConst {
v := c.valuesEncoded[0]
shard.uctx.resetFields()
pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix)
for rowIdx := range br.timestamps {
shard.wctx.writeRow(rowIdx, shard.uctx.fields)
}
} else {
values := c.getValues(br)
for i, v := range values {
if i == 0 || values[i-1] != v {
shard.uctx.resetFields()
pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix)
}
shard.wctx.writeRow(i, shard.uctx.fields)
}
}
shard.wctx.flush()
shard.uctx.reset()
}
func (pup *pipeUnpackProcessor) flush() error {
return nil
}
type pipeUnpackWriteContext struct {
brSrc *blockResult
csSrc []*blockResultColumn
ppBase pipeProcessor
rcs []resultColumn
br blockResult
valuesLen int
}
func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) {
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
wctx.ppBase = ppBase
}
func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
csSrc := wctx.csSrc
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(csSrc)+len(extraFields)
if areEqualColumns {
for i, f := range extraFields {
if rcs[len(csSrc)+i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, c := range csSrc {
rcs = appendResultColumnWithName(rcs, c.name)
}
for _, f := range extraFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
brSrc := wctx.brSrc
for i, c := range csSrc {
v := c.getValueAtRow(brSrc, rowIdx)
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, f := range extraFields {
v := f.Value
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeUnpackWriteContext) flush() {
rcs := wctx.rcs
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br := &wctx.br
br.setResultColumns(rcs)
wctx.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}

View file

@ -2,7 +2,8 @@ package logstorage
import ( import (
"fmt" "fmt"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
// pipeUnpackJSON processes '| unpack_json ...' pipe. // pipeUnpackJSON processes '| unpack_json ...' pipe.
@ -34,83 +35,21 @@ func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fields
} }
func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeUnpackJSONProcessorShard, workersCount) return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix)
pup := &pipeUnpackJSONProcessor{
pu: pu,
ppBase: ppBase,
shards: shards,
}
return pup
} }
type pipeUnpackJSONProcessor struct { func unpackJSON(uctx *fieldsUnpackerContext, s, fieldPrefix string) {
pu *pipeUnpackJSON if len(s) == 0 || s[0] != '{' {
ppBase pipeProcessor
shards []pipeUnpackJSONProcessorShard
}
type pipeUnpackJSONProcessorShard struct {
pipeUnpackJSONProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnpackJSONProcessorShardNopad{})%128]byte
}
type pipeUnpackJSONProcessorShardNopad struct {
p JSONParser
wctx pipeUnpackWriteContext
}
func (shard *pipeUnpackJSONProcessorShard) parseJSON(v, resultPrefix string) []Field {
if len(v) == 0 || v[0] != '{' {
// This isn't a JSON object // This isn't a JSON object
return nil
}
if err := shard.p.ParseLogMessageNoResetBuf(v, resultPrefix); err != nil {
// Cannot parse v
return nil
}
return shard.p.Fields
}
func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return return
} }
p := GetJSONParser()
resultPrefix := pup.pu.resultPrefix if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil {
shard := &pup.shards[workerID] for _, f := range p.Fields {
wctx := &shard.wctx uctx.addField(f.Name, f.Value, fieldPrefix)
wctx.init(br, pup.ppBase)
c := br.getColumnByName(pup.pu.fromField)
if c.isConst {
v := c.valuesEncoded[0]
extraFields := shard.parseJSON(v, resultPrefix)
for rowIdx := range br.timestamps {
wctx.writeRow(rowIdx, extraFields)
}
} else {
values := c.getValues(br)
var extraFields []Field
for i, v := range values {
if i == 0 || values[i-1] != v {
extraFields = shard.parseJSON(v, resultPrefix)
}
wctx.writeRow(i, extraFields)
} }
} }
PutJSONParser(p)
wctx.flush()
shard.p.reset()
}
func (pup *pipeUnpackJSONProcessor) flush() error {
return nil
} }
func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {

View file

@ -3,9 +3,6 @@ package logstorage
import ( import (
"fmt" "fmt"
"strings" "strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
// pipeUnpackLogfmt processes '| unpack_logfmt ...' pipe. // pipeUnpackLogfmt processes '| unpack_logfmt ...' pipe.
@ -37,71 +34,46 @@ func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fiel
} }
func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeUnpackLogfmtProcessorShard, workersCount) return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix)
pup := &pipeUnpackLogfmtProcessor{
pu: pu,
ppBase: ppBase,
shards: shards,
}
return pup
} }
type pipeUnpackLogfmtProcessor struct { func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) {
pu *pipeUnpackLogfmt for {
ppBase pipeProcessor // Search for field name
n := strings.IndexByte(s, '=')
shards []pipeUnpackLogfmtProcessorShard if n < 0 {
} // field name couldn't be read
type pipeUnpackLogfmtProcessorShard struct {
pipeUnpackLogfmtProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnpackLogfmtProcessorShardNopad{})%128]byte
}
type pipeUnpackLogfmtProcessorShardNopad struct {
p logfmtParser
wctx pipeUnpackWriteContext
}
func (pup *pipeUnpackLogfmtProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return return
} }
resultPrefix := pup.pu.resultPrefix name := strings.TrimSpace(s[:n])
shard := &pup.shards[workerID] s = s[n+1:]
wctx := &shard.wctx if len(s) == 0 {
wctx.init(br, pup.ppBase) uctx.addField(name, "", fieldPrefix)
c := br.getColumnByName(pup.pu.fromField)
if c.isConst {
v := c.valuesEncoded[0]
extraFields := shard.p.parse(v, resultPrefix)
for rowIdx := range br.timestamps {
wctx.writeRow(rowIdx, extraFields)
} }
// Search for field value
value, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
uctx.addField(name, value, fieldPrefix)
s = s[nOffset:]
if len(s) == 0 {
return
}
if s[0] != ' ' {
return
}
s = s[1:]
} else { } else {
values := c.getValues(br) n := strings.IndexByte(s, ' ')
var extraFields []Field if n < 0 {
for i, v := range values { uctx.addField(name, s, fieldPrefix)
if i == 0 || values[i-1] != v { return
extraFields = shard.p.parse(v, resultPrefix)
} }
wctx.writeRow(i, extraFields) uctx.addField(name, s[:n], fieldPrefix)
s = s[n+1:]
} }
} }
wctx.flush()
shard.p.reset()
}
func (pup *pipeUnpackLogfmtProcessor) flush() error {
return nil
} }
func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
@ -136,154 +108,3 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
} }
return pu, nil return pu, nil
} }
type pipeUnpackWriteContext struct {
brSrc *blockResult
csSrc []*blockResultColumn
ppBase pipeProcessor
rcs []resultColumn
br blockResult
valuesLen int
}
func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) {
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
wctx.ppBase = ppBase
}
func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
csSrc := wctx.csSrc
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(csSrc)+len(extraFields)
if areEqualColumns {
for i, f := range extraFields {
if rcs[len(csSrc)+i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, c := range csSrc {
rcs = appendResultColumnWithName(rcs, c.name)
}
for _, f := range extraFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
brSrc := wctx.brSrc
for i, c := range csSrc {
v := c.getValueAtRow(brSrc, rowIdx)
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, f := range extraFields {
v := f.Value
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeUnpackWriteContext) flush() {
rcs := wctx.rcs
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br := &wctx.br
br.setResultColumns(rcs)
wctx.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}
type logfmtParser struct {
Fields []Field
buf []byte
}
func (p *logfmtParser) reset() {
clear(p.Fields)
p.Fields = p.Fields[:0]
p.buf = p.buf[:0]
}
func (p *logfmtParser) parse(s, resultPrefix string) []Field {
clear(p.Fields)
p.Fields = p.Fields[:0]
for {
// Search for field name
n := strings.IndexByte(s, '=')
if n < 0 {
// field name couldn't be read
return p.Fields
}
name := strings.TrimSpace(s[:n])
s = s[n+1:]
if len(s) == 0 {
p.addField(name, "", resultPrefix)
return p.Fields
}
// Search for field value
value, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
p.addField(name, value, resultPrefix)
s = s[nOffset:]
if len(s) == 0 {
return p.Fields
}
if s[0] != ' ' {
return p.Fields
}
s = s[1:]
} else {
n := strings.IndexByte(s, ' ')
if n < 0 {
p.addField(name, s, resultPrefix)
return p.Fields
}
p.addField(name, s[:n], resultPrefix)
s = s[n+1:]
}
}
}
func (p *logfmtParser) addField(name, value, resultPrefix string) {
if resultPrefix != "" {
buf := p.buf
bufLen := len(buf)
buf = append(buf, resultPrefix...)
buf = append(buf, name...)
p.buf = buf
name = bytesutil.ToUnsafeString(buf[bufLen:])
}
p.Fields = append(p.Fields, Field{
Name: name,
Value: value,
})
}