This commit is contained in:
Aliaksandr Valialkin 2024-05-09 15:18:27 +02:00
parent 9e4abde51d
commit 7b72c6df5b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 167 additions and 132 deletions

View file

@ -166,24 +166,32 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) {
}
func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) {
// Add _time column
br.addTimeColumn()
unneededColumnNames := bs.bsw.so.unneededColumnNames
// Add _stream column
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
br.reset()
return
if !slices.Contains(unneededColumnNames, "_time") {
// Add _time column
br.addTimeColumn()
}
// Add _msg column
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
br.addConstColumn("_msg", v)
} else if ch := bs.csh.getColumnHeader("_msg"); ch != nil {
br.addColumn(bs, ch, bm)
} else {
br.addConstColumn("_msg", "")
if !slices.Contains(unneededColumnNames, "_stream") {
// Add _stream column
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
br.reset()
return
}
}
if !slices.Contains(unneededColumnNames, "_msg") {
// Add _msg column
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
br.addConstColumn("_msg", v)
} else if ch := bs.csh.getColumnHeader("_msg"); ch != nil {
br.addColumn(bs, ch, bm)
} else {
br.addConstColumn("_msg", "")
}
}
// Add other const columns
@ -191,7 +199,9 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) {
if isMsgFieldName(cc.Name) {
continue
}
br.addConstColumn(cc.Name, cc.Value)
if !slices.Contains(unneededColumnNames, cc.Name) {
br.addConstColumn(cc.Name, cc.Value)
}
}
// Add other non-const columns
@ -201,7 +211,9 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) {
if isMsgFieldName(ch.name) {
continue
}
br.addColumn(bs, ch, bm)
if !slices.Contains(unneededColumnNames, ch.name) {
br.addColumn(bs, ch, bm)
}
}
}

View file

