diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 66418ffe4..f708ef8f8 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: allow specifying fields, which must be packed into JSON in [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe) via `pack_json fields (field1, ..., fieldN)` syntax. + ## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs) Released at 2024-05-28 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 72317d1a4..ba80f23c2 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1560,7 +1560,7 @@ See also: ### math pipe `| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -For example, the following query divides `duration_msecs` field value by 1000, then rounds them to integer and stores the result in the `duration_secs` field: +For example, the following query divides `duration_msecs` field value by 1000, then rounds it to integer and stores the result in the `duration_secs` field: ```logsql _time:5m | math round(duration_msecs / 1000) as duration_secs @@ -1639,7 +1639,14 @@ The following query is equivalent to the previous one: _time:5m | pack_json ``` -The `pack_json` doesn't touch other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_json` pipe. For example, the following query +If only a subset of labels must be packed into JSON, then it must be listed inside `fields (...)` after `pack_json`. For example, the following query builds JSON with `foo` and `bar` fields +only and stores the result in `baz` field: + +```logsql +_time:5m | pack_json fields (foo, bar) as baz +``` + +The `pack_json` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_json` pipe. For example, the following query leaves only the `foo` label with the original log fields packed into JSON: ```logsql diff --git a/lib/logstorage/pipe_pack_json.go b/lib/logstorage/pipe_pack_json.go index d320fb6fb..af44c8750 100644 --- a/lib/logstorage/pipe_pack_json.go +++ b/lib/logstorage/pipe_pack_json.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "slices" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -12,10 +13,15 @@ import ( // See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe type pipePackJSON struct { resultField string + + fields []string } func (pp *pipePackJSON) String() string { s := "pack_json" + if len(pp.fields) > 0 { + s += " fields (" + fieldsToString(pp.fields) + ")" + } if !isMsgFieldName(pp.resultField) { s += " as " + quoteTokenIfNeeded(pp.resultField) } @@ -25,11 +31,19 @@ func (pp *pipePackJSON) String() string { func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { if !unneededFields.contains(pp.resultField) { - unneededFields.reset() + if len(pp.fields) > 0 { + unneededFields.removeFields(pp.fields) + } else { + unneededFields.reset() + } } } else { if neededFields.contains(pp.resultField) { - neededFields.add("*") + if len(pp.fields) > 0 { + neededFields.addFields(pp.fields) + } else { + neededFields.add("*") + } } } } @@ -74,6 +88,8 @@ type pipePackJSONProcessorShardNopad struct { buf []byte fields []Field + + cs []*blockResultColumn } func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { @@ -85,7 +101,17 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { shard.rc.name = ppp.pp.resultField - cs := br.getColumns() + cs := shard.cs[:0] + if len(ppp.pp.fields) == 0 { + csAll := br.getColumns() + cs = append(cs, csAll...) + } else { + for _, f := range ppp.pp.fields { + c := br.getColumnByName(f) + cs = append(cs, c) + } + } + shard.cs = cs buf := shard.buf[:0] fields := shard.fields @@ -122,10 +148,25 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) { } lex.nextToken() + var fields []string + if lex.isKeyword("fields") { + lex.nextToken() + fs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse fields: %w", err) + } + if slices.Contains(fs, "*") { + fs = nil + } + fields = fs + } + // parse optional 'as ...` part resultField := "_msg" if lex.isKeyword("as") { lex.nextToken() + } + if !lex.isKeyword("|", ")", "") { field, err := parseFieldName(lex) if err != nil { return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err) @@ -135,6 +176,7 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) { pp := &pipePackJSON{ resultField: resultField, + fields: fields, } return pp, nil diff --git a/lib/logstorage/pipe_pack_json_test.go b/lib/logstorage/pipe_pack_json_test.go index baf137447..b57f150c8 100644 --- a/lib/logstorage/pipe_pack_json_test.go +++ b/lib/logstorage/pipe_pack_json_test.go @@ -12,6 +12,8 @@ func TestParsePipePackJSONSuccess(t *testing.T) { f(`pack_json`) f(`pack_json as x`) + f(`pack_json fields (a, b)`) + f(`pack_json fields (a, b) as x`) } func TestParsePipePackJSONFailure(t *testing.T) { @@ -21,6 +23,7 @@ func TestParsePipePackJSONFailure(t *testing.T) { } f(`pack_json foo bar`) + f(`pack_json fields`) } func TestPipePackJSON(t *testing.T) { @@ -76,6 +79,30 @@ func TestPipePackJSON(t *testing.T) { {"c", "d"}, }, }) + + // pack only the needed fields + f(`pack_json fields (foo, baz) a`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `x`}, + {"foo", `abc`}, + {"bar", `cde`}, + {"a", `{"foo":"abc","baz":""}`}, + }, + { + {"a", `{"foo":"","baz":""}`}, + {"c", "d"}, + }, + }) } func TestPipePackJSONUpdateNeededFields(t *testing.T) {