lib/logstorage: add an ability to add prefix to resulting query field names in join pipe

See https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe
This commit is contained in:
Aliaksandr Valialkin 2024-11-07 13:15:52 +01:00
parent 30dd4cdc0d
commit 5a6531b329
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 90 additions and 16 deletions

View file

@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe): add an ability to add prefix to all the log field names from the joined query, by using `| join by (<by_fields>) (<query>) prefix "some_prefix"` syntax.
## [v0.41.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.41.0-victorialogs) ## [v0.41.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.41.0-victorialogs)
Released at 2024-11-06 Released at 2024-11-06

View file

@ -1817,6 +1817,16 @@ _time:1d {app="app1"} | stats by (user) count() app1_hits
| filter app2_hits:* | filter app2_hits:*
``` ```
It is possible adding a prefix to all the field names returned by the `<query>` by specifying the needed prefix after the `<query>`.
For example, the following query adds `app2.` prefix to all `<query>` log fields:
```logsql
_time:1d {app="app1"} | stats by (user) count() app1_hits
| join by (user) (
_time:1d {app="app2"} | stats by (user) count() app2_hits
) prefix "app2."
```
**Performance tips**: **Performance tips**:
- Make sure that the `<query>` in the `join` pipe returns relatively small number of results, since they are kept in RAM during execution of `join` pipe. - Make sure that the `<query>` in the `join` pipe returns relatively small number of results, since they are kept in RAM during execution of `join` pipe.

View file

@ -18,12 +18,19 @@ type pipeJoin struct {
// q is a query for obtaining results for joining // q is a query for obtaining results for joining
q *Query q *Query
// prefix is the prefix to add to log fields from q query
prefix string
// m contains results for joining. They are automatically initialized during query execution // m contains results for joining. They are automatically initialized during query execution
m map[string][][]Field m map[string][][]Field
} }
func (pj *pipeJoin) String() string { func (pj *pipeJoin) String() string {
return fmt.Sprintf("join by (%s) (%s)", fieldNamesString(pj.byFields), pj.q.String()) s := fmt.Sprintf("join by (%s) (%s)", fieldNamesString(pj.byFields), pj.q.String())
if pj.prefix != "" {
s += " prefix " + quoteTokenIfNeeded(pj.prefix)
}
return s
} }
func (pj *pipeJoin) canLiveTail() bool { func (pj *pipeJoin) canLiveTail() bool {
@ -43,7 +50,7 @@ func (pj *pipeJoin) initFilterInValues(_ map[string][]string, _ getFieldValuesFu
} }
func (pj *pipeJoin) initJoinMap(getJoinMapFunc getJoinMapFunc) (pipe, error) { func (pj *pipeJoin) initJoinMap(getJoinMapFunc getJoinMapFunc) (pipe, error) {
m, err := getJoinMapFunc(pj.q, pj.byFields) m, err := getJoinMapFunc(pj.q, pj.byFields, pj.prefix)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err) return nil, fmt.Errorf("cannot execute query at pipe [%s]: %w", pj, err)
} }
@ -89,6 +96,7 @@ type pipeJoinProcessorShardNopad struct {
wctx pipeUnpackWriteContext wctx pipeUnpackWriteContext
byValues []string byValues []string
byValuesIdxs []int
tmpBuf []byte tmpBuf []byte
} }
@ -105,12 +113,19 @@ func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) {
byValues := shard.byValues byValues := shard.byValues
cs := br.getColumns() cs := br.getColumns()
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ { shard.byValuesIdxs = slicesutil.SetLength(shard.byValuesIdxs, len(cs))
clear(byValues) byValuesIdxs := shard.byValuesIdxs
for i := range cs { for i := range cs {
name := cs[i].name name := cs[i].name
if cIdx := slices.Index(pj.byFields, name); cIdx >= 0 { byValuesIdxs[i] = slices.Index(pj.byFields, name)
byValues[cIdx] = cs[i].getValueAtRow(br, rowIdx)
}
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
clear(byValues)
for j := range cs {
if cIdx := byValuesIdxs[j]; cIdx >= 0 {
byValues[cIdx] = cs[j].getValueAtRow(br, rowIdx)
} }
} }
@ -180,5 +195,14 @@ func parsePipeJoin(lex *lexer) (*pipeJoin, error) {
q: q, q: q,
} }
if lex.isKeyword("prefix") {
lex.nextToken()
prefix, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot read prefix for [%s]: %w", pj, err)
}
pj.prefix = prefix
}
return pj, nil return pj, nil
} }

View file

