From f9e23bf8e3c4a3db57a095aabbd80c57de39d652 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Nov 2024 14:22:13 +0100 Subject: [PATCH] lib/logstorage: add `join` pipe for joining multiple query results --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 56 +++++++- lib/logstorage/column_names.go | 6 +- lib/logstorage/parser_test.go | 3 + lib/logstorage/pipe.go | 7 + lib/logstorage/pipe_join.go | 181 ++++++++++++++++++++++++++ lib/logstorage/pipe_join_test.go | 56 ++++++++ lib/logstorage/storage_search.go | 121 ++++++++++++++++- lib/logstorage/storage_search_test.go | 24 ++++ lib/logstorage/values_encoder.go | 4 +- 10 files changed, 444 insertions(+), 15 deletions(-) create mode 100644 lib/logstorage/pipe_join.go create mode 100644 lib/logstorage/pipe_join_test.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 5081f06b0..d94adf417 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins. * FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`. * FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters). * BUGFIX: Properly parse structured metadata when ingesting logs with Loki ingestion protocol. An issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 6ed83594b..8658c6d40 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1304,6 +1304,7 @@ LogsQL supports the following pipes: - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`filter`](#filter-pipe) applies additional [filters](#filters) to results. - [`format`](#format-pipe) formats output field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`join`](#join-pipe) joins query results by the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`len`](#len-pipe) calculates byte length of the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. - [`limit`](#limit-pipe) limits the number selected logs. - [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1759,9 +1760,54 @@ only if `ip` and `host` [fields](https://docs.victoriametrics.com/victorialogs/k _time:5m | format if (ip:* and host:*) "request from :" as message ``` +### join pipe + +The `| join by () ()` pipe joins the current results with the `` results by the given set of comma-separated ``. +This pipe works in the following way: + +1. It executes the `` and remembers its' results. It may contain arbitrary [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/). +1. For each input row it searches for matching rows in the `` results by the given ``. +1. If the `` results have no matching rows, then the input row is sent to the output as is. +1. If the `` results has matching rows, then for each matching row the input row is extended + with new fields seen at the matching row, and the result is sent to the output. + +This logic is similar to `LEFT JOIN` in SQL. For example, the following query returns the number of per-user logs across two applications - `app1` and `app2` ( +see [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter) for details on `{...}` filter): + +```logsql +_time:1d {app="app1"} | stats by (user) count() app1_hits + | join by (user) ( + _time:1d {app="app2"} | stats by (user) count() app2_hits + ) +``` + +If you need results similar to `JOIN` in SQL, then apply [`filter` pipe](#filter-pipe) with [`*` filter](https://docs.victoriametrics.com/victorialogs/logsql/#any-value-filter) +on fields, which must be non-empty after the join. For example, the following query returns stats only for users, which exist in both applications `app1` and `app2`: + +```logsql +_time:1d {app="app1"} | stats by (user) count() app1_hits + | join by (user) ( + _time:1d {app="app2"} | stats by (user) count() app2_hits + ) + | filter app2_hits:* +``` + +**Performance tips**: + +- Make sure that the `` in the `join` pipe returns relatively small number of results, since they are kept in RAM during execution of `join` pipe. +- [Conditional `stats`](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters) is usually faster to execute. + They usually require less RAM than the equivalent `join` pipe. + +See also: + +- [`stats` pipe](#stats-pipe) +- [conditional `stats`](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters) +- [`filter` pipe](#filter-pipe) + + ### len pipe -The `| len(field) as result` pipe stores byte length of the given `field` value into the `result` field. +The `| len(field) as result` [pipe](#pipes) stores byte length of the given `field` value into the `result` field. For example, the following query shows top 5 log entries with the maximum byte length of `_msg` field across logs for the last 5 minutes: @@ -2230,6 +2276,7 @@ See also: - [`sort` pipe](#sort-pipe) - [`uniq` pipe](#uniq-pipe) - [`top` pipe](#top-pipe) +- [`join` pipe](#join-pipe) #### Stats by fields @@ -2347,6 +2394,13 @@ _time:5m | stats count() total ``` +If zero input rows match the given `if (...)` filter, then zero result is returned for the given stats function. + +See also: + +- [`join` pipe](#join-pipe) +- [`stats` pipe functions](#stats-pipe-functions) + ### stream_context pipe `| stream_context ...` [pipe](#pipes) allows selecting surrounding logs for the matching logs in [logs stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) diff --git a/lib/logstorage/column_names.go b/lib/logstorage/column_names.go index e11fab68a..ed683b4dd 100644 --- a/lib/logstorage/column_names.go +++ b/lib/logstorage/column_names.go @@ -5,7 +5,6 @@ import ( "io" "strings" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -52,10 +51,7 @@ func getColumnNameIDs(columnNames []string) (map[string]uint64, error) { func marshalColumnNames(dst []byte, columnNames []string) []byte { data := encoding.MarshalVarUint64(nil, uint64(len(columnNames))) - - for _, name := range columnNames { - data = encoding.MarshalBytes(data, bytesutil.ToUnsafeBytes(name)) - } + data = marshalStrings(data, columnNames) dst = encoding.CompressZSTDLevel(dst, data, 1) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 9c9e18e2b..efc99246d 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2047,6 +2047,7 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | unpack_logfmt if (q:w p:a) from x fields(a,b) | count() r1`, `p,q`, ``) f(`* | unroll (a, b) | count() r1`, `a,b`, ``) f(`* | unroll if (q:w p:a) (a, b) | count() r1`, `a,b,p,q`, ``) + f(`* | join on (a, b) (xxx) | count() r1`, `a,b`, ``) } func TestQueryClone(t *testing.T) { @@ -2167,6 +2168,7 @@ func TestQueryCanLiveTail(t *testing.T) { f("* | unpack_logfmt", true) f("* | unpack_syslog", true) f("* | unroll by (a)", true) + f("* | join by (a) (b)", true) } func TestQueryDropAllPipes(t *testing.T) { @@ -2355,6 +2357,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) { f(`foo | count() | unpack_logfmt`) f(`foo | count() | unpack_syslog`) f(`foo | count() | unroll by (x)`) + f(`foo | count() | join by (x) (y)`) // drop by(...) field f(`* | by (x) count() as rows | math rows * 10, rows / 10 | drop x`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index d2af609c1..d324f0cd8 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -165,6 +165,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'format' pipe: %w", err) } return pf, nil + case lex.isKeyword("join"): + pj, err := parsePipeJoin(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'join' pipe: %w", err) + } + return pj, nil case lex.isKeyword("len"): pl, err := parsePipeLen(lex) if err != nil { @@ -307,6 +313,7 @@ var pipeNames = func() map[string]struct{} { "fields", "keep", "filter", "where", "format", + "join", "len", "limit", "head", "math", "eval", diff --git a/lib/logstorage/pipe_join.go b/lib/logstorage/pipe_join.go new file mode 100644 index 000000000..b4b73ab15 --- /dev/null +++ b/lib/logstorage/pipe_join.go @@ -0,0 +1,181 @@ +package logstorage + +import ( + "fmt" + "slices" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeJoin processes '| join ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe +type pipeJoin struct { + // byFields contains fields to use for join on q results + byFields []string + + // q is a query for obtaining results for joining + q *Query + + // m contains results for joining. They are automatically initialized during query execution + m map[string][][]Field +} + +func (pj *pipeJoin) String() string { + return fmt.Sprintf("join by (%s) (%s)", fieldNamesString(pj.byFields), pj.q.String()) +} + +func (pj *pipeJoin) canLiveTail() bool { + return true +} + +func (pj *pipeJoin) optimize() { + pj.q.Optimize() +} + +func (pj *pipeJoin) hasFilterInWithQuery() bool { + return false +} + +func (pj *pipeJoin) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + return pj, nil +} + +func (pj *pipeJoin) initJoinMap(getJoinMapFunc getJoinMapFunc) (pipe, error) { + m, err := getJoinMapFunc(pj.q, pj.byFields) + if err != nil { + return nil, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err) + } + pjNew := *pj + pjNew.m = m + return &pjNew, nil +} + +func (pj *pipeJoin) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + unneededFields.removeFields(pj.byFields) + } else { + neededFields.addFields(pj.byFields) + } +} + +func (pj *pipeJoin) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return &pipeJoinProcessor{ + pj: pj, + stopCh: stopCh, + ppNext: ppNext, + + shards: make([]pipeJoinProcessorShard, workersCount), + } +} + +type pipeJoinProcessor struct { + pj *pipeJoin + stopCh <-chan struct{} + ppNext pipeProcessor + + shards []pipeJoinProcessorShard +} + +type pipeJoinProcessorShard struct { + pipeJoinProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeJoinProcessorShardNopad{})%128]byte +} + +type pipeJoinProcessorShardNopad struct { + wctx pipeUnpackWriteContext + + byValues []string + tmpBuf []byte +} + +func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) { + if br.rowsLen == 0 { + return + } + + pj := pjp.pj + shard := &pjp.shards[workerID] + shard.wctx.init(workerID, pjp.ppNext, true, true, br) + + shard.byValues = slicesutil.SetLength(shard.byValues, len(pj.byFields)) + byValues := shard.byValues + + cs := br.getColumns() + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { + clear(byValues) + for i := range cs { + name := cs[i].name + if cIdx := slices.Index(pj.byFields, name); cIdx >= 0 { + byValues[cIdx] = cs[i].getValueAtRow(br, rowIdx) + } + } + + shard.tmpBuf = marshalStrings(shard.tmpBuf[:0], byValues) + matchingRows := pj.m[string(shard.tmpBuf)] + + if len(matchingRows) == 0 { + shard.wctx.writeRow(rowIdx, nil) + continue + } + for _, extraFields := range matchingRows { + shard.wctx.writeRow(rowIdx, extraFields) + } + } + + shard.wctx.flush() + shard.wctx.reset() +} + +func (pjp *pipeJoinProcessor) flush() error { + return nil +} + +func parsePipeJoin(lex *lexer) (*pipeJoin, error) { + if !lex.isKeyword("join") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "join") + } + lex.nextToken() + + // parse by (...) + if lex.isKeyword("by", "on") { + lex.nextToken() + } + + byFields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by(...)' at 'join': %w", err) + } + if len(byFields) == 0 { + return nil, fmt.Errorf("'by(...)' at 'join' must contain at least a single field") + } + if slices.Contains(byFields, "*") { + return nil, fmt.Errorf("join by '*' isn't supported") + } + + // Parse join query + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing '(' in front of join query") + } + lex.nextToken() + + q, err := parseQuery(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse join query: %w", err) + } + + if !lex.isKeyword(")") { + return nil, fmt.Errorf("missing ')' after the join query [%s]", q) + } + lex.nextToken() + + pj := &pipeJoin{ + byFields: byFields, + q: q, + } + + return pj, nil +} diff --git a/lib/logstorage/pipe_join_test.go b/lib/logstorage/pipe_join_test.go new file mode 100644 index 000000000..644a80e46 --- /dev/null +++ b/lib/logstorage/pipe_join_test.go @@ -0,0 +1,56 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeJoinSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`join by (foo) (error)`) + f(`join by (foo, bar) (a:b | fields x, y)`) +} + +func TestParsePipeJoinFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`join`) + f(`join by () (abc)`) + f(`join by (*) (abc)`) + f(`join by (f, *) (abc)`) + f(`join by (x)`) + f(`join by`) + f(`join (`) + f(`join by (foo) bar`) + f(`join by (x) ()`) + f(`join by (x) (`) + f(`join by (x) (abc`) +} + +func TestPipeJoinUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("join on (x, y) (abc)", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("join on (x, y) (abc)", "*", "f1,f2", "*", "f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("join on (x, y) (abc)", "*", "f2,x", "*", "f2") + + // needed fields do not intersect with src + f("join on (x, y) (abc)", "f1,f2", "", "f1,f2,x,y", "") + + // needed fields intersect with src + f("join on (x, y) (abc)", "f2,x", "", "f2,x,y", "") +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 4144475ee..d6c2b92ea 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -9,7 +9,9 @@ import ( "strings" "sync" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -80,11 +82,6 @@ type WriteBlockFunc func(workerID uint, timestamps []int64, columns []BlockColum // RunQuery runs the given q and calls writeBlock for results. func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error { - qNew, err := s.initFilterInValues(ctx, tenantIDs, q) - if err != nil { - return err - } - writeBlockResult := func(workerID uint, br *blockResult) { if br.rowsLen == 0 { return @@ -109,10 +106,20 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, putBlockRows(brs) } - return s.runQuery(ctx, tenantIDs, qNew, writeBlockResult) + return s.runQuery(ctx, tenantIDs, q, writeBlockResult) } func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error { + qNew, err := s.initFilterInValues(ctx, tenantIDs, q) + if err != nil { + return err + } + qNew, err = s.initJoinMaps(ctx, tenantIDs, qNew) + if err != nil { + return err + } + q = qNew + streamIDs := q.getStreamIDs() sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i].less(&streamIDs[j]) @@ -210,7 +217,70 @@ func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Qu return s.runValuesWithHitsQuery(ctx, tenantIDs, q) } +func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query, byFields []string) (map[string][][]Field, error) { + // TODO: track memory usage + + m := make(map[string][][]Field) + var mLock sync.Mutex + writeBlockResult := func(_ uint, br *blockResult) { + if br.rowsLen == 0 { + return + } + + cs := br.getColumns() + columnNames := make([]string, len(cs)) + for i := range cs { + columnNames[i] = strings.Clone(cs[i].name) + } + + byValues := make([]string, len(byFields)) + var tmpBuf []byte + + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { + fields := make([]Field, 0, len(cs)) + clear(byValues) + for j := range cs { + name := columnNames[j] + v := cs[j].getValueAtRow(br, rowIdx) + if cIdx := slices.Index(byFields, name); cIdx >= 0 { + byValues[cIdx] = v + continue + } + if v == "" { + continue + } + fields = append(fields, Field{ + Name: name, + Value: strings.Clone(v), + }) + } + + tmpBuf = marshalStrings(tmpBuf[:0], byValues) + k := string(tmpBuf) + + mLock.Lock() + m[k] = append(m[k], fields) + mLock.Unlock() + } + } + + if err := s.runQuery(ctx, tenantIDs, q, writeBlockResult); err != nil { + return nil, err + } + + return m, nil +} + +func marshalStrings(dst []byte, a []string) []byte { + for _, v := range a { + dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(v)) + } + return dst +} + func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string) ([]string, error) { + // TODO: track memory usage + pipes := append([]pipe{}, q.pipes...) quotedFieldName := quoteTokenIfNeeded(fieldName) pipeStr := fmt.Sprintf("uniq by (%s)", quotedFieldName) @@ -456,6 +526,45 @@ func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, return qNew, nil } +type getJoinMapFunc func(q *Query, byFields []string) (map[string][][]Field, error) + +func (s *Storage) initJoinMaps(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) { + if !hasJoinPipes(q.pipes) { + return q, nil + } + + getJoinMap := func(q *Query, byFields []string) (map[string][][]Field, error) { + return s.getJoinMap(ctx, tenantIDs, q, byFields) + } + + pipesNew := make([]pipe, len(q.pipes)) + for i := range q.pipes { + p := q.pipes[i] + if pj, ok := p.(*pipeJoin); ok { + pNew, err := pj.initJoinMap(getJoinMap) + if err != nil { + return nil, err + } + p = pNew + } + pipesNew[i] = p + } + qNew := &Query{ + f: q.f, + pipes: pipesNew, + } + return qNew, nil +} + +func hasJoinPipes(pipes []pipe) bool { + for _, p := range pipes { + if _, ok := p.(*pipeJoin); ok { + return true + } + } + return false +} + func (iff *ifFilter) hasFilterInWithQuery() bool { if iff == nil { return false diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 8e5375b9a..06beb6407 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -729,6 +729,30 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) + t.Run("pipe-join-single", func(t *testing.T) { + f(t, `'message 5' | stats by (instance) count() x + | join on (instance) ( + 'block 0' instance:host-1 | stats by (instance) + count() total, + count_uniq(stream-id) streams, + count_uniq(stream-id) x + )`, [][]Field{ + { + {"instance", "host-0:234"}, + {"x", "55"}, + }, + { + {"instance", "host-2:234"}, + {"x", "55"}, + }, + { + {"instance", "host-1:234"}, + {"x", "55"}, + {"total", "77"}, + {"streams", "1"}, + }, + }) + }) // Close the storage and delete its data s.MustClose() diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 9185f8a74..778e4fda8 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -1110,9 +1110,7 @@ func (vd *valuesDict) marshal(dst []byte) []byte { logger.Panicf("BUG: valuesDict may contain max %d items; got %d items", maxDictLen, len(values)) } dst = append(dst, byte(len(values))) - for _, v := range values { - dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(v)) - } + dst = marshalStrings(dst, values) return dst }