mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
7fd9d31e90
commit
ef504e8d9a
4 changed files with 233 additions and 8 deletions
|
@ -858,6 +858,10 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||||
f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`)
|
f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`)
|
||||||
f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`)
|
f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`)
|
||||||
|
|
||||||
|
// stats pipe uniq_array
|
||||||
|
f(`* | stats uniq_array(foo) bar`, `* | stats uniq_array(foo) as bar`)
|
||||||
|
f(`* | stats by(x, y) uniq_array(foo) as baz`, `* | stats by (x, y) uniq_array(foo) as baz`)
|
||||||
|
|
||||||
// stats pipe multiple funcs
|
// stats pipe multiple funcs
|
||||||
f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`)
|
f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`)
|
||||||
f(`* | stats by (x, y) count(*) foo, uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, uniq(a, b) as bar`)
|
f(`* | stats by (x, y) count(*) foo, uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, uniq(a, b) as bar`)
|
||||||
|
@ -1136,6 +1140,13 @@ func TestParseQueryFailure(t *testing.T) {
|
||||||
f(`foo | stats uniq`)
|
f(`foo | stats uniq`)
|
||||||
f(`foo | stats uniq()`)
|
f(`foo | stats uniq()`)
|
||||||
|
|
||||||
|
// invalid stats uniq_array
|
||||||
|
f(`foo | stats uniq_array`)
|
||||||
|
f(`foo | stats uniq_array()`)
|
||||||
|
f(`foo | stats uniq_array() as foo`)
|
||||||
|
f(`foo | stats uniq_array(a,b) as foo`)
|
||||||
|
f(`foo | stats uniq_array(*) as foo`)
|
||||||
|
|
||||||
// invalid grouping fields
|
// invalid grouping fields
|
||||||
f(`foo | stats by(foo:bar) count() baz`)
|
f(`foo | stats by(foo:bar) count() baz`)
|
||||||
f(`foo | stats by(foo:/bar) count() baz`)
|
f(`foo | stats by(foo:/bar) count() baz`)
|
||||||
|
|
|
@ -439,17 +439,17 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
|
||||||
var sf statsFunc
|
var sf statsFunc
|
||||||
switch {
|
switch {
|
||||||
case lex.isKeyword("count"):
|
case lex.isKeyword("count"):
|
||||||
sfc, err := parseStatsCount(lex)
|
scs, err := parseStatsCount(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", fmt.Errorf("cannot parse 'count' func: %w", err)
|
return nil, "", fmt.Errorf("cannot parse 'count' func: %w", err)
|
||||||
}
|
}
|
||||||
sf = sfc
|
sf = scs
|
||||||
case lex.isKeyword("uniq"):
|
case lex.isKeyword("uniq"):
|
||||||
sfu, err := parseStatsUniq(lex)
|
sus, err := parseStatsUniq(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", fmt.Errorf("cannot parse 'uniq' func: %w", err)
|
return nil, "", fmt.Errorf("cannot parse 'uniq' func: %w", err)
|
||||||
}
|
}
|
||||||
sf = sfu
|
sf = sus
|
||||||
case lex.isKeyword("sum"):
|
case lex.isKeyword("sum"):
|
||||||
sfs, err := parseStatsSum(lex)
|
sfs, err := parseStatsSum(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -474,6 +474,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
|
||||||
return nil, "", fmt.Errorf("cannot parse 'avg' func: %w", err)
|
return nil, "", fmt.Errorf("cannot parse 'avg' func: %w", err)
|
||||||
}
|
}
|
||||||
sf = sas
|
sf = sas
|
||||||
|
case lex.isKeyword("uniq_array"):
|
||||||
|
sus, err := parseStatsUniqArray(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", fmt.Errorf("cannot parse 'uniq_array' func: %w", err)
|
||||||
|
}
|
||||||
|
sf = sus
|
||||||
default:
|
default:
|
||||||
return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
|
return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@ func (sup *statsUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
// Fast path for a single column.
|
// Fast path for a single column.
|
||||||
// The unique key is formed as "<is_time> <value_type>? <encodedValue>",
|
// The unique key is formed as "<is_time> <value_type>? <encodedValue>",
|
||||||
// where <value_type> is skipped if <is_time> == 1.
|
// where <value_type> is skipped if <is_time> == 1.
|
||||||
// This guarantees that keys do not clash for different column types acorss blocks.
|
// This guarantees that keys do not clash for different column types across blocks.
|
||||||
c := br.getColumnByName(fields[0])
|
c := br.getColumnByName(fields[0])
|
||||||
if c.isTime {
|
if c.isTime {
|
||||||
// Count unique br.timestamps
|
// Count unique br.timestamps
|
||||||
|
@ -251,7 +251,7 @@ func (sup *statsUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) in
|
||||||
// Fast path for a single column.
|
// Fast path for a single column.
|
||||||
// The unique key is formed as "<is_time> <value_type>? <encodedValue>",
|
// The unique key is formed as "<is_time> <value_type>? <encodedValue>",
|
||||||
// where <value_type> is skipped if <is_time> == 1.
|
// where <value_type> is skipped if <is_time> == 1.
|
||||||
// This guarantees that keys do not clash for different column types acorss blocks.
|
// This guarantees that keys do not clash for different column types across blocks.
|
||||||
c := br.getColumnByName(fields[0])
|
c := br.getColumnByName(fields[0])
|
||||||
if c.isTime {
|
if c.isTime {
|
||||||
// Count unique br.timestamps
|
// Count unique br.timestamps
|
||||||
|
@ -300,7 +300,7 @@ func (sup *statsUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) in
|
||||||
return stateSizeIncrease
|
return stateSizeIncrease
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count unique values across encodedValues
|
// Count unique values for the given rowIdx
|
||||||
encodedValues := c.getEncodedValues(br)
|
encodedValues := c.getEncodedValues(br)
|
||||||
v := encodedValues[rowIdx]
|
v := encodedValues[rowIdx]
|
||||||
if c.valueType == valueTypeString && v == "" {
|
if c.valueType == valueTypeString && v == "" {
|
||||||
|
@ -346,8 +346,10 @@ func (sup *statsUniqProcessor) mergeState(sfp statsProcessor) {
|
||||||
src := sfp.(*statsUniqProcessor)
|
src := sfp.(*statsUniqProcessor)
|
||||||
m := sup.m
|
m := sup.m
|
||||||
for k := range src.m {
|
for k := range src.m {
|
||||||
|
if _, ok := m[k]; !ok {
|
||||||
m[k] = struct{}{}
|
m[k] = struct{}{}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sup *statsUniqProcessor) finalizeStats() string {
|
func (sup *statsUniqProcessor) finalizeStats() string {
|
206
lib/logstorage/stats_uniq_array.go
Normal file
206
lib/logstorage/stats_uniq_array.go
Normal file
|
@ -0,0 +1,206 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type statsUniqArray struct {
|
||||||
|
field string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (su *statsUniqArray) String() string {
|
||||||
|
return "uniq_array(" + quoteTokenIfNeeded(su.field) + ")"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (su *statsUniqArray) neededFields() []string {
|
||||||
|
return []string{su.field}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (su *statsUniqArray) newStatsProcessor() (statsProcessor, int) {
|
||||||
|
sup := &statsUniqArrayProcessor{
|
||||||
|
su: su,
|
||||||
|
|
||||||
|
m: make(map[string]struct{}),
|
||||||
|
}
|
||||||
|
return sup, int(unsafe.Sizeof(*sup))
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsUniqArrayProcessor struct {
|
||||||
|
su *statsUniqArray
|
||||||
|
|
||||||
|
m map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sup *statsUniqArrayProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
|
field := sup.su.field
|
||||||
|
m := sup.m
|
||||||
|
|
||||||
|
stateSizeIncrease := 0
|
||||||
|
c := br.getColumnByName(field)
|
||||||
|
if c.isConst {
|
||||||
|
// collect unique const values
|
||||||
|
v := c.encodedValues[0]
|
||||||
|
if v == "" {
|
||||||
|
// skip empty values
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
if _, ok := m[v]; !ok {
|
||||||
|
vCopy := strings.Clone(v)
|
||||||
|
m[vCopy] = struct{}{}
|
||||||
|
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
|
||||||
|
}
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
if c.valueType == valueTypeDict {
|
||||||
|
// collect unique non-zero c.dictValues
|
||||||
|
for _, v := range c.dictValues {
|
||||||
|
if v == "" {
|
||||||
|
// skip empty values
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := m[v]; !ok {
|
||||||
|
vCopy := strings.Clone(v)
|
||||||
|
m[vCopy] = struct{}{}
|
||||||
|
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
|
||||||
|
// slow path - collect unique values across all rows
|
||||||
|
values := c.getValues(br)
|
||||||
|
for i, v := range values {
|
||||||
|
if v == "" {
|
||||||
|
// skip empty values
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if i > 0 && values[i-1] == v {
|
||||||
|
// This value has been already counted.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := m[v]; !ok {
|
||||||
|
vCopy := strings.Clone(v)
|
||||||
|
m[vCopy] = struct{}{}
|
||||||
|
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sup *statsUniqArrayProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
||||||
|
field := sup.su.field
|
||||||
|
m := sup.m
|
||||||
|
|
||||||
|
stateSizeIncrease := 0
|
||||||
|
c := br.getColumnByName(field)
|
||||||
|
if c.isConst {
|
||||||
|
// collect unique const values
|
||||||
|
v := c.encodedValues[0]
|
||||||
|
if v == "" {
|
||||||
|
// skip empty values
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
if _, ok := m[v]; !ok {
|
||||||
|
vCopy := strings.Clone(v)
|
||||||
|
m[vCopy] = struct{}{}
|
||||||
|
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
|
||||||
|
}
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
if c.valueType == valueTypeDict {
|
||||||
|
// collect unique non-zero c.dictValues
|
||||||
|
dictIdx := c.encodedValues[rowIdx][0]
|
||||||
|
v := c.dictValues[dictIdx]
|
||||||
|
if v == "" {
|
||||||
|
// skip empty values
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
if _, ok := m[v]; !ok {
|
||||||
|
vCopy := strings.Clone(v)
|
||||||
|
m[vCopy] = struct{}{}
|
||||||
|
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
|
||||||
|
}
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect unique values for the given rowIdx.
|
||||||
|
v := c.getValueAtRow(br, rowIdx)
|
||||||
|
if v == "" {
|
||||||
|
// skip empty values
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
if _, ok := m[v]; !ok {
|
||||||
|
vCopy := strings.Clone(v)
|
||||||
|
m[vCopy] = struct{}{}
|
||||||
|
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
|
||||||
|
}
|
||||||
|
return stateSizeIncrease
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sup *statsUniqArrayProcessor) mergeState(sfp statsProcessor) {
|
||||||
|
src := sfp.(*statsUniqArrayProcessor)
|
||||||
|
m := sup.m
|
||||||
|
for k := range src.m {
|
||||||
|
if _, ok := m[k]; !ok {
|
||||||
|
m[k] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sup *statsUniqArrayProcessor) finalizeStats() string {
|
||||||
|
if len(sup.m) == 0 {
|
||||||
|
return "[]"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort unique items
|
||||||
|
items := make([]string, 0, len(sup.m))
|
||||||
|
for k := range sup.m {
|
||||||
|
items = append(items, k)
|
||||||
|
}
|
||||||
|
sort.Strings(items)
|
||||||
|
|
||||||
|
// Marshal items into JSON array.
|
||||||
|
|
||||||
|
// Pre-allocate buffer for serialized items.
|
||||||
|
// Assume that there is no need in quoting items. Otherwise additional reallocations
|
||||||
|
// for the allocated buffer are possible.
|
||||||
|
bufSize := len(items) + 1
|
||||||
|
for _, item := range items {
|
||||||
|
bufSize += len(item)
|
||||||
|
}
|
||||||
|
b := make([]byte, 0, bufSize)
|
||||||
|
|
||||||
|
b = append(b, '[')
|
||||||
|
b = strconv.AppendQuote(b, items[0])
|
||||||
|
for _, item := range items[1:] {
|
||||||
|
b = append(b, ',')
|
||||||
|
b = strconv.AppendQuote(b, item)
|
||||||
|
}
|
||||||
|
b = append(b, ']')
|
||||||
|
|
||||||
|
return bytesutil.ToUnsafeString(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStatsUniqArray(lex *lexer) (*statsUniqArray, error) {
|
||||||
|
fields, err := parseFieldNamesForFunc(lex, "uniq_array")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(fields) != 1 {
|
||||||
|
return nil, fmt.Errorf("'uniq_array' needs exactly one field; got %d fields: [%s]", len(fields), fields)
|
||||||
|
}
|
||||||
|
field := fields[0]
|
||||||
|
if field == "*" {
|
||||||
|
return nil, fmt.Errorf("'uniq_array' cannot contain '*'")
|
||||||
|
}
|
||||||
|
su := &statsUniqArray{
|
||||||
|
field: field,
|
||||||
|
}
|
||||||
|
return su, nil
|
||||||
|
}
|
Loading…
Reference in a new issue