diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 1c8d7f19f..885d5885e 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -20,11 +20,16 @@ type pipeStats struct { // byFields contains field names with optional buckets from 'by(...)' clause. byFields []*byStatsField - // resultNames contains names of output results generated by funcs. - resultNames []string - // funcs contains stats functions to execute. - funcs []statsFunc + funcs []pipeStatsFunc +} + +type pipeStatsFunc struct { + // f is stats function to execute + f statsFunc + + // resultName is the name of the output generated by f + resultName string } type statsFunc interface { @@ -77,7 +82,7 @@ func (ps *pipeStats) String() string { } a := make([]string, len(ps.funcs)) for i, f := range ps.funcs { - a[i] = f.String() + " as " + quoteTokenIfNeeded(ps.resultNames[i]) + a[i] = f.f.String() + " as " + quoteTokenIfNeeded(f.resultName) } s += strings.Join(a, ", ") return s @@ -92,9 +97,9 @@ func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) neededFields.add(bf.name) } - for i, resultName := range ps.resultNames { - if neededFieldsOrig.contains(resultName) && !unneededFields.contains(resultName) { - funcFields := ps.funcs[i].neededFields() + for i, f := range ps.funcs { + if neededFieldsOrig.contains(f.resultName) && !unneededFields.contains(f.resultName) { + funcFields := ps.funcs[i].f.neededFields() neededFields.addAll(funcFields) } } @@ -267,7 +272,7 @@ func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGr sfps := make([]statsProcessor, len(shard.ps.funcs)) for i, f := range shard.ps.funcs { - sfp, stateSize := f.newStatsProcessor() + sfp, stateSize := f.f.newStatsProcessor() sfps[i] = sfp shard.stateSizeBudget -= stateSize } @@ -361,15 +366,15 @@ func (psp *pipeStatsProcessor) flush() error { m = shards[0].m } - rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.resultNames)) + rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs)) for _, bf := range byFields { rcs = append(rcs, resultColumn{ name: bf.name, }) } - for _, resultName := range psp.ps.resultNames { + for _, f := range psp.ps.funcs { rcs = append(rcs, resultColumn{ - name: resultName, + name: f.resultName, }) } var br blockResult @@ -447,22 +452,31 @@ func parsePipeStats(lex *lexer) (*pipeStats, error) { ps.byFields = bfs } - var resultNames []string - var funcs []statsFunc + var funcs []pipeStatsFunc for { sf, err := parseStatsFunc(lex) if err != nil { return nil, err } + /* + if lex.isKeyword("if") { + ifQuery, err := parseIfQuery(lex) + if err != nil { + return fmt.Errorf("cannot parse 'if' query for %s: %w", sf, err) + } + } + */ resultName, err := parseResultName(lex) if err != nil { return nil, fmt.Errorf("cannot parse result name for %s: %w", sf, err) } - resultNames = append(resultNames, resultName) - funcs = append(funcs, sf) + funcs = append(funcs, pipeStatsFunc{ + f: sf, + resultName: resultName, + }) + if lex.isKeyword("|", ")", "") { - ps.resultNames = resultNames ps.funcs = funcs return &ps, nil }