From 90390cdc023cb2aa951eb0a5628d5d06aefc9160 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 5 May 2024 03:06:01 +0200 Subject: [PATCH] wip --- lib/logstorage/block_result.go | 27 ++++++++++++-- lib/logstorage/parser.go | 64 ++++++++++++++++++++++++++++++---- lib/logstorage/pipe_delete.go | 6 +++- lib/logstorage/pipe_rename.go | 7 +++- 4 files changed, 93 insertions(+), 11 deletions(-) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index ae65d2a38..9941d939c 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -973,6 +973,10 @@ func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) { } br.csOffset = csOffset br.cs = cs + + for _, dstColumnName := range dstColumnNames { + br.createMissingColumnByName(dstColumnName) + } } // renameColumns renames columns from srcColumnNames to dstColumnNames. @@ -995,6 +999,10 @@ func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) { } br.csOffset = csOffset br.cs = cs + + for _, dstColumnName := range dstColumnNames { + br.createMissingColumnByName(dstColumnName) + } } // deleteColumns deletes columns with the given columnNames. @@ -1047,9 +1055,7 @@ func (br *blockResult) areSameColumns(columnNames []string) bool { func (br *blockResult) getColumnByName(columnName string) blockResultColumn { cs := br.getColumns() - - // iterate columns in reverse order, so overridden column results are returned instead of original column results. - for i := len(cs) - 1; i >= 0; i-- { + for i := range cs { if cs[i].name == columnName { return cs[i] } @@ -1062,6 +1068,21 @@ func (br *blockResult) getColumnByName(columnName string) blockResultColumn { } } +func (br *blockResult) createMissingColumnByName(columnName string) { + cs := br.getColumns() + for i := range cs { + if cs[i].name == columnName { + return + } + } + + br.cs = append(br.cs, blockResultColumn{ + name: columnName, + isConst: true, + encodedValues: getEmptyStrings(1), + }) +} + func (br *blockResult) getColumns() []blockResultColumn { return br.cs[br.csOffset:] } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 5161ccb1c..ff3b998ca 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -203,11 +203,53 @@ func (q *Query) String() string { func (q *Query) getResultColumnNames() []string { input := []string{"*"} + dropFields := make(map[string]struct{}) pipes := q.pipes for i := len(pipes) - 1; i >= 0; i-- { - fields, m := pipes[i].getNeededFields() - if len(fields) == 0 { + neededFields, m := pipes[i].getNeededFields() + neededFields = normalizeFields(neededFields) + + referredFields := make(map[string]int) + for _, a := range m { + for _, f := range a { + referredFields[f]++ + } + } + + for k := range dropFields { + for _, f := range m[k] { + referredFields[f]-- + } + } + for k, v := range referredFields { + if v == 0 { + dropFields[k] = struct{}{} + } + } + dropFieldsNext := make(map[string]struct{}) + for k := range m { + if referredFields[k] == 0 { + dropFieldsNext[k] = struct{}{} + } + } + for k := range dropFields { + if referredFields[k] == 0 { + dropFieldsNext[k] = struct{}{} + } + } + + if len(dropFields) > 0 { + var neededFieldsDst []string + for _, f := range neededFields { + if _, ok := dropFields[f]; !ok { + neededFieldsDst = append(neededFieldsDst, f) + } + } + neededFields = neededFieldsDst + } + + if len(neededFields) == 0 { input = nil } if len(input) == 0 { @@ -225,21 +267,31 @@ func (q *Query) getResultColumnNames() []string { } } input = normalizeFields(dst) + if len(input) == 0 { + break + } } - // intersect fields with input - if fields[0] != "*" { + // intersect neededFields with input + if neededFields[0] != "*" { + clear(dropFields) + if input[0] == "*" { + input = neededFields + continue + } m := make(map[string]struct{}) for _, f := range input { m[f] = struct{}{} } var dst []string - for _, f := range fields { + for _, f := range neededFields { if _, ok := m[f]; ok { dst = append(dst, f) } } - input = normalizeFields(dst) + input = dst + } else { + dropFields = dropFieldsNext } } diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 0809f41dc..7b32f83d0 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -23,7 +23,11 @@ func (pd *pipeDelete) String() string { } func (pd *pipeDelete) getNeededFields() ([]string, map[string][]string) { - return []string{"*"}, nil + m := make(map[string][]string, len(pd.fields)) + for _, f := range pd.fields { + m[f] = nil + } + return []string{"*"}, m } func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index 1bfaba889..c1a19aa00 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -32,10 +32,15 @@ func (pr *pipeRename) String() string { } func (pr *pipeRename) getNeededFields() ([]string, map[string][]string) { - m := make(map[string][]string, len(pr.srcFields)) + m := make(map[string][]string, len(pr.srcFields)+len(pr.dstFields)) for i, dstField := range pr.dstFields { m[dstField] = append(m[dstField], pr.srcFields[i]) } + for _, srcField := range pr.srcFields { + if _, ok := m[srcField]; !ok { + m[srcField] = nil + } + } return []string{"*"}, m }