diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index 8d5c03e12..de0051109 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -20,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -210,8 +209,8 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, return false, fmt.Errorf(`missing log message after the "create" or "index" command`) } line = sc.Bytes() - p := logjson.GetParser() - if err := p.ParseLogMessage(line); err != nil { + p := logstorage.GetJSONParser() + if err := p.ParseLogMessage(line, ""); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } @@ -224,7 +223,7 @@ func readBulkLine(sc *bufio.Scanner, timeField, msgField string, } p.RenameField(msgField, "_msg") processLogMessage(ts, p.Fields) - logjson.PutParser(p) + logstorage.PutJSONParser(p) return true, nil } diff --git a/app/vlinsert/jsonline/jsonline.go b/app/vlinsert/jsonline/jsonline.go index caa86ccbf..9d1e7ebf8 100644 --- a/app/vlinsert/jsonline/jsonline.go +++ b/app/vlinsert/jsonline/jsonline.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter" @@ -105,8 +104,8 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f line = sc.Bytes() } - p := logjson.GetParser() - if err := p.ParseLogMessage(line); err != nil { + p := logstorage.GetJSONParser() + if err := p.ParseLogMessage(line, ""); err != nil { return false, fmt.Errorf("cannot parse json-encoded log entry: %w", err) } ts, err := extractTimestampFromFields(timeField, p.Fields) @@ -118,7 +117,7 @@ func readLine(sc *bufio.Scanner, timeField, msgField string, processLogMessage f } p.RenameField(msgField, "_msg") processLogMessage(ts, p.Fields) - logjson.PutParser(p) + logstorage.PutJSONParser(p) return true, nil } diff --git a/lib/logjson/parser_test.go b/lib/logjson/parser_test.go deleted file mode 100644 index ee4f40faf..000000000 --- a/lib/logjson/parser_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package logjson - -import ( - "reflect" - "testing" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" -) - -func TestParserFailure(t *testing.T) { - f := func(data string) { - t.Helper() - - p := GetParser() - err := p.ParseLogMessage([]byte(data)) - if err == nil { - t.Fatalf("expecting non-nil error") - } - PutParser(p) - } - f("") - f("{foo") - f("[1,2,3]") - f(`{"foo",}`) -} - -func TestParserSuccess(t *testing.T) { - f := func(data string, fieldsExpected []logstorage.Field) { - t.Helper() - - p := GetParser() - err := p.ParseLogMessage([]byte(data)) - if err != nil { - t.Fatalf("unexpected error: %s", err) - } - if !reflect.DeepEqual(p.Fields, fieldsExpected) { - t.Fatalf("unexpected fields;\ngot\n%s\nwant\n%s", p.Fields, fieldsExpected) - } - PutParser(p) - } - - f("{}", nil) - f(`{"foo":"bar"}`, []logstorage.Field{ - { - Name: "foo", - Value: "bar", - }, - }) - f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, []logstorage.Field{ - { - Name: "foo.bar", - Value: "baz", - }, - { - Name: "a", - Value: "1", - }, - { - Name: "b", - Value: "true", - }, - { - Name: "c", - Value: "[1,2]", - }, - { - Name: "d", - Value: "false", - }, - }) -} diff --git a/lib/logjson/parser.go b/lib/logstorage/json_parser.go similarity index 67% rename from lib/logjson/parser.go rename to lib/logstorage/json_parser.go index aa669181a..3cf366774 100644 --- a/lib/logjson/parser.go +++ b/lib/logstorage/json_parser.go @@ -1,4 +1,4 @@ -package logjson +package logstorage import ( "fmt" @@ -6,21 +6,20 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/valyala/fastjson" ) -// Parser parses a single JSON log message into Fields. +// JSONParser parses a single JSON log message into Fields. // // See https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model // // Use GetParser() for obtaining the parser. -type Parser struct { +type JSONParser struct { // Fields contains the parsed JSON line after Parse() call // // The Fields are valid until the next call to ParseLogMessage() // or until the parser is returned to the pool with PutParser() call. - Fields []logstorage.Field + Fields []Field // p is used for fast JSON parsing p fastjson.Parser @@ -33,59 +32,72 @@ type Parser struct { prefixBuf []byte } -func (p *Parser) reset() { - fields := p.Fields - for i := range fields { - lf := &fields[i] - lf.Name = "" - lf.Value = "" - } - p.Fields = fields[:0] +func (p *JSONParser) reset() { + clear(p.Fields) + p.Fields = p.Fields[:0] p.buf = p.buf[:0] p.prefixBuf = p.prefixBuf[:0] } -// GetParser returns Parser ready to parse JSON lines. +// GetJSONParser returns JSONParser ready to parse JSON lines. // -// Return the parser to the pool when it is no longer needed by calling PutParser(). -func GetParser() *Parser { +// Return the parser to the pool when it is no longer needed by calling PutJSONParser(). +func GetJSONParser() *JSONParser { v := parserPool.Get() if v == nil { - return &Parser{} + return &JSONParser{} } - return v.(*Parser) + return v.(*JSONParser) } -// PutParser returns the parser to the pool. +// PutJSONParser returns the parser to the pool. // // The parser cannot be used after returning to the pool. -func PutParser(p *Parser) { +func PutJSONParser(p *JSONParser) { p.reset() parserPool.Put(p) } var parserPool sync.Pool +// ParseLogMessageNoResetBuf parses the given JSON log message msg into p.Fields. +// +// It adds the given prefix to all the parsed field names. +// +// The p.Fields remains valid until the next call to PutJSONParser(). +func (p *JSONParser) ParseLogMessageNoResetBuf(msg, prefix string) error { + return p.parseLogMessage(msg, prefix, false) +} + // ParseLogMessage parses the given JSON log message msg into p.Fields. // -// The p.Fields remains valid until the next call to ParseLogMessage() or PutParser(). -func (p *Parser) ParseLogMessage(msg []byte) error { - s := bytesutil.ToUnsafeString(msg) - v, err := p.p.Parse(s) +// It adds the given prefix to all the parsed field names. +// +// The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser(). +func (p *JSONParser) ParseLogMessage(msg []byte, prefix string) error { + msgStr := bytesutil.ToUnsafeString(msg) + return p.parseLogMessage(msgStr, prefix, true) +} + +func (p *JSONParser) parseLogMessage(msg, prefix string, resetBuf bool) error { + v, err := p.p.Parse(msg) if err != nil { return fmt.Errorf("cannot parse json: %w", err) } if t := v.Type(); t != fastjson.TypeObject { return fmt.Errorf("expecting json dictionary; got %s", t) } - p.reset() + if resetBuf { + p.reset() + } + p.prefixBuf = append(p.prefixBuf[:0], prefix...) p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v) return nil } // RenameField renames field with the oldName to newName in p.Fields -func (p *Parser) RenameField(oldName, newName string) { +func (p *JSONParser) RenameField(oldName, newName string) { if oldName == "" { return } @@ -99,7 +111,7 @@ func (p *Parser) RenameField(oldName, newName string) { } } -func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]logstorage.Field, []byte, []byte) { +func appendLogFields(dst []Field, dstBuf, prefixBuf []byte, v *fastjson.Value) ([]Field, []byte, []byte) { o := v.GetObject() o.Visit(func(k []byte, v *fastjson.Value) { t := v.Type() @@ -133,13 +145,13 @@ func appendLogFields(dst []logstorage.Field, dstBuf, prefixBuf []byte, v *fastjs return dst, dstBuf, prefixBuf } -func appendLogField(dst []logstorage.Field, dstBuf, prefixBuf, k, value []byte) ([]logstorage.Field, []byte) { +func appendLogField(dst []Field, dstBuf, prefixBuf, k, value []byte) ([]Field, []byte) { dstBufLen := len(dstBuf) dstBuf = append(dstBuf, prefixBuf...) dstBuf = append(dstBuf, k...) name := dstBuf[dstBufLen:] - dst = append(dst, logstorage.Field{ + dst = append(dst, Field{ Name: bytesutil.ToUnsafeString(name), Value: bytesutil.ToUnsafeString(value), }) diff --git a/lib/logstorage/json_parser_test.go b/lib/logstorage/json_parser_test.go new file mode 100644 index 000000000..4c294615f --- /dev/null +++ b/lib/logstorage/json_parser_test.go @@ -0,0 +1,97 @@ +package logstorage + +import ( + "reflect" + "testing" +) + +func TestJSONParserFailure(t *testing.T) { + f := func(data string) { + t.Helper() + + p := GetJSONParser() + err := p.ParseLogMessage([]byte(data), "") + if err == nil { + t.Fatalf("expecting non-nil error") + } + PutJSONParser(p) + } + f("") + f("{foo") + f("[1,2,3]") + f(`{"foo",}`) +} + +func TestJSONParserSuccess(t *testing.T) { + f := func(data, prefix string, fieldsExpected []Field) { + t.Helper() + + p := GetJSONParser() + err := p.ParseLogMessage([]byte(data), prefix) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !reflect.DeepEqual(p.Fields, fieldsExpected) { + t.Fatalf("unexpected fields;\ngot\n%s\nwant\n%s", p.Fields, fieldsExpected) + } + PutJSONParser(p) + } + + f("{}", "", nil) + f(`{"foo":"bar"}`, "", []Field{ + { + Name: "foo", + Value: "bar", + }, + }) + f(`{"foo":"bar"}`, "prefix_", []Field{ + { + Name: "prefix_foo", + Value: "bar", + }, + }) + f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, "", []Field{ + { + Name: "foo.bar", + Value: "baz", + }, + { + Name: "a", + Value: "1", + }, + { + Name: "b", + Value: "true", + }, + { + Name: "c", + Value: "[1,2]", + }, + { + Name: "d", + Value: "false", + }, + }) + f(`{"foo":{"bar":"baz"},"a":1,"b":true,"c":[1,2],"d":false}`, "prefix_", []Field{ + { + Name: "prefix_foo.bar", + Value: "baz", + }, + { + Name: "prefix_a", + Value: "1", + }, + { + Name: "prefix_b", + Value: "true", + }, + { + Name: "prefix_c", + Value: "[1,2]", + }, + { + Name: "prefix_d", + Value: "false", + }, + }) +} diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 256e96345..c26e25d68 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -3,10 +3,6 @@ package logstorage import ( "fmt" "unsafe" - - "github.com/valyala/fastjson" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeUnpackJSON processes '| unpack_json ...' pipe. @@ -64,9 +60,7 @@ type pipeUnpackJSONProcessorShard struct { } type pipeUnpackJSONProcessorShardNopad struct { - jsonParser fastjson.Parser - jsonFields []Field - jsonValuesBuf []byte + jsonParser JSONParser rcs []resultColumn br blockResult @@ -134,38 +128,16 @@ func (shard *pipeUnpackJSONProcessorShard) flush(ppBase pipeProcessor) { } } -func (shard *pipeUnpackJSONProcessorShard) parseJSONFields(resultPrefix, v string) { - clear(shard.jsonFields) - shard.jsonFields = shard.jsonFields[:0] - shard.jsonValuesBuf = shard.jsonValuesBuf[:0] - - jsv, err := shard.jsonParser.Parse(v) - if err != nil { - return +func (shard *pipeUnpackJSONProcessorShard) parseJSON(v, resultPrefix string) []Field { + if len(v) == 0 || v[0] != '{' { + // This isn't a JSON object + return nil } - jso := jsv.GetObject() - buf := shard.jsonValuesBuf - jso.Visit(func(k []byte, v *fastjson.Value) { - var bv []byte - if v.Type() == fastjson.TypeString { - bv = v.GetStringBytes() - } else { - bufLen := len(buf) - buf = v.MarshalTo(buf) - bv = buf[bufLen:] - } - if resultPrefix != "" { - bufLen := len(buf) - buf = append(buf, resultPrefix...) - buf = append(buf, k...) - k = buf[bufLen:] - } - shard.jsonFields = append(shard.jsonFields, Field{ - Name: bytesutil.ToUnsafeString(k), - Value: bytesutil.ToUnsafeString(bv), - }) - }) - shard.jsonValuesBuf = buf + if err := shard.jsonParser.ParseLogMessageNoResetBuf(v, resultPrefix); err != nil { + // Cannot parse v + return nil + } + return shard.jsonParser.Fields } func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) { @@ -180,21 +152,23 @@ func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) { c := br.getColumnByName(pup.pu.fromField) if c.isConst { v := c.valuesEncoded[0] - shard.parseJSONFields(resultPrefix, v) + extraFields := shard.parseJSON(v, resultPrefix) for rowIdx := range br.timestamps { - shard.writeRow(pup.ppBase, br, cs, rowIdx, shard.jsonFields) + shard.writeRow(pup.ppBase, br, cs, rowIdx, extraFields) } } else { values := c.getValues(br) + var extraFields []Field for i, v := range values { if i == 0 || values[i-1] != v { - shard.parseJSONFields(resultPrefix, v) + extraFields = shard.parseJSON(v, resultPrefix) } - shard.writeRow(pup.ppBase, br, cs, i, shard.jsonFields) + shard.writeRow(pup.ppBase, br, cs, i, extraFields) } } shard.flush(pup.ppBase) + shard.jsonParser.reset() } func (pup *pipeUnpackJSONProcessor) flush() error { diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index b3a02da64..01af44831 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -91,7 +91,7 @@ func TestPipeUnpackJSON(t *testing.T) { // single row, unpack from named field f("unpack_json from x", [][]Field{ { - {"x", `{"foo":"bar","baz":"xyz","a":123,"b":["foo","bar"],"x":NaN}`}, + {"x", `{"foo":"bar","baz":"xyz","a":123,"b":["foo","bar"],"x":NaN,"y":{"z":{"a":"b"}}}`}, }, }, [][]Field{ { @@ -100,6 +100,7 @@ func TestPipeUnpackJSON(t *testing.T) { {"baz", "xyz"}, {"a", "123"}, {"b", `["foo","bar"]`}, + {"y.z.a", "b"}, }, })