This commit is contained in:
Aliaksandr Valialkin 2024-05-22 11:25:49 +02:00
parent 1eddb95c12
commit df6110bf06
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
18 changed files with 650 additions and 276 deletions

View file

@ -927,8 +927,8 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`) f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`)
f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`) f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`)
f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`) f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`)
f(`* | stats quantile(0.99, *) bar`, `* | stats quantile(0.99, *) as bar`) f(`* | stats quantile(0.99) bar`, `* | stats quantile(0.99) as bar`)
f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99, *) as bar`) f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99) as bar`)
// stats pipe median // stats pipe median
f(`* | stats Median(foo) bar`, `* | stats median(foo) as bar`) f(`* | stats Median(foo) bar`, `* | stats median(foo) as bar`)
@ -1359,7 +1359,6 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats quantile`) f(`foo | stats quantile`)
f(`foo | stats quantile() foo`) f(`foo | stats quantile() foo`)
f(`foo | stats quantile(bar, baz) foo`) f(`foo | stats quantile(bar, baz) foo`)
f(`foo | stats quantile(0.5) foo`)
f(`foo | stats quantile(-1, x) foo`) f(`foo | stats quantile(-1, x) foo`)
f(`foo | stats quantile(10, x) foo`) f(`foo | stats quantile(10, x) foo`)
@ -1549,6 +1548,49 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f2) r1, count(r1) r2 | fields r2`, `f1,f2,f3,f4`, ``) f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f2) r1, count(r1) r2 | fields r2`, `f1,f2,f3,f4`, ``)
f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f3) r1, count(r1) r2 | fields r1`, `f3,f4`, ``) f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f3) r1, count(r1) r2 | fields r1`, `f3,f4`, ``)
f(`* | stats avg() q`, `*`, ``)
f(`* | stats avg(*) q`, `*`, ``)
f(`* | stats avg(x) q`, `x`, ``)
f(`* | stats count_empty() q`, `*`, ``)
f(`* | stats count_empty(*) q`, `*`, ``)
f(`* | stats count_empty(x) q`, `x`, ``)
f(`* | stats count() q`, ``, ``)
f(`* | stats count(*) q`, ``, ``)
f(`* | stats count(x) q`, `x`, ``)
f(`* | stats count_uniq() q`, `*`, ``)
f(`* | stats count_uniq(*) q`, `*`, ``)
f(`* | stats count_uniq(x) q`, `x`, ``)
f(`* | stats fields_max(a) q`, `*`, ``)
f(`* | stats fields_max(a, *) q`, `*`, ``)
f(`* | stats fields_max(a, x) q`, `a,x`, ``)
f(`* | stats fields_min(a) q`, `*`, ``)
f(`* | stats fields_min(a, *) q`, `*`, ``)
f(`* | stats fields_min(a, x) q`, `a,x`, ``)
f(`* | stats min() q`, `*`, ``)
f(`* | stats min(*) q`, `*`, ``)
f(`* | stats min(x) q`, `x`, ``)
f(`* | stats median() q`, `*`, ``)
f(`* | stats median(*) q`, `*`, ``)
f(`* | stats median(x) q`, `x`, ``)
f(`* | stats max() q`, `*`, ``)
f(`* | stats max(*) q`, `*`, ``)
f(`* | stats max(x) q`, `x`, ``)
f(`* | stats quantile(0.5) q`, `*`, ``)
f(`* | stats quantile(0.5, *) q`, `*`, ``)
f(`* | stats quantile(0.5, x) q`, `x`, ``)
f(`* | stats sum() q`, `*`, ``)
f(`* | stats sum(*) q`, `*`, ``)
f(`* | stats sum(x) q`, `x`, ``)
f(`* | stats sum_len() q`, `*`, ``)
f(`* | stats sum_len(*) q`, `*`, ``)
f(`* | stats sum_len(x) q`, `x`, ``)
f(`* | stats uniq_values() q`, `*`, ``)
f(`* | stats uniq_values(*) q`, `*`, ``)
f(`* | stats uniq_values(x) q`, `x`, ``)
f(`* | stats values() q`, `*`, ``)
f(`* | stats values(*) q`, `*`, ``)
f(`* | stats values(x) q`, `x`, ``)
f(`_time:5m | stats by(_time:day) count() r1 | stats values(_time) r2`, `_time`, ``) f(`_time:5m | stats by(_time:day) count() r1 | stats values(_time) r2`, `_time`, ``)
f(`_time:1y | stats (_time:1w) count() r1 | stats count() r2`, `_time`, ``) f(`_time:1y | stats (_time:1w) count() r1 | stats count() r2`, `_time`, ``)

View file

@ -2,7 +2,6 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"strings" "strings"
"sync/atomic" "sync/atomic"
"unsafe" "unsafe"
@ -831,24 +830,6 @@ func tryParseBucketSize(s string) (float64, bool) {
return 0, false return 0, false
} }
// parseFieldNamesForStatsFunc parses field names for statsFunc.
//
// It returns ["*"] if the fields names list is empty or if it contains "*" field.
func parseFieldNamesForStatsFunc(lex *lexer, funcName string) ([]string, error) {
if !lex.isKeyword(funcName) {
return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err)
}
if len(fields) == 0 || slices.Contains(fields, "*") {
fields = []string{"*"}
}
return fields, nil
}
func parseFieldNamesInParens(lex *lexer) ([]string, error) { func parseFieldNamesInParens(lex *lexer) ([]string, error) {
if !lex.isKeyword("(") { if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`") return nil, fmt.Errorf("missing `(`")

View file

