From 8bb2ffa9b9c5a2d7239b9652a6d728d448dac4ce Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 22 May 2024 18:14:59 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 21 ++++-- lib/logstorage/pipe_unpack_json.go | 87 ++++++++++++++++++------- lib/logstorage/pipe_unpack_json_test.go | 44 +++++++++---- lib/logstorage/stats_avg.go | 3 + 4 files changed, 111 insertions(+), 44 deletions(-) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 71de7e9d7..07a8635bf 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1630,8 +1630,8 @@ See also: ### unpack_json pipe -`| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given `field_name` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -into `k1`, ... `kN` field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. +`| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +into `k1`, ... `kN` output field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1648,21 +1648,28 @@ The following query is equivalent to the previous one: _time:5m | unpack_json ``` -If you want to make sure that the unpacked JSON fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, -by adding `result_prefix "prefix_name"` to `unpack_json`. For example, the following query adds `foo_` prefix for all the unpacked fields -form [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): +If only some fields must be extracted from JSON, then they can be enumerated inside `fields (...)`. For example, the following query unpacks only `foo` and `bar` +fields from JSON value stored in `my_json` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model): ```logsql -_time:5m | unpack_json result_prefix "foo_" +_time:5m | unpack_json from my_json fields (foo, bar) ``` Performance tip: if you need extracting a single field from long JSON, it is faster to use [`extract` pipe](#extract-pipe). For example, the following query extracts `"ip"` field from JSON -stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): +stored in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) at the maximum speed: ``` _time:5m | extract '"ip":' ``` +If you want to make sure that the unpacked JSON fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, +by adding `result_prefix "prefix_name"` to `unpack_json`. For example, the following query adds `foo_` prefix for all the unpacked fields +form `foo`: + +```logsql +_time:5m | unpack_json from foo result_prefix "foo_" +``` + See also: - [Conditional `unpack_json`](#conditional-unpack_json) diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 3b58f9a41..84b574693 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "slices" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) @@ -13,6 +14,11 @@ type pipeUnpackJSON struct { // fromField is the field to unpack json fields from fromField string + // fields is an optional list of fields to extract from json. + // + // if it is empty, then all the fields are extracted. + fields []string + // resultPrefix is prefix to add to unpacked field names resultPrefix string @@ -22,15 +28,18 @@ type pipeUnpackJSON struct { func (pu *pipeUnpackJSON) String() string { s := "unpack_json" + if pu.iff != nil { + s += " " + pu.iff.String() + } if !isMsgFieldName(pu.fromField) { s += " from " + quoteTokenIfNeeded(pu.fromField) } + if len(pu.fields) > 0 { + s += " fields (" + fieldsToString(pu.fields) + ")" + } if pu.resultPrefix != "" { s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } - if pu.iff != nil { - s += " " + pu.iff.String() - } return s } @@ -49,21 +58,35 @@ func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fields } func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff) -} - -func unpackJSON(uctx *fieldsUnpackerContext, s string) { - if len(s) == 0 || s[0] != '{' { - // This isn't a JSON object - return - } - p := GetJSONParser() - if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil { - for _, f := range p.Fields { - uctx.addField(f.Name, f.Value) + unpackJSON := func(uctx *fieldsUnpackerContext, s string) { + if len(s) == 0 || s[0] != '{' { + // This isn't a JSON object + return } + p := GetJSONParser() + if err := p.ParseLogMessage(bytesutil.ToUnsafeBytes(s)); err == nil { + if len(pu.fields) == 0 { + for _, f := range p.Fields { + uctx.addField(f.Name, f.Value) + } + } else { + for _, fieldName := range pu.fields { + found := false + for _, f := range p.Fields { + if f.Name == fieldName { + uctx.addField(f.Name, f.Value) + found = true + } + } + if !found { + uctx.addField(fieldName, "") + } + } + } + } + PutJSONParser(p) } - PutJSONParser(p) + return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.iff) } func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { @@ -72,6 +95,15 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { } lex.nextToken() + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + fromField := "_msg" if lex.isKeyword("from") { lex.nextToken() @@ -82,6 +114,19 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { fromField = f } + var fields []string + if lex.isKeyword("fields") { + lex.nextToken() + fs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'fields': %w", err) + } + fields = fs + if slices.Contains(fields, "*") { + fields = nil + } + } + resultPrefix := "" if lex.isKeyword("result_prefix") { lex.nextToken() @@ -94,15 +139,9 @@ func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { pu := &pipeUnpackJSON{ fromField: fromField, + fields: fields, resultPrefix: resultPrefix, - } - - if lex.isKeyword("if") { - iff, err := parseIfFilter(lex) - if err != nil { - return nil, err - } - pu.iff = iff + iff: iff, } return pu, nil diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index 4e7fbe2f3..a69c47036 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -15,13 +15,16 @@ func TestParsePipeUnpackJSONSuccess(t *testing.T) { } f(`unpack_json`) + f(`unpack_json fields (a)`) + f(`unpack_json fields (a, b, c)`) f(`unpack_json if (a:x)`) f(`unpack_json from x`) - f(`unpack_json from x if (a:x)`) + f(`unpack_json from x fields (a, b)`) + f(`unpack_json if (a:x) from x fields (a, b)`) f(`unpack_json from x result_prefix abc`) - f(`unpack_json from x result_prefix abc if (a:x)`) + f(`unpack_json if (a:x) from x fields (a, b) result_prefix abc`) f(`unpack_json result_prefix abc`) - f(`unpack_json result_prefix abc if (a:x)`) + f(`unpack_json if (a:x) fields (a, b) result_prefix abc`) } func TestParsePipeUnpackJSONFailure(t *testing.T) { @@ -32,6 +35,8 @@ func TestParsePipeUnpackJSONFailure(t *testing.T) { f(`unpack_json foo`) f(`unpack_json if`) + f(`unpack_json fields`) + f(`unpack_json fields x`) f(`unpack_json if (x:y) foobar`) f(`unpack_json from`) f(`unpack_json from x y`) @@ -50,6 +55,19 @@ func TestPipeUnpackJSON(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // unpack only the requested fields + f("unpack_json fields (foo, b)", [][]Field{ + { + {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, + {"foo", "bar"}, + {"b", ""}, + }, + }) + // single row, unpack from _msg f("unpack_json", [][]Field{ { @@ -194,7 +212,7 @@ func TestPipeUnpackJSON(t *testing.T) { }) // multiple rows with distinct number of fields with result_prefix and if condition - f("unpack_json from x result_prefix qwe_ if (y:abc)", [][]Field{ + f("unpack_json if (y:abc) from x result_prefix qwe_", [][]Field{ { {"x", `{"foo":"bar","baz":"xyz"}`}, {"y", `abc`}, @@ -447,25 +465,25 @@ func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { // all the needed fields f("unpack_json from x", "*", "", "*", "") - f("unpack_json from x if (y:z)", "*", "", "*", "") + f("unpack_json if (y:z) from x", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src f("unpack_json from x", "*", "f1,f2", "*", "f1,f2") - f("unpack_json from x if (y:z)", "*", "f1,f2", "*", "f1,f2") - f("unpack_json from x if (f1:z)", "*", "f1,f2", "*", "f2") + f("unpack_json if (y:z) from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_json if (f1:z) from x", "*", "f1,f2", "*", "f2") // all the needed fields, unneeded fields intersect with src f("unpack_json from x", "*", "f2,x", "*", "f2") - f("unpack_json from x if (y:z)", "*", "f2,x", "*", "f2") - f("unpack_json from x if (f2:z)", "*", "f1,f2,x", "*", "f1") + f("unpack_json if (y:z) from x", "*", "f2,x", "*", "f2") + f("unpack_json if (f2:z) from x", "*", "f1,f2,x", "*", "f1") // needed fields do not intersect with src f("unpack_json from x", "f1,f2", "", "f1,f2,x", "") - f("unpack_json from x if (y:z)", "f1,f2", "", "f1,f2,x,y", "") - f("unpack_json from x if (f1:z)", "f1,f2", "", "f1,f2,x", "") + f("unpack_json if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "") + f("unpack_json if (f1:z) from x", "f1,f2", "", "f1,f2,x", "") // needed fields intersect with src f("unpack_json from x", "f2,x", "", "f2,x", "") - f("unpack_json from x if (y:z)", "f2,x", "", "f2,x,y", "") - f("unpack_json from x if (f2:z y:qwe)", "f2,x", "", "f2,x,y", "") + f("unpack_json if (y:z) from x", "f2,x", "", "f2,x,y", "") + f("unpack_json if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "") } diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go index b300ee7fb..9fcb6b1f5 100644 --- a/lib/logstorage/stats_avg.go +++ b/lib/logstorage/stats_avg.go @@ -121,7 +121,10 @@ func statsFuncFieldsToString(fields []string) string { if len(fields) == 0 { return "*" } + return fieldsToString(fields) +} +func fieldsToString(fields []string) string { a := make([]string, len(fields)) for i, f := range fields { a[i] = quoteTokenIfNeeded(f)