@ -12,6 +12,7 @@ func TestParsePipeJoinSuccess(t *testing.T) {
f(`join by (foo) (error)`) f(`join by (foo) (error)`)
f(`join by (foo, bar) (a:b | fields x, y)`) f(`join by (foo, bar) (a:b | fields x, y)`)
f(`join by (foo) (a:b) prefix c`)
} }
func TestParsePipeJoinFailure(t *testing.T) { func TestParsePipeJoinFailure(t *testing.T) {
@ -31,6 +32,8 @@ func TestParsePipeJoinFailure(t *testing.T) {
f(`join by (x) ()`) f(`join by (x) ()`)
f(`join by (x) (`) f(`join by (x) (`)
f(`join by (x) (abc`) f(`join by (x) (abc`)
f(`join (x) (y) prefix`)
f(`join (x) (y) prefix |`)
} }
func TestPipeJoinUpdateNeededFields(t *testing.T) { func TestPipeJoinUpdateNeededFields(t *testing.T) {

View file

@ -217,9 +217,11 @@ func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Qu
return s.runValuesWithHitsQuery(ctx, tenantIDs, q) return s.runValuesWithHitsQuery(ctx, tenantIDs, q)
} }
func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query, byFields []string) (map[string][][]Field, error) { func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query, byFields []string, prefix string) (map[string][][]Field, error) {
// TODO: track memory usage // TODO: track memory usage
logger.Infof("DEBUG: byFields=%q, prefix=%q", byFields, prefix)
m := make(map[string][][]Field) m := make(map[string][][]Field)
var mLock sync.Mutex var mLock sync.Mutex
writeBlockResult := func(_ uint, br *blockResult) { writeBlockResult := func(_ uint, br *blockResult) {
@ -229,8 +231,15 @@ func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query
cs := br.getColumns() cs := br.getColumns()
columnNames := make([]string, len(cs)) columnNames := make([]string, len(cs))
byValuesIdxs := make([]int, len(cs))
for i := range cs { for i := range cs {
columnNames[i] = strings.Clone(cs[i].name) name := strings.Clone(cs[i].name)
idx := slices.Index(byFields, name)
if prefix != "" && idx < 0 {
name = prefix + name
}
columnNames[i] = name
byValuesIdxs[i] = idx
} }
byValues := make([]string, len(byFields)) byValues := make([]string, len(byFields))
@ -242,16 +251,17 @@ func (s *Storage) getJoinMap(ctx context.Context, tenantIDs []TenantID, q *Query
for j := range cs { for j := range cs {
name := columnNames[j] name := columnNames[j]
v := cs[j].getValueAtRow(br, rowIdx) v := cs[j].getValueAtRow(br, rowIdx)
if cIdx := slices.Index(byFields, name); cIdx >= 0 { if cIdx := byValuesIdxs[j]; cIdx >= 0 {
byValues[cIdx] = v byValues[cIdx] = v
continue continue
} }
if v == "" { if v == "" {
continue continue
} }
value := strings.Clone(v)
fields = append(fields, Field{ fields = append(fields, Field{
Name: name, Name: name,
Value: strings.Clone(v), Value: value,
}) })
} }
@ -526,15 +536,15 @@ func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID,
return qNew, nil return qNew, nil
} }
type getJoinMapFunc func(q *Query, byFields []string) (map[string][][]Field, error) type getJoinMapFunc func(q *Query, byFields []string, prefix string) (map[string][][]Field, error)
func (s *Storage) initJoinMaps(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) { func (s *Storage) initJoinMaps(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) {
if !hasJoinPipes(q.pipes) { if !hasJoinPipes(q.pipes) {
return q, nil return q, nil
} }
getJoinMap := func(q *Query, byFields []string) (map[string][][]Field, error) { getJoinMap := func(q *Query, byFields []string, prefix string) (map[string][][]Field, error) {
return s.getJoinMap(ctx, tenantIDs, q, byFields) return s.getJoinMap(ctx, tenantIDs, q, byFields, prefix)
} }
pipesNew := make([]pipe, len(q.pipes)) pipesNew := make([]pipe, len(q.pipes))

View file

@ -729,7 +729,7 @@ func TestStorageRunQuery(t *testing.T) {
}, },
}) })
}) })
t.Run("pipe-join-single", func(t *testing.T) { t.Run("pipe-join", func(t *testing.T) {
f(t, `'message 5' | stats by (instance) count() x f(t, `'message 5' | stats by (instance) count() x
| join on (instance) ( | join on (instance) (
'block 0' instance:host-1 | stats by (instance) 'block 0' instance:host-1 | stats by (instance)
@ -753,6 +753,31 @@ func TestStorageRunQuery(t *testing.T) {
}, },
}) })
}) })
t.Run("pipe-join-prefix", func(t *testing.T) {
f(t, `'message 5' | stats by (instance) count() x
| join on (instance) (
'block 0' instance:host-1 | stats by (instance)
count() total,
count_uniq(stream-id) streams,
count_uniq(stream-id) x
) prefix "abc."`, [][]Field{
{
{"instance", "host-0:234"},
{"x", "55"},
},
{
{"instance", "host-2:234"},
{"x", "55"},
},
{
{"instance", "host-1:234"},
{"x", "55"},
{"abc.total", "77"},
{"abc.streams", "1"},
{"abc.x", "1"},
},
})
})
// Close the storage and delete its data // Close the storage and delete its data
s.MustClose() s.MustClose()