This commit is contained in:
Aliaksandr Valialkin 2024-05-22 18:14:59 +02:00
parent 2ff9cf9f43
commit 8bb2ffa9b9
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 111 additions and 44 deletions

View file

@ -1630,8 +1630,8 @@ See also:
### unpack_json pipe ### unpack_json pipe
`| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given `field_name` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) `| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
into `k1`, ... `kN` field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. into `k1`, ... `kN` output field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched.
Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1648,21 +1648,28 @@ The following query is equivalent to the previous one:
_time:5m | unpack_json _time:5m | unpack_json
``` ```
If you want to make sure that the unpacked JSON fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, If only some fields must be extracted from JSON, then they can be enumerated inside `fields (...)`. For example, the following query unpacks only `foo` and `bar`
by adding `result_prefix "prefix_name"` to `unpack_json`. For example, the following query adds `foo_` prefix for all the unpacked fields fields from JSON value stored in `my_json` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
form [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
```logsql ```logsql
_time:5m | unpack_json result_prefix "foo_" _time:5m | unpack_json from my_json fields (foo, bar)
``` ```
Performance tip: if you need extracting a single field from long JSON, it is faster to use [`extract` pipe](#extract-pipe). For example, the following query extracts `"ip"` field from JSON Performance tip: if you need extracting a single field from long JSON, it is faster to use [`extract` pipe](#extract-pipe). For example, the following query extracts `"ip"` field from JSON
stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) at the maximum speed:
``` ```
_time:5m | extract '"ip":<ip>' _time:5m | extract '"ip":<ip>'
``` ```
If you want to make sure that the unpacked JSON fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON,
by adding `result_prefix "prefix_name"` to `unpack_json`. For example, the following query adds `foo_` prefix for all the unpacked fields
form `foo`:
```logsql
_time:5m | unpack_json from foo result_prefix "foo_"
```
See also: See also:
- [Conditional `unpack_json`](#conditional-unpack_json) - [Conditional `unpack_json`](#conditional-unpack_json)

View file

@ -2,6 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
@ -13,6 +14,11 @@ type pipeUnpackJSON struct {
// fromField is the field to unpack json fields from // fromField is the field to unpack json fields from
fromField string fromField string
// fields is an optional list of fields to extract from json.
//
// if it is empty, then all the fields are extracted.
fields []string
// resultPrefix is prefix to add to unpacked field names // resultPrefix is prefix to add to unpacked field names
resultPrefix string resultPrefix string
@ -22,15 +28,18 @@ type pipeUnpackJSON struct {
func (pu *pipeUnpackJSON) String() string { func (pu *pipeUnpackJSON) String() string {
s := "unpack_json" s := "unpack_json"
if pu.iff != nil {
s += " " + pu.iff.String()
}
if !isMsgFieldName(pu.fromField) { if !isMsgFieldName(pu.fromField) {
s += " from " + quoteTokenIfNeeded(pu.fromField) s += " from " + quoteTokenIfNeeded(pu.fromField)
} }
if len(pu.fields) > 0 {
s += " fields (" + fieldsToString(pu.fields) + ")"
}
if pu.resultPrefix != "" { if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
} }
if pu.iff != nil {
s += " " + pu.iff.String()
}
return s return s
} }
@ -49,21 +58,35 @@ func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fields
} }
func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff) unpackJSON := func(uctx *fieldsUnpackerContext, s string) {
} if len(s) == 0 || s[0] != '{' {
// This isn't a JSON object
func unpackJSON(uctx *fieldsUnpackerContext, s string) { return
if len(s) == 0 || s[0] != '{' {
// This isn't a JSON object
return
}
p := GetJSONParser()
if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil {
for _, f := range p.Fields {
uctx.addField(f.Name, f.Value)
} }
p := GetJSONParser()
if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil {
if len(pu.fields) == 0 {
for _, f := range p.Fields {
uctx.addField(f.Name, f.Value)
}
} else {
for _, fieldName := range pu.fields {
found := false
for _, f := range p.Fields {
if f.Name == fieldName {
uctx.addField(f.Name, f.Value)
found = true
}
}
if !found {
uctx.addField(fieldName, "")
}
}
}
}
PutJSONParser(p)
} }
PutJSONParser(p) return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff)
} }
func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
@ -72,6 +95,15 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
} }
lex.nextToken() lex.nextToken()
var iff *ifFilter
if lex.isKeyword("if") {
f, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
iff = f
}
fromField := "_msg" fromField := "_msg"
if lex.isKeyword("from") { if lex.isKeyword("from") {
lex.nextToken() lex.nextToken()
@ -82,6 +114,19 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
fromField = f fromField = f
} }
var fields []string
if lex.isKeyword("fields") {
lex.nextToken()
fs, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields': %w", err)
}
fields = fs
if slices.Contains(fields, "*") {
fields = nil
}
}
resultPrefix := "" resultPrefix := ""
if lex.isKeyword("result_prefix") { if lex.isKeyword("result_prefix") {
lex.nextToken() lex.nextToken()
@ -94,15 +139,9 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {
pu := &pipeUnpackJSON{ pu := &pipeUnpackJSON{
fromField: fromField, fromField: fromField,
fields: fields,
resultPrefix: resultPrefix, resultPrefix: resultPrefix,
} iff: iff,
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
pu.iff = iff
} }
return pu, nil return pu, nil

View file

@ -15,13 +15,16 @@ func TestParsePipeUnpackJSONSuccess(t *testing.T) {
} }
f(`unpack_json`) f(`unpack_json`)
f(`unpack_json fields (a)`)
f(`unpack_json fields (a, b, c)`)
f(`unpack_json if (a:x)`) f(`unpack_json if (a:x)`)
f(`unpack_json from x`) f(`unpack_json from x`)
f(`unpack_json from x if (a:x)`) f(`unpack_json from x fields (a, b)`)
f(`unpack_json if (a:x) from x fields (a, b)`)
f(`unpack_json from x result_prefix abc`) f(`unpack_json from x result_prefix abc`)
f(`unpack_json from x result_prefix abc if (a:x)`) f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc`)
f(`unpack_json result_prefix abc`) f(`unpack_json result_prefix abc`)
f(`unpack_json result_prefix abc if (a:x)`) f(`unpack_json if (a:x) fields (a, b) result_prefix abc`)
} }
func TestParsePipeUnpackJSONFailure(t *testing.T) { func TestParsePipeUnpackJSONFailure(t *testing.T) {
@ -32,6 +35,8 @@ func TestParsePipeUnpackJSONFailure(t *testing.T) {
f(`unpack_json foo`) f(`unpack_json foo`)
f(`unpack_json if`) f(`unpack_json if`)
f(`unpack_json fields`)
f(`unpack_json fields x`)
f(`unpack_json if (x:y) foobar`) f(`unpack_json if (x:y) foobar`)
f(`unpack_json from`) f(`unpack_json from`)
f(`unpack_json from x y`) f(`unpack_json from x y`)
@ -50,6 +55,19 @@ func TestPipeUnpackJSON(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected) expectPipeResults(t, pipeStr, rows, rowsExpected)
} }
// unpack only the requested fields
f("unpack_json fields (foo, b)", [][]Field{
{
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar","z":"q","a":"b"}`},
{"foo", "bar"},
{"b", ""},
},
})
// single row, unpack from _msg // single row, unpack from _msg
f("unpack_json", [][]Field{ f("unpack_json", [][]Field{
{ {
@ -194,7 +212,7 @@ func TestPipeUnpackJSON(t *testing.T) {
}) })
// multiple rows with distinct number of fields with result_prefix and if condition // multiple rows with distinct number of fields with result_prefix and if condition
f("unpack_json from x result_prefix qwe_ if (y:abc)", [][]Field{ f("unpack_json if (y:abc) from x result_prefix qwe_", [][]Field{
{ {
{"x", `{"foo":"bar","baz":"xyz"}`}, {"x", `{"foo":"bar","baz":"xyz"}`},
{"y", `abc`}, {"y", `abc`},
@ -447,25 +465,25 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) {
// all the needed fields // all the needed fields
f("unpack_json from x", "*", "", "*", "") f("unpack_json from x", "*", "", "*", "")
f("unpack_json from x if (y:z)", "*", "", "*", "") f("unpack_json if (y:z) from x", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src // all the needed fields, unneeded fields do not intersect with src
f("unpack_json from x", "*", "f1,f2", "*", "f1,f2") f("unpack_json from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_json from x if (y:z)", "*", "f1,f2", "*", "f1,f2") f("unpack_json if (y:z) from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_json from x if (f1:z)", "*", "f1,f2", "*", "f2") f("unpack_json if (f1:z) from x", "*", "f1,f2", "*", "f2")
// all the needed fields, unneeded fields intersect with src // all the needed fields, unneeded fields intersect with src
f("unpack_json from x", "*", "f2,x", "*", "f2") f("unpack_json from x", "*", "f2,x", "*", "f2")
f("unpack_json from x if (y:z)", "*", "f2,x", "*", "f2") f("unpack_json if (y:z) from x", "*", "f2,x", "*", "f2")
f("unpack_json from x if (f2:z)", "*", "f1,f2,x", "*", "f1") f("unpack_json if (f2:z) from x", "*", "f1,f2,x", "*", "f1")
// needed fields do not intersect with src // needed fields do not intersect with src
f("unpack_json from x", "f1,f2", "", "f1,f2,x", "") f("unpack_json from x", "f1,f2", "", "f1,f2,x", "")
f("unpack_json from x if (y:z)", "f1,f2", "", "f1,f2,x,y", "") f("unpack_json if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "")
f("unpack_json from x if (f1:z)", "f1,f2", "", "f1,f2,x", "") f("unpack_json if (f1:z) from x", "f1,f2", "", "f1,f2,x", "")
// needed fields intersect with src // needed fields intersect with src
f("unpack_json from x", "f2,x", "", "f2,x", "") f("unpack_json from x", "f2,x", "", "f2,x", "")
f("unpack_json from x if (y:z)", "f2,x", "", "f2,x,y", "") f("unpack_json if (y:z) from x", "f2,x", "", "f2,x,y", "")
f("unpack_json from x if (f2:z y:qwe)", "f2,x", "", "f2,x,y", "") f("unpack_json if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "")
} }

View file

@ -121,7 +121,10 @@ func statsFuncFieldsToString(fields []string) string {
if len(fields) == 0 { if len(fields) == 0 {
return "*" return "*"
} }
return fieldsToString(fields)
}
func fieldsToString(fields []string) string {
a := make([]string, len(fields)) a := make([]string, len(fields))
for i, f := range fields { for i, f := range fields {
a[i] = quoteTokenIfNeeded(f) a[i] = quoteTokenIfNeeded(f)