diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 07a8635bf..20791f36a 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1679,17 +1679,17 @@ See also: #### Conditional unpack_json If the [`unpack_json` pipe](#unpack_json-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), -then add `if ()` to the end of `unpack_json ...`. -The `` can contain arbitrary [filters](#filters). For example, the following query unpacks JSON fields only if `ip` field in the current log entry isn't set or empty: +then add `if ()` after `unpack_json`. +The `` can contain arbitrary [filters](#filters). For example, the following query unpacks JSON fields from `foo` field only if `ip` field in the current log entry isn't set or empty: ```logsql -_time:5m | unpack_json if (ip:"") +_time:5m | unpack_json if (ip:"") from foo ``` ### unpack_logfmt pipe `| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields -from the given `field_name` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names +from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. For example, the following query unpacks [logfmt](https://brandur.org/logfmt) fields from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) @@ -1706,12 +1706,11 @@ The following query is equivalent to the previous one: _time:5m | unpack_logfmt ``` -If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, -by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields -from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): +If only some fields must be unpacked from logfmt, then they can be enumerated inside `fields (...)`. For example, the following query extracts only `foo` and `bar` fields +from logfmt stored in the `my_logfmt` field: ```logsql -_time:5m | unpack_logfmt result_prefix "foo_" +_time:5m | unpack_logfmt from my_logfmt fields (foo, bar) ``` Performance tip: if you need extracting a single field from long [logfmt](https://brandur.org/logfmt) line, it is faster to use [`extract` pipe](#extract-pipe). @@ -1722,6 +1721,14 @@ in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#mes _time:5m | extract ' ip=' ``` +If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, +by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields +from `foo` field: + +```logsql +_time:5m | unpack_logfmt from foo result_prefix "foo_" +``` + See also: - [Conditional unpack_logfmt](#conditional-unpack_logfmt) @@ -1731,11 +1738,12 @@ See also: #### Conditional unpack_logfmt If the [`unpack_logfmt` pipe](#unpack_logfmt-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), -then add `if ()` to the end of `unpack_logfmt ...`. -The `` can contain arbitrary [filters](#filters). For example, the following query unpacks logfmt fields only if `ip` field in the current log entry isn't set or empty: +then add `if ()` after `unpack_logfmt`. +The `` can contain arbitrary [filters](#filters). For example, the following query unpacks logfmt fields from `foo` field +only if `ip` field in the current log entry isn't set or empty: ```logsql -_time:5m | unpack_logfmt if (ip:"") +_time:5m | unpack_logfmt if (ip:"") from foo ``` ## stats pipe functions diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 84b574693..dd6cdab3e 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -71,16 +71,12 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, } } else { for _, fieldName := range pu.fields { - found := false for _, f := range p.Fields { if f.Name == fieldName { uctx.addField(f.Name, f.Value) - found = true + break } } - if !found { - uctx.addField(fieldName, "") - } } } } diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index a69c47036..d0c06d01d 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -64,7 +64,6 @@ func TestPipeUnpackJSON(t *testing.T) { { {"_msg", `{"foo":"bar","z":"q","a":"b"}`}, {"foo", "bar"}, - {"b", ""}, }, }) diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index d371b011f..984df2498 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "slices" "strings" ) @@ -12,6 +13,11 @@ type pipeUnpackLogfmt struct { // fromField is the field to unpack logfmt fields from fromField string + // fields is an optional list of fields to extract from logfmt. + // + // if it is empty, then all the fields are extracted. + fields []string + // resultPrefix is prefix to add to unpacked field names resultPrefix string @@ -21,15 +27,18 @@ type pipeUnpackLogfmt struct { func (pu *pipeUnpackLogfmt) String() string { s := "unpack_logfmt" + if pu.iff != nil { + s += " " + pu.iff.String() + } if !isMsgFieldName(pu.fromField) { s += " from " + quoteTokenIfNeeded(pu.fromField) } + if len(pu.fields) > 0 { + s += " fields (" + fieldsToString(pu.fields) + ")" + } if pu.resultPrefix != "" { s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } - if pu.iff != nil { - s += " " + pu.iff.String() - } return s } @@ -48,46 +57,53 @@ func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fiel } func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) -} - -func unpackLogfmt(uctx *fieldsUnpackerContext, s string) { - for { - // Search for field name - n := strings.IndexByte(s, '=') - if n < 0 { - // field name couldn't be read - return - } - - name := strings.TrimSpace(s[:n]) - s = s[n+1:] - if len(s) == 0 { - uctx.addField(name, "") - } - - // Search for field value - value, nOffset := tryUnquoteString(s) - if nOffset >= 0 { + addField := func(uctx *fieldsUnpackerContext, name, value string) { + if len(pu.fields) == 0 || slices.Contains(pu.fields, name) { uctx.addField(name, value) - s = s[nOffset:] - if len(s) == 0 { - return - } - if s[0] != ' ' { - return - } - s = s[1:] - } else { - n := strings.IndexByte(s, ' ') - if n < 0 { - uctx.addField(name, s) - return - } - uctx.addField(name, s[:n]) - s = s[n+1:] } } + + unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) { + for { + // Search for field name + n := strings.IndexByte(s, '=') + if n < 0 { + // field name couldn't be read + return + } + + name := strings.TrimSpace(s[:n]) + s = s[n+1:] + if len(s) == 0 { + addField(uctx, name, "") + } + + // Search for field value + value, nOffset := tryUnquoteString(s) + if nOffset >= 0 { + addField(uctx, name, value) + s = s[nOffset:] + if len(s) == 0 { + return + } + if s[0] != ' ' { + return + } + s = s[1:] + } else { + n := strings.IndexByte(s, ' ') + if n < 0 { + addField(uctx, name, s) + return + } + addField(uctx, name, s[:n]) + s = s[n+1:] + } + } + } + + return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) + } func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { @@ -96,6 +112,15 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { } lex.nextToken() + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + fromField := "_msg" if lex.isKeyword("from") { lex.nextToken() @@ -106,6 +131,19 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { fromField = f } + var fields []string + if lex.isKeyword("fields") { + lex.nextToken() + fs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'fields': %w", err) + } + fields = fs + if slices.Contains(fields, "*") { + fields = nil + } + } + resultPrefix := "" if lex.isKeyword("result_prefix") { lex.nextToken() @@ -118,15 +156,9 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { pu := &pipeUnpackLogfmt{ fromField: fromField, + fields: fields, resultPrefix: resultPrefix, - } - - if lex.isKeyword("if") { - iff, err := parseIfFilter(lex) - if err != nil { - return nil, err - } - pu.iff = iff + iff: iff, } return pu, nil diff --git a/lib/logstorage/pipe_unpack_logfmt_test.go b/lib/logstorage/pipe_unpack_logfmt_test.go index d76575e31..ff1711d69 100644 --- a/lib/logstorage/pipe_unpack_logfmt_test.go +++ b/lib/logstorage/pipe_unpack_logfmt_test.go @@ -11,13 +11,19 @@ func TestParsePipeUnpackLogfmtSuccess(t *testing.T) { } f(`unpack_logfmt`) + f(`unpack_logfmt fields (a, b)`) f(`unpack_logfmt if (a:x)`) + f(`unpack_logfmt if (a:x) fields (a, b)`) f(`unpack_logfmt from x`) - f(`unpack_logfmt from x if (a:x)`) + f(`unpack_logfmt from x fields (a, b)`) + f(`unpack_logfmt if (a:x) from x`) + f(`unpack_logfmt if (a:x) from x fields (a, b)`) f(`unpack_logfmt from x result_prefix abc`) - f(`unpack_logfmt from x result_prefix abc if (a:x)`) + f(`unpack_logfmt if (a:x) from x result_prefix abc`) + f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc`) f(`unpack_logfmt result_prefix abc`) - f(`unpack_logfmt result_prefix abc if (a:x)`) + f(`unpack_logfmt if (a:x) result_prefix abc`) + f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc`) } func TestParsePipeUnpackLogfmtFailure(t *testing.T) { @@ -27,6 +33,7 @@ func TestParsePipeUnpackLogfmtFailure(t *testing.T) { } f(`unpack_logfmt foo`) + f(`unpack_logfmt fields`) f(`unpack_logfmt if`) f(`unpack_logfmt if (x:y) foobar`) f(`unpack_logfmt from`) @@ -46,6 +53,19 @@ func TestPipeUnpackLogfmt(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // unpack a subset of fields + f("unpack_logfmt fields (foo, a, b)", [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"foo", "bar"}, + {"a", "b"}, + }, + }) + // single row, unpack from _msg f("unpack_logfmt", [][]Field{ { @@ -184,7 +204,7 @@ func TestPipeUnpackLogfmt(t *testing.T) { }) // multiple rows with distinct number of fields, with result_prefix and if condition - f("unpack_logfmt from x result_prefix qwe_ if (y:abc)", [][]Field{ + f("unpack_logfmt if (y:abc) from x result_prefix qwe_", [][]Field{ { {"x", `foo=bar baz=xyz`}, {"y", `abc`}, @@ -222,25 +242,25 @@ func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) { // all the needed fields f("unpack_logfmt from x", "*", "", "*", "") - f("unpack_logfmt from x if (y:z)", "*", "", "*", "") + f("unpack_logfmt if (y:z) from x", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src f("unpack_logfmt from x", "*", "f1,f2", "*", "f1,f2") - f("unpack_logfmt from x if (y:z)", "*", "f1,f2", "*", "f1,f2") - f("unpack_logfmt from x if (f1:z)", "*", "f1,f2", "*", "f2") + f("unpack_logfmt if (y:z) from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_logfmt if (f1:z) from x", "*", "f1,f2", "*", "f2") // all the needed fields, unneeded fields intersect with src f("unpack_logfmt from x", "*", "f2,x", "*", "f2") - f("unpack_logfmt from x if (y:z)", "*", "f2,x", "*", "f2") - f("unpack_logfmt from x if (f2:z)", "*", "f1,f2,x", "*", "f1") + f("unpack_logfmt if (y:z) from x", "*", "f2,x", "*", "f2") + f("unpack_logfmt if (f2:z) from x", "*", "f1,f2,x", "*", "f1") // needed fields do not intersect with src f("unpack_logfmt from x", "f1,f2", "", "f1,f2,x", "") - f("unpack_logfmt from x if (y:z)", "f1,f2", "", "f1,f2,x,y", "") - f("unpack_logfmt from x if (f1:z)", "f1,f2", "", "f1,f2,x", "") + f("unpack_logfmt if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "") + f("unpack_logfmt if (f1:z) from x", "f1,f2", "", "f1,f2,x", "") // needed fields intersect with src f("unpack_logfmt from x", "f2,x", "", "f2,x", "") - f("unpack_logfmt from x if (y:z)", "f2,x", "", "f2,x,y", "") - f("unpack_logfmt from x if (f2:z y:qwe)", "f2,x", "", "f2,x,y", "") + f("unpack_logfmt if (y:z) from x", "f2,x", "", "f2,x,y", "") + f("unpack_logfmt if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "") }