diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 4e35b4e22..182d31d9a 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe), which can be used for unrolling JSON arrays stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: add [`replace_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#replace_regexp-pipe), which allows updating [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. * FEATURE: improve performance for [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) and [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) pipes. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 8a0cf0d52..359c3e4c2 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1088,6 +1088,7 @@ LogsQL supports the following pipes: - [`uniq`](#uniq-pipe) returns unique log entires. - [`unpack_json`](#unpack_json-pipe) unpacks JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). ### copy pipe @@ -1880,6 +1881,7 @@ See also: - [Conditional `unpack_json`](#conditional-unpack_json) - [`unpack_logfmt` pipe](#unpack_logfmt-pipe) - [`extract` pipe](#extract-pipe) +- [`unroll` pipe](#unroll-pipe) #### Conditional unpack_json @@ -1974,6 +1976,24 @@ only if `ip` field in the current log entry isn't set or empty: _time:5m | unpack_logfmt if (ip:"") from foo ``` +### unroll pipe + +`| unroll by (field1, ..., fieldN)` [pipe](#pipes) can be used for unrolling JSON arrays from `field1`, `fieldN` +[log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into separate rows. + +For example, the following query unrolls `timestamp` and `value` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) from logs for the last 5 minutes: + +```logsql +_time:5m | unroll (timestamp, value) +``` + +See also: + +- [`unpack_json` pipe](#unpack_json-pipe) +- [`extract` pipe](#extract-pipe) +- [`uniq_values` stats function](#uniq_values-stats) +- [`values` stats function](#values-stats) + ## stats pipe functions LogsQL supports the following functions for [`stats` pipe](#stats-pipe): @@ -2278,6 +2298,8 @@ over logs for the last 5 minutes: _time:5m | stats uniq_values(ip) unique_ips ``` +The returned unique ip addresses can be unrolled into distinct log entries with [`unroll` pipe](#unroll-pipe). + Every unique value is stored in memory during query execution. Big number of unique values may require a lot of memory. Sometimes it is enough to return only a subset of unique values. In this case add `limit N` after `uniq_values(...)` in order to limit the number of returned unique values to `N`, while limiting the maximum memory usage. @@ -2310,6 +2332,8 @@ over logs for the last 5 minutes: _time:5m | stats values(ip) ips ``` +The returned ip addresses can be unrolled into distinct log entries with [`unroll` pipe](#unroll-pipe). + See also: - [`uniq_values`](#uniq_values-stats) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index dbd852ba8..54f77e137 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -194,6 +194,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err) } return pu, nil + case lex.isKeyword("unroll"): + pu, err := parsePipeUnroll(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'unroll' pipe: %w", err) + } + return pu, nil default: return nil, fmt.Errorf("unexpected pipe %q", lex.token) } diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go new file mode 100644 index 000000000..0e803d264 --- /dev/null +++ b/lib/logstorage/pipe_unroll.go @@ -0,0 +1,284 @@ +package logstorage + +import ( + "fmt" + "slices" + "unsafe" + + "github.com/valyala/fastjson" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeUnroll processes '| unroll ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe +type pipeUnroll struct { + // fields to unroll + fields []string + + // iff is an optional filter for skipping the unroll + iff *ifFilter +} + +func (pu *pipeUnroll) String() string { + s := "unroll" + if pu.iff != nil { + s += " " + pu.iff.String() + } + s += " by (" + fieldNamesString(pu.fields) + ")" + return s +} + +func (pu *pipeUnroll) optimize() { + pu.iff.optimizeFilterIn() +} + +func (pu *pipeUnroll) hasFilterInWithQuery() bool { + return pu.iff.hasFilterInWithQuery() +} + +func (pu *pipeUnroll) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + puNew := *pu + puNew.iff = iffNew + return &puNew, nil +} + +func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + unneededFieldsCount := 0 + for _, f := range pu.fields { + if unneededFields.contains(f) { + unneededFieldsCount++ + } + } + if unneededFieldsCount < len(pu.fields) && pu.iff != nil { + unneededFields.removeFields(pu.iff.neededFields) + } + } else { + needIfFields := false + for _, f := range pu.fields { + if neededFields.contains(f) { + needIfFields = true + } + } + if needIfFields && pu.iff != nil { + neededFields.addFields(pu.iff.neededFields) + } + } +} + +func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + return &pipeUnrollProcessor{ + pu: pu, + ppBase: ppBase, + + shards: make([]pipeUnrollProcessorShard, workersCount), + } +} + +type pipeUnrollProcessor struct { + pu *pipeUnroll + ppBase pipeProcessor + + shards []pipeUnrollProcessorShard +} + +type pipeUnrollProcessorShard struct { + pipeUnrollProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeUnrollProcessorShardNopad{})%128]byte +} + +type pipeUnrollProcessorShardNopad struct { + bm bitmap + + wctx pipeUnpackWriteContext + a arena + + columnValues [][]string + unrolledValues [][]string + valuesBuf []string + fields []Field +} + +func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + pu := pup.pu + shard := &pup.shards[workerID] + shard.wctx.init(workerID, pup.ppBase, false, false, br) + + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + if iff := pu.iff; iff != nil { + iff.f.applyToBlockResult(br, bm) + if bm.isZero() { + pup.ppBase.writeBlock(workerID, br) + return + } + } + + shard.columnValues = slicesutil.SetLength(shard.columnValues, len(pu.fields)) + columnValues := shard.columnValues + for i, f := range pu.fields { + c := br.getColumnByName(f) + columnValues[i] = c.getValues(br) + } + + fields := shard.fields + for rowIdx := range br.timestamps { + if bm.isSetBit(rowIdx) { + shard.writeUnrolledFields(br, pu.fields, columnValues, rowIdx) + } else { + fields = fields[:0] + for i, f := range pu.fields { + v := columnValues[i][rowIdx] + fields = append(fields, Field{ + Name: f, + Value: v, + }) + } + shard.wctx.writeRow(rowIdx, fields) + } + } + + shard.wctx.flush() + shard.wctx.reset() + shard.a.reset() +} + +func (shard *pipeUnrollProcessorShard) writeUnrolledFields(br *blockResult, fieldNames []string, columnValues [][]string, rowIdx int) { + // unroll values at rowIdx row + + shard.unrolledValues = slicesutil.SetLength(shard.unrolledValues, len(columnValues)) + unrolledValues := shard.unrolledValues + + valuesBuf := shard.valuesBuf[:0] + for i, values := range columnValues { + v := values[rowIdx] + valuesBufLen := len(valuesBuf) + valuesBuf = unpackJSONArray(valuesBuf, &shard.a, v) + unrolledValues[i] = valuesBuf[valuesBufLen:] + } + shard.valuesBuf = valuesBuf + + // find the number of rows across unrolled values + rows := len(unrolledValues[0]) + for _, values := range unrolledValues[1:] { + if len(values) > rows { + rows = len(values) + } + } + if rows == 0 { + // Unroll too a single row with empty unrolled values. + rows = 1 + } + + // write unrolled values to the next pipe. + fields := shard.fields + for unrollIdx := 0; unrollIdx < rows; unrollIdx++ { + fields = fields[:0] + for i, values := range unrolledValues { + v := "" + if unrollIdx < len(values) { + v = values[unrollIdx] + } + fields = append(fields, Field{ + Name: fieldNames[i], + Value: v, + }) + } + shard.wctx.writeRow(rowIdx, fields) + } +} + +func (pup *pipeUnrollProcessor) flush() error { + return nil +} + +func parsePipeUnroll(lex *lexer) (*pipeUnroll, error) { + if !lex.isKeyword("unroll") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unroll") + } + lex.nextToken() + + // parse optional if (...) + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + // parse by (...) + if lex.isKeyword("by") { + lex.nextToken() + } + + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by(...)' at 'unroll': %w", err) + } + if len(fields) == 0 { + return nil, fmt.Errorf("'by(...)' at 'unroll' must contain at least a single field") + } + if slices.Contains(fields, "*") { + return nil, fmt.Errorf("unroll by '*' isn't supported") + } + + pu := &pipeUnroll{ + fields: fields, + iff: iff, + } + + return pu, nil +} + +func unpackJSONArray(dst []string, a *arena, s string) []string { + if s == "" || s[0] != '[' { + return dst + } + + p := jspp.Get() + defer jspp.Put(p) + + jsv, err := p.Parse(s) + if err != nil { + return dst + } + jsa, err := jsv.Array() + if err != nil { + return dst + } + for _, jsv := range jsa { + if jsv.Type() == fastjson.TypeString { + sb, err := jsv.StringBytes() + if err != nil { + logger.Panicf("BUG: unexpected error returned from StringBytes(): %s", err) + } + v := a.copyBytesToString(sb) + dst = append(dst, v) + } else { + bLen := len(a.b) + a.b = jsv.MarshalTo(a.b) + v := bytesutil.ToUnsafeString(a.b[bLen:]) + dst = append(dst, v) + } + } + return dst +} + +var jspp fastjson.ParserPool diff --git a/lib/logstorage/pipe_unroll_test.go b/lib/logstorage/pipe_unroll_test.go new file mode 100644 index 000000000..f30dff562 --- /dev/null +++ b/lib/logstorage/pipe_unroll_test.go @@ -0,0 +1,261 @@ +package logstorage + +import ( + "reflect" + "testing" +) + +func TestParsePipeUnrollSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`unroll by (foo)`) + f(`unroll if (x:y) by (foo, bar)`) +} + +func TestParsePipeUrollFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`unroll`) + f(`unroll by ()`) + f(`unroll by (*)`) + f(`unroll by (f, *)`) + f(`unroll by`) + f(`unroll (`) + f(`unroll by (foo) bar`) + f(`unroll by (x) if (a:b)`) +} + +func TestPipeUnroll(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // unroll by missing field + f("unroll (x)", [][]Field{ + { + {"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`}, + {"q", "w"}, + }, + }, [][]Field{ + { + {"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`}, + {"q", "w"}, + {"x", ""}, + }, + }) + + // unroll by field without JSON array + f("unroll (q)", [][]Field{ + { + {"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`}, + {"q", "w"}, + }, + }, [][]Field{ + { + {"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`}, + {"q", ""}, + }, + }) + + // unroll by a single field + f("unroll (a)", [][]Field{ + { + {"a", `["foo",1,{"baz":"x"},[1,2],null,NaN]`}, + {"q", "w"}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "foo"}, + {"q", "w"}, + }, + { + {"a", "1"}, + {"q", "w"}, + }, + { + {"a", `{"baz":"x"}`}, + {"q", "w"}, + }, + { + {"a", "[1,2]"}, + {"q", "w"}, + }, + { + {"a", "null"}, + {"q", "w"}, + }, + { + {"a", "NaN"}, + {"q", "w"}, + }, + { + {"a", ""}, + {"c", "d"}, + }, + }) + + // unroll by multiple fields + f("unroll by (timestamp, value)", [][]Field{ + { + {"timestamp", "[1,2,3]"}, + {"value", `["foo","bar","baz"]`}, + {"other", "abc"}, + {"x", "y"}, + }, + { + {"timestamp", "[1]"}, + {"value", `["foo","bar"]`}, + }, + { + {"timestamp", "[1]"}, + {"value", `bar`}, + {"q", "w"}, + }, + }, [][]Field{ + { + {"timestamp", "1"}, + {"value", "foo"}, + {"other", "abc"}, + {"x", "y"}, + }, + { + {"timestamp", "2"}, + {"value", "bar"}, + {"other", "abc"}, + {"x", "y"}, + }, + { + {"timestamp", "3"}, + {"value", "baz"}, + {"other", "abc"}, + {"x", "y"}, + }, + { + {"timestamp", "1"}, + {"value", "foo"}, + }, + { + {"timestamp", ""}, + {"value", "bar"}, + }, + { + {"timestamp", "1"}, + {"value", ""}, + {"q", "w"}, + }, + }) + + // conditional unroll by missing field + f("unroll if (q:abc) (a)", [][]Field{ + { + {"a", `asd`}, + {"q", "w"}, + }, + { + {"a", `["foo",123]`}, + {"q", "abc"}, + }, + }, [][]Field{ + { + {"a", `asd`}, + {"q", "w"}, + }, + { + {"a", "foo"}, + {"q", "abc"}, + }, + { + {"a", "123"}, + {"q", "abc"}, + }, + }) + + // unroll by non-existing field + f("unroll (a)", [][]Field{ + { + {"a", `asd`}, + {"q", "w"}, + }, + { + {"a", `["foo",123]`}, + {"q", "abc"}, + }, + }, [][]Field{ + { + {"a", ``}, + {"q", "w"}, + }, + { + {"a", "foo"}, + {"q", "abc"}, + }, + { + {"a", "123"}, + {"q", "abc"}, + }, + }) + +} + +func TestPipeUnrollUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("unroll (x)", "*", "", "*", "") + f("unroll (x, y)", "*", "", "*", "") + f("unroll if (y:z) (a, b)", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("unroll (x)", "*", "f1,f2", "*", "f1,f2") + f("unroll if (a:b) (x)", "*", "f1,f2", "*", "f1,f2") + f("unroll if (f1:b) (x)", "*", "f1,f2", "*", "f2") + + // all the needed fields, unneeded fields intersect with src + f("unroll (x)", "*", "f2,x", "*", "f2,x") + f("unroll if (a:b) (x)", "*", "f2,x", "*", "f2,x") + f("unroll if (f2:b) (x)", "*", "f2,x", "*", "f2,x") + + // needed fields do not intersect with src + f("unroll (x)", "f1,f2", "", "f1,f2", "") + f("unroll if (a:b) (x)", "f1,f2", "", "f1,f2", "") + + // needed fields intersect with src + f("unroll (x)", "f2,x", "", "f2,x", "") + f("unroll if (a:b) (x)", "f2,x", "", "a,f2,x", "") +} + +func TestUnpackJSONArray(t *testing.T) { + f := func(s string, resultExpected []string) { + t.Helper() + + var a arena + result := unpackJSONArray(nil, &a, s) + if !reflect.DeepEqual(result, resultExpected) { + t.Fatalf("unexpected result for unpackJSONArray(%q)\ngot\n%q\nwant\n%q", s, result, resultExpected) + } + } + + f("", nil) + f("123", nil) + f("foo", nil) + f(`"foo"`, nil) + f(`{"foo":"bar"}`, nil) + f(`[foo`, nil) + f(`[]`, nil) + f(`[1]`, []string{"1"}) + f(`[1,"foo",["bar",12],{"baz":"x"},NaN,null]`, []string{"1", "foo", `["bar",12]`, `{"baz":"x"}`, "NaN", "null"}) +}