From 5a6531b3298a23b6fe49cc22927ef591be2b283a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 7 Nov 2024 13:15:52 +0100 Subject: [PATCH] lib/logstorage: add an ability to add prefix to resulting query field names in `join` pipe See https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe --- docs/VictoriaLogs/CHANGELOG.md | 2 ++ docs/VictoriaLogs/LogsQL.md | 10 +++++++ lib/logstorage/pipe_join.go | 40 +++++++++++++++++++++------ lib/logstorage/pipe_join_test.go | 3 ++ lib/logstorage/storage_search.go | 24 +++++++++++----- lib/logstorage/storage_search_test.go | 27 +++++++++++++++++- 6 files changed, 90 insertions(+), 16 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 3e0cda7ca..5de4b2234 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe): add an ability to add prefix to all the log field names from the joined query, by using `| join by () () prefix "some_prefix"` syntax. + ## [v0.41.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.41.0-victorialogs) Released at 2024-11-06 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 9c1f3023e..166b9d4b9 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1817,6 +1817,16 @@ _time:1d {app="app1"} | stats by (user) count() app1_hits | filter app2_hits:* ``` +It is possible adding a prefix to all the field names returned by the `` by specifying the needed prefix after the ``. +For example, the following query adds `app2.` prefix to all `` log fields: + +```logsql +_time:1d {app="app1"} | stats by (user) count() app1_hits + | join by (user) ( + _time:1d {app="app2"} | stats by (user) count() app2_hits + ) prefix "app2." +``` + **Performance tips**: - Make sure that the `` in the `join` pipe returns relatively small number of results, since they are kept in RAM during execution of `join` pipe. diff --git a/lib/logstorage/pipe_join.go b/lib/logstorage/pipe_join.go index 82a584053..606e64e40 100644 --- a/lib/logstorage/pipe_join.go +++ b/lib/logstorage/pipe_join.go @@ -18,12 +18,19 @@ type pipeJoin struct { // q is a query for obtaining results for joining q *Query + // prefix is the prefix to add to log fields from q query + prefix string + // m contains results for joining. They are automatically initialized during query execution m map[string][][]Field } func (pj *pipeJoin) String() string { - return fmt.Sprintf("join by (%s) (%s)", fieldNamesString(pj.byFields), pj.q.String()) + s := fmt.Sprintf("join by (%s) (%s)", fieldNamesString(pj.byFields), pj.q.String()) + if pj.prefix != "" { + s += " prefix " + quoteTokenIfNeeded(pj.prefix) + } + return s } func (pj *pipeJoin) canLiveTail() bool { @@ -43,7 +50,7 @@ func (pj *pipeJoin) initFilterInValues(_ map[string][]string, _ getFieldValuesFu } func (pj *pipeJoin) initJoinMap(getJoinMapFunc getJoinMapFunc) (pipe, error) { - m, err := getJoinMapFunc(pj.q, pj.byFields) + m, err := getJoinMapFunc(pj.q, pj.byFields, pj.prefix) if err != nil { return nil, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err) } @@ -88,8 +95,9 @@ type pipeJoinProcessorShard struct { type pipeJoinProcessorShardNopad struct { wctx pipeUnpackWriteContext - byValues []string - tmpBuf []byte + byValues []string + byValuesIdxs []int + tmpBuf []byte } func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) { @@ -105,12 +113,19 @@ func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) { byValues := shard.byValues cs := br.getColumns() + shard.byValuesIdxs = slicesutil.SetLength(shard.byValuesIdxs, len(cs)) + byValuesIdxs := shard.byValuesIdxs + for i := range cs { + name := cs[i].name + byValuesIdxs[i] = slices.Index(pj.byFields, name) + + } + for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { clear(byValues) - for i := range cs { - name := cs[i].name - if cIdx := slices.Index(pj.byFields, name); cIdx >= 0 { - byValues[cIdx] = cs[i].getValueAtRow(br, rowIdx) + for j := range cs { + if cIdx := byValuesIdxs[j]; cIdx >= 0 { + byValues[cIdx] = cs[j].getValueAtRow(br, rowIdx) } } @@ -180,5 +195,14 @@ func parsePipeJoin(lex *lexer) (*pipeJoin, error) { q: q, } + if lex.isKeyword("prefix") { + lex.nextToken() + prefix, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot read prefix for [%s]: %w", pj, err) + } + pj.prefix = prefix + } + return pj, nil } diff --git a/lib/logstorage/pipe_join_test.go b/lib/logstorage/pipe_join_test.go index 644a80e46..f1fced32a 100644 --- a/lib/logstorage/pipe_join_test.go +++ b/lib/logstorage/pipe_join_test.go @@ -12,6 +12,7 @@ func TestParsePipeJoinSuccess(t *testing.T) { f(`join by (foo) (error)`) f(`join by (foo, bar) (a:b | fields x, y)`) + f(`join by (foo) (a:b) prefix c`) } func TestParsePipeJoinFailure(t *testing.T) { @@ -31,6 +32,8 @@ func TestParsePipeJoinFailure(t *testing.T) { f(`join by (x) ()`) f(`join by (x) (`) f(`join by (x) (abc`) + f(`join (x) (y) prefix`) + f(`join (x) (y) prefix |`) } func TestPipeJoinUpdateNeededFields(t *testing.T) { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index d6c2b92ea..54fe066e8 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -217,9 +217,11 @@ func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Qu return s.runValuesWithHitsQuery(ctx, tenantIDs, q) } -func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query, byFields []string) (map[string][][]Field, error) { +func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query, byFields []string, prefix string) (map[string][][]Field, error) { // TODO: track memory usage + logger.Infof("DEBUG: byFields=%q, prefix=%q", byFields, prefix) + m := make(map[string][][]Field) var mLock sync.Mutex writeBlockResult := func(_ uint, br *blockResult) { @@ -229,8 +231,15 @@ func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query cs := br.getColumns() columnNames := make([]string, len(cs)) + byValuesIdxs := make([]int, len(cs)) for i := range cs { - columnNames[i] = strings.Clone(cs[i].name) + name := strings.Clone(cs[i].name) + idx := slices.Index(byFields, name) + if prefix != "" && idx < 0 { + name = prefix + name + } + columnNames[i] = name + byValuesIdxs[i] = idx } byValues := make([]string, len(byFields)) @@ -242,16 +251,17 @@ func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query for j := range cs { name := columnNames[j] v := cs[j].getValueAtRow(br, rowIdx) - if cIdx := slices.Index(byFields, name); cIdx >= 0 { + if cIdx := byValuesIdxs[j]; cIdx >= 0 { byValues[cIdx] = v continue } if v == "" { continue } + value := strings.Clone(v) fields = append(fields, Field{ Name: name, - Value: strings.Clone(v), + Value: value, }) } @@ -526,15 +536,15 @@ func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, return qNew, nil } -type getJoinMapFunc func(q *Query, byFields []string) (map[string][][]Field, error) +type getJoinMapFunc func(q *Query, byFields []string, prefix string) (map[string][][]Field, error) func (s *Storage) initJoinMaps(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) { if !hasJoinPipes(q.pipes) { return q, nil } - getJoinMap := func(q *Query, byFields []string) (map[string][][]Field, error) { - return s.getJoinMap(ctx, tenantIDs, q, byFields) + getJoinMap := func(q *Query, byFields []string, prefix string) (map[string][][]Field, error) { + return s.getJoinMap(ctx, tenantIDs, q, byFields, prefix) } pipesNew := make([]pipe, len(q.pipes)) diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 06beb6407..f08468429 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -729,7 +729,7 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) - t.Run("pipe-join-single", func(t *testing.T) { + t.Run("pipe-join", func(t *testing.T) { f(t, `'message 5' | stats by (instance) count() x | join on (instance) ( 'block 0' instance:host-1 | stats by (instance) @@ -753,6 +753,31 @@ func TestStorageRunQuery(t *testing.T) { }, }) }) + t.Run("pipe-join-prefix", func(t *testing.T) { + f(t, `'message 5' | stats by (instance) count() x + | join on (instance) ( + 'block 0' instance:host-1 | stats by (instance) + count() total, + count_uniq(stream-id) streams, + count_uniq(stream-id) x + ) prefix "abc."`, [][]Field{ + { + {"instance", "host-0:234"}, + {"x", "55"}, + }, + { + {"instance", "host-2:234"}, + {"x", "55"}, + }, + { + {"instance", "host-1:234"}, + {"x", "55"}, + {"abc.total", "77"}, + {"abc.streams", "1"}, + {"abc.x", "1"}, + }, + }) + }) // Close the storage and delete its data s.MustClose()