This commit is contained in:
Aliaksandr Valialkin 2024-04-29 01:32:11 +02:00
parent 53c3384bf7
commit fcbad5ac1b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 22 additions and 29 deletions

View file

@ -1,7 +1,6 @@
package logstorage
import (
"slices"
"strconv"
"sync"
"time"
@ -118,11 +117,10 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
}
// fetch the requested columns to bs.br.
columnNames := bs.bsw.so.resultColumnNames
if slices.Contains(columnNames, "*") {
if bs.bsw.so.needAllColumns {
bs.br.fetchAllColumns(bs, bm)
} else {
bs.br.fetchRequestedColumns(bs, bm, columnNames)
bs.br.fetchRequestedColumns(bs, bm)
}
}
@ -362,8 +360,8 @@ func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *filterBitmap) {
br.columnNames = br.columnNamesBuf
}
func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap, columnNames []string) {
for _, columnName := range columnNames {
func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap) {
for _, columnName := range bs.bsw.so.resultColumnNames {
switch columnName {
case "_stream":
if !br.addStreamColumn(bs) {
@ -387,7 +385,7 @@ func (br *blockResult) fetchRequestedColumns(bs *blockSearch, bm *filterBitmap,
}
}
}
br.columnNames = columnNames
br.columnNames = bs.bsw.so.resultColumnNames
}
func (br *blockResult) RowsCount() int {

View file

@ -353,12 +353,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
}
if len(byFields) == 1 {
// Special case for grouping by a single column.
idx := getBlockColumnIndex(columns, byFields[0])
if idx < 0 {
logger.Panicf("BUG: columnIdx must be positive")
}
values := columns[idx].Values
values := getValuesForBlockColumn(columns, byFields[0], len(timestamps))
if isConstValue(values) {
// Fast path for column with constant value.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
@ -385,7 +380,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
}
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields)
shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields, len(timestamps))
columnValues := shard.columnValues
if areConstValues(columnValues) {
@ -407,9 +402,6 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
// verify whether the key for 'by (...)' fields equals the previous key
sameValue := sfps != nil
for _, values := range columnValues {
if values == nil {
logger.Panicf("BUG: values cannot be nil here!")
}
if i <= 0 || values[i-1] != values[i] {
sameValue = false
break
@ -441,6 +433,7 @@ func areConstValues(valuess [][]string) bool {
func isConstValue(values []string) bool {
if len(values) == 0 {
// Return false, since it is impossible to get values[0] value from empty values.
return false
}
vFirst := values[0]
@ -759,7 +752,7 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
// Slow path for multiple columns.
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
sfup.columnValues = appendBlockColumnValues(sfup.columnValues[:0], columns, fields)
sfup.columnValues = appendBlockColumnValues(sfup.columnValues[:0], columns, fields, len(timestamps))
columnValues := sfup.columnValues
keyBuf := sfup.keyBuf
@ -767,10 +760,7 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, values := range columnValues {
v := ""
if values != nil {
v = values[i]
}
v := values[i]
if v != "" {
allEmptyValues = false
}
@ -1104,13 +1094,9 @@ func getFieldsIgnoreStar(fields []string) []string {
return result
}
func appendBlockColumnValues(dst [][]string, columns []BlockColumn, fields []string) [][]string {
func appendBlockColumnValues(dst [][]string, columns []BlockColumn, fields []string, rowsCount int) [][]string {
for _, f := range fields {
idx := getBlockColumnIndex(columns, f)
var values []string
if idx >= 0 {
values = columns[idx].Values
}
values := getValuesForBlockColumn(columns, f, rowsCount)
dst = append(dst, values)
}
return dst

View file

@ -3,6 +3,7 @@ package logstorage
import (
"context"
"math"
"slices"
"sort"
"sync"
"sync/atomic"
@ -18,8 +19,11 @@ type genericSearchOptions struct {
// filter is the filter to use for the search
filter filter
// resultColumnNames is names of columns to return in the result.
// resultColumnNames is names of columns to return in the result
resultColumnNames []string
// needAllColumns is set to true when all the columns must be returned in the result
needAllColumns bool
}
type searchOptions struct {
@ -42,6 +46,9 @@ type searchOptions struct {
// resultColumnNames is names of columns to return in the result
resultColumnNames []string
// needAllColumns is set to true when all the columns must be returned in the result
needAllColumns bool
}
// RunQuery runs the given q and calls writeBlock for results.
@ -51,6 +58,7 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
tenantIDs: tenantIDs,
filter: q.f,
resultColumnNames: resultColumnNames,
needAllColumns: slices.Contains(resultColumnNames, "*"),
}
workersCount := cgroup.AvailableCPUs()
@ -316,6 +324,7 @@ func (pt *partition) search(tf *timeFilter, sf *StreamFilter, f filter, so *gene
maxTimestamp: tf.maxTimestamp,
filter: f,
resultColumnNames: so.resultColumnNames,
needAllColumns: so.needAllColumns,
}
return pt.ddb.search(soInternal, workCh, stopCh)
}