This commit is contained in:
Aliaksandr Valialkin 2024-05-16 00:49:11 +02:00
parent e4d89f3584
commit 7e4769abad
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -20,11 +20,16 @@ type pipeStats struct {
// byFields contains field names with optional buckets from 'by(...)' clause. // byFields contains field names with optional buckets from 'by(...)' clause.
byFields []*byStatsField byFields []*byStatsField
// resultNames contains names of output results generated by funcs.
resultNames []string
// funcs contains stats functions to execute. // funcs contains stats functions to execute.
funcs []statsFunc funcs []pipeStatsFunc
}
type pipeStatsFunc struct {
// f is stats function to execute
f statsFunc
// resultName is the name of the output generated by f
resultName string
} }
type statsFunc interface { type statsFunc interface {
@ -77,7 +82,7 @@ func (ps *pipeStats) String() string {
} }
a := make([]string, len(ps.funcs)) a := make([]string, len(ps.funcs))
for i, f := range ps.funcs { for i, f := range ps.funcs {
a[i] = f.String() + " as " + quoteTokenIfNeeded(ps.resultNames[i]) a[i] = f.f.String() + " as " + quoteTokenIfNeeded(f.resultName)
} }
s += strings.Join(a, ", ") s += strings.Join(a, ", ")
return s return s
@ -92,9 +97,9 @@ func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet)
neededFields.add(bf.name) neededFields.add(bf.name)
} }
for i, resultName := range ps.resultNames { for i, f := range ps.funcs {
if neededFieldsOrig.contains(resultName) && !unneededFields.contains(resultName) { if neededFieldsOrig.contains(f.resultName) && !unneededFields.contains(f.resultName) {
funcFields := ps.funcs[i].neededFields() funcFields := ps.funcs[i].f.neededFields()
neededFields.addAll(funcFields) neededFields.addAll(funcFields)
} }
} }
@ -267,7 +272,7 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr
sfps := make([]statsProcessor, len(shard.ps.funcs)) sfps := make([]statsProcessor, len(shard.ps.funcs))
for i, f := range shard.ps.funcs { for i, f := range shard.ps.funcs {
sfp, stateSize := f.newStatsProcessor() sfp, stateSize := f.f.newStatsProcessor()
sfps[i] = sfp sfps[i] = sfp
shard.stateSizeBudget -= stateSize shard.stateSizeBudget -= stateSize
} }
@ -361,15 +366,15 @@ func (psp *pipeStatsProcessor) flush() error {
m = shards[0].m m = shards[0].m
} }
rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.resultNames)) rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs))
for _, bf := range byFields { for _, bf := range byFields {
rcs = append(rcs, resultColumn{ rcs = append(rcs, resultColumn{
name: bf.name, name: bf.name,
}) })
} }
for _, resultName := range psp.ps.resultNames { for _, f := range psp.ps.funcs {
rcs = append(rcs, resultColumn{ rcs = append(rcs, resultColumn{
name: resultName, name: f.resultName,
}) })
} }
var br blockResult var br blockResult
@ -447,22 +452,31 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) {
ps.byFields = bfs ps.byFields = bfs
} }
var resultNames []string var funcs []pipeStatsFunc
var funcs []statsFunc
for { for {
sf, err := parseStatsFunc(lex) sf, err := parseStatsFunc(lex)
if err != nil { if err != nil {
return nil, err return nil, err
} }
/*
if lex.isKeyword("if") {
ifQuery, err := parseIfQuery(lex)
if err != nil {
return fmt.Errorf("cannot parse 'if' query for %s: %w", sf, err)
}
}
*/
resultName, err := parseResultName(lex) resultName, err := parseResultName(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse result name for %s: %w", sf, err) return nil, fmt.Errorf("cannot parse result name for %s: %w", sf, err)
} }
resultNames = append(resultNames, resultName) funcs = append(funcs, pipeStatsFunc{
funcs = append(funcs, sf) f: sf,
resultName: resultName,
})
if lex.isKeyword("|", ")", "") { if lex.isKeyword("|", ")", "") {
ps.resultNames = resultNames
ps.funcs = funcs ps.funcs = funcs
return &ps, nil return &ps, nil
} }