diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index de0051109..3d103791c 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -210,7 +210,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, } line = sc.Bytes() p := logstorage.GetJSONParser() - if err := p.ParseLogMessage(line, ""); err != nil { + if err := p.ParseLogMessage(line); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index 9d1e7ebf8..33d832789 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -105,7 +105,7 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f } p := logstorage.GetJSONParser() - if err := p.ParseLogMessage(line, ""); err != nil { + if err := p.ParseLogMessage(line); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } ts, err := extractTimestampFromFields(timeField, p.Fields) diff --git a/lib/logstorage/json_parser.go b/lib/logstorage/json_parser.go index 150a6a379..ccffcf315 100644 --- a/lib/logstorage/json_parser.go +++ b/lib/logstorage/json_parser.go @@ -33,16 +33,10 @@ type JSONParser struct { } func (p *JSONParser) reset() { - p.resetNobuf() - - p.buf = p.buf[:0] -} - -func (p *JSONParser) resetNobuf() { clear(p.Fields) p.Fields = p.Fields[:0] - p.prefixBuf = p.prefixBuf[:0] + p.buf = p.buf[:0] } // GetJSONParser returns JSONParser ready to parse JSON lines. @@ -66,36 +60,20 @@ func PutJSONParser(p *JSONParser) { var parserPool sync.Pool -// ParseLogMessageNoResetBuf parses the given JSON log message msg into p.Fields. -// -// It adds the given prefix to all the parsed field names. -// -// The p.Fields remains valid until the next call to PutJSONParser(). -func (p *JSONParser) ParseLogMessageNoResetBuf(msg, prefix string) error { - p.resetNobuf() - return p.parseLogMessage(msg, prefix) -} - // ParseLogMessage parses the given JSON log message msg into p.Fields. // -// It adds the given prefix to all the parsed field names. -// // The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser(). -func (p *JSONParser) ParseLogMessage(msg []byte, prefix string) error { - msgStr := bytesutil.ToUnsafeString(msg) +func (p *JSONParser) ParseLogMessage(msg []byte) error { p.reset() - return p.parseLogMessage(msgStr, prefix) -} -func (p *JSONParser) parseLogMessage(msg, prefix string) error { - v, err := p.p.Parse(msg) + msgStr := bytesutil.ToUnsafeString(msg) + v, err := p.p.Parse(msgStr) if err != nil { return fmt.Errorf("cannot parse json: %w", err) } if t := v.Type(); t != fastjson.TypeObject { return fmt.Errorf("expecting json dictionary; got %s", t) } - p.prefixBuf = append(p.prefixBuf[:0], prefix...) p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v) return nil } diff --git a/lib/logstorage/json_parser_test.go b/lib/logstorage/json_parser_test.go index 4c294615f..818fd4976 100644 --- a/lib/logstorage/json_parser_test.go +++ b/lib/logstorage/json_parser_test.go @@ -10,7 +10,7 @@ func TestJSONParserFailure(t *testing.T) { t.Helper() p := GetJSONParser() - err := p.ParseLogMessage([]byte(data), "") + err := p.ParseLogMessage([]byte(data)) if err == nil { t.Fatalf("expecting non-nil error") } @@ -23,11 +23,11 @@ func TestJSONParserFailure(t *testing.T) { } func TestJSONParserSuccess(t *testing.T) { - f := func(data, prefix string, fieldsExpected []Field) { + f := func(data string, fieldsExpected []Field) { t.Helper() p := GetJSONParser() - err := p.ParseLogMessage([]byte(data), prefix) + err := p.ParseLogMessage([]byte(data)) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -37,23 +37,21 @@ func TestJSONParserSuccess(t *testing.T) { PutJSONParser(p) } - f("{}", "", nil) - f(`{"foo":"bar"}`, "", []Field{ + f("{}", nil) + f(`{"foo":"bar"}`, []Field{ { Name: "foo", Value: "bar", }, }) - f(`{"foo":"bar"}`, "prefix_", []Field{ + f(`{"foo":{"bar":{"x":"y","z":["foo"]}},"a":1,"b":true,"c":[1,2],"d":false}`, []Field{ { - Name: "prefix_foo", - Value: "bar", + Name: "foo.bar.x", + Value: "y", }, - }) - f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, "", []Field{ { - Name: "foo.bar", - Value: "baz", + Name: "foo.bar.z", + Value: `["foo"]`, }, { Name: "a", @@ -72,26 +70,4 @@ func TestJSONParserSuccess(t *testing.T) { Value: "false", }, }) - f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, "prefix_", []Field{ - { - Name: "prefix_foo.bar", - Value: "baz", - }, - { - Name: "prefix_a", - Value: "1", - }, - { - Name: "prefix_b", - Value: "true", - }, - { - Name: "prefix_c", - Value: "[1,2]", - }, - { - Name: "prefix_d", - Value: "false", - }, - }) } diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go new file mode 100644 index 000000000..855b04140 --- /dev/null +++ b/lib/logstorage/pipe_unpack.go @@ -0,0 +1,184 @@ +package logstorage + +import ( + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +type fieldsUnpackerContext struct { + fields []Field + a arena +} + +func (uctx *fieldsUnpackerContext) reset() { + uctx.resetFields() + uctx.a.reset() +} + +func (uctx *fieldsUnpackerContext) resetFields() { + clear(uctx.fields) + uctx.fields = uctx.fields[:0] +} + +func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) { + nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name)) + copy(nameBuf, fieldPrefix) + copy(nameBuf[len(fieldPrefix):], name) + nameCopy := bytesutil.ToUnsafeString(nameBuf) + + valueCopy := uctx.a.copyString(value) + + uctx.fields = append(uctx.fields, Field{ + Name: nameCopy, + Value: valueCopy, + }) +} + +func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string), ppBase pipeProcessor, fromField, fieldPrefix string) *pipeUnpackProcessor { + return &pipeUnpackProcessor{ + unpackFunc: unpackFunc, + ppBase: ppBase, + + shards: make([]pipeUnpackProcessorShard, workersCount), + + fromField: fromField, + fieldPrefix: fieldPrefix, + } +} + +type pipeUnpackProcessor struct { + unpackFunc func(uctx *fieldsUnpackerContext, s, fieldPrefix string) + ppBase pipeProcessor + + shards []pipeUnpackProcessorShard + + fromField string + fieldPrefix string +} + +type pipeUnpackProcessorShard struct { + pipeUnpackProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeUnpackProcessorShardNopad{})%128]byte +} + +type pipeUnpackProcessorShardNopad struct { + uctx fieldsUnpackerContext + wctx pipeUnpackWriteContext +} + +func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pup.shards[workerID] + shard.wctx.init(br, pup.ppBase) + + c := br.getColumnByName(pup.fromField) + if c.isConst { + v := c.valuesEncoded[0] + shard.uctx.resetFields() + pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) + for rowIdx := range br.timestamps { + shard.wctx.writeRow(rowIdx, shard.uctx.fields) + } + } else { + values := c.getValues(br) + for i, v := range values { + if i == 0 || values[i-1] != v { + shard.uctx.resetFields() + pup.unpackFunc(&shard.uctx, v, pup.fieldPrefix) + } + shard.wctx.writeRow(i, shard.uctx.fields) + } + } + + shard.wctx.flush() + shard.uctx.reset() +} + +func (pup *pipeUnpackProcessor) flush() error { + return nil +} + +type pipeUnpackWriteContext struct { + brSrc *blockResult + csSrc []*blockResultColumn + ppBase pipeProcessor + + rcs []resultColumn + br blockResult + + valuesLen int +} + +func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) { + wctx.brSrc = brSrc + wctx.csSrc = brSrc.getColumns() + wctx.ppBase = ppBase +} + +func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { + csSrc := wctx.csSrc + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(csSrc)+len(extraFields) + if areEqualColumns { + for i, f := range extraFields { + if rcs[len(csSrc)+i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to bbBase and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, c := range csSrc { + rcs = appendResultColumnWithName(rcs, c.name) + } + for _, f := range extraFields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + wctx.rcs = rcs + } + + brSrc := wctx.brSrc + for i, c := range csSrc { + v := c.getValueAtRow(brSrc, rowIdx) + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + for i, f := range extraFields { + v := f.Value + rcs[len(csSrc)+i].addValue(v) + wctx.valuesLen += len(v) + } + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeUnpackWriteContext) flush() { + rcs := wctx.rcs + + wctx.valuesLen = 0 + + if len(rcs) == 0 { + return + } + + // Flush rcs to ppBase + br := &wctx.br + br.setResultColumns(rcs) + wctx.ppBase.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index f9a44556c..951bae0f7 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -2,7 +2,8 @@ package logstorage import ( "fmt" - "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeUnpackJSON processes '| unpack_json ...' pipe. @@ -34,83 +35,21 @@ func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fields } func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - shards := make([]pipeUnpackJSONProcessorShard, workersCount) - - pup := &pipeUnpackJSONProcessor{ - pu: pu, - ppBase: ppBase, - - shards: shards, - } - return pup + return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix) } -type pipeUnpackJSONProcessor struct { - pu *pipeUnpackJSON - ppBase pipeProcessor - - shards []pipeUnpackJSONProcessorShard -} - -type pipeUnpackJSONProcessorShard struct { - pipeUnpackJSONProcessorShardNopad - - // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(pipeUnpackJSONProcessorShardNopad{})%128]byte -} - -type pipeUnpackJSONProcessorShardNopad struct { - p JSONParser - - wctx pipeUnpackWriteContext -} - -func (shard *pipeUnpackJSONProcessorShard) parseJSON(v, resultPrefix string) []Field { - if len(v) == 0 || v[0] != '{' { +func unpackJSON(uctx *fieldsUnpackerContext, s, fieldPrefix string) { + if len(s) == 0 || s[0] != '{' { // This isn't a JSON object - return nil - } - if err := shard.p.ParseLogMessageNoResetBuf(v, resultPrefix); err != nil { - // Cannot parse v - return nil - } - return shard.p.Fields -} - -func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { return } - - resultPrefix := pup.pu.resultPrefix - shard := &pup.shards[workerID] - wctx := &shard.wctx - wctx.init(br, pup.ppBase) - - c := br.getColumnByName(pup.pu.fromField) - if c.isConst { - v := c.valuesEncoded[0] - extraFields := shard.parseJSON(v, resultPrefix) - for rowIdx := range br.timestamps { - wctx.writeRow(rowIdx, extraFields) - } - } else { - values := c.getValues(br) - var extraFields []Field - for i, v := range values { - if i == 0 || values[i-1] != v { - extraFields = shard.parseJSON(v, resultPrefix) - } - wctx.writeRow(i, extraFields) + p := GetJSONParser() + if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil { + for _, f := range p.Fields { + uctx.addField(f.Name, f.Value, fieldPrefix) } } - - wctx.flush() - shard.p.reset() -} - -func (pup *pipeUnpackJSONProcessor) flush() error { - return nil + PutJSONParser(p) } func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index 37f508839..0859a3f48 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -3,9 +3,6 @@ package logstorage import ( "fmt" "strings" - "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeUnpackLogfmt processes '| unpack_logfmt ...' pipe. @@ -37,71 +34,46 @@ func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fiel } func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - shards := make([]pipeUnpackLogfmtProcessorShard, workersCount) - - pup := &pipeUnpackLogfmtProcessor{ - pu: pu, - ppBase: ppBase, - - shards: shards, - } - return pup + return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix) } -type pipeUnpackLogfmtProcessor struct { - pu *pipeUnpackLogfmt - ppBase pipeProcessor - - shards []pipeUnpackLogfmtProcessorShard -} - -type pipeUnpackLogfmtProcessorShard struct { - pipeUnpackLogfmtProcessorShardNopad - - // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(pipeUnpackLogfmtProcessorShardNopad{})%128]byte -} - -type pipeUnpackLogfmtProcessorShardNopad struct { - p logfmtParser - - wctx pipeUnpackWriteContext -} - -func (pup *pipeUnpackLogfmtProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { - return - } - - resultPrefix := pup.pu.resultPrefix - shard := &pup.shards[workerID] - wctx := &shard.wctx - wctx.init(br, pup.ppBase) - - c := br.getColumnByName(pup.pu.fromField) - if c.isConst { - v := c.valuesEncoded[0] - extraFields := shard.p.parse(v, resultPrefix) - for rowIdx := range br.timestamps { - wctx.writeRow(rowIdx, extraFields) +func unpackLogfmt(uctx *fieldsUnpackerContext, s, fieldPrefix string) { + for { + // Search for field name + n := strings.IndexByte(s, '=') + if n < 0 { + // field name couldn't be read + return } - } else { - values := c.getValues(br) - var extraFields []Field - for i, v := range values { - if i == 0 || values[i-1] != v { - extraFields = shard.p.parse(v, resultPrefix) + + name := strings.TrimSpace(s[:n]) + s = s[n+1:] + if len(s) == 0 { + uctx.addField(name, "", fieldPrefix) + } + + // Search for field value + value, nOffset := tryUnquoteString(s) + if nOffset >= 0 { + uctx.addField(name, value, fieldPrefix) + s = s[nOffset:] + if len(s) == 0 { + return } - wctx.writeRow(i, extraFields) + if s[0] != ' ' { + return + } + s = s[1:] + } else { + n := strings.IndexByte(s, ' ') + if n < 0 { + uctx.addField(name, s, fieldPrefix) + return + } + uctx.addField(name, s[:n], fieldPrefix) + s = s[n+1:] } } - - wctx.flush() - shard.p.reset() -} - -func (pup *pipeUnpackLogfmtProcessor) flush() error { - return nil } func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { @@ -136,154 +108,3 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { } return pu, nil } - -type pipeUnpackWriteContext struct { - brSrc *blockResult - csSrc []*blockResultColumn - ppBase pipeProcessor - - rcs []resultColumn - br blockResult - - valuesLen int -} - -func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) { - wctx.brSrc = brSrc - wctx.csSrc = brSrc.getColumns() - wctx.ppBase = ppBase -} - -func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { - csSrc := wctx.csSrc - rcs := wctx.rcs - - areEqualColumns := len(rcs) == len(csSrc)+len(extraFields) - if areEqualColumns { - for i, f := range extraFields { - if rcs[len(csSrc)+i].name != f.Name { - areEqualColumns = false - break - } - } - } - if !areEqualColumns { - // send the current block to bbBase and construct a block with new set of columns - wctx.flush() - - rcs = wctx.rcs[:0] - for _, c := range csSrc { - rcs = appendResultColumnWithName(rcs, c.name) - } - for _, f := range extraFields { - rcs = appendResultColumnWithName(rcs, f.Name) - } - wctx.rcs = rcs - } - - brSrc := wctx.brSrc - for i, c := range csSrc { - v := c.getValueAtRow(brSrc, rowIdx) - rcs[i].addValue(v) - wctx.valuesLen += len(v) - } - for i, f := range extraFields { - v := f.Value - rcs[len(csSrc)+i].addValue(v) - wctx.valuesLen += len(v) - } - if wctx.valuesLen >= 1_000_000 { - wctx.flush() - } -} - -func (wctx *pipeUnpackWriteContext) flush() { - rcs := wctx.rcs - - wctx.valuesLen = 0 - - if len(rcs) == 0 { - return - } - - // Flush rcs to ppBase - br := &wctx.br - br.setResultColumns(rcs) - wctx.ppBase.writeBlock(0, br) - br.reset() - for i := range rcs { - rcs[i].resetValues() - } -} - -type logfmtParser struct { - Fields []Field - - buf []byte -} - -func (p *logfmtParser) reset() { - clear(p.Fields) - p.Fields = p.Fields[:0] - - p.buf = p.buf[:0] -} - -func (p *logfmtParser) parse(s, resultPrefix string) []Field { - clear(p.Fields) - p.Fields = p.Fields[:0] - - for { - // Search for field name - n := strings.IndexByte(s, '=') - if n < 0 { - // field name couldn't be read - return p.Fields - } - - name := strings.TrimSpace(s[:n]) - s = s[n+1:] - if len(s) == 0 { - p.addField(name, "", resultPrefix) - return p.Fields - } - - // Search for field value - value, nOffset := tryUnquoteString(s) - if nOffset >= 0 { - p.addField(name, value, resultPrefix) - s = s[nOffset:] - if len(s) == 0 { - return p.Fields - } - if s[0] != ' ' { - return p.Fields - } - s = s[1:] - } else { - n := strings.IndexByte(s, ' ') - if n < 0 { - p.addField(name, s, resultPrefix) - return p.Fields - } - p.addField(name, s[:n], resultPrefix) - s = s[n+1:] - } - } -} - -func (p *logfmtParser) addField(name, value, resultPrefix string) { - if resultPrefix != "" { - buf := p.buf - bufLen := len(buf) - buf = append(buf, resultPrefix...) - buf = append(buf, name...) - p.buf = buf - - name = bytesutil.ToUnsafeString(buf[bufLen:]) - } - p.Fields = append(p.Fields, Field{ - Name: name, - Value: value, - }) -}