This commit is contained in:
Aliaksandr Valialkin 2024-04-28 22:52:15 +02:00
parent a941bdcdbd
commit 0850e13eb3
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -111,6 +111,9 @@ func parsePipes(lex *lexer) ([]pipe, error) {
type fieldsPipe struct { type fieldsPipe struct {
// fields contains list of fields to fetch // fields contains list of fields to fetch
fields []string fields []string
// whether fields contains star
containsStar bool
} }
func (fp *fieldsPipe) String() string { func (fp *fieldsPipe) String() string {
@ -133,7 +136,7 @@ type fieldsPipeProcessor struct {
} }
func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
if slices.Contains(fpp.fp.fields, "*") || areSameBlockColumns(columns, fpp.fp.fields) { if fpp.fp.containsStar || areSameBlockColumns(columns, fpp.fp.fields) {
// Fast path - there is no need in additional transformations before writing the block to ppBase. // Fast path - there is no need in additional transformations before writing the block to ppBase.
fpp.ppBase.writeBlock(workerID, timestamps, columns) fpp.ppBase.writeBlock(workerID, timestamps, columns)
return return
@ -176,6 +179,7 @@ func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) {
case lex.isKeyword("|", ")", ""): case lex.isKeyword("|", ")", ""):
fp := &fieldsPipe{ fp := &fieldsPipe{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return fp, nil return fp, nil
case lex.isKeyword(","): case lex.isKeyword(","):
@ -552,6 +556,8 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
type statsFuncCount struct { type statsFuncCount struct {
fields []string fields []string
containsStar bool
resultName string resultName string
} }
@ -578,7 +584,7 @@ type statsFuncCountProcessor struct {
func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
fields := sfcp.sfc.fields fields := sfcp.sfc.fields
if len(fields) == 0 || slices.Contains(fields, "*") { if len(fields) == 0 || sfcp.sfc.containsStar {
// Fast path - count all the columns. // Fast path - count all the columns.
sfcp.rowsCount += uint64(len(timestamps)) sfcp.rowsCount += uint64(len(timestamps))
return 0 return 0
@ -608,7 +614,7 @@ func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, c
func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int {
fields := sfcp.sfc.fields fields := sfcp.sfc.fields
if len(fields) == 0 || slices.Contains(fields, "*") { if len(fields) == 0 || sfcp.sfc.containsStar {
// Fast path - count the given column // Fast path - count the given column
sfcp.rowsCount++ sfcp.rowsCount++
return 0 return 0
@ -815,6 +821,7 @@ func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
} }
sfc := &statsFuncCount{ sfc := &statsFuncCount{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName, resultName: resultName,
} }
return sfc, nil return sfc, nil