diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index bb3af749c..ba0b2c4f6 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -1,7 +1,6 @@ package logstorage import ( - "slices" "strconv" "sync" "time" @@ -118,11 +117,10 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { } // fetch the requested columns to bs.br. - columnNames := bs.bsw.so.resultColumnNames - if slices.Contains(columnNames, "*") { + if bs.bsw.so.needAllColumns { bs.br.fetchAllColumns(bs, bm) } else { - bs.br.fetchRequestedColumns(bs, bm, columnNames) + bs.br.fetchRequestedColumns(bs, bm) } } @@ -362,8 +360,8 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *filterBitmap) { br.columnNames = br.columnNamesBuf } -func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap, columnNames []string) { - for _, columnName := range columnNames { +func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap) { + for _, columnName := range bs.bsw.so.resultColumnNames { switch columnName { case "_stream": if !br.addStreamColumn(bs) { @@ -387,7 +385,7 @@ func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap, } } } - br.columnNames = columnNames + br.columnNames = bs.bsw.so.resultColumnNames } func (br *blockResult) RowsCount() int { diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 0f2e6b35c..043e205bf 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -353,12 +353,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } if len(byFields) == 1 { // Special case for grouping by a single column. - idx := getBlockColumnIndex(columns, byFields[0]) - if idx < 0 { - logger.Panicf("BUG: columnIdx must be positive") - } - values := columns[idx].Values - + values := getValuesForBlockColumn(columns, byFields[0], len(timestamps)) if isConstValue(values) { // Fast path for column with constant value. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) @@ -385,7 +380,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields) + shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields, len(timestamps)) columnValues := shard.columnValues if areConstValues(columnValues) { @@ -407,9 +402,6 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col // verify whether the key for 'by (...)' fields equals the previous key sameValue := sfps != nil for _, values := range columnValues { - if values == nil { - logger.Panicf("BUG: values cannot be nil here!") - } if i <= 0 || values[i-1] != values[i] { sameValue = false break @@ -441,6 +433,7 @@ func areConstValues(valuess [][]string) bool { func isConstValue(values []string) bool { if len(values) == 0 { + // Return false, since it is impossible to get values[0] value from empty values. return false } vFirst := values[0] @@ -759,7 +752,7 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co // Slow path for multiple columns. // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - sfup.columnValues = appendBlockColumnValues(sfup.columnValues[:0], columns, fields) + sfup.columnValues = appendBlockColumnValues(sfup.columnValues[:0], columns, fields, len(timestamps)) columnValues := sfup.columnValues keyBuf := sfup.keyBuf @@ -767,10 +760,7 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co allEmptyValues := true keyBuf = keyBuf[:0] for _, values := range columnValues { - v := "" - if values != nil { - v = values[i] - } + v := values[i] if v != "" { allEmptyValues = false } @@ -1104,13 +1094,9 @@ func getFieldsIgnoreStar(fields []string) []string { return result } -func appendBlockColumnValues(dst [][]string, columns []BlockColumn, fields []string) [][]string { +func appendBlockColumnValues(dst [][]string, columns []BlockColumn, fields []string, rowsCount int) [][]string { for _, f := range fields { - idx := getBlockColumnIndex(columns, f) - var values []string - if idx >= 0 { - values = columns[idx].Values - } + values := getValuesForBlockColumn(columns, f, rowsCount) dst = append(dst, values) } return dst diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index f96bb64ec..804064ecc 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -3,6 +3,7 @@ package logstorage import ( "context" "math" + "slices" "sort" "sync" "sync/atomic" @@ -18,8 +19,11 @@ type genericSearchOptions struct { // filter is the filter to use for the search filter filter - // resultColumnNames is names of columns to return in the result. + // resultColumnNames is names of columns to return in the result resultColumnNames []string + + // needAllColumns is set to true when all the columns must be returned in the result + needAllColumns bool } type searchOptions struct { @@ -42,6 +46,9 @@ type searchOptions struct { // resultColumnNames is names of columns to return in the result resultColumnNames []string + + // needAllColumns is set to true when all the columns must be returned in the result + needAllColumns bool } // RunQuery runs the given q and calls writeBlock for results. @@ -51,6 +58,7 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, tenantIDs: tenantIDs, filter: q.f, resultColumnNames: resultColumnNames, + needAllColumns: slices.Contains(resultColumnNames, "*"), } workersCount := cgroup.AvailableCPUs() @@ -316,6 +324,7 @@ func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *gene maxTimestamp: tf.maxTimestamp, filter: f, resultColumnNames: so.resultColumnNames, + needAllColumns: so.needAllColumns, } return pt.ddb.search(soInternal, workCh, stopCh) }