@ -1,22 +1,23 @@
package logstorage package logstorage
import ( import (
"fmt"
"slices" "slices"
"strconv" "strconv"
"strings"
"unsafe" "unsafe"
) )
type statsAvg struct { type statsAvg struct {
fields []string fields []string
containsStar bool
} }
func (sa *statsAvg) String() string { func (sa *statsAvg) String() string {
return "avg(" + fieldNamesString(sa.fields) + ")" return "avg(" + statsFuncFieldsToString(sa.fields) + ")"
} }
func (sa *statsAvg) updateNeededFields(neededFields fieldsSet) { func (sa *statsAvg) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(sa.fields) updateNeededFieldsForStatsFunc(neededFields, sa.fields)
} }
func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) { func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) {
@ -34,7 +35,8 @@ type statsAvgProcessor struct {
} }
func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int { func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int {
if sap.sa.containsStar { fields := sap.sa.fields
if len(fields) == 0 {
// Scan all the columns // Scan all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, count := c.sumValues(br) f, count := c.sumValues(br)
@ -43,7 +45,7 @@ func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int {
} }
} else { } else {
// Scan the requested columns // Scan the requested columns
for _, field := range sap.sa.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, count := c.sumValues(br) f, count := c.sumValues(br)
sap.sum += f sap.sum += f
@ -54,7 +56,8 @@ func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int {
} }
func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if sap.sa.containsStar { fields := sap.sa.fields
if len(fields) == 0 {
// Scan all the fields for the given row // Scan all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, ok := c.getFloatValueAtRow(br, rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
@ -65,7 +68,7 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
} }
} else { } else {
// Scan only the given fields for the given row // Scan only the given fields for the given row
for _, field := range sap.sa.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, ok := c.getFloatValueAtRow(br, rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
@ -89,13 +92,46 @@ func (sap *statsAvgProcessor) finalizeStats() string {
} }
func parseStatsAvg(lex *lexer) (*statsAvg, error) { func parseStatsAvg(lex *lexer) (*statsAvg, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "avg") fields, err := parseStatsFuncFields(lex, "avg")
if err != nil { if err != nil {
return nil, err return nil, err
} }
sa := &statsAvg{ sa := &statsAvg{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return sa, nil return sa, nil
} }
func parseStatsFuncFields(lex *lexer, funcName string) ([]string, error) {
if !lex.isKeyword(funcName) {
return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err)
}
if len(fields) == 0 || slices.Contains(fields, "*") {
fields = nil
}
return fields, nil
}
func statsFuncFieldsToString(fields []string) string {
if len(fields) == 0 {
return "*"
}
a := make([]string, len(fields))
for i, f := range fields {
a[i] = quoteTokenIfNeeded(f)
}
return strings.Join(a, ", ")
}
func updateNeededFieldsForStatsFunc(neededFields fieldsSet, fields []string) {
if len(fields) == 0 {
neededFields.add("*")
}
neededFields.addFields(fields)
}

View file

@ -10,15 +10,14 @@ import (
type statsCount struct { type statsCount struct {
fields []string fields []string
containsStar bool
} }
func (sc *statsCount) String() string { func (sc *statsCount) String() string {
return "count(" + fieldNamesString(sc.fields) + ")" return "count(" + statsFuncFieldsToString(sc.fields) + ")"
} }
func (sc *statsCount) updateNeededFields(neededFields fieldsSet) { func (sc *statsCount) updateNeededFields(neededFields fieldsSet) {
if sc.containsStar { if len(sc.fields) == 0 {
// There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps) // There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps)
return return
} }
@ -40,7 +39,7 @@ type statsCountProcessor struct {
func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
fields := scp.sc.fields fields := scp.sc.fields
if scp.sc.containsStar { if len(fields) == 0 {
// Fast path - unconditionally count all the columns. // Fast path - unconditionally count all the columns.
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
return 0 return 0
@ -138,7 +137,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
fields := scp.sc.fields fields := scp.sc.fields
if scp.sc.containsStar { if len(fields) == 0 {
// Fast path - unconditionally count the given column // Fast path - unconditionally count the given column
scp.rowsCount++ scp.rowsCount++
return 0 return 0
@ -200,13 +199,12 @@ func (scp *statsCountProcessor) finalizeStats() string {
} }
func parseStatsCount(lex *lexer) (*statsCount, error) { func parseStatsCount(lex *lexer) (*statsCount, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "count") fields, err := parseStatsFuncFields(lex, "count")
if err != nil { if err != nil {
return nil, err return nil, err
} }
sc := &statsCount{ sc := &statsCount{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return sc, nil return sc, nil
} }

View file

@ -10,15 +10,14 @@ import (
type statsCountEmpty struct { type statsCountEmpty struct {
fields []string fields []string
containsStar bool
} }
func (sc *statsCountEmpty) String() string { func (sc *statsCountEmpty) String() string {
return "count_empty(" + fieldNamesString(sc.fields) + ")" return "count_empty(" + statsFuncFieldsToString(sc.fields) + ")"
} }
func (sc *statsCountEmpty) updateNeededFields(neededFields fieldsSet) { func (sc *statsCountEmpty) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(sc.fields) updateNeededFieldsForStatsFunc(neededFields, sc.fields)
} }
func (sc *statsCountEmpty) newStatsProcessor() (statsProcessor, int) { func (sc *statsCountEmpty) newStatsProcessor() (statsProcessor, int) {
@ -36,7 +35,7 @@ type statsCountEmptyProcessor struct {
func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int { func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int {
fields := scp.sc.fields fields := scp.sc.fields
if scp.sc.containsStar { if len(fields) == 0 {
bm := getBitmap(len(br.timestamps)) bm := getBitmap(len(br.timestamps))
bm.setBits() bm.setBits()
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
@ -133,7 +132,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int
func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
fields := scp.sc.fields fields := scp.sc.fields
if scp.sc.containsStar { if len(fields) == 0 {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
if v := c.getValueAtRow(br, rowIdx); v != "" { if v := c.getValueAtRow(br, rowIdx); v != "" {
return 0 return 0
@ -197,13 +196,12 @@ func (scp *statsCountEmptyProcessor) finalizeStats() string {
} }
func parseStatsCountEmpty(lex *lexer) (*statsCountEmpty, error) { func parseStatsCountEmpty(lex *lexer) (*statsCountEmpty, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "count_empty") fields, err := parseStatsFuncFields(lex, "count_empty")
if err != nil { if err != nil {
return nil, err return nil, err
} }
sc := &statsCountEmpty{ sc := &statsCountEmpty{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return sc, nil return sc, nil
} }

View file

@ -2,7 +2,6 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"strconv" "strconv"
"unsafe" "unsafe"
@ -12,12 +11,11 @@ import (
type statsCountUniq struct { type statsCountUniq struct {
fields []string fields []string
containsStar bool
limit uint64 limit uint64
} }
func (su *statsCountUniq) String() string { func (su *statsCountUniq) String() string {
s := "count_uniq(" + fieldNamesString(su.fields) + ")" s := "count_uniq(" + statsFuncFieldsToString(su.fields) + ")"
if su.limit > 0 { if su.limit > 0 {
s += fmt.Sprintf(" limit %d", su.limit) s += fmt.Sprintf(" limit %d", su.limit)
} }
@ -25,7 +23,7 @@ func (su *statsCountUniq) String() string {
} }
func (su *statsCountUniq) updateNeededFields(neededFields fieldsSet) { func (su *statsCountUniq) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(su.fields) updateNeededFieldsForStatsFunc(neededFields, su.fields)
} }
func (su *statsCountUniq) newStatsProcessor() (statsProcessor, int) { func (su *statsCountUniq) newStatsProcessor() (statsProcessor, int) {
@ -52,17 +50,23 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
} }
fields := sup.su.fields fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0 stateSizeIncrease := 0
if sup.su.containsStar { if len(fields) == 0 {
// Count unique rows // Count unique rows
cs := br.getColumns() cs := br.getColumns()
columnValues := sup.columnValues[:0]
for _, c := range cs {
values := c.getValues(br)
columnValues = append(columnValues, values)
}
sup.columnValues = columnValues
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
for i := range br.timestamps { for i := range br.timestamps {
seenKey := true seenKey := true
for _, c := range cs { for _, values := range columnValues {
values := c.getValues(br)
if i == 0 || values[i-1] != values[i] { if i == 0 || values[i-1] != values[i] {
seenKey = false seenKey = false
break break
@ -75,23 +79,20 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
allEmptyValues := true allEmptyValues := true
keyBuf = keyBuf[:0] keyBuf = keyBuf[:0]
for _, c := range cs { for j, values := range columnValues {
v := c.getValueAtRow(br, i) v := values[i]
if v != "" { if v != "" {
allEmptyValues = false allEmptyValues = false
} }
// Put column name into key, since every block can contain different set of columns for '*' selector. // Put column name into key, since every block can contain different set of columns for '*' selector.
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(cs[j].name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
} }
if allEmptyValues { if allEmptyValues {
// Do not count empty values // Do not count empty values
continue continue
} }
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
} }
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
@ -112,10 +113,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
} }
keyBuf = append(keyBuf[:0], 1) keyBuf = append(keyBuf[:0], 1)
keyBuf = encoding.MarshalInt64(keyBuf, timestamp) keyBuf = encoding.MarshalInt64(keyBuf, timestamp)
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
} }
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
@ -130,10 +128,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf[:0], 0)
keyBuf = append(keyBuf, v...) keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }
@ -147,10 +142,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
} }
keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf[:0], 0)
keyBuf = append(keyBuf, v...) keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
} }
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
@ -170,10 +162,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
} }
keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf[:0], 0)
keyBuf = append(keyBuf, v...) keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
} }
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
@ -216,10 +205,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
// Do not count empty values // Do not count empty values
continue continue
} }
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
} }
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
@ -231,10 +217,9 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
} }
fields := sup.su.fields fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0 stateSizeIncrease := 0
if sup.su.containsStar { if len(fields) == 0 {
// Count unique rows // Count unique rows
allEmptyValues := true allEmptyValues := true
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
@ -253,10 +238,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
// Do not count empty values // Do not count empty values
return stateSizeIncrease return stateSizeIncrease
} }
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease return stateSizeIncrease
} }
if len(fields) == 1 { if len(fields) == 1 {
@ -269,10 +251,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 1) keyBuf = append(keyBuf[:0], 1)
keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx]) keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx])
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }
@ -286,10 +265,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf[:0], 0)
keyBuf = append(keyBuf, v...) keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }
@ -305,10 +281,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf[:0], 0)
keyBuf = append(keyBuf, v...) keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }
@ -322,10 +295,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0) keyBuf = append(keyBuf[:0], 0)
keyBuf = append(keyBuf, v...) keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }
@ -347,10 +317,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
// Do not count empty values // Do not count empty values
return stateSizeIncrease return stateSizeIncrease
} }
if _, ok := m[string(keyBuf)]; !ok { stateSizeIncrease += sup.updateState(keyBuf)
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease return stateSizeIncrease
} }
@ -376,19 +343,27 @@ func (sup *statsCountUniqProcessor) finalizeStats() string {
return strconv.FormatUint(n, 10) return strconv.FormatUint(n, 10)
} }
func (sup *statsCountUniqProcessor) updateState(v []byte) int {
stateSizeIncrease := 0
if _, ok := sup.m[string(v)]; !ok {
sup.m[string(v)] = struct{}{}
stateSizeIncrease += len(v) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
func (sup *statsCountUniqProcessor) limitReached() bool { func (sup *statsCountUniqProcessor) limitReached() bool {
limit := sup.su.limit limit := sup.su.limit
return limit > 0 && uint64(len(sup.m)) >= limit return limit > 0 && uint64(len(sup.m)) >= limit
} }
func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) { func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "count_uniq") fields, err := parseStatsFuncFields(lex, "count_uniq")
if err != nil { if err != nil {
return nil, err return nil, err
} }
su := &statsCountUniq{ su := &statsCountUniq{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
if lex.isKeyword("limit") { if lex.isKeyword("limit") {
lex.nextToken() lex.nextToken()

View file

@ -0,0 +1,373 @@
package logstorage
import (
"testing"
)
func TestParseStatsCountUniqSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncSuccess(t, pipeStr)
}
f(`count_uniq(*)`)
f(`count_uniq(a)`)
f(`count_uniq(a, b)`)
f(`count_uniq(*) limit 10`)
f(`count_uniq(a) limit 20`)
f(`count_uniq(a, b) limit 5`)
}
func TestParseStatsCountUniqFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncFailure(t, pipeStr)
}
f(`count_uniq`)
f(`count_uniq(a b)`)
f(`count_uniq(x) y`)
f(`count_uniq(x) limit`)
f(`count_uniq(x) limit N`)
}
func TestStatsCountUniq(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("stats count_uniq(*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", "3"},
},
})
f("stats count_uniq(*) limit 2 as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", "2"},
},
})
f("stats count_uniq(*) limit 10 as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", "3"},
},
})
f("stats count_uniq(b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", "2"},
},
})
f("stats count_uniq(a, b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{},
{
{"aa", `3`},
{"bb", `54`},
},
}, [][]Field{
{
{"x", "2"},
},
})
f("stats count_uniq(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `54`},
},
}, [][]Field{
{
{"x", "0"},
},
})
f("stats count_uniq(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"b", `54`},
},
}, [][]Field{
{
{"x", "1"},
},
})
f("stats by (a) count_uniq(b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"b", `5`},
},
{
{"a", `3`},
{"b", `7`},
},
}, [][]Field{
{
{"a", "1"},
{"x", "1"},
},
{
{"a", "3"},
{"x", "2"},
},
})
f("stats by (a) count_uniq(b) if (!c:foo) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"b", "aadf"},
{"c", "foo"},
},
{
{"a", `3`},
{"b", `5`},
{"c", "bar"},
},
{
{"a", `3`},
},
}, [][]Field{
{
{"a", "1"},
{"x", "1"},
},
{
{"a", "3"},
{"x", "1"},
},
})
f("stats by (a) count_uniq(*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"c", "3"},
},
{},
{
{"a", `3`},
{"b", `5`},
},
}, [][]Field{
{
{"a", ""},
{"x", "0"},
},
{
{"a", "1"},
{"x", "2"},
},
{
{"a", "3"},
{"x", "1"},
},
})
f("stats by (a) count_uniq(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `3`},
{"c", `5`},
},
{
{"a", `3`},
{"b", `7`},
},
}, [][]Field{
{
{"a", "1"},
{"x", "0"},
},
{
{"a", "3"},
{"x", "1"},
},
})
f("stats by (a) count_uniq(a, b, c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"c", "3"},
},
{
{"a", `3`},
{"b", `5`},
},
{
{"foo", "bar"},
},
{
{"a", `3`},
{"b", `7`},
},
}, [][]Field{
{
{"a", "1"},
{"x", "2"},
},
{
{"a", ""},
{"x", "0"},
},
{
{"a", "3"},
{"x", "2"},
},
})
f("stats by (a, b) count_uniq(a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"c", "3"},
},
{
{"c", `3`},
{"b", `5`},
},
}, [][]Field{
{
{"a", "1"},
{"b", "3"},
{"x", "1"},
},
{
{"a", "1"},
{"b", ""},
{"x", "1"},
},
{
{"a", ""},
{"b", "5"},
{"x", "0"},
},
})
}

