diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 1eb73ab75..55874b3d3 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -927,8 +927,8 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`) f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`) f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`) - f(`* | stats quantile(0.99, *) bar`, `* | stats quantile(0.99, *) as bar`) - f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99, *) as bar`) + f(`* | stats quantile(0.99) bar`, `* | stats quantile(0.99) as bar`) + f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99) as bar`) // stats pipe median f(`* | stats Median(foo) bar`, `* | stats median(foo) as bar`) @@ -1359,7 +1359,6 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats quantile`) f(`foo | stats quantile() foo`) f(`foo | stats quantile(bar, baz) foo`) - f(`foo | stats quantile(0.5) foo`) f(`foo | stats quantile(-1, x) foo`) f(`foo | stats quantile(10, x) foo`) @@ -1549,6 +1548,49 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f2) r1, count(r1) r2 | fields r2`, `f1,f2,f3,f4`, ``) f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f3) r1, count(r1) r2 | fields r1`, `f3,f4`, ``) + f(`* | stats avg() q`, `*`, ``) + f(`* | stats avg(*) q`, `*`, ``) + f(`* | stats avg(x) q`, `x`, ``) + f(`* | stats count_empty() q`, `*`, ``) + f(`* | stats count_empty(*) q`, `*`, ``) + f(`* | stats count_empty(x) q`, `x`, ``) + f(`* | stats count() q`, ``, ``) + f(`* | stats count(*) q`, ``, ``) + f(`* | stats count(x) q`, `x`, ``) + f(`* | stats count_uniq() q`, `*`, ``) + f(`* | stats count_uniq(*) q`, `*`, ``) + f(`* | stats count_uniq(x) q`, `x`, ``) + f(`* | stats fields_max(a) q`, `*`, ``) + f(`* | stats fields_max(a, *) q`, `*`, ``) + f(`* | stats fields_max(a, x) q`, `a,x`, ``) + f(`* | stats fields_min(a) q`, `*`, ``) + f(`* | stats fields_min(a, *) q`, `*`, ``) + f(`* | stats fields_min(a, x) q`, `a,x`, ``) + f(`* | stats min() q`, `*`, ``) + f(`* | stats min(*) q`, `*`, ``) + f(`* | stats min(x) q`, `x`, ``) + f(`* | stats median() q`, `*`, ``) + f(`* | stats median(*) q`, `*`, ``) + f(`* | stats median(x) q`, `x`, ``) + f(`* | stats max() q`, `*`, ``) + f(`* | stats max(*) q`, `*`, ``) + f(`* | stats max(x) q`, `x`, ``) + f(`* | stats quantile(0.5) q`, `*`, ``) + f(`* | stats quantile(0.5, *) q`, `*`, ``) + f(`* | stats quantile(0.5, x) q`, `x`, ``) + f(`* | stats sum() q`, `*`, ``) + f(`* | stats sum(*) q`, `*`, ``) + f(`* | stats sum(x) q`, `x`, ``) + f(`* | stats sum_len() q`, `*`, ``) + f(`* | stats sum_len(*) q`, `*`, ``) + f(`* | stats sum_len(x) q`, `x`, ``) + f(`* | stats uniq_values() q`, `*`, ``) + f(`* | stats uniq_values(*) q`, `*`, ``) + f(`* | stats uniq_values(x) q`, `x`, ``) + f(`* | stats values() q`, `*`, ``) + f(`* | stats values(*) q`, `*`, ``) + f(`* | stats values(x) q`, `x`, ``) + f(`_time:5m | stats by(_time:day) count() r1 | stats values(_time) r2`, `_time`, ``) f(`_time:1y | stats (_time:1w) count() r1 | stats count() r2`, `_time`, ``) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 0ae44335f..4643017ae 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -2,7 +2,6 @@ package logstorage import ( "fmt" - "slices" "strings" "sync/atomic" "unsafe" @@ -831,24 +830,6 @@ func tryParseBucketSize(s string) (float64, bool) { return 0, false } -// parseFieldNamesForStatsFunc parses field names for statsFunc. -// -// It returns ["*"] if the fields names list is empty or if it contains "*" field. -func parseFieldNamesForStatsFunc(lex *lexer, funcName string) ([]string, error) { - if !lex.isKeyword(funcName) { - return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName) - } - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err) - } - if len(fields) == 0 || slices.Contains(fields, "*") { - fields = []string{"*"} - } - return fields, nil -} - func parseFieldNamesInParens(lex *lexer) ([]string, error) { if !lex.isKeyword("(") { return nil, fmt.Errorf("missing `(`") diff --git a/lib/logstorage/stats_avg.go b/lib/logstorage/stats_avg.go index cdf7d6477..b300ee7fb 100644 --- a/lib/logstorage/stats_avg.go +++ b/lib/logstorage/stats_avg.go @@ -1,22 +1,23 @@ package logstorage import ( + "fmt" "slices" "strconv" + "strings" "unsafe" ) type statsAvg struct { - fields []string - containsStar bool + fields []string } func (sa *statsAvg) String() string { - return "avg(" + fieldNamesString(sa.fields) + ")" + return "avg(" + statsFuncFieldsToString(sa.fields) + ")" } func (sa *statsAvg) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(sa.fields) + updateNeededFieldsForStatsFunc(neededFields, sa.fields) } func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) { @@ -34,7 +35,8 @@ type statsAvgProcessor struct { } func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int { - if sap.sa.containsStar { + fields := sap.sa.fields + if len(fields) == 0 { // Scan all the columns for _, c := range br.getColumns() { f, count := c.sumValues(br) @@ -43,7 +45,7 @@ func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int { } } else { // Scan the requested columns - for _, field := range sap.sa.fields { + for _, field := range fields { c := br.getColumnByName(field) f, count := c.sumValues(br) sap.sum += f @@ -54,7 +56,8 @@ func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int { } func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { - if sap.sa.containsStar { + fields := sap.sa.fields + if len(fields) == 0 { // Scan all the fields for the given row for _, c := range br.getColumns() { f, ok := c.getFloatValueAtRow(br, rowIdx) @@ -65,7 +68,7 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int } } else { // Scan only the given fields for the given row - for _, field := range sap.sa.fields { + for _, field := range fields { c := br.getColumnByName(field) f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { @@ -89,13 +92,46 @@ func (sap *statsAvgProcessor) finalizeStats() string { } func parseStatsAvg(lex *lexer) (*statsAvg, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "avg") + fields, err := parseStatsFuncFields(lex, "avg") if err != nil { return nil, err } sa := &statsAvg{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } return sa, nil } + +func parseStatsFuncFields(lex *lexer, funcName string) ([]string, error) { + if !lex.isKeyword(funcName) { + return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName) + } + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err) + } + if len(fields) == 0 || slices.Contains(fields, "*") { + fields = nil + } + return fields, nil +} + +func statsFuncFieldsToString(fields []string) string { + if len(fields) == 0 { + return "*" + } + + a := make([]string, len(fields)) + for i, f := range fields { + a[i] = quoteTokenIfNeeded(f) + } + return strings.Join(a, ", ") +} + +func updateNeededFieldsForStatsFunc(neededFields fieldsSet, fields []string) { + if len(fields) == 0 { + neededFields.add("*") + } + neededFields.addFields(fields) +} diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 1de39031b..8b852ef0e 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -9,16 +9,15 @@ import ( ) type statsCount struct { - fields []string - containsStar bool + fields []string } func (sc *statsCount) String() string { - return "count(" + fieldNamesString(sc.fields) + ")" + return "count(" + statsFuncFieldsToString(sc.fields) + ")" } func (sc *statsCount) updateNeededFields(neededFields fieldsSet) { - if sc.containsStar { + if len(sc.fields) == 0 { // There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps) return } @@ -40,7 +39,7 @@ type statsCountProcessor struct { func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields - if scp.sc.containsStar { + if len(fields) == 0 { // Fast path - unconditionally count all the columns. scp.rowsCount += uint64(len(br.timestamps)) return 0 @@ -138,7 +137,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { fields := scp.sc.fields - if scp.sc.containsStar { + if len(fields) == 0 { // Fast path - unconditionally count the given column scp.rowsCount++ return 0 @@ -200,13 +199,12 @@ func (scp *statsCountProcessor) finalizeStats() string { } func parseStatsCount(lex *lexer) (*statsCount, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "count") + fields, err := parseStatsFuncFields(lex, "count") if err != nil { return nil, err } sc := &statsCount{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } return sc, nil } diff --git a/lib/logstorage/stats_count_empty.go b/lib/logstorage/stats_count_empty.go index 7065e57a4..69c180911 100644 --- a/lib/logstorage/stats_count_empty.go +++ b/lib/logstorage/stats_count_empty.go @@ -9,16 +9,15 @@ import ( ) type statsCountEmpty struct { - fields []string - containsStar bool + fields []string } func (sc *statsCountEmpty) String() string { - return "count_empty(" + fieldNamesString(sc.fields) + ")" + return "count_empty(" + statsFuncFieldsToString(sc.fields) + ")" } func (sc *statsCountEmpty) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(sc.fields) + updateNeededFieldsForStatsFunc(neededFields, sc.fields) } func (sc *statsCountEmpty) newStatsProcessor() (statsProcessor, int) { @@ -36,7 +35,7 @@ type statsCountEmptyProcessor struct { func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int { fields := scp.sc.fields - if scp.sc.containsStar { + if len(fields) == 0 { bm := getBitmap(len(br.timestamps)) bm.setBits() for _, c := range br.getColumns() { @@ -133,7 +132,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { fields := scp.sc.fields - if scp.sc.containsStar { + if len(fields) == 0 { for _, c := range br.getColumns() { if v := c.getValueAtRow(br, rowIdx); v != "" { return 0 @@ -197,13 +196,12 @@ func (scp *statsCountEmptyProcessor) finalizeStats() string { } func parseStatsCountEmpty(lex *lexer) (*statsCountEmpty, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "count_empty") + fields, err := parseStatsFuncFields(lex, "count_empty") if err != nil { return nil, err } sc := &statsCountEmpty{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } return sc, nil } diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 7ad3af47f..f61d461db 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -2,7 +2,6 @@ package logstorage import ( "fmt" - "slices" "strconv" "unsafe" @@ -11,13 +10,12 @@ import ( ) type statsCountUniq struct { - fields []string - containsStar bool - limit uint64 + fields []string + limit uint64 } func (su *statsCountUniq) String() string { - s := "count_uniq(" + fieldNamesString(su.fields) + ")" + s := "count_uniq(" + statsFuncFieldsToString(su.fields) + ")" if su.limit > 0 { s += fmt.Sprintf(" limit %d", su.limit) } @@ -25,7 +23,7 @@ func (su *statsCountUniq) String() string { } func (su *statsCountUniq) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(su.fields) + updateNeededFieldsForStatsFunc(neededFields, su.fields) } func (su *statsCountUniq) newStatsProcessor() (statsProcessor, int) { @@ -52,17 +50,23 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { } fields := sup.su.fields - m := sup.m stateSizeIncrease := 0 - if sup.su.containsStar { + if len(fields) == 0 { // Count unique rows cs := br.getColumns() + + columnValues := sup.columnValues[:0] + for _, c := range cs { + values := c.getValues(br) + columnValues = append(columnValues, values) + } + sup.columnValues = columnValues + keyBuf := sup.keyBuf[:0] for i := range br.timestamps { seenKey := true - for _, c := range cs { - values := c.getValues(br) + for _, values := range columnValues { if i == 0 || values[i-1] != values[i] { seenKey = false break @@ -75,23 +79,20 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { allEmptyValues := true keyBuf = keyBuf[:0] - for _, c := range cs { - v := c.getValueAtRow(br, i) + for j, values := range columnValues { + v := values[i] if v != "" { allEmptyValues = false } // Put column name into key, since every block can contain different set of columns for '*' selector. - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(cs[j].name)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } if allEmptyValues { // Do not count empty values continue } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) } sup.keyBuf = keyBuf return stateSizeIncrease @@ -112,10 +113,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { } keyBuf = append(keyBuf[:0], 1) keyBuf = encoding.MarshalInt64(keyBuf, timestamp) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) } sup.keyBuf = keyBuf return stateSizeIncrease @@ -130,10 +128,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { keyBuf := sup.keyBuf[:0] keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf, v...) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) sup.keyBuf = keyBuf return stateSizeIncrease } @@ -147,10 +142,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { } keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf, v...) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) } sup.keyBuf = keyBuf return stateSizeIncrease @@ -170,10 +162,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { } keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf, v...) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) } sup.keyBuf = keyBuf return stateSizeIncrease @@ -216,10 +205,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { // Do not count empty values continue } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) } sup.keyBuf = keyBuf return stateSizeIncrease @@ -231,10 +217,9 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in } fields := sup.su.fields - m := sup.m stateSizeIncrease := 0 - if sup.su.containsStar { + if len(fields) == 0 { // Count unique rows allEmptyValues := true keyBuf := sup.keyBuf[:0] @@ -253,10 +238,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in // Do not count empty values return stateSizeIncrease } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) return stateSizeIncrease } if len(fields) == 1 { @@ -269,10 +251,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in keyBuf := sup.keyBuf[:0] keyBuf = append(keyBuf[:0], 1) keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx]) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) sup.keyBuf = keyBuf return stateSizeIncrease } @@ -286,10 +265,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in keyBuf := sup.keyBuf[:0] keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf, v...) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) sup.keyBuf = keyBuf return stateSizeIncrease } @@ -305,10 +281,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in keyBuf := sup.keyBuf[:0] keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf, v...) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) sup.keyBuf = keyBuf return stateSizeIncrease } @@ -322,10 +295,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in keyBuf := sup.keyBuf[:0] keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf, v...) - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) sup.keyBuf = keyBuf return stateSizeIncrease } @@ -347,10 +317,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in // Do not count empty values return stateSizeIncrease } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } + stateSizeIncrease += sup.updateState(keyBuf) return stateSizeIncrease } @@ -376,19 +343,27 @@ func (sup *statsCountUniqProcessor) finalizeStats() string { return strconv.FormatUint(n, 10) } +func (sup *statsCountUniqProcessor) updateState(v []byte) int { + stateSizeIncrease := 0 + if _, ok := sup.m[string(v)]; !ok { + sup.m[string(v)] = struct{}{} + stateSizeIncrease += len(v) + int(unsafe.Sizeof("")) + } + return stateSizeIncrease +} + func (sup *statsCountUniqProcessor) limitReached() bool { limit := sup.su.limit return limit > 0 && uint64(len(sup.m)) >= limit } func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "count_uniq") + fields, err := parseStatsFuncFields(lex, "count_uniq") if err != nil { return nil, err } su := &statsCountUniq{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } if lex.isKeyword("limit") { lex.nextToken() diff --git a/lib/logstorage/stats_count_uniq_test.go b/lib/logstorage/stats_count_uniq_test.go new file mode 100644 index 000000000..a237d9cf5 --- /dev/null +++ b/lib/logstorage/stats_count_uniq_test.go @@ -0,0 +1,373 @@ +package logstorage + +import ( + "testing" +) + +func TestParseStatsCountUniqSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncSuccess(t, pipeStr) + } + + f(`count_uniq(*)`) + f(`count_uniq(a)`) + f(`count_uniq(a, b)`) + f(`count_uniq(*) limit 10`) + f(`count_uniq(a) limit 20`) + f(`count_uniq(a, b) limit 5`) +} + +func TestParseStatsCountUniqFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParseStatsFuncFailure(t, pipeStr) + } + + f(`count_uniq`) + f(`count_uniq(a b)`) + f(`count_uniq(x) y`) + f(`count_uniq(x) limit`) + f(`count_uniq(x) limit N`) +} + +func TestStatsCountUniq(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("stats count_uniq(*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + {}, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", "3"}, + }, + }) + + f("stats count_uniq(*) limit 2 as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + {}, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", "2"}, + }, + }) + + f("stats count_uniq(*) limit 10 as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + {}, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", "3"}, + }, + }) + + f("stats count_uniq(b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + {}, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", "2"}, + }, + }) + + f("stats count_uniq(a, b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + {}, + { + {"aa", `3`}, + {"bb", `54`}, + }, + }, [][]Field{ + { + {"x", "2"}, + }, + }) + + f("stats count_uniq(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", "0"}, + }, + }) + + f("stats count_uniq(a) if (b:*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"b", `54`}, + }, + }, [][]Field{ + { + {"x", "1"}, + }, + }) + + f("stats by (a) count_uniq(b) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"b", `5`}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", "1"}, + }, + { + {"a", "3"}, + {"x", "2"}, + }, + }) + + f("stats by (a) count_uniq(b) if (!c:foo) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"b", "aadf"}, + {"c", "foo"}, + }, + { + {"a", `3`}, + {"b", `5`}, + {"c", "bar"}, + }, + { + {"a", `3`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", "1"}, + }, + { + {"a", "3"}, + {"x", "1"}, + }, + }) + + f("stats by (a) count_uniq(*) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "3"}, + }, + {}, + { + {"a", `3`}, + {"b", `5`}, + }, + }, [][]Field{ + { + {"a", ""}, + {"x", "0"}, + }, + { + {"a", "1"}, + {"x", "2"}, + }, + { + {"a", "3"}, + {"x", "1"}, + }, + }) + + f("stats by (a) count_uniq(c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `3`}, + {"c", `5`}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", "0"}, + }, + { + {"a", "3"}, + {"x", "1"}, + }, + }) + + f("stats by (a) count_uniq(a, b, c) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "3"}, + }, + { + {"a", `3`}, + {"b", `5`}, + }, + { + {"foo", "bar"}, + }, + { + {"a", `3`}, + {"b", `7`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", "2"}, + }, + { + {"a", ""}, + {"x", "0"}, + }, + { + {"a", "3"}, + {"x", "2"}, + }, + }) + + f("stats by (a, b) count_uniq(a) as x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `1`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"c", "3"}, + }, + { + {"c", `3`}, + {"b", `5`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"b", "3"}, + {"x", "1"}, + }, + { + {"a", "1"}, + {"b", ""}, + {"x", "1"}, + }, + { + {"a", ""}, + {"b", "5"}, + {"x", "0"}, + }, + }) +} diff --git a/lib/logstorage/stats_fields_max.go b/lib/logstorage/stats_fields_max.go index 98d290260..599cf48b0 100644 --- a/lib/logstorage/stats_fields_max.go +++ b/lib/logstorage/stats_fields_max.go @@ -14,23 +14,23 @@ import ( type statsFieldsMax struct { srcField string - resultFields []string + fetchFields []string } func (sm *statsFieldsMax) String() string { s := "fields_max(" + quoteTokenIfNeeded(sm.srcField) - if len(sm.resultFields) > 0 { - s += ", " + fieldNamesString(sm.resultFields) + if len(sm.fetchFields) > 0 { + s += ", " + fieldNamesString(sm.fetchFields) } s += ")" return s } func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) { - if len(sm.resultFields) == 0 { + if len(sm.fetchFields) == 0 { neededFields.add("*") } else { - neededFields.addFields(sm.resultFields) + neededFields.addFields(sm.fetchFields) } neededFields.add(sm.srcField) } @@ -177,7 +177,8 @@ func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowId clear(fields) fields = fields[:0] - if len(smp.sm.resultFields) == 0 { + fetchFields := smp.sm.fetchFields + if len(fetchFields) == 0 { cs := br.getColumns() for _, c := range cs { v := c.getValueAtRow(br, rowIdx) @@ -188,7 +189,7 @@ func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowId stateSizeIncrease += len(c.name) + len(v) } } else { - for _, field := range smp.sm.resultFields { + for _, field := range fetchFields { c := br.getColumnByName(field) v := c.getValueAtRow(br, rowIdx) fields = append(fields, Field{ @@ -227,14 +228,14 @@ func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) { } srcField := fields[0] - resultFields := fields[1:] - if slices.Contains(resultFields, "*") { - resultFields = nil + fetchFields := fields[1:] + if slices.Contains(fetchFields, "*") { + fetchFields = nil } sm := &statsFieldsMax{ - srcField: srcField, - resultFields: resultFields, + srcField: srcField, + fetchFields: fetchFields, } return sm, nil } diff --git a/lib/logstorage/stats_fields_min.go b/lib/logstorage/stats_fields_min.go index 25ae60ede..e57b466e8 100644 --- a/lib/logstorage/stats_fields_min.go +++ b/lib/logstorage/stats_fields_min.go @@ -14,23 +14,23 @@ import ( type statsFieldsMin struct { srcField string - resultFields []string + fetchFields []string } func (sm *statsFieldsMin) String() string { s := "fields_min(" + quoteTokenIfNeeded(sm.srcField) - if len(sm.resultFields) > 0 { - s += ", " + fieldNamesString(sm.resultFields) + if len(sm.fetchFields) > 0 { + s += ", " + fieldNamesString(sm.fetchFields) } s += ")" return s } func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) { - if len(sm.resultFields) == 0 { + if len(sm.fetchFields) == 0 { neededFields.add("*") } else { - neededFields.addFields(sm.resultFields) + neededFields.addFields(sm.fetchFields) } neededFields.add(sm.srcField) } @@ -177,7 +177,8 @@ func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowId clear(fields) fields = fields[:0] - if len(smp.sm.resultFields) == 0 { + fetchFields := smp.sm.fetchFields + if len(fetchFields) == 0 { cs := br.getColumns() for _, c := range cs { v := c.getValueAtRow(br, rowIdx) @@ -188,7 +189,7 @@ func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowId stateSizeIncrease += len(c.name) + len(v) } } else { - for _, field := range smp.sm.resultFields { + for _, field := range fetchFields { c := br.getColumnByName(field) v := c.getValueAtRow(br, rowIdx) fields = append(fields, Field{ @@ -227,14 +228,14 @@ func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) { } srcField := fields[0] - resultFields := fields[1:] - if slices.Contains(resultFields, "*") { - resultFields = nil + fetchFields := fields[1:] + if slices.Contains(fetchFields, "*") { + fetchFields = nil } sm := &statsFieldsMin{ - srcField: srcField, - resultFields: resultFields, + srcField: srcField, + fetchFields: fetchFields, } return sm, nil } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index b77082325..c92cd7a75 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -2,7 +2,6 @@ package logstorage import ( "math" - "slices" "strings" "unsafe" @@ -15,14 +14,11 @@ type statsMax struct { } func (sm *statsMax) String() string { - if len(sm.fields) == 0 { - return "max(*)" - } - return "max(" + fieldNamesString(sm.fields) + ")" + return "max(" + statsFuncFieldsToString(sm.fields) + ")" } func (sm *statsMax) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(sm.fields) + updateNeededFieldsForStatsFunc(neededFields, sm.fields) } func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { @@ -168,13 +164,10 @@ func (smp *statsMaxProcessor) finalizeStats() string { } func parseStatsMax(lex *lexer) (*statsMax, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "max") + fields, err := parseStatsFuncFields(lex, "max") if err != nil { return nil, err } - if slices.Contains(fields, "*") { - fields = nil - } sm := &statsMax{ fields: fields, } diff --git a/lib/logstorage/stats_median.go b/lib/logstorage/stats_median.go index baeb240e3..92b95fc02 100644 --- a/lib/logstorage/stats_median.go +++ b/lib/logstorage/stats_median.go @@ -1,30 +1,27 @@ package logstorage import ( - "slices" "unsafe" ) type statsMedian struct { - fields []string - containsStar bool + fields []string } func (sm *statsMedian) String() string { - return "median(" + fieldNamesString(sm.fields) + ")" + return "median(" + statsFuncFieldsToString(sm.fields) + ")" } func (sm *statsMedian) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(sm.fields) + updateNeededFieldsForStatsFunc(neededFields, sm.fields) } func (sm *statsMedian) newStatsProcessor() (statsProcessor, int) { smp := &statsMedianProcessor{ sqp: &statsQuantileProcessor{ sq: &statsQuantile{ - fields: sm.fields, - containsStar: sm.containsStar, - phi: 0.5, + fields: sm.fields, + phi: 0.5, }, }, } @@ -53,13 +50,12 @@ func (smp *statsMedianProcessor) finalizeStats() string { } func parseStatsMedian(lex *lexer) (*statsMedian, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "median") + fields, err := parseStatsFuncFields(lex, "median") if err != nil { return nil, err } sm := &statsMedian{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } return sm, nil } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 4ddb0929d..fe9890dab 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -2,7 +2,6 @@ package logstorage import ( "math" - "slices" "strings" "unsafe" @@ -15,14 +14,11 @@ type statsMin struct { } func (sm *statsMin) String() string { - if len(sm.fields) == 0 { - return "min(*)" - } - return "min(" + fieldNamesString(sm.fields) + ")" + return "min(" + statsFuncFieldsToString(sm.fields) + ")" } func (sm *statsMin) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(sm.fields) + updateNeededFieldsForStatsFunc(neededFields, sm.fields) } func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { @@ -41,14 +37,15 @@ type statsMinProcessor struct { func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { minLen := len(smp.min) - if len(smp.sm.fields) == 0 { + fields := smp.sm.fields + if len(fields) == 0 { // Find the minimum value across all the columns for _, c := range br.getColumns() { smp.updateStateForColumn(br, c) } } else { // Find the minimum value across the requested columns - for _, field := range smp.sm.fields { + for _, field := range fields { c := br.getColumnByName(field) smp.updateStateForColumn(br, c) } @@ -60,7 +57,8 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { minLen := len(smp.min) - if len(smp.sm.fields) == 0 { + fields := smp.sm.fields + if len(fields) == 0 { // Find the minimum value across all the fields for the given row for _, c := range br.getColumns() { v := c.getValueAtRow(br, rowIdx) @@ -68,7 +66,7 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int } } else { // Find the minimum value across the requested fields for the given row - for _, field := range smp.sm.fields { + for _, field := range fields { c := br.getColumnByName(field) v := c.getValueAtRow(br, rowIdx) smp.updateStateString(v) @@ -168,13 +166,10 @@ func (smp *statsMinProcessor) finalizeStats() string { } func parseStatsMin(lex *lexer) (*statsMin, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "min") + fields, err := parseStatsFuncFields(lex, "min") if err != nil { return nil, err } - if slices.Contains(fields, "*") { - fields = nil - } sm := &statsMin{ fields: fields, } diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go index 685281bb4..4bf988f28 100644 --- a/lib/logstorage/stats_quantile.go +++ b/lib/logstorage/stats_quantile.go @@ -14,19 +14,23 @@ import ( ) type statsQuantile struct { - fields []string - containsStar bool + fields []string phi float64 phiStr string } func (sq *statsQuantile) String() string { - return fmt.Sprintf("quantile(%s, %s)", sq.phiStr, fieldNamesString(sq.fields)) + s := "quantile(" + sq.phiStr + if len(sq.fields) > 0 { + s += ", " + fieldNamesString(sq.fields) + } + s += ")" + return s } func (sq *statsQuantile) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(sq.fields) + updateNeededFieldsForStatsFunc(neededFields, sq.fields) } func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) { @@ -45,12 +49,13 @@ type statsQuantileProcessor struct { func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease := 0 - if sqp.sq.containsStar { + fields := sqp.sq.fields + if len(fields) == 0 { for _, c := range br.getColumns() { stateSizeIncrease += sqp.updateStateForColumn(br, c) } } else { - for _, field := range sqp.sq.fields { + for _, field := range fields { c := br.getColumnByName(field) stateSizeIncrease += sqp.updateStateForColumn(br, c) } @@ -63,7 +68,8 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int h := &sqp.h stateSizeIncrease := 0 - if sqp.sq.containsStar { + fields := sqp.sq.fields + if len(fields) == 0 { for _, c := range br.getColumns() { f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { @@ -71,7 +77,7 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int } } } else { - for _, field := range sqp.sq.fields { + for _, field := range fields { c := br.getColumnByName(field) f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { @@ -182,8 +188,8 @@ func parseStatsQuantile(lex *lexer) (*statsQuantile, error) { if err != nil { return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err) } - if len(fields) < 2 { - return nil, fmt.Errorf("'quantile' must have at least two args: phi and field name") + if len(fields) < 1 { + return nil, fmt.Errorf("'quantile' must have at least phi arg") } // Parse phi @@ -199,12 +205,11 @@ func parseStatsQuantile(lex *lexer) (*statsQuantile, error) { // Parse fields fields = fields[1:] if slices.Contains(fields, "*") { - fields = []string{"*"} + fields = nil } sq := &statsQuantile{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, phi: phi, phiStr: phiStr, diff --git a/lib/logstorage/stats_quantile_test.go b/lib/logstorage/stats_quantile_test.go index 9cd8baa9c..580b6c6fb 100644 --- a/lib/logstorage/stats_quantile_test.go +++ b/lib/logstorage/stats_quantile_test.go @@ -11,7 +11,7 @@ func TestParseStatsQuantileSuccess(t *testing.T) { expectParseStatsFuncSuccess(t, pipeStr) } - f(`quantile(0.3, *)`) + f(`quantile(0.3)`) f(`quantile(1, a)`) f(`quantile(0.99, a, b)`) } @@ -36,7 +36,7 @@ func TestStatsQuantile(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } - f("stats quantile(0.9, *) as x", [][]Field{ + f("stats quantile(0.9) as x", [][]Field{ { {"_msg", `abc`}, {"a", `2`}, @@ -211,7 +211,7 @@ func TestStatsQuantile(t *testing.T) { }, }) - f("stats by (a) quantile(0.9, *) as x", [][]Field{ + f("stats by (a) quantile(0.9) as x", [][]Field{ { {"_msg", `abc`}, {"a", `1`}, diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index b0421c661..abab7fbc8 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -2,22 +2,20 @@ package logstorage import ( "math" - "slices" "strconv" "unsafe" ) type statsSum struct { - fields []string - containsStar bool + fields []string } func (ss *statsSum) String() string { - return "sum(" + fieldNamesString(ss.fields) + ")" + return "sum(" + statsFuncFieldsToString(ss.fields) + ")" } func (ss *statsSum) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(ss.fields) + updateNeededFieldsForStatsFunc(neededFields, ss.fields) } func (ss *statsSum) newStatsProcessor() (statsProcessor, int) { @@ -35,14 +33,15 @@ type statsSumProcessor struct { } func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { - if ssp.ss.containsStar { + fields := ssp.ss.fields + if len(fields) == 0 { // Sum all the columns for _, c := range br.getColumns() { ssp.updateStateForColumn(br, c) } } else { // Sum the requested columns - for _, field := range ssp.ss.fields { + for _, field := range fields { c := br.getColumnByName(field) ssp.updateStateForColumn(br, c) } @@ -51,7 +50,8 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { } func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { - if ssp.ss.containsStar { + fields := ssp.ss.fields + if len(fields) == 0 { // Sum all the fields for the given row for _, c := range br.getColumns() { f, ok := c.getFloatValueAtRow(br, rowIdx) @@ -61,7 +61,7 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int } } else { // Sum only the given fields for the given row - for _, field := range ssp.ss.fields { + for _, field := range fields { c := br.getColumnByName(field) f, ok := c.getFloatValueAtRow(br, rowIdx) if ok { @@ -99,13 +99,12 @@ func (ssp *statsSumProcessor) finalizeStats() string { } func parseStatsSum(lex *lexer) (*statsSum, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "sum") + fields, err := parseStatsFuncFields(lex, "sum") if err != nil { return nil, err } ss := &statsSum{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } return ss, nil } diff --git a/lib/logstorage/stats_sum_len.go b/lib/logstorage/stats_sum_len.go index 0b27f45c2..fd83bc30e 100644 --- a/lib/logstorage/stats_sum_len.go +++ b/lib/logstorage/stats_sum_len.go @@ -1,22 +1,20 @@ package logstorage import ( - "slices" "strconv" "unsafe" ) type statsSumLen struct { - fields []string - containsStar bool + fields []string } func (ss *statsSumLen) String() string { - return "sum_len(" + fieldNamesString(ss.fields) + ")" + return "sum_len(" + statsFuncFieldsToString(ss.fields) + ")" } func (ss *statsSumLen) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(ss.fields) + updateNeededFieldsForStatsFunc(neededFields, ss.fields) } func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) { @@ -34,14 +32,15 @@ type statsSumLenProcessor struct { } func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int { - if ssp.ss.containsStar { + fields := ssp.ss.fields + if len(fields) == 0 { // Sum all the columns for _, c := range br.getColumns() { ssp.sumLen += c.sumLenValues(br) } } else { // Sum the requested columns - for _, field := range ssp.ss.fields { + for _, field := range fields { c := br.getColumnByName(field) ssp.sumLen += c.sumLenValues(br) } @@ -50,7 +49,8 @@ func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int { } func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { - if ssp.ss.containsStar { + fields := ssp.ss.fields + if len(fields) == 0 { // Sum all the fields for the given row for _, c := range br.getColumns() { v := c.getValueAtRow(br, rowIdx) @@ -58,7 +58,7 @@ func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int) } } else { // Sum only the given fields for the given row - for _, field := range ssp.ss.fields { + for _, field := range fields { c := br.getColumnByName(field) v := c.getValueAtRow(br, rowIdx) ssp.sumLen += uint64(len(v)) @@ -77,13 +77,12 @@ func (ssp *statsSumLenProcessor) finalizeStats() string { } func parseStatsSumLen(lex *lexer) (*statsSumLen, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "sum_len") + fields, err := parseStatsFuncFields(lex, "sum_len") if err != nil { return nil, err } ss := &statsSumLen{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } return ss, nil } diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index 0d2778798..4e07ab4fb 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -11,13 +11,12 @@ import ( ) type statsUniqValues struct { - fields []string - containsStar bool - limit uint64 + fields []string + limit uint64 } func (su *statsUniqValues) String() string { - s := "uniq_values(" + fieldNamesString(su.fields) + ")" + s := "uniq_values(" + statsFuncFieldsToString(su.fields) + ")" if su.limit > 0 { s += fmt.Sprintf(" limit %d", su.limit) } @@ -25,7 +24,7 @@ func (su *statsUniqValues) String() string { } func (su *statsUniqValues) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(su.fields) + updateNeededFieldsForStatsFunc(neededFields, su.fields) } func (su *statsUniqValues) newStatsProcessor() (statsProcessor, int) { @@ -50,12 +49,13 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int } stateSizeIncrease := 0 - if sup.su.containsStar { + fields := sup.su.fields + if len(fields) == 0 { for _, c := range br.getColumns() { stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br) } } else { - for _, field := range sup.su.fields { + for _, field := range fields { c := br.getColumnByName(field) stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br) } @@ -64,7 +64,6 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int } func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int { - m := sup.m stateSizeIncrease := 0 if c.isConst { // collect unique const values @@ -73,11 +72,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC // skip empty values return stateSizeIncrease } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } + stateSizeIncrease += sup.updateState(v) return stateSizeIncrease } if c.valueType == valueTypeDict { @@ -87,11 +82,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC // skip empty values continue } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } + stateSizeIncrease += sup.updateState(v) } return stateSizeIncrease } @@ -107,11 +98,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC // This value has been already counted. continue } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } + stateSizeIncrease += sup.updateState(v) } return stateSizeIncrease } @@ -123,12 +110,13 @@ func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx i } stateSizeIncrease := 0 - if sup.su.containsStar { + fields := sup.su.fields + if len(fields) == 0 { for _, c := range br.getColumns() { stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx) } } else { - for _, field := range sup.su.fields { + for _, field := range fields { c := br.getColumnByName(field) stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx) } @@ -137,7 +125,6 @@ func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx i } func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int { - m := sup.m stateSizeIncrease := 0 if c.isConst { // collect unique const values @@ -146,11 +133,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum // skip empty values return stateSizeIncrease } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } + stateSizeIncrease += sup.updateState(v) return stateSizeIncrease } if c.valueType == valueTypeDict { @@ -162,11 +145,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum // skip empty values return stateSizeIncrease } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } + stateSizeIncrease += sup.updateState(v) return stateSizeIncrease } @@ -176,11 +155,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum // skip empty values return stateSizeIncrease } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } + stateSizeIncrease += sup.updateState(v) return stateSizeIncrease } @@ -190,10 +165,9 @@ func (sup *statsUniqValuesProcessor) mergeState(sfp statsProcessor) { } src := sfp.(*statsUniqValuesProcessor) - m := sup.m for k := range src.m { - if _, ok := m[k]; !ok { - m[k] = struct{}{} + if _, ok := sup.m[k]; !ok { + sup.m[k] = struct{}{} } } } @@ -228,6 +202,16 @@ func sortStrings(a []string) { }) } +func (sup *statsUniqValuesProcessor) updateState(v string) int { + stateSizeIncrease := 0 + if _, ok := sup.m[v]; !ok { + vCopy := strings.Clone(v) + sup.m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + return stateSizeIncrease +} + func (sup *statsUniqValuesProcessor) limitReached() bool { limit := sup.su.limit return limit > 0 && uint64(len(sup.m)) >= limit @@ -255,13 +239,12 @@ func marshalJSONArray(items []string) string { } func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "uniq_values") + fields, err := parseStatsFuncFields(lex, "uniq_values") if err != nil { return nil, err } su := &statsUniqValues{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } if lex.isKeyword("limit") { lex.nextToken() diff --git a/lib/logstorage/stats_values.go b/lib/logstorage/stats_values.go index dd57a6ddc..ceab48998 100644 --- a/lib/logstorage/stats_values.go +++ b/lib/logstorage/stats_values.go @@ -2,19 +2,17 @@ package logstorage import ( "fmt" - "slices" "strings" "unsafe" ) type statsValues struct { - fields []string - containsStar bool - limit uint64 + fields []string + limit uint64 } func (sv *statsValues) String() string { - s := "values(" + fieldNamesString(sv.fields) + ")" + s := "values(" + statsFuncFieldsToString(sv.fields) + ")" if sv.limit > 0 { s += fmt.Sprintf(" limit %d", sv.limit) } @@ -22,7 +20,7 @@ func (sv *statsValues) String() string { } func (sv *statsValues) updateNeededFields(neededFields fieldsSet) { - neededFields.addFields(sv.fields) + updateNeededFieldsForStatsFunc(neededFields, sv.fields) } func (sv *statsValues) newStatsProcessor() (statsProcessor, int) { @@ -45,12 +43,13 @@ func (svp *statsValuesProcessor) updateStatsForAllRows(br *blockResult) int { } stateSizeIncrease := 0 - if svp.sv.containsStar { + fields := svp.sv.fields + if len(fields) == 0 { for _, c := range br.getColumns() { stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br) } } else { - for _, field := range svp.sv.fields { + for _, field := range fields { c := br.getColumnByName(field) stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br) } @@ -112,12 +111,13 @@ func (svp *statsValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) } stateSizeIncrease := 0 - if svp.sv.containsStar { + fields := svp.sv.fields + if len(fields) == 0 { for _, c := range br.getColumns() { stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx) } } else { - for _, field := range svp.sv.fields { + for _, field := range fields { c := br.getColumnByName(field) stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx) } @@ -188,13 +188,12 @@ func (svp *statsValuesProcessor) limitReached() bool { } func parseStatsValues(lex *lexer) (*statsValues, error) { - fields, err := parseFieldNamesForStatsFunc(lex, "values") + fields, err := parseStatsFuncFields(lex, "values") if err != nil { return nil, err } sv := &statsValues{ - fields: fields, - containsStar: slices.Contains(fields, "*"), + fields: fields, } if lex.isKeyword("limit") { lex.nextToken()