diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 87e5f11cb..be23a41dd 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -850,6 +850,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`) f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`) + // stats pipe avg + f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`) + f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`) + // stats pipe uniq f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`) f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`) @@ -1117,6 +1121,11 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats min()`) f(`foo | stats min() as abc`) + // invalid stats avg + f(`foo | stats avg`) + f(`foo | stats avg()`) + f(`foo | stats avg() as abc`) + // invalid stats uniq f(`foo | stats uniq`) f(`foo | stats uniq()`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index e22fcd60a..60ebc9281 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -446,6 +446,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'min' func: %w", err) } sf = sms + case lex.isKeyword("avg"): + sas, err := parseStatsAvg(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'avg' func: %w", err) + } + sf = sas default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go new file mode 100644 index 000000000..1e05ab830 --- /dev/null +++ b/lib/logstorage/stats_avg.go @@ -0,0 +1,109 @@ +package logstorage + +import ( + "fmt" + "math" + "slices" + "strconv" + "unsafe" +) + +type statsAvg struct { + fields []string + containsStar bool +} + +func (sa *statsAvg) String() string { + return "avg(" + fieldNamesString(sa.fields) + ")" +} + +func (sa *statsAvg) neededFields() []string { + return sa.fields +} + +func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) { + sap := &statsAvgProcessor{ + sa: sa, + } + return sap, int(unsafe.Sizeof(*sap)) +} + +type statsAvgProcessor struct { + sa *statsAvg + + sum float64 + count uint64 +} + +func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int { + if sap.sa.containsStar { + // Scan all the columns + for _, c := range br.getColumns() { + f, count := c.sumValues(br) + sap.sum += f + sap.count += uint64(count) + } + return 0 + } + + // Scan the requested columns + for _, field := range sap.sa.fields { + c := br.getColumnByName(field) + f, count := c.sumValues(br) + sap.sum += f + sap.count += uint64(count) + } + return 0 +} + +func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if sap.sa.containsStar { + // Scan all the fields for the given row + for _, c := range br.getColumns() { + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + sap.sum += f + sap.count++ + } + } + return 0 + } + + // Scan only the given fields for the given row + for _, field := range sap.sa.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + sap.sum += f + sap.count++ + } + } + return 0 +} + +func (sap *statsAvgProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsAvgProcessor) + sap.sum += src.sum + sap.count += src.count +} + +func (sap *statsAvgProcessor) finalizeStats() string { + avg := sap.sum / float64(sap.count) + return strconv.FormatFloat(avg, 'f', -1, 64) +} + +func parseStatsAvg(lex *lexer) (*statsAvg, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'avg' args: %w", err) + } + if len(fields) == 0 { + return nil, fmt.Errorf("'avg' must contain at least one arg") + } + sa := &statsAvg{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + return sa, nil +}