View file

@ -14,23 +14,23 @@ import (
type statsFieldsMax struct { type statsFieldsMax struct {
srcField string srcField string
resultFields []string fetchFields []string
} }
func (sm *statsFieldsMax) String() string { func (sm *statsFieldsMax) String() string {
s := "fields_max(" + quoteTokenIfNeeded(sm.srcField) s := "fields_max(" + quoteTokenIfNeeded(sm.srcField)
if len(sm.resultFields) > 0 { if len(sm.fetchFields) > 0 {
s += ", " + fieldNamesString(sm.resultFields) s += ", " + fieldNamesString(sm.fetchFields)
} }
s += ")" s += ")"
return s return s
} }
func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) { func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) {
if len(sm.resultFields) == 0 { if len(sm.fetchFields) == 0 {
neededFields.add("*") neededFields.add("*")
} else { } else {
neededFields.addFields(sm.resultFields) neededFields.addFields(sm.fetchFields)
} }
neededFields.add(sm.srcField) neededFields.add(sm.srcField)
} }
@ -177,7 +177,8 @@ func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowId
clear(fields) clear(fields)
fields = fields[:0] fields = fields[:0]
if len(smp.sm.resultFields) == 0 { fetchFields := smp.sm.fetchFields
if len(fetchFields) == 0 {
cs := br.getColumns() cs := br.getColumns()
for _, c := range cs { for _, c := range cs {
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
@ -188,7 +189,7 @@ func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowId
stateSizeIncrease += len(c.name) + len(v) stateSizeIncrease += len(c.name) + len(v)
} }
} else { } else {
for _, field := range smp.sm.resultFields { for _, field := range fetchFields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{ fields = append(fields, Field{
@ -227,14 +228,14 @@ func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) {
} }
srcField := fields[0] srcField := fields[0]
resultFields := fields[1:] fetchFields := fields[1:]
if slices.Contains(resultFields, "*") { if slices.Contains(fetchFields, "*") {
resultFields = nil fetchFields = nil
} }
sm := &statsFieldsMax{ sm := &statsFieldsMax{
srcField: srcField, srcField: srcField,
resultFields: resultFields, fetchFields: fetchFields,
} }
return sm, nil return sm, nil
} }

View file

@ -14,23 +14,23 @@ import (
type statsFieldsMin struct { type statsFieldsMin struct {
srcField string srcField string
resultFields []string fetchFields []string
} }
func (sm *statsFieldsMin) String() string { func (sm *statsFieldsMin) String() string {
s := "fields_min(" + quoteTokenIfNeeded(sm.srcField) s := "fields_min(" + quoteTokenIfNeeded(sm.srcField)
if len(sm.resultFields) > 0 { if len(sm.fetchFields) > 0 {
s += ", " + fieldNamesString(sm.resultFields) s += ", " + fieldNamesString(sm.fetchFields)
} }
s += ")" s += ")"
return s return s
} }
func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) { func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) {
if len(sm.resultFields) == 0 { if len(sm.fetchFields) == 0 {
neededFields.add("*") neededFields.add("*")
} else { } else {
neededFields.addFields(sm.resultFields) neededFields.addFields(sm.fetchFields)
} }
neededFields.add(sm.srcField) neededFields.add(sm.srcField)
} }
@ -177,7 +177,8 @@ func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowId
clear(fields) clear(fields)
fields = fields[:0] fields = fields[:0]
if len(smp.sm.resultFields) == 0 { fetchFields := smp.sm.fetchFields
if len(fetchFields) == 0 {
cs := br.getColumns() cs := br.getColumns()
for _, c := range cs { for _, c := range cs {
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
@ -188,7 +189,7 @@ func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowId
stateSizeIncrease += len(c.name) + len(v) stateSizeIncrease += len(c.name) + len(v)
} }
} else { } else {
for _, field := range smp.sm.resultFields { for _, field := range fetchFields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{ fields = append(fields, Field{
@ -227,14 +228,14 @@ func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) {
} }
srcField := fields[0] srcField := fields[0]
resultFields := fields[1:] fetchFields := fields[1:]
if slices.Contains(resultFields, "*") { if slices.Contains(fetchFields, "*") {
resultFields = nil fetchFields = nil
} }
sm := &statsFieldsMin{ sm := &statsFieldsMin{
srcField: srcField, srcField: srcField,
resultFields: resultFields, fetchFields: fetchFields,
} }
return sm, nil return sm, nil
} }

View file

@ -2,7 +2,6 @@ package logstorage
import ( import (
"math" "math"
"slices"
"strings" "strings"
"unsafe" "unsafe"
@ -15,14 +14,11 @@ type statsMax struct {
} }
func (sm *statsMax) String() string { func (sm *statsMax) String() string {
if len(sm.fields) == 0 { return "max(" + statsFuncFieldsToString(sm.fields) + ")"
return "max(*)"
}
return "max(" + fieldNamesString(sm.fields) + ")"
} }
func (sm *statsMax) updateNeededFields(neededFields fieldsSet) { func (sm *statsMax) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(sm.fields) updateNeededFieldsForStatsFunc(neededFields, sm.fields)
} }
func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { func (sm *statsMax) newStatsProcessor() (statsProcessor, int) {
@ -168,13 +164,10 @@ func (smp *statsMaxProcessor) finalizeStats() string {
} }
func parseStatsMax(lex *lexer) (*statsMax, error) { func parseStatsMax(lex *lexer) (*statsMax, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "max") fields, err := parseStatsFuncFields(lex, "max")
if err != nil { if err != nil {
return nil, err return nil, err
} }
if slices.Contains(fields, "*") {
fields = nil
}
sm := &statsMax{ sm := &statsMax{
fields: fields, fields: fields,
} }

View file

@ -1,21 +1,19 @@
package logstorage package logstorage
import ( import (
"slices"
"unsafe" "unsafe"
) )
type statsMedian struct { type statsMedian struct {
fields []string fields []string
containsStar bool
} }
func (sm *statsMedian) String() string { func (sm *statsMedian) String() string {
return "median(" + fieldNamesString(sm.fields) + ")" return "median(" + statsFuncFieldsToString(sm.fields) + ")"
} }
func (sm *statsMedian) updateNeededFields(neededFields fieldsSet) { func (sm *statsMedian) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(sm.fields) updateNeededFieldsForStatsFunc(neededFields, sm.fields)
} }
func (sm *statsMedian) newStatsProcessor() (statsProcessor, int) { func (sm *statsMedian) newStatsProcessor() (statsProcessor, int) {
@ -23,7 +21,6 @@ func (sm *statsMedian) newStatsProcessor() (statsProcessor, int) {
sqp: &statsQuantileProcessor{ sqp: &statsQuantileProcessor{
sq: &statsQuantile{ sq: &statsQuantile{
fields: sm.fields, fields: sm.fields,
containsStar: sm.containsStar,
phi: 0.5, phi: 0.5,
}, },
}, },
@ -53,13 +50,12 @@ func (smp *statsMedianProcessor) finalizeStats() string {
} }
func parseStatsMedian(lex *lexer) (*statsMedian, error) { func parseStatsMedian(lex *lexer) (*statsMedian, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "median") fields, err := parseStatsFuncFields(lex, "median")
if err != nil { if err != nil {
return nil, err return nil, err
} }
sm := &statsMedian{ sm := &statsMedian{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return sm, nil return sm, nil
} }

View file

@ -2,7 +2,6 @@ package logstorage
import ( import (
"math" "math"
"slices"
"strings" "strings"
"unsafe" "unsafe"
@ -15,14 +14,11 @@ type statsMin struct {
} }
func (sm *statsMin) String() string { func (sm *statsMin) String() string {
if len(sm.fields) == 0 { return "min(" + statsFuncFieldsToString(sm.fields) + ")"
return "min(*)"
}
return "min(" + fieldNamesString(sm.fields) + ")"
} }
func (sm *statsMin) updateNeededFields(neededFields fieldsSet) { func (sm *statsMin) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(sm.fields) updateNeededFieldsForStatsFunc(neededFields, sm.fields)
} }
func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { func (sm *statsMin) newStatsProcessor() (statsProcessor, int) {
@ -41,14 +37,15 @@ type statsMinProcessor struct {
func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
minLen := len(smp.min) minLen := len(smp.min)
if len(smp.sm.fields) == 0 { fields := smp.sm.fields
if len(fields) == 0 {
// Find the minimum value across all the columns // Find the minimum value across all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
smp.updateStateForColumn(br, c) smp.updateStateForColumn(br, c)
} }
} else { } else {
// Find the minimum value across the requested columns // Find the minimum value across the requested columns
for _, field := range smp.sm.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
smp.updateStateForColumn(br, c) smp.updateStateForColumn(br, c)
} }
@ -60,7 +57,8 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
minLen := len(smp.min) minLen := len(smp.min)
if len(smp.sm.fields) == 0 { fields := smp.sm.fields
if len(fields) == 0 {
// Find the minimum value across all the fields for the given row // Find the minimum value across all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
@ -68,7 +66,7 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
} }
} else { } else {
// Find the minimum value across the requested fields for the given row // Find the minimum value across the requested fields for the given row
for _, field := range smp.sm.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
smp.updateStateString(v) smp.updateStateString(v)
@ -168,13 +166,10 @@ func (smp *statsMinProcessor) finalizeStats() string {
} }
func parseStatsMin(lex *lexer) (*statsMin, error) { func parseStatsMin(lex *lexer) (*statsMin, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "min") fields, err := parseStatsFuncFields(lex, "min")
if err != nil { if err != nil {
return nil, err return nil, err
} }
if slices.Contains(fields, "*") {
fields = nil
}
sm := &statsMin{ sm := &statsMin{
fields: fields, fields: fields,
} }

View file

@ -15,18 +15,22 @@ import (
type statsQuantile struct { type statsQuantile struct {
fields []string fields []string
containsStar bool
phi float64 phi float64
phiStr string phiStr string
} }
func (sq *statsQuantile) String() string { func (sq *statsQuantile) String() string {
return fmt.Sprintf("quantile(%s, %s)", sq.phiStr, fieldNamesString(sq.fields)) s := "quantile(" + sq.phiStr
if len(sq.fields) > 0 {
s += ", " + fieldNamesString(sq.fields)
}
s += ")"
return s
} }
func (sq *statsQuantile) updateNeededFields(neededFields fieldsSet) { func (sq *statsQuantile) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(sq.fields) updateNeededFieldsForStatsFunc(neededFields, sq.fields)
} }
func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) { func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) {
@ -45,12 +49,13 @@ type statsQuantileProcessor struct {
func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int { func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
if sqp.sq.containsStar { fields := sqp.sq.fields
if len(fields) == 0 {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
stateSizeIncrease += sqp.updateStateForColumn(br, c) stateSizeIncrease += sqp.updateStateForColumn(br, c)
} }
} else { } else {
for _, field := range sqp.sq.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
stateSizeIncrease += sqp.updateStateForColumn(br, c) stateSizeIncrease += sqp.updateStateForColumn(br, c)
} }
@ -63,7 +68,8 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int
h := &sqp.h h := &sqp.h
stateSizeIncrease := 0 stateSizeIncrease := 0
if sqp.sq.containsStar { fields := sqp.sq.fields
if len(fields) == 0 {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, ok := c.getFloatValueAtRow(br, rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
@ -71,7 +77,7 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int
} }
} }
} else { } else {
for _, field := range sqp.sq.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, ok := c.getFloatValueAtRow(br, rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
@ -182,8 +188,8 @@ func parseStatsQuantile(lex *lexer) (*statsQuantile, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err) return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err)
} }
if len(fields) < 2 { if len(fields) < 1 {
return nil, fmt.Errorf("'quantile' must have at least two args: phi and field name") return nil, fmt.Errorf("'quantile' must have at least phi arg")
} }
// Parse phi // Parse phi
@ -199,12 +205,11 @@ func parseStatsQuantile(lex *lexer) (*statsQuantile, error) {
// Parse fields // Parse fields
fields = fields[1:] fields = fields[1:]
if slices.Contains(fields, "*") { if slices.Contains(fields, "*") {
fields = []string{"*"} fields = nil
} }
sq := &statsQuantile{ sq := &statsQuantile{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
phi: phi, phi: phi,
phiStr: phiStr, phiStr: phiStr,

View file

@ -11,7 +11,7 @@ func TestParseStatsQuantileSuccess(t *testing.T) {
expectParseStatsFuncSuccess(t, pipeStr) expectParseStatsFuncSuccess(t, pipeStr)
} }
f(`quantile(0.3, *)`) f(`quantile(0.3)`)
f(`quantile(1, a)`) f(`quantile(1, a)`)
f(`quantile(0.99, a, b)`) f(`quantile(0.99, a, b)`)
} }
@ -36,7 +36,7 @@ func TestStatsQuantile(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected) expectPipeResults(t, pipeStr, rows, rowsExpected)
} }
f("stats quantile(0.9, *) as x", [][]Field{ f("stats quantile(0.9) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `2`}, {"a", `2`},
@ -211,7 +211,7 @@ func TestStatsQuantile(t *testing.T) {
}, },
}) })
f("stats by (a) quantile(0.9, *) as x", [][]Field{ f("stats by (a) quantile(0.9) as x", [][]Field{
{ {
{"_msg", `abc`}, {"_msg", `abc`},
{"a", `1`}, {"a", `1`},

View file

@ -2,22 +2,20 @@ package logstorage
import ( import (
"math" "math"
"slices"
"strconv" "strconv"
"unsafe" "unsafe"
) )
type statsSum struct { type statsSum struct {
fields []string fields []string
containsStar bool
} }
func (ss *statsSum) String() string { func (ss *statsSum) String() string {
return "sum(" + fieldNamesString(ss.fields) + ")" return "sum(" + statsFuncFieldsToString(ss.fields) + ")"
} }
func (ss *statsSum) updateNeededFields(neededFields fieldsSet) { func (ss *statsSum) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(ss.fields) updateNeededFieldsForStatsFunc(neededFields, ss.fields)
} }
func (ss *statsSum) newStatsProcessor() (statsProcessor, int) { func (ss *statsSum) newStatsProcessor() (statsProcessor, int) {
@ -35,14 +33,15 @@ type statsSumProcessor struct {
} }
func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int { func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int {
if ssp.ss.containsStar { fields := ssp.ss.fields
if len(fields) == 0 {
// Sum all the columns // Sum all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
ssp.updateStateForColumn(br, c) ssp.updateStateForColumn(br, c)
} }
} else { } else {
// Sum the requested columns // Sum the requested columns
for _, field := range ssp.ss.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
ssp.updateStateForColumn(br, c) ssp.updateStateForColumn(br, c)
} }
@ -51,7 +50,8 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int {
} }
func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if ssp.ss.containsStar { fields := ssp.ss.fields
if len(fields) == 0 {
// Sum all the fields for the given row // Sum all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, ok := c.getFloatValueAtRow(br, rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
@ -61,7 +61,7 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
} }
} else { } else {
// Sum only the given fields for the given row // Sum only the given fields for the given row
for _, field := range ssp.ss.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, ok := c.getFloatValueAtRow(br, rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
@ -99,13 +99,12 @@ func (ssp *statsSumProcessor) finalizeStats() string {
} }
func parseStatsSum(lex *lexer) (*statsSum, error) { func parseStatsSum(lex *lexer) (*statsSum, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "sum") fields, err := parseStatsFuncFields(lex, "sum")
if err != nil { if err != nil {
return nil, err return nil, err
} }
ss := &statsSum{ ss := &statsSum{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return ss, nil return ss, nil
} }

View file

@ -1,22 +1,20 @@
package logstorage package logstorage
import ( import (
"slices"
"strconv" "strconv"
"unsafe" "unsafe"
) )
type statsSumLen struct { type statsSumLen struct {
fields []string fields []string
containsStar bool
} }
func (ss *statsSumLen) String() string { func (ss *statsSumLen) String() string {
return "sum_len(" + fieldNamesString(ss.fields) + ")" return "sum_len(" + statsFuncFieldsToString(ss.fields) + ")"
} }
func (ss *statsSumLen) updateNeededFields(neededFields fieldsSet) { func (ss *statsSumLen) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(ss.fields) updateNeededFieldsForStatsFunc(neededFields, ss.fields)
} }
func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) { func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) {
@ -34,14 +32,15 @@ type statsSumLenProcessor struct {
} }
func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int { func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int {
if ssp.ss.containsStar { fields := ssp.ss.fields
if len(fields) == 0 {
// Sum all the columns // Sum all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
ssp.sumLen += c.sumLenValues(br) ssp.sumLen += c.sumLenValues(br)
} }
} else { } else {
// Sum the requested columns // Sum the requested columns
for _, field := range ssp.ss.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
ssp.sumLen += c.sumLenValues(br) ssp.sumLen += c.sumLenValues(br)
} }
@ -50,7 +49,8 @@ func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int {
} }
func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if ssp.ss.containsStar { fields := ssp.ss.fields
if len(fields) == 0 {
// Sum all the fields for the given row // Sum all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
@ -58,7 +58,7 @@ func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int)
} }
} else { } else {
// Sum only the given fields for the given row // Sum only the given fields for the given row
for _, field := range ssp.ss.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
ssp.sumLen += uint64(len(v)) ssp.sumLen += uint64(len(v))
@ -77,13 +77,12 @@ func (ssp *statsSumLenProcessor) finalizeStats() string {
} }
func parseStatsSumLen(lex *lexer) (*statsSumLen, error) { func parseStatsSumLen(lex *lexer) (*statsSumLen, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "sum_len") fields, err := parseStatsFuncFields(lex, "sum_len")
if err != nil { if err != nil {
return nil, err return nil, err
} }
ss := &statsSumLen{ ss := &statsSumLen{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
return ss, nil return ss, nil
} }

View file

@ -12,12 +12,11 @@ import (
type statsUniqValues struct { type statsUniqValues struct {
fields []string fields []string
containsStar bool
limit uint64 limit uint64
} }
func (su *statsUniqValues) String() string { func (su *statsUniqValues) String() string {
s := "uniq_values(" + fieldNamesString(su.fields) + ")" s := "uniq_values(" + statsFuncFieldsToString(su.fields) + ")"
if su.limit > 0 { if su.limit > 0 {
s += fmt.Sprintf(" limit %d", su.limit) s += fmt.Sprintf(" limit %d", su.limit)
} }
@ -25,7 +24,7 @@ func (su *statsUniqValues) String() string {
} }
func (su *statsUniqValues) updateNeededFields(neededFields fieldsSet) { func (su *statsUniqValues) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(su.fields) updateNeededFieldsForStatsFunc(neededFields, su.fields)
} }
func (su *statsUniqValues) newStatsProcessor() (statsProcessor, int) { func (su *statsUniqValues) newStatsProcessor() (statsProcessor, int) {
@ -50,12 +49,13 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int
} }
stateSizeIncrease := 0 stateSizeIncrease := 0
if sup.su.containsStar { fields := sup.su.fields
if len(fields) == 0 {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br) stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br)
} }
} else { } else {
for _, field := range sup.su.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br) stateSizeIncrease += sup.updateStatsForAllRowsColumn(c, br)
} }
@ -64,7 +64,6 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int
} }
func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int { func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int {
m := sup.m
stateSizeIncrease := 0 stateSizeIncrease := 0
if c.isConst { if c.isConst {
// collect unique const values // collect unique const values
@ -73,11 +72,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC
// skip empty values // skip empty values
return stateSizeIncrease return stateSizeIncrease
} }
if _, ok := m[v]; !ok { stateSizeIncrease += sup.updateState(v)
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
return stateSizeIncrease return stateSizeIncrease
} }
if c.valueType == valueTypeDict { if c.valueType == valueTypeDict {
@ -87,11 +82,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC
// skip empty values // skip empty values
continue continue
} }
if _, ok := m[v]; !ok { stateSizeIncrease += sup.updateState(v)
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
} }
return stateSizeIncrease return stateSizeIncrease
} }
@ -107,11 +98,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC
// This value has been already counted. // This value has been already counted.
continue continue
} }
if _, ok := m[v]; !ok { stateSizeIncrease += sup.updateState(v)
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
} }
return stateSizeIncrease return stateSizeIncrease
} }
@ -123,12 +110,13 @@ func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx i
} }
stateSizeIncrease := 0 stateSizeIncrease := 0
if sup.su.containsStar { fields := sup.su.fields
if len(fields) == 0 {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx) stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx)
} }
} else { } else {
for _, field := range sup.su.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx) stateSizeIncrease += sup.updateStatsForRowColumn(c, br, rowIdx)
} }
@ -137,7 +125,6 @@ func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx i
} }
func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int { func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int {
m := sup.m
stateSizeIncrease := 0 stateSizeIncrease := 0
if c.isConst { if c.isConst {
// collect unique const values // collect unique const values
@ -146,11 +133,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum
// skip empty values // skip empty values
return stateSizeIncrease return stateSizeIncrease
} }
if _, ok := m[v]; !ok { stateSizeIncrease += sup.updateState(v)
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
return stateSizeIncrease return stateSizeIncrease
} }
if c.valueType == valueTypeDict { if c.valueType == valueTypeDict {
@ -162,11 +145,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum
// skip empty values // skip empty values
return stateSizeIncrease return stateSizeIncrease
} }
if _, ok := m[v]; !ok { stateSizeIncrease += sup.updateState(v)
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
return stateSizeIncrease return stateSizeIncrease
} }
@ -176,11 +155,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum
// skip empty values // skip empty values
return stateSizeIncrease return stateSizeIncrease
} }
if _, ok := m[v]; !ok { stateSizeIncrease += sup.updateState(v)
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
return stateSizeIncrease return stateSizeIncrease
} }
@ -190,10 +165,9 @@ func (sup *statsUniqValuesProcessor) mergeState(sfp statsProcessor) {
} }
src := sfp.(*statsUniqValuesProcessor) src := sfp.(*statsUniqValuesProcessor)
m := sup.m
for k := range src.m { for k := range src.m {
if _, ok := m[k]; !ok { if _, ok := sup.m[k]; !ok {
m[k] = struct{}{} sup.m[k] = struct{}{}
} }
} }
} }
@ -228,6 +202,16 @@ func sortStrings(a []string) {
}) })
} }
func (sup *statsUniqValuesProcessor) updateState(v string) int {
stateSizeIncrease := 0
if _, ok := sup.m[v]; !ok {
vCopy := strings.Clone(v)
sup.m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
return stateSizeIncrease
}
func (sup *statsUniqValuesProcessor) limitReached() bool { func (sup *statsUniqValuesProcessor) limitReached() bool {
limit := sup.su.limit limit := sup.su.limit
return limit > 0 && uint64(len(sup.m)) >= limit return limit > 0 && uint64(len(sup.m)) >= limit
@ -255,13 +239,12 @@ func marshalJSONArray(items []string) string {
} }
func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) { func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "uniq_values") fields, err := parseStatsFuncFields(lex, "uniq_values")
if err != nil { if err != nil {
return nil, err return nil, err
} }
su := &statsUniqValues{ su := &statsUniqValues{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
if lex.isKeyword("limit") { if lex.isKeyword("limit") {
lex.nextToken() lex.nextToken()

View file

@ -2,19 +2,17 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"strings" "strings"
"unsafe" "unsafe"
) )
type statsValues struct { type statsValues struct {
fields []string fields []string
containsStar bool
limit uint64 limit uint64
} }
func (sv *statsValues) String() string { func (sv *statsValues) String() string {
s := "values(" + fieldNamesString(sv.fields) + ")" s := "values(" + statsFuncFieldsToString(sv.fields) + ")"
if sv.limit > 0 { if sv.limit > 0 {
s += fmt.Sprintf(" limit %d", sv.limit) s += fmt.Sprintf(" limit %d", sv.limit)
} }
@ -22,7 +20,7 @@ func (sv *statsValues) String() string {
} }
func (sv *statsValues) updateNeededFields(neededFields fieldsSet) { func (sv *statsValues) updateNeededFields(neededFields fieldsSet) {
neededFields.addFields(sv.fields) updateNeededFieldsForStatsFunc(neededFields, sv.fields)
} }
func (sv *statsValues) newStatsProcessor() (statsProcessor, int) { func (sv *statsValues) newStatsProcessor() (statsProcessor, int) {
@ -45,12 +43,13 @@ func (svp *statsValuesProcessor) updateStatsForAllRows(br *blockResult) int {
} }
stateSizeIncrease := 0 stateSizeIncrease := 0
if svp.sv.containsStar { fields := svp.sv.fields
if len(fields) == 0 {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br) stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br)
} }
} else { } else {
for _, field := range svp.sv.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br) stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br)
} }
@ -112,12 +111,13 @@ func (svp *statsValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int)
} }
stateSizeIncrease := 0 stateSizeIncrease := 0
if svp.sv.containsStar { fields := svp.sv.fields
if len(fields) == 0 {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx) stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx)
} }
} else { } else {
for _, field := range svp.sv.fields { for _, field := range fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx) stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx)
} }
@ -188,13 +188,12 @@ func (svp *statsValuesProcessor) limitReached() bool {
} }
func parseStatsValues(lex *lexer) (*statsValues, error) { func parseStatsValues(lex *lexer) (*statsValues, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "values") fields, err := parseStatsFuncFields(lex, "values")
if err != nil { if err != nil {
return nil, err return nil, err
} }
sv := &statsValues{ sv := &statsValues{
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"),
} }
if lex.isKeyword("limit") { if lex.isKeyword("limit") {
lex.nextToken() lex.nextToken()