This commit is contained in:
Aliaksandr Valialkin 2024-04-25 02:16:06 +02:00
parent 098067c7cd
commit d304785936
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 95 additions and 30 deletions

View file

@ -117,29 +117,11 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
} }
// fetch the requested columns to bs.br. // fetch the requested columns to bs.br.
for _, columnName := range bs.bsw.so.resultColumnNames { columnNames := bs.bsw.so.resultColumnNames
switch columnName { if len(columnNames) == 1 && columnNames[0] == "*" {
case "_stream": bs.br.fetchAllColumns(bs, bm)
if !bs.br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
bs.br.reset()
return
}
case "_time":
bs.br.addTimeColumn()
default:
v := bs.csh.getConstColumnValue(columnName)
if v != "" {
bs.br.addConstColumn(v)
continue
}
ch := bs.csh.getColumnHeader(columnName)
if ch == nil {
bs.br.addConstColumn("")
} else { } else {
bs.br.addColumn(bs, ch, bm) bs.br.fetchRequestedColumns(bs, bm, columnNames)
}
}
} }
} }
@ -299,21 +281,26 @@ type blockResult struct {
// streamID is streamID for the given blockResult // streamID is streamID for the given blockResult
streamID streamID streamID streamID
// cs contain values for result columns // cs contain values for the requested columns.
//
// The corresponding requested column names are stored at columnsNames.
cs []blockResultColumn cs []blockResultColumn
// timestamps contain timestamps for the selected log entries // timestamps contain timestamps for the selected log entries
timestamps []int64 timestamps []int64
// columnNamesBuf is used only if all the columns must be fetched.
columnNamesBuf []string
// columnNames references the list of names for cs columns.
columnNames []string
} }
func (br *blockResult) reset() { func (br *blockResult) reset() {
br.buf = br.buf[:0] br.buf = br.buf[:0]
vb := br.valuesBuf clear(br.valuesBuf)
for i := range vb { br.valuesBuf = br.valuesBuf[:0]
vb[i] = ""
}
br.valuesBuf = vb[:0]
br.streamID.reset() br.streamID.reset()
@ -324,6 +311,82 @@ func (br *blockResult) reset() {
br.cs = cs[:0] br.cs = cs[:0]
br.timestamps = br.timestamps[:0] br.timestamps = br.timestamps[:0]
clear(br.columnNamesBuf)
br.columnNamesBuf = br.columnNamesBuf[:0]
br.columnNames = nil
}
func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *filterBitmap) {
// Add _stream column
br.columnNamesBuf = append(br.columnNamesBuf, "_stream")
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
br.reset()
return
}
// Add _time column
br.columnNamesBuf = append(br.columnNamesBuf, "_time")
br.addTimeColumn()
// Add _msg column
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
br.columnNamesBuf = append(br.columnNamesBuf, "_msg")
br.addConstColumn(v)
} else if ch := bs.csh.getColumnHeader("_msg"); ch != nil {
br.columnNamesBuf = append(br.columnNamesBuf, "_msg")
br.addColumn(bs, ch, bm)
}
for _, cc := range bs.csh.constColumns {
if isMsgFieldName(cc.Name) {
continue
}
br.columnNamesBuf = append(br.columnNamesBuf, cc.Name)
br.addConstColumn(cc.Value)
}
chs := bs.csh.columnHeaders
for i := range chs {
ch := &chs[i]
if isMsgFieldName(ch.name) {
continue
}
br.columnNamesBuf = append(br.columnNamesBuf, ch.name)
br.addColumn(bs, ch, bm)
}
br.columnNames = br.columnNamesBuf
}
func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap, columnNames []string) {
for _, columnName := range columnNames {
switch columnName {
case "_stream":
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
br.reset()
return
}
case "_time":
br.addTimeColumn()
default:
v := bs.csh.getConstColumnValue(columnName)
if v != "" {
br.addConstColumn(v)
continue
}
ch := bs.csh.getColumnHeader(columnName)
if ch == nil {
br.addConstColumn("")
} else {
br.addColumn(bs, ch, bm)
}
}
}
br.columnNames = columnNames
} }
func (br *blockResult) RowsCount() int { func (br *blockResult) RowsCount() int {

View file

@ -195,6 +195,8 @@ func (q *Query) String() string {
} }
func (q *Query) getResultColumnNames() []string { func (q *Query) getResultColumnNames() []string {
return []string{"*"}
m := make(map[string]struct{}) m := make(map[string]struct{})
q.f.updateReferencedColumnNames(m) q.f.updateReferencedColumnNames(m)

View file

@ -55,7 +55,7 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{
brs := getBlockRows() brs := getBlockRows()
cs := brs.cs cs := brs.cs
for i, columnName := range resultColumnNames { for i, columnName := range br.columnNames {
cs = append(cs, BlockColumn{ cs = append(cs, BlockColumn{
Name: columnName, Name: columnName,
Values: br.getColumnValues(i), Values: br.getColumnValues(i),