diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 7df246af9..5b701957a 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -858,6 +858,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`) f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`) + // stats pipe uniq_array + f(`* | stats uniq_array(foo) bar`, `* | stats uniq_array(foo) as bar`) + f(`* | stats by(x, y) uniq_array(foo) as baz`, `* | stats by (x, y) uniq_array(foo) as baz`) + // stats pipe multiple funcs f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`) f(`* | stats by (x, y) count(*) foo, uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, uniq(a, b) as bar`) @@ -1136,6 +1140,13 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats uniq`) f(`foo | stats uniq()`) + // invalid stats uniq_array + f(`foo | stats uniq_array`) + f(`foo | stats uniq_array()`) + f(`foo | stats uniq_array() as foo`) + f(`foo | stats uniq_array(a,b) as foo`) + f(`foo | stats uniq_array(*) as foo`) + // invalid grouping fields f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 37f67a99b..86a6ee5d0 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -439,17 +439,17 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { var sf statsFunc switch { case lex.isKeyword("count"): - sfc, err := parseStatsCount(lex) + scs, err := parseStatsCount(lex) if err != nil { return nil, "", fmt.Errorf("cannot parse 'count' func: %w", err) } - sf = sfc + sf = scs case lex.isKeyword("uniq"): - sfu, err := parseStatsUniq(lex) + sus, err := parseStatsUniq(lex) if err != nil { return nil, "", fmt.Errorf("cannot parse 'uniq' func: %w", err) } - sf = sfu + sf = sus case lex.isKeyword("sum"): sfs, err := parseStatsSum(lex) if err != nil { @@ -474,6 +474,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'avg' func: %w", err) } sf = sas + case lex.isKeyword("uniq_array"): + sus, err := parseStatsUniqArray(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'uniq_array' func: %w", err) + } + sf = sus default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/stats_unique.go b/lib/logstorage/stats_uniq.go similarity index 98% rename from lib/logstorage/stats_unique.go rename to lib/logstorage/stats_uniq.go index dfb486807..36700b8c5 100644 --- a/lib/logstorage/stats_unique.go +++ b/lib/logstorage/stats_uniq.go @@ -90,7 +90,7 @@ func (sup *statsUniqProcessor) updateStatsForAllRows(br *blockResult) int { // Fast path for a single column. // The unique key is formed as " ? ", // where is skipped if == 1. - // This guarantees that keys do not clash for different column types acorss blocks. + // This guarantees that keys do not clash for different column types across blocks. c := br.getColumnByName(fields[0]) if c.isTime { // Count unique br.timestamps @@ -251,7 +251,7 @@ func (sup *statsUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) in // Fast path for a single column. // The unique key is formed as " ? ", // where is skipped if == 1. - // This guarantees that keys do not clash for different column types acorss blocks. + // This guarantees that keys do not clash for different column types across blocks. c := br.getColumnByName(fields[0]) if c.isTime { // Count unique br.timestamps @@ -300,7 +300,7 @@ func (sup *statsUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) in return stateSizeIncrease } - // Count unique values across encodedValues + // Count unique values for the given rowIdx encodedValues := c.getEncodedValues(br) v := encodedValues[rowIdx] if c.valueType == valueTypeString && v == "" { @@ -346,7 +346,9 @@ func (sup *statsUniqProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsUniqProcessor) m := sup.m for k := range src.m { - m[k] = struct{}{} + if _, ok := m[k]; !ok { + m[k] = struct{}{} + } } } diff --git a/lib/logstorage/stats_uniq_array.go b/lib/logstorage/stats_uniq_array.go new file mode 100644 index 000000000..25f2ae98d --- /dev/null +++ b/lib/logstorage/stats_uniq_array.go @@ -0,0 +1,206 @@ +package logstorage + +import ( + "fmt" + "sort" + "strconv" + "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +type statsUniqArray struct { + field string +} + +func (su *statsUniqArray) String() string { + return "uniq_array(" + quoteTokenIfNeeded(su.field) + ")" +} + +func (su *statsUniqArray) neededFields() []string { + return []string{su.field} +} + +func (su *statsUniqArray) newStatsProcessor() (statsProcessor, int) { + sup := &statsUniqArrayProcessor{ + su: su, + + m: make(map[string]struct{}), + } + return sup, int(unsafe.Sizeof(*sup)) +} + +type statsUniqArrayProcessor struct { + su *statsUniqArray + + m map[string]struct{} +} + +func (sup *statsUniqArrayProcessor) updateStatsForAllRows(br *blockResult) int { + field := sup.su.field + m := sup.m + + stateSizeIncrease := 0 + c := br.getColumnByName(field) + if c.isConst { + // collect unique const values + v := c.encodedValues[0] + if v == "" { + // skip empty values + return stateSizeIncrease + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + return stateSizeIncrease + } + if c.valueType == valueTypeDict { + // collect unique non-zero c.dictValues + for _, v := range c.dictValues { + if v == "" { + // skip empty values + continue + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + } + return stateSizeIncrease + } + + // slow path - collect unique values across all rows + values := c.getValues(br) + for i, v := range values { + if v == "" { + // skip empty values + continue + } + if i > 0 && values[i-1] == v { + // This value has been already counted. + continue + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + } + return stateSizeIncrease +} + +func (sup *statsUniqArrayProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + field := sup.su.field + m := sup.m + + stateSizeIncrease := 0 + c := br.getColumnByName(field) + if c.isConst { + // collect unique const values + v := c.encodedValues[0] + if v == "" { + // skip empty values + return stateSizeIncrease + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + return stateSizeIncrease + } + if c.valueType == valueTypeDict { + // collect unique non-zero c.dictValues + dictIdx := c.encodedValues[rowIdx][0] + v := c.dictValues[dictIdx] + if v == "" { + // skip empty values + return stateSizeIncrease + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + return stateSizeIncrease + } + + // collect unique values for the given rowIdx. + v := c.getValueAtRow(br, rowIdx) + if v == "" { + // skip empty values + return stateSizeIncrease + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + return stateSizeIncrease +} + +func (sup *statsUniqArrayProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsUniqArrayProcessor) + m := sup.m + for k := range src.m { + if _, ok := m[k]; !ok { + m[k] = struct{}{} + } + } +} + +func (sup *statsUniqArrayProcessor) finalizeStats() string { + if len(sup.m) == 0 { + return "[]" + } + + // Sort unique items + items := make([]string, 0, len(sup.m)) + for k := range sup.m { + items = append(items, k) + } + sort.Strings(items) + + // Marshal items into JSON array. + + // Pre-allocate buffer for serialized items. + // Assume that there is no need in quoting items. Otherwise additional reallocations + // for the allocated buffer are possible. + bufSize := len(items) + 1 + for _, item := range items { + bufSize += len(item) + } + b := make([]byte, 0, bufSize) + + b = append(b, '[') + b = strconv.AppendQuote(b, items[0]) + for _, item := range items[1:] { + b = append(b, ',') + b = strconv.AppendQuote(b, item) + } + b = append(b, ']') + + return bytesutil.ToUnsafeString(b) +} + +func parseStatsUniqArray(lex *lexer) (*statsUniqArray, error) { + fields, err := parseFieldNamesForFunc(lex, "uniq_array") + if err != nil { + return nil, err + } + if len(fields) != 1 { + return nil, fmt.Errorf("'uniq_array' needs exactly one field; got %d fields: [%s]", len(fields), fields) + } + field := fields[0] + if field == "*" { + return nil, fmt.Errorf("'uniq_array' cannot contain '*'") + } + su := &statsUniqArray{ + field: field, + } + return su, nil +}