diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 21ed4ae46..c850cee73 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -166,24 +166,32 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { } func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { - // Add _time column - br.addTimeColumn() + unneededColumnNames := bs.bsw.so.unneededColumnNames - // Add _stream column - if !br.addStreamColumn(bs) { - // Skip the current block, since the associated stream tags are missing. - br.reset() - return + if !slices.Contains(unneededColumnNames, "_time") { + // Add _time column + br.addTimeColumn() } - // Add _msg column - v := bs.csh.getConstColumnValue("_msg") - if v != "" { - br.addConstColumn("_msg", v) - } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { - br.addColumn(bs, ch, bm) - } else { - br.addConstColumn("_msg", "") + if !slices.Contains(unneededColumnNames, "_stream") { + // Add _stream column + if !br.addStreamColumn(bs) { + // Skip the current block, since the associated stream tags are missing. + br.reset() + return + } + } + + if !slices.Contains(unneededColumnNames, "_msg") { + // Add _msg column + v := bs.csh.getConstColumnValue("_msg") + if v != "" { + br.addConstColumn("_msg", v) + } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { + br.addColumn(bs, ch, bm) + } else { + br.addConstColumn("_msg", "") + } } // Add other const columns @@ -191,7 +199,9 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { if isMsgFieldName(cc.Name) { continue } - br.addConstColumn(cc.Name, cc.Value) + if !slices.Contains(unneededColumnNames, cc.Name) { + br.addConstColumn(cc.Name, cc.Value) + } } // Add other non-const columns @@ -201,7 +211,9 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { if isMsgFieldName(ch.name) { continue } - br.addColumn(bs, ch, bm) + if !slices.Contains(unneededColumnNames, ch.name) { + br.addColumn(bs, ch, bm) + } } } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index d842e59f2..67faad890 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -206,7 +206,7 @@ func (q *Query) String() string { return s } -func (q *Query) getNeededColumns() []string { +func (q *Query) getNeededColumns() ([]string, []string) { neededFields := newFieldsSet() neededFields.add("*") unneededFields := newFieldsSet() @@ -216,7 +216,7 @@ func (q *Query) getNeededColumns() []string { pipes[i].updateNeededFields(neededFields, unneededFields) } - return neededFields.getAll() + return neededFields.getAll(), unneededFields.getAll() } // ParseQuery parses s. diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index d0795e670..de68cc1ee 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1256,7 +1256,7 @@ func TestParseQueryFailure(t *testing.T) { } func TestQueryGetNeededColumns(t *testing.T) { - f := func(s, neededColumnsExpected string) { + f := func(s, neededColumnsExpected, unneededColumnsExpected string) { t.Helper() q, err := ParseQuery(s) @@ -1264,96 +1264,105 @@ func TestQueryGetNeededColumns(t *testing.T) { t.Fatalf("cannot parse query %s: %s", s, err) } - columns := q.getNeededColumns() - neededColumns := strings.Join(columns, ",") + needed, unneeded := q.getNeededColumns() + neededColumns := strings.Join(needed, ",") + unneededColumns := strings.Join(unneeded, ",") + if neededColumns != neededColumnsExpected { t.Fatalf("unexpected neededColumns; got %q; want %q", neededColumns, neededColumnsExpected) } + if unneededColumns != unneededColumnsExpected { + t.Fatalf("unexpected unneededColumns; got %q; want %q", unneededColumns, unneededColumnsExpected) + } } - f(`*`, `*`) - f(`foo bar`, `*`) - f(`foo:bar _time:5m baz`, `*`) + f(`*`, `*`, ``) + f(`foo bar`, `*`, ``) + f(`foo:bar _time:5m baz`, `*`, ``) - f(`* | fields *`, `*`) - f(`* | fields * | offset 10`, `*`) - f(`* | fields * | offset 10 | limit 20`, `*`) - f(`* | fields foo`, `foo`) - f(`* | fields foo, bar`, `bar,foo`) - f(`* | fields foo, bar | fields baz, bar`, `bar`) - f(`* | fields foo, bar | fields baz, a`, ``) - f(`* | fields f1, f2 | rm f3, f4`, `f1,f2`) - f(`* | fields f1, f2 | rm f2, f3`, `f1`) - f(`* | fields f1, f2 | rm f1, f2, f3`, ``) - f(`* | fields f1, f2 | cp f1 f2, f3 f4`, `f1`) - f(`* | fields f1, f2 | cp f1 f3, f4 f5`, `f1,f2`) - f(`* | fields f1, f2 | cp f2 f3, f4 f5`, `f1,f2`) - f(`* | fields f1, f2 | cp f2 f3, f4 f1`, `f2`) - f(`* | fields f1, f2 | mv f1 f2, f3 f4`, `f1`) - f(`* | fields f1, f2 | mv f1 f3, f4 f5`, `f1,f2`) - f(`* | fields f1, f2 | mv f2 f3, f4 f5`, `f1,f2`) - f(`* | fields f1, f2 | mv f2 f3, f4 f1`, `f2`) - f(`* | fields f1, f2 | stats count() r1`, ``) - f(`* | fields f1, f2 | stats count_uniq() r1`, `f1,f2`) - f(`* | fields f1, f2 | stats count(f1) r1`, `f1`) - f(`* | fields f1, f2 | stats count(f1,f2,f3) r1`, `f1,f2`) - f(`* | fields f1, f2 | stats by(b1) count() r1`, ``) - f(`* | fields f1, f2 | stats by(b1,f1) count() r1`, `f1`) - f(`* | fields f1, f2 | stats by(b1,f1) count(f1) r1`, `f1`) - f(`* | fields f1, f2 | stats by(b1,f1) count(f1,f2,f3) r1`, `f1,f2`) - f(`* | fields f1, f2 | sort by(f3)`, `f1,f2`) - f(`* | fields f1, f2 | sort by(f1,f3)`, `f1,f2`) - f(`* | fields f1, f2 | sort by(f3) | stats count() r1`, ``) - f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, `f1`) - f(`* | fields f1, f2 | sort by(f1) | stats count(f2,f3) r1`, `f1,f2`) - f(`* | fields f1, f2 | sort by(f3) | fields f2`, `f2`) - f(`* | fields f1, f2 | sort by(f1,f3) | fields f2`, `f1,f2`) + f(`* | fields *`, `*`, ``) + f(`* | fields * | offset 10`, `*`, ``) + f(`* | fields * | offset 10 | limit 20`, `*`, ``) + f(`* | fields foo`, `foo`, ``) + f(`* | fields foo, bar`, `bar,foo`, ``) + f(`* | fields foo, bar | fields baz, bar`, `bar`, ``) + f(`* | fields foo, bar | fields baz, a`, ``, ``) + f(`* | fields f1, f2 | rm f3, f4`, `f1,f2`, ``) + f(`* | fields f1, f2 | rm f2, f3`, `f1`, ``) + f(`* | fields f1, f2 | rm f1, f2, f3`, ``, ``) + f(`* | fields f1, f2 | cp f1 f2, f3 f4`, `f1`, ``) + f(`* | fields f1, f2 | cp f1 f3, f4 f5`, `f1,f2`, ``) + f(`* | fields f1, f2 | cp f2 f3, f4 f5`, `f1,f2`, ``) + f(`* | fields f1, f2 | cp f2 f3, f4 f1`, `f2`, ``) + f(`* | fields f1, f2 | mv f1 f2, f3 f4`, `f1`, ``) + f(`* | fields f1, f2 | mv f1 f3, f4 f5`, `f1,f2`, ``) + f(`* | fields f1, f2 | mv f2 f3, f4 f5`, `f1,f2`, ``) + f(`* | fields f1, f2 | mv f2 f3, f4 f1`, `f2`, ``) + f(`* | fields f1, f2 | stats count() r1`, ``, ``) + f(`* | fields f1, f2 | stats count_uniq() r1`, `f1,f2`, ``) + f(`* | fields f1, f2 | stats count(f1) r1`, `f1`, ``) + f(`* | fields f1, f2 | stats count(f1,f2,f3) r1`, `f1,f2`, ``) + f(`* | fields f1, f2 | stats by(b1) count() r1`, ``, ``) + f(`* | fields f1, f2 | stats by(b1,f1) count() r1`, `f1`, ``) + f(`* | fields f1, f2 | stats by(b1,f1) count(f1) r1`, `f1`, ``) + f(`* | fields f1, f2 | stats by(b1,f1) count(f1,f2,f3) r1`, `f1,f2`, ``) + f(`* | fields f1, f2 | sort by(f3)`, `f1,f2`, ``) + f(`* | fields f1, f2 | sort by(f1,f3)`, `f1,f2`, ``) + f(`* | fields f1, f2 | sort by(f3) | stats count() r1`, ``, ``) + f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, `f1`, ``) + f(`* | fields f1, f2 | sort by(f1) | stats count(f2,f3) r1`, `f1,f2`, ``) + f(`* | fields f1, f2 | sort by(f3) | fields f2`, `f2`, ``) + f(`* | fields f1, f2 | sort by(f1,f3) | fields f2`, `f1,f2`, ``) - f(`* | cp foo bar`, `*`) - f(`* | cp foo bar, baz a`, `*`) - f(`* | cp foo bar, baz a | fields foo,a,b`, `b,baz,foo`) - f(`* | cp foo bar, baz a | fields bar,a,b`, `b,baz,foo`) - f(`* | cp foo bar, baz a | fields baz,a,b`, `b,baz`) - f(`* | cp foo bar | fields bar,a`, `a,foo`) - f(`* | cp foo bar | fields baz,a`, `a,baz`) - f(`* | cp foo bar | fields foo,a`, `a,foo`) - f(`* | cp f1 f2 | rm f1`, `*`) - f(`* | cp f1 f2 | rm f2`, `*`) - f(`* | cp f1 f2 | rm f3`, `*`) + f(`* | cp foo bar`, `*`, `bar`) + f(`* | cp foo bar, baz a`, `*`, `a,bar`) + f(`* | cp foo bar, baz a | fields foo,a,b`, `b,baz,foo`, ``) + f(`* | cp foo bar, baz a | fields bar,a,b`, `b,baz,foo`, ``) + f(`* | cp foo bar, baz a | fields baz,a,b`, `b,baz`, ``) + f(`* | cp foo bar | fields bar,a`, `a,foo`, ``) + f(`* | cp foo bar | fields baz,a`, `a,baz`, ``) + f(`* | cp foo bar | fields foo,a`, `a,foo`, ``) + f(`* | cp f1 f2 | rm f1`, `*`, `f2`) + f(`* | cp f1 f2 | rm f2`, `*`, `f2`) + f(`* | cp f1 f2 | rm f3`, `*`, `f2,f3`) - f(`* | mv foo bar`, `*`) - f(`* | mv foo bar, baz a`, `*`) - f(`* | mv foo bar, baz a | fields foo,a,b`, `b,baz`) - f(`* | mv foo bar, baz a | fields bar,a,b`, `b,baz,foo`) - f(`* | mv foo bar, baz a | fields baz,a,b`, `b,baz`) - f(`* | mv foo bar, baz a | fields baz,foo,b`, `b`) - f(`* | mv foo bar | fields bar,a`, `a,foo`) - f(`* | mv foo bar | fields baz,a`, `a,baz`) - f(`* | mv foo bar | fields foo,a`, `a`) - f(`* | mv f1 f2 | rm f1`, `*`) - f(`* | mv f1 f2 | rm f2`, `*`) - f(`* | mv f1 f2 | rm f3`, `*`) + f(`* | mv foo bar`, `*`, `bar`) + f(`* | mv foo bar, baz a`, `*`, `a,bar`) + f(`* | mv foo bar, baz a | fields foo,a,b`, `b,baz`, ``) + f(`* | mv foo bar, baz a | fields bar,a,b`, `b,baz,foo`, ``) + f(`* | mv foo bar, baz a | fields baz,a,b`, `b,baz`, ``) + f(`* | mv foo bar, baz a | fields baz,foo,b`, `b`, ``) + f(`* | mv foo bar | fields bar,a`, `a,foo`, ``) + f(`* | mv foo bar | fields baz,a`, `a,baz`, ``) + f(`* | mv foo bar | fields foo,a`, `a`, ``) + f(`* | mv f1 f2 | rm f1`, `*`, `f2`) + f(`* | mv f1 f2 | rm f2,f3`, `*`, `f1,f2,f3`) + f(`* | mv f1 f2 | rm f3`, `*`, `f2,f3`) - f(`* | sort by (f1)`, `*`) - f(`* | sort by (f1) | fields f2`, `f1,f2`) - f(`* | sort by (f1) | fields *`, `*`) - f(`* | sort by (f1) | sort by (f2,f3 desc) desc`, `*`) - f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4`, `f1,f2,f3,f4`) - f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4 | rm f1,f2,f5`, `f1,f2,f3,f4`) + f(`* | sort by (f1)`, `*`, ``) + f(`* | sort by (f1) | fields f2`, `f1,f2`, ``) + f(`* | sort by (f1) | fields *`, `*`, ``) + f(`* | sort by (f1) | sort by (f2,f3 desc) desc`, `*`, ``) + f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4`, `f1,f2,f3,f4`, ``) + f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4 | rm f1,f2,f5`, `f1,f2,f3,f4`, ``) - f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2`, `f1,f2,f3,f4`) - f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields f1`, ``) - f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r1`, `f1,f2`) - f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r2,r3`, `f1,f3,f4`) + f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2`, `f1,f2,f3,f4`, ``) + f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields f1`, ``, ``) + f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r1`, `f1,f2`, ``) + f(`* | stats by(f1) count(f2) r1, count(f3,f4) r2 | fields r2,r3`, `f1,f3,f4`, ``) - f(`* | rm f1, f2`, `*`) - f(`* | rm f1, f2 | fields f3`, `f3`) - f(`* | rm f1, f2 | fields f1,f3`, `f3`) - f(`* | rm f1, f2 | stats count() f1`, ``) - f(`* | rm f1, f2 | stats count(f3) r1`, `f3`) - f(`* | rm f1, f2 | stats count(f1) r1`, ``) - f(`* | rm f1, f2 | stats count(f1,f3) r1`, `f3`) - f(`* | rm f1, f2 | stats by(f1) count(f2) r1`, ``) - f(`* | rm f1, f2 | stats by(f3) count(f2) r1`, `f3`) - f(`* | rm f1, f2 | stats by(f3) count(f4) r1`, `f3,f4`) + f(`* | rm f1, f2`, `*`, `f1,f2`) + f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`) + f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`) + f(`* | rm f1, f2 | mv f2 f3 | sort by(f4)`, `*`, `f1,f2,f3`) + f(`* | rm f1, f2 | mv f2 f3 | sort by(f1)`, `*`, `f1,f2,f3`) + f(`* | rm f1, f2 | fields f3`, `f3`, ``) + f(`* | rm f1, f2 | fields f1,f3`, `f3`, ``) + f(`* | rm f1, f2 | stats count() f1`, ``, ``) + f(`* | rm f1, f2 | stats count(f3) r1`, `f3`, ``) + f(`* | rm f1, f2 | stats count(f1) r1`, ``, ``) + f(`* | rm f1, f2 | stats count(f1,f3) r1`, `f3`, ``) + f(`* | rm f1, f2 | stats by(f1) count(f2) r1`, ``, ``) + f(`* | rm f1, f2 | stats by(f3) count(f2) r1`, `f3`, ``) + f(`* | rm f1, f2 | stats by(f3) count(f4) r1`, `f3,f4`, ``) } diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index a9730b4c1..cd4cccfe7 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -32,26 +32,26 @@ func (pc *pipeCopy) String() string { } func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) { - m := make(map[string]int) + neededSrcFields := make([]bool, len(pc.srcFields)) for i, dstField := range pc.dstFields { if neededFields.contains(dstField) && !unneededFields.contains(dstField) { - m[pc.srcFields[i]]++ + neededSrcFields[i] = true } } if neededFields.contains("*") { // update only unneeded fields unneededFields.addAll(pc.dstFields) for i, srcField := range pc.srcFields { - if m[srcField] > 0 { - unneededFields.remove(pc.srcFields[i]) + if neededSrcFields[i] { + unneededFields.remove(srcField) } } } else { // update only needed fields and reset unneeded fields neededFields.removeAll(pc.dstFields) for i, srcField := range pc.srcFields { - if m[srcField] > 0 { - neededFields.add(pc.srcFields[i]) + if neededSrcFields[i] { + neededFields.add(srcField) } } unneededFields.reset() diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index 1716ee28b..0eb48af39 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -32,28 +32,30 @@ func (pr *pipeRename) String() string { } func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) { - m := make(map[string]int) + neededSrcFields := make([]bool, len(pr.srcFields)) for i, dstField := range pr.dstFields { if neededFields.contains(dstField) && !unneededFields.contains(dstField) { - m[pr.srcFields[i]]++ + neededSrcFields[i] = true } } if neededFields.contains("*") { // update only unneeded fields unneededFields.addAll(pr.dstFields) for i, srcField := range pr.srcFields { - if m[srcField] > 0 { - unneededFields.remove(pr.srcFields[i]) + if neededSrcFields[i] { + unneededFields.remove(srcField) + } else { + unneededFields.add(srcField) } } } else { // update only needed fields and reset unneeded fields neededFields.removeAll(pr.dstFields) for i, srcField := range pr.srcFields { - if m[srcField] > 0 { - neededFields.add(pr.srcFields[i]) + if neededSrcFields[i] { + neededFields.add(srcField) } else { - neededFields.remove(pr.srcFields[i]) + neededFields.remove(srcField) } } unneededFields.reset() diff --git a/lib/logstorage/pipe_rename_test.go b/lib/logstorage/pipe_rename_test.go index eda949214..5011e3333 100644 --- a/lib/logstorage/pipe_rename_test.go +++ b/lib/logstorage/pipe_rename_test.go @@ -32,11 +32,11 @@ func TestPipeRenameUpdateNeededFields(t *testing.T) { f("rename s1 d1, s2 d2", "*", "s1,f1,f2", "*", "d1,d2,f1,f2") // all the needed fields, unneeded fields intersect with dst - f("rename s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2") + f("rename s1 d1, s2 d2", "*", "d2,f1,f2", "*", "d1,d2,f1,f2,s2") // all the needed fields, unneeded fields intersect with src and dst f("rename s1 d1, s2 d2", "*", "s1,d1,f1,f2", "*", "d1,d2,f1,f2,s1") - f("rename s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2") + f("rename s1 d1, s2 d2", "*", "s1,d2,f1,f2", "*", "d1,d2,f1,f2,s2") // needed fields do not intersect with src and dst f("rename s1 d1, s2 d2", "f1,f2", "", "f1,f2", "") diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 4240915e3..2f4979ae7 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -19,10 +19,15 @@ type genericSearchOptions struct { // filter is the filter to use for the search filter filter - // neededColumnNames is names of columns to return in the result + // neededColumnNames contains names of columns to return in the result neededColumnNames []string - // needAllColumns is set to true when all the columns must be returned in the result + // unneededColumnNames contains names of columns, which mustn't be returned in the result. + // + // This list is consulted if needAllColumns=true + unneededColumnNames []string + + // needAllColumns is set to true when all the columns except of unneededColumnNames must be returned in the result needAllColumns bool } @@ -44,21 +49,27 @@ type searchOptions struct { // filter is the filter to use for the search filter filter - // neededColumnNames is names of columns to return in the result + // neededColumnNames contains names of columns to return in the result neededColumnNames []string - // needAllColumns is set to true when all the columns must be returned in the result + // unneededColumnNames contains names of columns, which mustn't be returned in the result. + // + // This list is consulted when needAllColumns=true. + unneededColumnNames []string + + // needAllColumns is set to true when all the columns except of unneededColumnNames must be returned in the result needAllColumns bool } // RunQuery runs the given q and calls writeBlock for results. func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) error { - neededColumnNames := q.getNeededColumns() + neededColumnNames, unneededColumnNames := q.getNeededColumns() so := &genericSearchOptions{ - tenantIDs: tenantIDs, - filter: q.f, - neededColumnNames: neededColumnNames, - needAllColumns: slices.Contains(neededColumnNames, "*"), + tenantIDs: tenantIDs, + filter: q.f, + neededColumnNames: neededColumnNames, + unneededColumnNames: unneededColumnNames, + needAllColumns: slices.Contains(neededColumnNames, "*"), } workersCount := cgroup.AvailableCPUs() @@ -317,13 +328,14 @@ func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *gene f = initStreamFilters(tenantIDs, pt.idb, f) } soInternal := &searchOptions{ - tenantIDs: tenantIDs, - streamIDs: streamIDs, - minTimestamp: ft.minTimestamp, - maxTimestamp: ft.maxTimestamp, - filter: f, - neededColumnNames: so.neededColumnNames, - needAllColumns: so.needAllColumns, + tenantIDs: tenantIDs, + streamIDs: streamIDs, + minTimestamp: ft.minTimestamp, + maxTimestamp: ft.maxTimestamp, + filter: f, + neededColumnNames: so.neededColumnNames, + unneededColumnNames: so.unneededColumnNames, + needAllColumns: so.needAllColumns, } return pt.ddb.search(soInternal, workCh, stopCh) }