diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 06d289151..72cdaf667 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -834,47 +834,63 @@ func TestParseQuerySuccess(t *testing.T) { f(`foo | skip 10 | skip 100`, `foo | skip 10 | skip 100`) // stats pipe count - f(`* | Stats count() AS foo`, `* | stats count() as foo`) f(`* | STATS bY (foo, b.a/r, "b az") count(*) XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`) f(`* | stats by() COUNT(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`) + f(`* | stats count() x`, `* | stats count(*) as x`) + f(`* | stats count(*) x`, `* | stats count(*) as x`) + f(`* | stats count(foo,*,bar) x`, `* | stats count(*) as x`) // stats pipe sum f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`) f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`) + f(`* | stats sum() x`, `* | stats sum(*) as x`) + f(`* | stats sum(*) x`, `* | stats sum(*) as x`) + f(`* | stats sum(foo,*,bar) x`, `* | stats sum(*) as x`) // stats pipe max f(`* | stats Max(foo) bar`, `* | stats max(foo) as bar`) f(`* | stats BY(x, y, ) MAX(foo,bar,) bar`, `* | stats by (x, y) max(foo, bar) as bar`) + f(`* | stats max() x`, `* | stats max(*) as x`) + f(`* | stats max(*) x`, `* | stats max(*) as x`) + f(`* | stats max(foo,*,bar) x`, `* | stats max(*) as x`) // stats pipe min f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`) f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`) + f(`* | stats min() x`, `* | stats min(*) as x`) + f(`* | stats min(*) x`, `* | stats min(*) as x`) + f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`) // stats pipe avg f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`) f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`) + f(`* | stats avg() x`, `* | stats avg(*) as x`) + f(`* | stats avg(*) x`, `* | stats avg(*) as x`) + f(`* | stats avg(foo,*,bar) x`, `* | stats avg(*) as x`) // stats pipe uniq f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`) f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`) f(`* | stats by(x) uniq(*) z`, `* | stats by (x) uniq(*) as z`) - f(`* | stats by(x) uniq() z`, `* | stats by (x) uniq() as z`) + f(`* | stats by(x) uniq() z`, `* | stats by (x) uniq(*) as z`) + f(`* | stats by(x) uniq(a,*,b) z`, `* | stats by (x) uniq(*) as z`) // stats pipe uniq_array f(`* | stats uniq_array(foo) bar`, `* | stats uniq_array(foo) as bar`) f(`* | stats by(x, y) uniq_array(foo, bar) as baz`, `* | stats by (x, y) uniq_array(foo, bar) as baz`) f(`* | stats by(x) uniq_array(*) y`, `* | stats by (x) uniq_array(*) as y`) - f(`* | stats by(x) uniq_array() y`, `* | stats by (x) uniq_array() as y`) + f(`* | stats by(x) uniq_array() y`, `* | stats by (x) uniq_array(*) as y`) + f(`* | stats by(x) uniq_array(a,*,b) y`, `* | stats by (x) uniq_array(*) as y`) // stats pipe multiple funcs - f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`) + f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", uniq(a) as bar`) f(`* | stats by (x, y) count(*) foo, uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, uniq(a, b) as bar`) // stats pipe with grouping buckets - f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`) - f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count() as foo`) - f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count() as foo`) - f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count() as foo`) + f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count(*) as foo`) + f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count(*) as foo`) + f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`) + f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`) // multiple different pipes f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`) @@ -1123,22 +1139,18 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats sum f(`foo | stats sum`) f(`foo | stats sum()`) - f(`foo | stats sum() as abc`) // invalid stats max f(`foo | stats max`) f(`foo | stats max()`) - f(`foo | stats max() as abc`) // invalid stats min f(`foo | stats min`) f(`foo | stats min()`) - f(`foo | stats min() as abc`) // invalid stats avg f(`foo | stats avg`) f(`foo | stats avg()`) - f(`foo | stats avg() as abc`) // invalid stats uniq f(`foo | stats uniq`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 86a6ee5d0..56d8e418c 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -2,6 +2,7 @@ package logstorage import ( "fmt" + "slices" "strings" "sync/atomic" "unsafe" @@ -659,7 +660,7 @@ func tryParseBucketSize(s string) (float64, bool) { return 0, false } -func parseFieldNamesForFunc(lex *lexer, funcName string) ([]string, error) { +func parseFieldNamesForStatsFunc(lex *lexer, funcName string) ([]string, error) { if !lex.isKeyword(funcName) { return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName) } @@ -668,6 +669,9 @@ func parseFieldNamesForFunc(lex *lexer, funcName string) ([]string, error) { if err != nil { return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err) } + if len(fields) == 0 || slices.Contains(fields, "*") { + fields = []string{"*"} + } return fields, nil } diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go index edeb2a2a5..517fb6837 100644 --- a/lib/logstorage/stats_avg.go +++ b/lib/logstorage/stats_avg.go @@ -1,7 +1,6 @@ package logstorage import ( - "fmt" "math" "slices" "strconv" @@ -43,15 +42,14 @@ func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int { sap.sum += f sap.count += uint64(count) } - return 0 - } - - // Scan the requested columns - for _, field := range sap.sa.fields { - c := br.getColumnByName(field) - f, count := c.sumValues(br) - sap.sum += f - sap.count += uint64(count) + } else { + // Scan the requested columns + for _, field := range sap.sa.fields { + c := br.getColumnByName(field) + f, count := c.sumValues(br) + sap.sum += f + sap.count += uint64(count) + } } return 0 } @@ -66,16 +64,15 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int sap.count++ } } - return 0 - } - - // Scan only the given fields for the given row - for _, field := range sap.sa.fields { - c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { - sap.sum += f - sap.count++ + } else { + // Scan only the given fields for the given row + for _, field := range sap.sa.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + sap.sum += f + sap.count++ + } } } return 0 @@ -93,13 +90,10 @@ func (sap *statsAvgProcessor) finalizeStats() string { } func parseStatsAvg(lex *lexer) (*statsAvg, error) { - fields, err := parseFieldNamesForFunc(lex, "avg") + fields, err := parseFieldNamesForStatsFunc(lex, "avg") if err != nil { return nil, err } - if len(fields) == 0 { - return nil, fmt.Errorf("'avg' must contain at least one arg") - } sa := &statsAvg{ fields: fields, containsStar: slices.Contains(fields, "*"), diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 81d9ebf0c..c87587a03 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -36,7 +36,7 @@ type statsCountProcessor struct { func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields - if len(fields) == 0 || scp.sc.containsStar { + if scp.sc.containsStar { // Fast path - unconditionally count all the columns. scp.rowsCount += uint64(len(br.timestamps)) return 0 @@ -134,7 +134,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { fields := scp.sc.fields - if len(fields) == 0 || scp.sc.containsStar { + if scp.sc.containsStar { // Fast path - unconditionally count the given column scp.rowsCount++ return 0 @@ -194,7 +194,7 @@ func (scp *statsCountProcessor) finalizeStats() string { } func parseStatsCount(lex *lexer) (*statsCount, error) { - fields, err := parseFieldNamesForFunc(lex, "count") + fields, err := parseFieldNamesForStatsFunc(lex, "count") if err != nil { return nil, err } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index b842810de..bfbd959c2 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -1,7 +1,6 @@ package logstorage import ( - "fmt" "math" "slices" "strconv" @@ -44,15 +43,14 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { smp.max = f } } - return 0 - } - - // Find the maximum value across the requested columns - for _, field := range smp.sm.fields { - c := br.getColumnByName(field) - f := c.getMaxValue(br) - if f > smp.max || math.IsNaN(smp.max) { - smp.max = f + } else { + // Find the maximum value across the requested columns + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getMaxValue(br) + if f > smp.max || math.IsNaN(smp.max) { + smp.max = f + } } } return 0 @@ -67,15 +65,14 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int smp.max = f } } - return 0 - } - - // Find the maximum value across the requested fields for the given row - for _, field := range smp.sm.fields { - c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if f > smp.max || math.IsNaN(smp.max) { - smp.max = f + } else { + // Find the maximum value across the requested fields for the given row + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if f > smp.max || math.IsNaN(smp.max) { + smp.max = f + } } } return 0 @@ -93,13 +90,10 @@ func (smp *statsMaxProcessor) finalizeStats() string { } func parseStatsMax(lex *lexer) (*statsMax, error) { - fields, err := parseFieldNamesForFunc(lex, "max") + fields, err := parseFieldNamesForStatsFunc(lex, "max") if err != nil { return nil, err } - if len(fields) == 0 { - return nil, fmt.Errorf("'max' must contain at least one arg") - } sm := &statsMax{ fields: fields, containsStar: slices.Contains(fields, "*"), diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index e15c47794..5aa20a6d0 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -1,7 +1,6 @@ package logstorage import ( - "fmt" "math" "slices" "strconv" @@ -44,15 +43,14 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { smp.min = f } } - return 0 - } - - // Find the minimum value across the requested columns - for _, field := range smp.sm.fields { - c := br.getColumnByName(field) - f := c.getMinValue(br) - if f < smp.min || math.IsNaN(smp.min) { - smp.min = f + } else { + // Find the minimum value across the requested columns + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getMinValue(br) + if f < smp.min || math.IsNaN(smp.min) { + smp.min = f + } } } return 0 @@ -67,15 +65,14 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int smp.min = f } } - return 0 - } - - // Find the minimum value across the requested fields for the given row - for _, field := range smp.sm.fields { - c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if f < smp.min || math.IsNaN(smp.min) { - smp.min = f + } else { + // Find the minimum value across the requested fields for the given row + for _, field := range smp.sm.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if f < smp.min || math.IsNaN(smp.min) { + smp.min = f + } } } return 0 @@ -93,13 +90,10 @@ func (smp *statsMinProcessor) finalizeStats() string { } func parseStatsMin(lex *lexer) (*statsMin, error) { - fields, err := parseFieldNamesForFunc(lex, "min") + fields, err := parseFieldNamesForStatsFunc(lex, "min") if err != nil { return nil, err } - if len(fields) == 0 { - return nil, fmt.Errorf("'min' must contain at least one arg") - } sm := &statsMin{ fields: fields, containsStar: slices.Contains(fields, "*"), diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index 5ed33418a..bdb97cbd5 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -1,7 +1,6 @@ package logstorage import ( - "fmt" "math" "slices" "strconv" @@ -48,18 +47,17 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { } } } - return 0 - } - - // Sum the requested columns - for _, field := range ssp.ss.fields { - c := br.getColumnByName(field) - f, count := c.sumValues(br) - if count > 0 { - if math.IsNaN(ssp.sum) { - ssp.sum = f - } else { - ssp.sum += f + } else { + // Sum the requested columns + for _, field := range ssp.ss.fields { + c := br.getColumnByName(field) + f, count := c.sumValues(br) + if count > 0 { + if math.IsNaN(ssp.sum) { + ssp.sum = f + } else { + ssp.sum += f + } } } } @@ -79,18 +77,17 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int } } } - return 0 - } - - // Sum only the given fields for the given row - for _, field := range ssp.ss.fields { - c := br.getColumnByName(field) - f := c.getFloatValueAtRow(rowIdx) - if !math.IsNaN(f) { - if math.IsNaN(ssp.sum) { - ssp.sum = f - } else { - ssp.sum += f + } else { + // Sum only the given fields for the given row + for _, field := range ssp.ss.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + if math.IsNaN(ssp.sum) { + ssp.sum = f + } else { + ssp.sum += f + } } } } @@ -107,13 +104,10 @@ func (ssp *statsSumProcessor) finalizeStats() string { } func parseStatsSum(lex *lexer) (*statsSum, error) { - fields, err := parseFieldNamesForFunc(lex, "sum") + fields, err := parseFieldNamesForStatsFunc(lex, "sum") if err != nil { return nil, err } - if len(fields) == 0 { - return nil, fmt.Errorf("'sum' must contain at least one arg") - } ss := &statsSum{ fields: fields, containsStar: slices.Contains(fields, "*"), diff --git a/lib/logstorage/stats_uniq.go b/lib/logstorage/stats_uniq.go index 36700b8c5..448696c8c 100644 --- a/lib/logstorage/stats_uniq.go +++ b/lib/logstorage/stats_uniq.go @@ -45,7 +45,7 @@ func (sup *statsUniqProcessor) updateStatsForAllRows(br *blockResult) int { m := sup.m stateSizeIncrease := 0 - if len(fields) == 0 || sup.su.containsStar { + if sup.su.containsStar { // Count unique rows columns := br.getColumns() keyBuf := sup.keyBuf[:0] @@ -222,7 +222,7 @@ func (sup *statsUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) in m := sup.m stateSizeIncrease := 0 - if len(fields) == 0 || sup.su.containsStar { + if sup.su.containsStar { // Count unique rows allEmptyValues := true keyBuf := sup.keyBuf[:0] @@ -358,7 +358,7 @@ func (sup *statsUniqProcessor) finalizeStats() string { } func parseStatsUniq(lex *lexer) (*statsUniq, error) { - fields, err := parseFieldNamesForFunc(lex, "uniq") + fields, err := parseFieldNamesForStatsFunc(lex, "uniq") if err != nil { return nil, err } diff --git a/lib/logstorage/stats_uniq_array.go b/lib/logstorage/stats_uniq_array.go index 5b07c5706..7b4423572 100644 --- a/lib/logstorage/stats_uniq_array.go +++ b/lib/logstorage/stats_uniq_array.go @@ -39,16 +39,14 @@ type statsUniqArrayProcessor struct { } func (sup *statsUniqArrayProcessor) updateStatsForAllRows(br *blockResult) int { - fields := sup.su.fields - stateSizeIncrease := 0 - if len(fields) == 0 || sup.su.containsStar { + if sup.su.containsStar { columns := br.getColumns() for i := range columns { stateSizeIncrease += sup.updateStatsForAllRowsColumn(&columns[i], br) } } else { - for _, field := range fields { + for _, field := range sup.su.fields { c := br.getColumnByName(field) stateSizeIncrease += sup.updateStatsForAllRowsColumn(&c, br) } @@ -110,16 +108,14 @@ func (sup *statsUniqArrayProcessor) updateStatsForAllRowsColumn(c *blockResultCo } func (sup *statsUniqArrayProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { - fields := sup.su.fields - stateSizeIncrease := 0 - if len(fields) == 0 || sup.su.containsStar { + if sup.su.containsStar { columns := br.getColumns() for i := range columns { stateSizeIncrease += sup.updateStatsForRowColumn(&columns[i], br, rowIdx) } } else { - for _, field := range fields { + for _, field := range sup.su.fields { c := br.getColumnByName(field) stateSizeIncrease += sup.updateStatsForRowColumn(&c, br, rowIdx) } @@ -219,7 +215,7 @@ func (sup *statsUniqArrayProcessor) finalizeStats() string { } func parseStatsUniqArray(lex *lexer) (*statsUniqArray, error) { - fields, err := parseFieldNamesForFunc(lex, "uniq_array") + fields, err := parseFieldNamesForStatsFunc(lex, "uniq_array") if err != nil { return nil, err }