@ -206,7 +206,7 @@ func (q *Query) String() string {
return s
}
func (q *Query) getNeededColumns() []string {
func (q *Query) getNeededColumns() ([]string, []string) {
neededFields := newFieldsSet()
neededFields.add("*")
unneededFields := newFieldsSet()
@ -216,7 +216,7 @@ func (q *Query) getNeededColumns() []string {
pipes[i].updateNeededFields(neededFields, unneededFields)
}
return neededFields.getAll()
return neededFields.getAll(), unneededFields.getAll()
}
// ParseQuery parses s.

View file

@ -1256,7 +1256,7 @@ func TestParseQueryFailure(t *testing.T) {
}
func TestQueryGetNeededColumns(t *testing.T) {
f := func(s, neededColumnsExpected string) {
f := func(s, neededColumnsExpected, unneededColumnsExpected string) {
t.Helper()
q, err := ParseQuery(s)
@ -1264,96 +1264,105 @@ func TestQueryGetNeededColumns(t *testing.T) {
t.Fatalf("cannot parse query %s: %s", s, err)
}
columns := q.getNeededColumns()
neededColumns := strings.Join(columns, ",")
needed, unneeded := q.getNeededColumns()
neededColumns := strings.Join(needed, ",")
unneededColumns := strings.Join(unneeded, ",")
if neededColumns != neededColumnsExpected {
t.Fatalf("unexpected neededColumns; got %q; want %q", neededColumns, neededColumnsExpected)
}
if unneededColumns != unneededColumnsExpected {
t.Fatalf("unexpected unneededColumns; got %q; want %q", unneededColumns, unneededColumnsExpected)
}
}
f(`*`, `*`)
f(`foo bar`, `*`)
f(`foo:bar _time:5m baz`, `*`)
f(`*`, `*`, ``)
f(`foo bar`, `*`, ``)
f(`foo:bar _time:5m baz`, `*`, ``)
f(`* | fields *`, `*`)
f(`* | fields * | offset 10`, `*`)
f(`* | fields * | offset 10 | limit 20`, `*`)
f(`* | fields foo`, `foo`)
f(`* | fields foo, bar`, `bar,foo`)
f(`* | fields foo, bar | fields baz, bar`, `bar`)
f(`* | fields foo, bar | fields baz, a`, ``)
f(`* | fields f1, f2 | rm f3, f4`, `f1,f2`)
f(`* | fields f1, f2 | rm f2, f3`, `f1`)
f(`* | fields f1, f2 | rm f1, f2, f3`, ``)
f(`* | fields f1, f2 | cp f1 f2, f3 f4`, `f1`)
f(`* | fields f1, f2 | cp f1 f3, f4 f5`, `f1,f2`)
f(`* | fields f1, f2 | cp f2 f3, f4 f5`, `f1,f2`)
f(`* | fields f1, f2 | cp f2 f3, f4 f1`, `f2`)
f(`* | fields f1, f2 | mv f1 f2, f3 f4`, `f1`)
f(`* | fields f1, f2 | mv f1 f3, f4 f5`, `f1,f2`)
f(`* | fields f1, f2 | mv f2 f3, f4 f5`, `f1,f2`)
f(`* | fields f1, f2 | mv f2 f3, f4 f1`, `f2`)
f(`* | fields f1, f2 | stats count() r1`, ``)
f(`* | fields f1, f2 | stats count_uniq() r1`, `f1,f2`)
f(`* | fields f1, f2 | stats count(f1) r1`, `f1`)
f(`* | fields f1, f2 | stats count(f1,f2,f3) r1`, `f1,f2`)
f(`* | fields f1, f2 | stats by(b1) count() r1`, ``)
f(`* | fields f1, f2 | stats by(b1,f1) count() r1`, `f1`)
f(`* | fields f1, f2 | stats by(b1,f1) count(f1) r1`, `f1`)
f(`* | fields f1, f2 | stats by(b1,f1) count(f1,f2,f3) r1`, `f1,f2`)
f(`* | fields f1, f2 | sort by(f3)`, `f1,f2`)
f(`* | fields f1, f2 | sort by(f1,f3)`, `f1,f2`)
f(`* | fields f1, f2 | sort by(f3) | stats count() r1`, ``)
f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, `f1`)
f(`* | fields f1, f2 | sort by(f1) | stats count(f2,f3) r1`, `f1,f2`)
f(`* | fields f1, f2 | sort by(f3) | fields f2`, `f2`)
f(`* | fields f1, f2 | sort by(f1,f3) | fields f2`, `f1,f2`)
f(`* | fields *`, `*`, ``)
f(`* | fields * | offset 10`, `*`, ``)
f(`* | fields * | offset 10 | limit 20`, `*`, ``)
f(`* | fields foo`, `foo`, ``)
f(`* | fields foo, bar`, `bar,foo`, ``)
f(`* | fields foo, bar | fields baz, bar`, `bar`, ``)
f(`* | fields foo, bar | fields baz, a`, ``, ``)
f(`* | fields f1, f2 | rm f3, f4`, `f1,f2`, ``)
f(`* | fields f1, f2 | rm f2, f3`, `f1`, ``)
f(`* | fields f1, f2 | rm f1, f2, f3`, ``, ``)
f(`* | fields f1, f2 | cp f1 f2, f3 f4`, `f1`, ``)
f(`* | fields f1, f2 | cp f1 f3, f4 f5`, `f1,f2`, ``)
f(`* | fields f1, f2 | cp f2 f3, f4 f5`, `f1,f2`, ``)
f(`* | fields f1, f2 | cp f2 f3, f4 f1`, `f2`, ``)
f(`* | fields f1, f2 | mv f1 f2, f3 f4`, `f1`, ``)
f(`* | fields f1, f2 | mv f1 f3, f4 f5`, `f1,f2`, ``)
f(`* | fields f1, f2 | mv f2 f3, f4 f5`, `f1,f2`, ``)
f(`* | fields f1, f2 | mv f2 f3, f4 f1`, `f2`, ``)
f(`* | fields f1, f2 | stats count() r1`, ``, ``)
f(`* | fields f1, f2 | stats count_uniq() r1`, `f1,f2`, ``)
f(`* | fields f1, f2 | stats count(f1) r1`, `f1`, ``)
f(`* | fields f1, f2 | stats count(f1,f2,f3) r1`, `f1,f2`, ``)
f(`* | fields f1, f2 | stats by(b1) count() r1`, ``, ``)
f(`* | fields f1, f2 | stats by(b1,f1) count() r1`, `f1`, ``)
f(`* | fields f1, f2 | stats by(b1,f1) count(f1) r1`, `f1`, ``)
f(`* | fields f1, f2 | stats by(b1,f1) count(f1,f2,f3) r1`, `f1,f2`, ``)
f(`* | fields f1, f2 | sort by(f3)`, `f1,f2`, ``)
f(`* | fields f1, f2 | sort by(f1,f3)`, `f1,f2`, ``)
f(`* | fields f1, f2 | sort by(f3) | stats count() r1`, ``, ``)
f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, `f1`, ``)
f(`* | fields f1, f2 | sort by(f1) | stats count(f2,f3) r1`, `f1,f2`, ``)
f(`* | fields f1, f2 | sort by(f3) | fields f2`, `f2`, ``)
f(`* | fields f1, f2 | sort by(f1,f3) | fields f2`, `f1,f2`, ``)
f(`* | cp foo bar`, `*`)
f(`* | cp foo bar, baz a`, `*`)
f(`* | cp foo bar, baz a | fields foo,a,b`, `b,baz,foo`)
f(`* | cp foo bar, baz a | fields bar,a,b`, `b,baz,foo`)
f(`* | cp foo bar, baz a | fields baz,a,b`, `b,baz`)
f(`* | cp foo bar | fields bar,a`, `a,foo`)
f(`* | cp foo bar | fields baz,a`, `a,baz`)
f(`* | cp foo bar | fields foo,a`, `a,foo`)
f(`* | cp f1 f2 | rm f1`, `*`)
f(`* | cp f1 f2 | rm f2`, `*`)
f(`* | cp f1 f2 | rm f3`, `*`)
f(`* | cp foo bar`, `*`, `bar`)
f(`* | cp foo bar, baz a`, `*`, `a,bar`)
f(`* | cp foo bar, baz a | fields foo,a,b`, `b,baz,foo`, ``)
f(`* | cp foo bar, baz a | fields bar,a,b`, `b,baz,foo`, ``)
f(`* | cp foo bar, baz a | fields baz,a,b`, `b,baz`, ``)
f(`* | cp foo bar | fields bar,a`, `a,foo`, ``)
f(`* | cp foo bar | fields baz,a`, `a,baz`, ``)
f(`* | cp foo bar | fields foo,a`, `a,foo`, ``)
f(`* | cp f1 f2 | rm f1`, `*`, `f2`)
f(`* | cp f1 f2 | rm f2`, `*`, `f2`)
f(`* | cp f1 f2 | rm f3`, `*`, `f2,f3`)
f(`* | mv foo bar`, `*`)
f(`* | mv foo bar, baz a`, `*`)
f(`* | mv foo bar, baz a | fields foo,a,b`, `b,baz`)
f(`* | mv foo bar, baz a | fields bar,a,b`, `b,baz,foo`)
f(`* | mv foo bar, baz a | fields baz,a,b`, `b,baz`)
f(`* | mv foo bar, baz a | fields baz,foo,b`, `b`)
f(`* | mv foo bar | fields bar,a`, `a,foo`)
f(`* | mv foo bar | fields baz,a`, `a,baz`)
f(`* | mv foo bar | fields foo,a`, `a`)
f(`* | mv f1 f2 | rm f1`, `*`)
f(`* | mv f1 f2 | rm f2`, `*`)
f(`* | mv f1 f2 | rm f3`, `*`)
f(`* | mv foo bar`, `*`, `bar`)
f(`* | mv foo bar, baz a`, `*`, `a,bar`)
f(`* | mv foo bar, baz a | fields foo,a,b`, `b,baz`, ``)
f(`* | mv foo bar, baz a | fields bar,a,b`, `b,baz,foo`, ``)
f(`* | mv foo bar, baz a | fields baz,a,b`, `b,baz`, ``)
f(`* | mv foo bar, baz a | fields baz,foo,b`, `b`, ``)
f(`* | mv foo bar | fields bar,a`, `a,foo`, ``)
f(`* | mv foo bar | fields baz,a`, `a,baz`, ``)
f(`* | mv foo bar | fields foo,a`, `a`, ``)
f(`* | mv f1 f2 | rm f1`, `*`, `f2`)
f(`* | mv f1 f2 | rm f2,f3`, `*`, `f1,f2,f3`)
f(`* | mv f1 f2 | rm f3`, `*`, `f2,f3`)
f(`* | sort by (f1)`, `*`)
f(`* | sort by (f1) | fields f2`, `f1,f2`)
f(`* | sort by (f1) | fields *`, `*`)
f(`* | sort by (f1) | sort by (f2,f3 desc) desc`, `*`)
f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4`, `f1,f2,f3,f4`)
f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4 | rm f1,f2,f5`, `f1,f2,f3,f4`)
f(`* | sort by (f1)`, `*`, ``)
f(`* | sort by (f1) | fields f2`, `f1,f2`, ``)
f(`* | sort by (f1) | fields *`, `*`, ``)
f(`* | sort by (f1) | sort by (f2,f3 desc) desc`, `*`, ``)
f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4`, `f1,f2,f3,f4`, ``)
f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4 | rm f1,f2,f5`, `f1,f2,f3,f4`, ``)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2`, `f1,f2,f3,f4`)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields f1`, ``)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r1`, `f1,f2`)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r2,r3`, `f1,f3,f4`)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2`, `f1,f2,f3,f4`, ``)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields f1`, ``, ``)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r1`, `f1,f2`, ``)
f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r2,r3`, `f1,f3,f4`, ``)
f(`* | rm f1, f2`, `*`)
f(`* | rm f1, f2 | fields f3`, `f3`)
f(`* | rm f1, f2 | fields f1,f3`, `f3`)
f(`* | rm f1, f2 | stats count() f1`, ``)
f(`* | rm f1, f2 | stats count(f3) r1`, `f3`)
f(`* | rm f1, f2 | stats count(f1) r1`, ``)
f(`* | rm f1, f2 | stats count(f1,f3) r1`, `f3`)
f(`* | rm f1, f2 | stats by(f1) count(f2) r1`, ``)
f(`* | rm f1, f2 | stats by(f3) count(f2) r1`, `f3`)
f(`* | rm f1, f2 | stats by(f3) count(f4) r1`, `f3,f4`)
f(`* | rm f1, f2`, `*`, `f1,f2`)
f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | mv f2 f3 | sort by(f4)`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | mv f2 f3 | sort by(f1)`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | fields f3`, `f3`, ``)
f(`* | rm f1, f2 | fields f1,f3`, `f3`, ``)
f(`* | rm f1, f2 | stats count() f1`, ``, ``)
f(`* | rm f1, f2 | stats count(f3) r1`, `f3`, ``)
f(`* | rm f1, f2 | stats count(f1) r1`, ``, ``)
f(`* | rm f1, f2 | stats count(f1,f3) r1`, `f3`, ``)
f(`* | rm f1, f2 | stats by(f1) count(f2) r1`, ``, ``)
f(`* | rm f1, f2 | stats by(f3) count(f2) r1`, `f3`, ``)
f(`* | rm f1, f2 | stats by(f3) count(f4) r1`, `f3,f4`, ``)
}

View file

@ -32,26 +32,26 @@ func (pc *pipeCopy) String() string {
}
func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) {
m := make(map[string]int)
neededSrcFields := make([]bool, len(pc.srcFields))
for i, dstField := range pc.dstFields {
if neededFields.contains(dstField) && !unneededFields.contains(dstField) {
m[pc.srcFields[i]]++
neededSrcFields[i] = true
}
}
if neededFields.contains("*") {
// update only unneeded fields
unneededFields.addAll(pc.dstFields)
for i, srcField := range pc.srcFields {
if m[srcField] > 0 {
unneededFields.remove(pc.srcFields[i])
if neededSrcFields[i] {
unneededFields.remove(srcField)
}
}
} else {
// update only needed fields and reset unneeded fields
neededFields.removeAll(pc.dstFields)
for i, srcField := range pc.srcFields {
if m[srcField] > 0 {
neededFields.add(pc.srcFields[i])
if neededSrcFields[i] {
neededFields.add(srcField)
}
}
unneededFields.reset()

View file

@ -32,28 +32,30 @@ func (pr *pipeRename) String() string {
}
func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) {
m := make(map[string]int)
neededSrcFields := make([]bool, len(pr.srcFields))
for i, dstField := range pr.dstFields {
if neededFields.contains(dstField) && !unneededFields.contains(dstField) {
m[pr.srcFields[i]]++
neededSrcFields[i] = true
}
}
if neededFields.contains("*") {
// update only unneeded fields
unneededFields.addAll(pr.dstFields)
for i, srcField := range pr.srcFields {
if m[srcField] > 0 {
unneededFields.remove(pr.srcFields[i])
if neededSrcFields[i] {
unneededFields.remove(srcField)
} else {
unneededFields.add(srcField)
}
}
} else {
// update only needed fields and reset unneeded fields
neededFields.removeAll(pr.dstFields)
for i, srcField := range pr.srcFields {
if m[srcField] > 0 {
neededFields.add(pr.srcFields[i])
if neededSrcFields[i] {
neededFields.add(srcField)
} else {
neededFields.remove(pr.srcFields[i])
neededFields.remove(srcField)
}
}
unneededFields.reset()

View file

@ -32,11 +32,11 @@ func TestPipeRenameUpdateNeededFields(t *testing.T) {
f("rename s1 d1, s2 d2", "*", "s1,f1,f2", "*", "d1,d2,f1,f2")
// all the needed fields, unneeded fields intersect with dst
f("rename s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2")
f("rename s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2,s2")
// all the needed fields, unneeded fields intersect with src and dst
f("rename s1 d1, s2 d2", "*", "s1,d1,f1,f2", "*", "d1,d2,f1,f2,s1")
f("rename s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2")
f("rename s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2,s2")
// needed fields do not intersect with src and dst
f("rename s1 d1, s2 d2", "f1,f2", "", "f1,f2", "")

View file

@ -19,10 +19,15 @@ type genericSearchOptions struct {
// filter is the filter to use for the search
filter filter
// neededColumnNames is names of columns to return in the result
// neededColumnNames contains names of columns to return in the result
neededColumnNames []string
// needAllColumns is set to true when all the columns must be returned in the result
// unneededColumnNames contains names of columns, which mustn't be returned in the result.
//
// This list is consulted if needAllColumns=true
unneededColumnNames []string
// needAllColumns is set to true when all the columns except of unneededColumnNames must be returned in the result
needAllColumns bool
}
@ -44,21 +49,27 @@ type searchOptions struct {
// filter is the filter to use for the search
filter filter
// neededColumnNames is names of columns to return in the result
// neededColumnNames contains names of columns to return in the result
neededColumnNames []string
// needAllColumns is set to true when all the columns must be returned in the result
// unneededColumnNames contains names of columns, which mustn't be returned in the result.
//
// This list is consulted when needAllColumns=true.
unneededColumnNames []string
// needAllColumns is set to true when all the columns except of unneededColumnNames must be returned in the result
needAllColumns bool
}
// RunQuery runs the given q and calls writeBlock for results.
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) error {
neededColumnNames := q.getNeededColumns()
neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
filter: q.f,
neededColumnNames: neededColumnNames,
needAllColumns: slices.Contains(neededColumnNames, "*"),
tenantIDs: tenantIDs,
filter: q.f,
neededColumnNames: neededColumnNames,
unneededColumnNames: unneededColumnNames,
needAllColumns: slices.Contains(neededColumnNames, "*"),
}
workersCount := cgroup.AvailableCPUs()
@ -317,13 +328,14 @@ func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *gene
f = initStreamFilters(tenantIDs, pt.idb, f)
}
soInternal := &searchOptions{
tenantIDs: tenantIDs,
streamIDs: streamIDs,
minTimestamp: ft.minTimestamp,
maxTimestamp: ft.maxTimestamp,
filter: f,
neededColumnNames: so.neededColumnNames,
needAllColumns: so.needAllColumns,
tenantIDs: tenantIDs,
streamIDs: streamIDs,
minTimestamp: ft.minTimestamp,
maxTimestamp: ft.maxTimestamp,
filter: f,
neededColumnNames: so.neededColumnNames,
unneededColumnNames: so.unneededColumnNames,
needAllColumns: so.needAllColumns,
}
return pt.ddb.search(soInternal, workCh, stopCh)
}