This commit is contained in:
Aliaksandr Valialkin 2024-05-03 13:44:57 +02:00
parent 63d9a02d46
commit 85a60e0743
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 48 additions and 22 deletions

View file

@ -857,10 +857,14 @@ func TestParseQuerySuccess(t *testing.T) {
// stats pipe uniq // stats pipe uniq
f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`) f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`)
f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`) f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`)
f(`* | stats by(x) uniq(*) z`, `* | stats by (x) uniq(*) as z`)
f(`* | stats by(x) uniq() z`, `* | stats by (x) uniq() as z`)
// stats pipe uniq_array // stats pipe uniq_array
f(`* | stats uniq_array(foo) bar`, `* | stats uniq_array(foo) as bar`) f(`* | stats uniq_array(foo) bar`, `* | stats uniq_array(foo) as bar`)
f(`* | stats by(x, y) uniq_array(foo) as baz`, `* | stats by (x, y) uniq_array(foo) as baz`) f(`* | stats by(x, y) uniq_array(foo, bar) as baz`, `* | stats by (x, y) uniq_array(foo, bar) as baz`)
f(`* | stats by(x) uniq_array(*) y`, `* | stats by (x) uniq_array(*) as y`)
f(`* | stats by(x) uniq_array() y`, `* | stats by (x) uniq_array() as y`)
// stats pipe multiple funcs // stats pipe multiple funcs
f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`) f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`)
@ -1143,9 +1147,6 @@ func TestParseQueryFailure(t *testing.T) {
// invalid stats uniq_array // invalid stats uniq_array
f(`foo | stats uniq_array`) f(`foo | stats uniq_array`)
f(`foo | stats uniq_array()`) f(`foo | stats uniq_array()`)
f(`foo | stats uniq_array() as foo`)
f(`foo | stats uniq_array(a,b) as foo`)
f(`foo | stats uniq_array(*) as foo`)
// invalid grouping fields // invalid grouping fields
f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:bar) count() baz`)

View file

@ -1,7 +1,7 @@
package logstorage package logstorage
import ( import (
"fmt" "slices"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -11,15 +11,16 @@ import (
) )
type statsUniqArray struct { type statsUniqArray struct {
field string fields []string
containsStar bool
} }
func (su *statsUniqArray) String() string { func (su *statsUniqArray) String() string {
return "uniq_array(" + quoteTokenIfNeeded(su.field) + ")" return "uniq_array(" + fieldNamesString(su.fields) + ")"
} }
func (su *statsUniqArray) neededFields() []string { func (su *statsUniqArray) neededFields() []string {
return []string{su.field} return su.fields
} }
func (su *statsUniqArray) newStatsProcessor() (statsProcessor, int) { func (su *statsUniqArray) newStatsProcessor() (statsProcessor, int) {
@ -38,11 +39,26 @@ type statsUniqArrayProcessor struct {
} }
func (sup *statsUniqArrayProcessor) updateStatsForAllRows(br *blockResult) int { func (sup *statsUniqArrayProcessor) updateStatsForAllRows(br *blockResult) int {
field := sup.su.field fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0 stateSizeIncrease := 0
c := br.getColumnByName(field) if len(fields) == 0 || sup.su.containsStar {
columns := br.getColumns()
for i := range columns {
stateSizeIncrease += sup.updateStatsForAllRowsColumn(&columns[i], br)
}
} else {
for _, field := range fields {
c := br.getColumnByName(field)
stateSizeIncrease += sup.updateStatsForAllRowsColumn(&c, br)
}
}
return stateSizeIncrease
}
func (sup *statsUniqArrayProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int {
m := sup.m
stateSizeIncrease := 0
if c.isConst { if c.isConst {
// collect unique const values // collect unique const values
v := c.encodedValues[0] v := c.encodedValues[0]
@ -94,11 +110,26 @@ func (sup *statsUniqArrayProcessor) updateStatsForAllRows(br *blockResult) int {
} }
func (sup *statsUniqArrayProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (sup *statsUniqArrayProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
field := sup.su.field fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0 stateSizeIncrease := 0
c := br.getColumnByName(field) if len(fields) == 0 || sup.su.containsStar {
columns := br.getColumns()
for i := range columns {
stateSizeIncrease += sup.updateStatsForRowColumn(&columns[i], br, rowIdx)
}
} else {
for _, field := range fields {
c := br.getColumnByName(field)
stateSizeIncrease += sup.updateStatsForRowColumn(&c, br, rowIdx)
}
}
return stateSizeIncrease
}
func (sup *statsUniqArrayProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int {
m := sup.m
stateSizeIncrease := 0
if c.isConst { if c.isConst {
// collect unique const values // collect unique const values
v := c.encodedValues[0] v := c.encodedValues[0]
@ -192,15 +223,9 @@ func parseStatsUniqArray(lex *lexer) (*statsUniqArray, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(fields) != 1 {
return nil, fmt.Errorf("'uniq_array' needs exactly one field; got %d fields: [%s]", len(fields), fields)
}
field := fields[0]
if field == "*" {
return nil, fmt.Errorf("'uniq_array' cannot contain '*'")
}
su := &statsUniqArray{ su := &statsUniqArray{
field: field, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return su, nil return su, nil
} }