From d3047859369a2ac798808eb68f61de4f351ef050 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 25 Apr 2024 02:16:06 +0200 Subject: [PATCH] wip --- lib/logstorage/block_search.go | 121 +++++++++++++++++++++++-------- lib/logstorage/parser.go | 2 + lib/logstorage/storage_search.go | 2 +- 3 files changed, 95 insertions(+), 30 deletions(-) diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index b0f7a324d..d0de2acb8 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -117,29 +117,11 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { } // fetch the requested columns to bs.br. - for _, columnName := range bs.bsw.so.resultColumnNames { - switch columnName { - case "_stream": - if !bs.br.addStreamColumn(bs) { - // Skip the current block, since the associated stream tags are missing. - bs.br.reset() - return - } - case "_time": - bs.br.addTimeColumn() - default: - v := bs.csh.getConstColumnValue(columnName) - if v != "" { - bs.br.addConstColumn(v) - continue - } - ch := bs.csh.getColumnHeader(columnName) - if ch == nil { - bs.br.addConstColumn("") - } else { - bs.br.addColumn(bs, ch, bm) - } - } + columnNames := bs.bsw.so.resultColumnNames + if len(columnNames) == 1 && columnNames[0] == "*" { + bs.br.fetchAllColumns(bs, bm) + } else { + bs.br.fetchRequestedColumns(bs, bm, columnNames) } } @@ -299,21 +281,26 @@ type blockResult struct { // streamID is streamID for the given blockResult streamID streamID - // cs contain values for result columns + // cs contain values for the requested columns. + // + // The corresponding requested column names are stored at columnsNames. cs []blockResultColumn // timestamps contain timestamps for the selected log entries timestamps []int64 + + // columnNamesBuf is used only if all the columns must be fetched. + columnNamesBuf []string + + // columnNames references the list of names for cs columns. + columnNames []string } func (br *blockResult) reset() { br.buf = br.buf[:0] - vb := br.valuesBuf - for i := range vb { - vb[i] = "" - } - br.valuesBuf = vb[:0] + clear(br.valuesBuf) + br.valuesBuf = br.valuesBuf[:0] br.streamID.reset() @@ -324,6 +311,82 @@ func (br *blockResult) reset() { br.cs = cs[:0] br.timestamps = br.timestamps[:0] + + clear(br.columnNamesBuf) + br.columnNamesBuf = br.columnNamesBuf[:0] + + br.columnNames = nil +} + +func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *filterBitmap) { + // Add _stream column + br.columnNamesBuf = append(br.columnNamesBuf, "_stream") + if !br.addStreamColumn(bs) { + // Skip the current block, since the associated stream tags are missing. + br.reset() + return + } + + // Add _time column + br.columnNamesBuf = append(br.columnNamesBuf, "_time") + br.addTimeColumn() + + // Add _msg column + v := bs.csh.getConstColumnValue("_msg") + if v != "" { + br.columnNamesBuf = append(br.columnNamesBuf, "_msg") + br.addConstColumn(v) + } else if ch := bs.csh.getColumnHeader("_msg"); ch != nil { + br.columnNamesBuf = append(br.columnNamesBuf, "_msg") + br.addColumn(bs, ch, bm) + } + + for _, cc := range bs.csh.constColumns { + if isMsgFieldName(cc.Name) { + continue + } + br.columnNamesBuf = append(br.columnNamesBuf, cc.Name) + br.addConstColumn(cc.Value) + } + + chs := bs.csh.columnHeaders + for i := range chs { + ch := &chs[i] + if isMsgFieldName(ch.name) { + continue + } + br.columnNamesBuf = append(br.columnNamesBuf, ch.name) + br.addColumn(bs, ch, bm) + } + br.columnNames = br.columnNamesBuf +} + +func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap, columnNames []string) { + for _, columnName := range columnNames { + switch columnName { + case "_stream": + if !br.addStreamColumn(bs) { + // Skip the current block, since the associated stream tags are missing. + br.reset() + return + } + case "_time": + br.addTimeColumn() + default: + v := bs.csh.getConstColumnValue(columnName) + if v != "" { + br.addConstColumn(v) + continue + } + ch := bs.csh.getColumnHeader(columnName) + if ch == nil { + br.addConstColumn("") + } else { + br.addColumn(bs, ch, bm) + } + } + } + br.columnNames = columnNames } func (br *blockResult) RowsCount() int { diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 104491a01..a7155ff83 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -195,6 +195,8 @@ func (q *Query) String() string { } func (q *Query) getResultColumnNames() []string { + return []string{"*"} + m := make(map[string]struct{}) q.f.updateReferencedColumnNames(m) diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 74e56c7fb..6a95db2ee 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -55,7 +55,7 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ brs := getBlockRows() cs := brs.cs - for i, columnName := range resultColumnNames { + for i, columnName := range br.columnNames { cs = append(cs, BlockColumn{ Name: columnName, Values: br.getColumnValues(i),