diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 182d31d9a..4e645be87 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe), which packs all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a JSON object and stores it into the given field. * FEATURE: add [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe), which can be used for unrolling JSON arrays stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: add [`replace_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe), which allows updating [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. * FEATURE: improve performance for [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) and [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) pipes. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 359c3e4c2..a34ed69fe 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1080,6 +1080,7 @@ LogsQL supports the following pipes: - [`format`](#format-pipe) formats ouptut field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`limit`](#limit-pipe) limits the number selected logs. - [`offset`](#offset-pipe) skips the given number of selected logs. +- [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. @@ -1428,6 +1429,37 @@ See also: - [`limit` pipe](#limit-pipe) - [`sort` pipe](#sort-pipe) +### pack_json pipe + +`| pack_json as field_name` [pipe](#pipe) packs all [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object +and stores its as a string in the given `field_name`. + +For example, the following query packs all the fields into JSON object and stores it into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +for logs over the last 5 minutes: + +```logsql +_time:5m | pack_json as _msg +``` + +The `as _msg` part can be omitted if packed JSON object is stored into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +The following query is equivalent to the previous one: + +```logsql +_time:5m | pack_json +``` + +The `pack_json` doesn't touch other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_json` pipe. For example, the following query +leaves only the `foo` label with the original log fields packed into JSON: + +```logsql +_time:5m | pack_json as foo | fields foo +``` + +See also: + +- [`unpack_json` pipe](#unpack_json-pipe) + + ### rename pipe If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be renamed, then `| rename src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. @@ -1882,6 +1914,7 @@ See also: - [`unpack_logfmt` pipe](#unpack_logfmt-pipe) - [`extract` pipe](#extract-pipe) - [`unroll` pipe](#unroll-pipe) +- [`pack_json` pipe](#pack_json-pipe) #### Conditional unpack_json @@ -1994,6 +2027,16 @@ See also: - [`uniq_values` stats function](#uniq_values-stats) - [`values` stats function](#values-stats) +#### Conditional unroll + +If the [`unroll` pipe](#unpack_logfmt-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then add `if ()` after `unroll`. +The `` can contain arbitrary [filters](#filters). For example, the following query unrolls `value` field only if `value_type` field equals to `json_array`: + +```logsql +_time:5m | unroll if (value_type:="json_array") (value) +``` + ## stats pipe functions LogsQL supports the following functions for [`stats` pipe](#stats-pipe): diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 54f77e137..0806063b9 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -146,6 +146,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err) } return ps, nil + case lex.isKeyword("pack_json"): + pp, err := parsePackJSON(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err) + } + return pp, nil case lex.isKeyword("rename", "mv"): pr, err := parsePipeRename(lex) if err != nil { diff --git a/lib/logstorage/pipe_pack_json.go b/lib/logstorage/pipe_pack_json.go new file mode 100644 index 000000000..fb6988a1d --- /dev/null +++ b/lib/logstorage/pipe_pack_json.go @@ -0,0 +1,140 @@ +package logstorage + +import ( + "fmt" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// pipePackJSON processes '| pack_json ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe +type pipePackJSON struct { + resultField string +} + +func (pp *pipePackJSON) String() string { + s := "pack_json" + if !isMsgFieldName(pp.resultField) { + s += " as " + quoteTokenIfNeeded(pp.resultField) + } + return s +} + +func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + if !unneededFields.contains(pp.resultField) { + unneededFields.reset() + } + } else { + if neededFields.contains(pp.resultField) { + neededFields.add("*") + } + } +} + +func (pp *pipePackJSON) optimize() { + // nothing to do +} + +func (pp *pipePackJSON) hasFilterInWithQuery() bool { + return false +} + +func (pp *pipePackJSON) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pp, nil +} + +func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + return &pipePackJSONProcessor{ + pp: pp, + ppBase: ppBase, + + shards: make([]pipePackJSONProcessorShard, workersCount), + } +} + +type pipePackJSONProcessor struct { + pp *pipePackJSON + ppBase pipeProcessor + + shards []pipePackJSONProcessorShard +} + +type pipePackJSONProcessorShard struct { + pipePackJSONProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipePackJSONProcessorShardNopad{})%128]byte +} + +type pipePackJSONProcessorShardNopad struct { + rc resultColumn + + buf []byte + fields []Field +} + +func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &ppp.shards[workerID] + + shard.rc.name = ppp.pp.resultField + + cs := br.getColumns() + + buf := shard.buf[:0] + fields := shard.fields + for rowIdx := range br.timestamps { + fields = fields[:0] + for _, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: c.name, + Value: v, + }) + } + + bufLen := len(buf) + buf = marshalFieldsToJSON(buf, fields) + v := bytesutil.ToUnsafeString(buf[bufLen:]) + shard.rc.addValue(v) + } + + br.addResultColumn(&shard.rc) + ppp.ppBase.writeBlock(workerID, br) + + shard.rc.reset() +} + +func (ppp *pipePackJSONProcessor) flush() error { + return nil +} + +func parsePackJSON(lex *lexer) (*pipePackJSON, error) { + if !lex.isKeyword("pack_json") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "pack_json") + } + lex.nextToken() + + // parse optional 'as ...` part + resultField := "_msg" + if lex.isKeyword("as") { + lex.nextToken() + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err) + } + resultField = field + } + + pp := &pipePackJSON{ + resultField: resultField, + } + + return pp, nil +} diff --git a/lib/logstorage/pipe_pack_json_test.go b/lib/logstorage/pipe_pack_json_test.go new file mode 100644 index 000000000..baf137447 --- /dev/null +++ b/lib/logstorage/pipe_pack_json_test.go @@ -0,0 +1,101 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipePackJSONSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`pack_json`) + f(`pack_json as x`) +} + +func TestParsePipePackJSONFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`pack_json foo bar`) +} + +func TestPipePackJSON(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // pack into _msg + f(`pack_json`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `{"_msg":"x","foo":"abc","bar":"cde"}`}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"_msg", `{"a":"b","c":"d"}`}, + {"a", "b"}, + {"c", "d"}, + }, + }) + + // pack into other field + f(`pack_json as a`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `x`}, + {"foo", `abc`}, + {"bar", `cde`}, + {"a", `{"_msg":"x","foo":"abc","bar":"cde"}`}, + }, + { + {"a", `{"a":"b","c":"d"}`}, + {"c", "d"}, + }, + }) +} + +func TestPipePackJSONUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`pack_json as x`, "*", "", "*", "") + + // unneeded fields do not intersect with output + f(`pack_json as x`, "*", "f1,f2", "*", "") + + // unneeded fields intersect with output + f(`pack_json as f1`, "*", "f1,f2", "*", "f1,f2") + + // needed fields do not intersect with output + f(`pack_json f1`, "x,y", "", "x,y", "") + + // needed fields intersect with output + f(`pack_json as f2`, "f2,y", "", "*", "") +}