This commit is contained in:
Aliaksandr Valialkin 2024-05-05 03:06:01 +02:00
parent bc7dfd5ba4
commit 90390cdc02
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 93 additions and 11 deletions

View file

@ -973,6 +973,10 @@ func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) {
} }
br.csOffset = csOffset br.csOffset = csOffset
br.cs = cs br.cs = cs
for _, dstColumnName := range dstColumnNames {
br.createMissingColumnByName(dstColumnName)
}
} }
// renameColumns renames columns from srcColumnNames to dstColumnNames. // renameColumns renames columns from srcColumnNames to dstColumnNames.
@ -995,6 +999,10 @@ func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) {
} }
br.csOffset = csOffset br.csOffset = csOffset
br.cs = cs br.cs = cs
for _, dstColumnName := range dstColumnNames {
br.createMissingColumnByName(dstColumnName)
}
} }
// deleteColumns deletes columns with the given columnNames. // deleteColumns deletes columns with the given columnNames.
@ -1047,9 +1055,7 @@ func (br *blockResult) areSameColumns(columnNames []string) bool {
func (br *blockResult) getColumnByName(columnName string) blockResultColumn { func (br *blockResult) getColumnByName(columnName string) blockResultColumn {
cs := br.getColumns() cs := br.getColumns()
for i := range cs {
// iterate columns in reverse order, so overridden column results are returned instead of original column results.
for i := len(cs) - 1; i >= 0; i-- {
if cs[i].name == columnName { if cs[i].name == columnName {
return cs[i] return cs[i]
} }
@ -1062,6 +1068,21 @@ func (br *blockResult) getColumnByName(columnName string) blockResultColumn {
} }
} }
func (br *blockResult) createMissingColumnByName(columnName string) {
cs := br.getColumns()
for i := range cs {
if cs[i].name == columnName {
return
}
}
br.cs = append(br.cs, blockResultColumn{
name: columnName,
isConst: true,
encodedValues: getEmptyStrings(1),
})
}
func (br *blockResult) getColumns() []blockResultColumn { func (br *blockResult) getColumns() []blockResultColumn {
return br.cs[br.csOffset:] return br.cs[br.csOffset:]
} }

View file

@ -203,11 +203,53 @@ func (q *Query) String() string {
func (q *Query) getResultColumnNames() []string { func (q *Query) getResultColumnNames() []string {
input := []string{"*"} input := []string{"*"}
dropFields := make(map[string]struct{})
pipes := q.pipes pipes := q.pipes
for i := len(pipes) - 1; i >= 0; i-- { for i := len(pipes) - 1; i >= 0; i-- {
fields, m := pipes[i].getNeededFields() neededFields, m := pipes[i].getNeededFields()
if len(fields) == 0 { neededFields = normalizeFields(neededFields)
referredFields := make(map[string]int)
for _, a := range m {
for _, f := range a {
referredFields[f]++
}
}
for k := range dropFields {
for _, f := range m[k] {
referredFields[f]--
}
}
for k, v := range referredFields {
if v == 0 {
dropFields[k] = struct{}{}
}
}
dropFieldsNext := make(map[string]struct{})
for k := range m {
if referredFields[k] == 0 {
dropFieldsNext[k] = struct{}{}
}
}
for k := range dropFields {
if referredFields[k] == 0 {
dropFieldsNext[k] = struct{}{}
}
}
if len(dropFields) > 0 {
var neededFieldsDst []string
for _, f := range neededFields {
if _, ok := dropFields[f]; !ok {
neededFieldsDst = append(neededFieldsDst, f)
}
}
neededFields = neededFieldsDst
}
if len(neededFields) == 0 {
input = nil input = nil
} }
if len(input) == 0 { if len(input) == 0 {
@ -225,21 +267,31 @@ func (q *Query) getResultColumnNames() []string {
} }
} }
input = normalizeFields(dst) input = normalizeFields(dst)
if len(input) == 0 {
break
}
} }
// intersect fields with input // intersect neededFields with input
if fields[0] != "*" { if neededFields[0] != "*" {
clear(dropFields)
if input[0] == "*" {
input = neededFields
continue
}
m := make(map[string]struct{}) m := make(map[string]struct{})
for _, f := range input { for _, f := range input {
m[f] = struct{}{} m[f] = struct{}{}
} }
var dst []string var dst []string
for _, f := range fields { for _, f := range neededFields {
if _, ok := m[f]; ok { if _, ok := m[f]; ok {
dst = append(dst, f) dst = append(dst, f)
} }
} }
input = normalizeFields(dst) input = dst
} else {
dropFields = dropFieldsNext
} }
} }

View file

@ -23,7 +23,11 @@ func (pd *pipeDelete) String() string {
} }
func (pd *pipeDelete) getNeededFields() ([]string, map[string][]string) { func (pd *pipeDelete) getNeededFields() ([]string, map[string][]string) {
return []string{"*"}, nil m := make(map[string][]string, len(pd.fields))
for _, f := range pd.fields {
m[f] = nil
}
return []string{"*"}, m
} }
func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {

View file

@ -32,10 +32,15 @@ func (pr *pipeRename) String() string {
} }
func (pr *pipeRename) getNeededFields() ([]string, map[string][]string) { func (pr *pipeRename) getNeededFields() ([]string, map[string][]string) {
m := make(map[string][]string, len(pr.srcFields)) m := make(map[string][]string, len(pr.srcFields)+len(pr.dstFields))
for i, dstField := range pr.dstFields { for i, dstField := range pr.dstFields {
m[dstField] = append(m[dstField], pr.srcFields[i]) m[dstField] = append(m[dstField], pr.srcFields[i])
} }
for _, srcField := range pr.srcFields {
if _, ok := m[srcField]; !ok {
m[srcField] = nil
}
}
return []string{"*"}, m return []string{"*"}, m
} }