This commit is contained in:
Aliaksandr Valialkin 2024-04-29 03:20:43 +02:00
parent 449eade980
commit d9fdbf907c
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 499 additions and 475 deletions

View file

@ -2,9 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"math"
"slices" "slices"
"strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"unsafe" "unsafe"
@ -593,19 +591,19 @@ func parseStatsPipe(lex *lexer) (*statsPipe, error) {
func parseStatsFunc(lex *lexer) (statsFunc, error) { func parseStatsFunc(lex *lexer) (statsFunc, error) {
switch { switch {
case lex.isKeyword("count"): case lex.isKeyword("count"):
sfc, err := parseStatsFuncCount(lex) sfc, err := parseStatsCount(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'count' func: %w", err) return nil, fmt.Errorf("cannot parse 'count' func: %w", err)
} }
return sfc, nil return sfc, nil
case lex.isKeyword("uniq"): case lex.isKeyword("uniq"):
sfu, err := parseStatsFuncUniq(lex) sfu, err := parseStatsUniq(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' func: %w", err) return nil, fmt.Errorf("cannot parse 'uniq' func: %w", err)
} }
return sfu, nil return sfu, nil
case lex.isKeyword("sum"): case lex.isKeyword("sum"):
sfs, err := parseStatsFuncSum(lex) sfs, err := parseStatsSum(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' func: %w", err) return nil, fmt.Errorf("cannot parse 'sum' func: %w", err)
} }
@ -615,476 +613,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
} }
} }
type statsFuncCount struct {
fields []string
containsStar bool
resultName string
}
func (sfc *statsFuncCount) String() string {
return "count(" + fieldNamesString(sfc.fields) + ") as " + quoteTokenIfNeeded(sfc.resultName)
}
func (sfc *statsFuncCount) neededFields() []string {
return getFieldsIgnoreStar(sfc.fields)
}
func (sfc *statsFuncCount) newStatsFuncProcessor() (statsFuncProcessor, int) {
sfcp := &statsFuncCountProcessor{
sfc: sfc,
}
return sfcp, int(unsafe.Sizeof(*sfcp))
}
type statsFuncCountProcessor struct {
sfc *statsFuncCount
rowsCount uint64
}
func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
fields := sfcp.sfc.fields
if len(fields) == 0 || sfcp.sfc.containsStar {
// Fast path - count all the columns.
sfcp.rowsCount += uint64(len(timestamps))
return 0
}
// Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count().
bm := getFilterBitmap(len(timestamps))
defer putFilterBitmap(bm)
bm.setBits()
for _, f := range fields {
if idx := getBlockColumnIndex(columns, f); idx >= 0 {
values := columns[idx].Values
bm.forEachSetBit(func(i int) bool {
return values[i] == ""
})
}
}
emptyValues := 0
bm.forEachSetBit(func(i int) bool {
emptyValues++
return true
})
sfcp.rowsCount += uint64(len(timestamps) - emptyValues)
return 0
}
func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int {
fields := sfcp.sfc.fields
if len(fields) == 0 || sfcp.sfc.containsStar {
// Fast path - count the given column
sfcp.rowsCount++
return 0
}
// Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty
for _, f := range fields {
if idx := getBlockColumnIndex(columns, f); idx >= 0 && columns[idx].Values[rowIdx] != "" {
sfcp.rowsCount++
return 0
}
}
return 0
}
func (sfcp *statsFuncCountProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsFuncCountProcessor)
sfcp.rowsCount += src.rowsCount
}
func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) {
value := strconv.FormatUint(sfcp.rowsCount, 10)
return sfcp.sfc.resultName, value
}
func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' args: %w", err)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sfc := &statsFuncCount{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sfc, nil
}
type statsFuncSum struct {
fields []string
containsStar bool
resultName string
}
func (sfs *statsFuncSum) String() string {
return "sum(" + fieldNamesString(sfs.fields) + ") as " + quoteTokenIfNeeded(sfs.resultName)
}
func (sfs *statsFuncSum) neededFields() []string {
return sfs.fields
}
func (sfs *statsFuncSum) newStatsFuncProcessor() (statsFuncProcessor, int) {
sfsp := &statsFuncSumProcessor{
sfs: sfs,
}
return sfsp, int(unsafe.Sizeof(*sfsp))
}
type statsFuncSumProcessor struct {
sfs *statsFuncSum
sum float64
}
func (sfsp *statsFuncSumProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
if sfsp.sfs.containsStar {
// Sum all the columns
for _, c := range columns {
sfsp.sum += sumValues(c.Values)
}
return 0
}
// Sum the requested columns
for _, field := range sfsp.sfs.fields {
if idx := getBlockColumnIndex(columns, field); idx >= 0 {
sfsp.sum += sumValues(columns[idx].Values)
}
}
return 0
}
func sumValues(values []string) float64 {
sum := float64(0)
f := float64(0)
for i, v := range values {
if i == 0 || values[i-1] != v {
f, _ = tryParseFloat64(v)
if math.IsNaN(f) {
// Ignore NaN values, since this is the expected behaviour by most users.
f = 0
}
}
sum += f
}
return sum
}
func (sfsp *statsFuncSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int {
if sfsp.sfs.containsStar {
// Sum all the fields for the given row
for _, c := range columns {
v := c.Values[rowIdx]
f, _ := tryParseFloat64(v)
if !math.IsNaN(f) {
sfsp.sum += f
}
}
return 0
}
// Sum only the given fields for the given row
for _, field := range sfsp.sfs.fields {
if idx := getBlockColumnIndex(columns, field); idx >= 0 {
v := columns[idx].Values[rowIdx]
f, _ := tryParseFloat64(v)
if !math.IsNaN(f) {
sfsp.sum += f
}
}
}
return 0
}
func (sfsp *statsFuncSumProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsFuncSumProcessor)
sfsp.sum += src.sum
}
func (sfsp *statsFuncSumProcessor) finalizeStats() (string, string) {
value := strconv.FormatFloat(sfsp.sum, 'g', -1, 64)
return sfsp.sfs.resultName, value
}
func parseStatsFuncSum(lex *lexer) (*statsFuncSum, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' args: %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("'sum' must contain at least one arg")
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sfs := &statsFuncSum{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sfs, nil
}
type statsFuncUniq struct {
fields []string
containsStar bool
resultName string
}
func (sfu *statsFuncUniq) String() string {
return "uniq(" + fieldNamesString(sfu.fields) + ") as " + quoteTokenIfNeeded(sfu.resultName)
}
func (sfu *statsFuncUniq) neededFields() []string {
return sfu.fields
}
func (sfu *statsFuncUniq) newStatsFuncProcessor() (statsFuncProcessor, int) {
sfup := &statsFuncUniqProcessor{
sfu: sfu,
m: make(map[string]struct{}),
}
return sfup, int(unsafe.Sizeof(*sfup))
}
type statsFuncUniqProcessor struct {
sfu *statsFuncUniq
m map[string]struct{}
columnValues [][]string
keyBuf []byte
}
func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
fields := sfup.sfu.fields
m := sfup.m
stateSizeIncrease := 0
if len(fields) == 0 || sfup.sfu.containsStar {
// Count unique rows
keyBuf := sfup.keyBuf
for i := range timestamps {
seenKey := true
for _, c := range columns {
values := c.Values
if i == 0 || values[i-1] != values[i] {
seenKey = false
break
}
}
if seenKey {
continue
}
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, c := range columns {
v := c.Values[i]
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sfup.keyBuf = keyBuf
return stateSizeIncrease
}
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
values := columns[idx].Values
for i, v := range values {
if v == "" {
// Do not count empty values
continue
}
if i > 0 && values[i-1] == v {
continue
}
if _, ok := m[v]; !ok {
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
}
}
return stateSizeIncrease
}
// Slow path for multiple columns.
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
sfup.columnValues = appendBlockColumnValues(sfup.columnValues[:0], columns, fields, len(timestamps))
columnValues := sfup.columnValues
keyBuf := sfup.keyBuf
for i := range timestamps {
seenKey := true
for _, values := range columnValues {
if i == 0 || values[i-1] != values[i] {
seenKey = false
}
}
if seenKey {
continue
}
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, values := range columnValues {
v := values[i]
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sfup.keyBuf = keyBuf
return stateSizeIncrease
}
func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) int {
fields := sfup.sfu.fields
m := sfup.m
stateSizeIncrease := 0
if len(fields) == 0 || sfup.sfu.containsStar {
// Count unique rows
allEmptyValues := true
keyBuf := sfup.keyBuf[:0]
for _, c := range columns {
v := c.Values[rowIdx]
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
sfup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
v := columns[idx].Values[rowIdx]
if v == "" {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[v]; !ok {
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
}
return stateSizeIncrease
}
// Slow path for multiple columns.
allEmptyValues := true
keyBuf := sfup.keyBuf[:0]
for _, f := range fields {
v := ""
if idx := getBlockColumnIndex(columns, f); idx >= 0 {
v = columns[idx].Values[rowIdx]
}
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
sfup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
func (sfup *statsFuncUniqProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsFuncUniqProcessor)
m := sfup.m
for k := range src.m {
m[k] = struct{}{}
}
}
func (sfup *statsFuncUniqProcessor) finalizeStats() (string, string) {
n := uint64(len(sfup.m))
value := strconv.FormatUint(n, 10)
return sfup.sfu.resultName, value
}
func parseStatsFuncUniq(lex *lexer) (*statsFuncUniq, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("'uniq' must contain at least a single arg")
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sfu := &statsFuncUniq{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sfu, nil
}
func parseResultName(lex *lexer) (string, error) { func parseResultName(lex *lexer) (string, error) {
if lex.isKeyword("as") { if lex.isKeyword("as") {
if !lex.mustNextToken() { if !lex.mustNextToken() {

View file

@ -0,0 +1,114 @@
package logstorage
import (
"fmt"
"slices"
"strconv"
"unsafe"
)
type statsCount struct {
fields []string
containsStar bool
resultName string
}
func (sc *statsCount) String() string {
return "count(" + fieldNamesString(sc.fields) + ") as " + quoteTokenIfNeeded(sc.resultName)
}
func (sc *statsCount) neededFields() []string {
return getFieldsIgnoreStar(sc.fields)
}
func (sc *statsCount) newStatsFuncProcessor() (statsFuncProcessor, int) {
scp := &statsCountProcessor{
sc: sc,
}
return scp, int(unsafe.Sizeof(*scp))
}
type statsCountProcessor struct {
sc *statsCount
rowsCount uint64
}
func (scp *statsCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
fields := scp.sc.fields
if len(fields) == 0 || scp.sc.containsStar {
// Fast path - count all the columns.
scp.rowsCount += uint64(len(timestamps))
return 0
}
// Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count().
bm := getFilterBitmap(len(timestamps))
defer putFilterBitmap(bm)
bm.setBits()
for _, f := range fields {
if idx := getBlockColumnIndex(columns, f); idx >= 0 {
values := columns[idx].Values
bm.forEachSetBit(func(i int) bool {
return values[i] == ""
})
}
}
emptyValues := 0
bm.forEachSetBit(func(i int) bool {
emptyValues++
return true
})
scp.rowsCount += uint64(len(timestamps) - emptyValues)
return 0
}
func (scp *statsCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int {
fields := scp.sc.fields
if len(fields) == 0 || scp.sc.containsStar {
// Fast path - count the given column
scp.rowsCount++
return 0
}
// Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty
for _, f := range fields {
if idx := getBlockColumnIndex(columns, f); idx >= 0 && columns[idx].Values[rowIdx] != "" {
scp.rowsCount++
return 0
}
}
return 0
}
func (scp *statsCountProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsCountProcessor)
scp.rowsCount += src.rowsCount
}
func (scp *statsCountProcessor) finalizeStats() (string, string) {
value := strconv.FormatUint(scp.rowsCount, 10)
return scp.sc.resultName, value
}
func parseStatsCount(lex *lexer) (*statsCount, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' args: %w", err)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sc := &statsCount{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sc, nil
}

127
lib/logstorage/stats_sum.go Normal file
View file

@ -0,0 +1,127 @@
package logstorage
import (
"fmt"
"math"
"slices"
"strconv"
"unsafe"
)
type statsSum struct {
fields []string
containsStar bool
resultName string
}
func (ss *statsSum) String() string {
return "sum(" + fieldNamesString(ss.fields) + ") as " + quoteTokenIfNeeded(ss.resultName)
}
func (ss *statsSum) neededFields() []string {
return ss.fields
}
func (ss *statsSum) newStatsFuncProcessor() (statsFuncProcessor, int) {
ssp := &statsSumProcessor{
ss: ss,
}
return ssp, int(unsafe.Sizeof(*ssp))
}
type statsSumProcessor struct {
ss *statsSum
sum float64
}
func (ssp *statsSumProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
if ssp.ss.containsStar {
// Sum all the columns
for _, c := range columns {
ssp.sum += sumValues(c.Values)
}
return 0
}
// Sum the requested columns
for _, field := range ssp.ss.fields {
if idx := getBlockColumnIndex(columns, field); idx >= 0 {
ssp.sum += sumValues(columns[idx].Values)
}
}
return 0
}
func sumValues(values []string) float64 {
sum := float64(0)
f := float64(0)
for i, v := range values {
if i == 0 || values[i-1] != v {
f, _ = tryParseFloat64(v)
if math.IsNaN(f) {
// Ignore NaN values, since this is the expected behaviour by most users.
f = 0
}
}
sum += f
}
return sum
}
func (ssp *statsSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int {
if ssp.ss.containsStar {
// Sum all the fields for the given row
for _, c := range columns {
v := c.Values[rowIdx]
f, _ := tryParseFloat64(v)
if !math.IsNaN(f) {
ssp.sum += f
}
}
return 0
}
// Sum only the given fields for the given row
for _, field := range ssp.ss.fields {
if idx := getBlockColumnIndex(columns, field); idx >= 0 {
v := columns[idx].Values[rowIdx]
f, _ := tryParseFloat64(v)
if !math.IsNaN(f) {
ssp.sum += f
}
}
}
return 0
}
func (ssp *statsSumProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsSumProcessor)
ssp.sum += src.sum
}
func (ssp *statsSumProcessor) finalizeStats() (string, string) {
value := strconv.FormatFloat(ssp.sum, 'g', -1, 64)
return ssp.ss.resultName, value
}
func parseStatsSum(lex *lexer) (*statsSum, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' args: %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("'sum' must contain at least one arg")
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
ss := &statsSum{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return ss, nil
}

View file

@ -0,0 +1,255 @@
package logstorage
import (
"fmt"
"slices"
"strconv"
"strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
type statsUniq struct {
fields []string
containsStar bool
resultName string
}
func (su *statsUniq) String() string {
return "uniq(" + fieldNamesString(su.fields) + ") as " + quoteTokenIfNeeded(su.resultName)
}
func (su *statsUniq) neededFields() []string {
return su.fields
}
func (su *statsUniq) newStatsFuncProcessor() (statsFuncProcessor, int) {
sup := &statsUniqProcessor{
su: su,
m: make(map[string]struct{}),
}
return sup, int(unsafe.Sizeof(*sup))
}
type statsUniqProcessor struct {
su *statsUniq
m map[string]struct{}
columnValues [][]string
keyBuf []byte
}
func (sup *statsUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0
if len(fields) == 0 || sup.su.containsStar {
// Count unique rows
keyBuf := sup.keyBuf
for i := range timestamps {
seenKey := true
for _, c := range columns {
values := c.Values
if i == 0 || values[i-1] != values[i] {
seenKey = false
break
}
}
if seenKey {
continue
}
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, c := range columns {
v := c.Values[i]
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
values := columns[idx].Values
for i, v := range values {
if v == "" {
// Do not count empty values
continue
}
if i > 0 && values[i-1] == v {
continue
}
if _, ok := m[v]; !ok {
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
}
}
return stateSizeIncrease
}
// Slow path for multiple columns.
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
sup.columnValues = appendBlockColumnValues(sup.columnValues[:0], columns, fields, len(timestamps))
columnValues := sup.columnValues
keyBuf := sup.keyBuf
for i := range timestamps {
seenKey := true
for _, values := range columnValues {
if i == 0 || values[i-1] != values[i] {
seenKey = false
}
}
if seenKey {
continue
}
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, values := range columnValues {
v := values[i]
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) int {
fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0
if len(fields) == 0 || sup.su.containsStar {
// Count unique rows
allEmptyValues := true
keyBuf := sup.keyBuf[:0]
for _, c := range columns {
v := c.Values[rowIdx]
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
sup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
v := columns[idx].Values[rowIdx]
if v == "" {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[v]; !ok {
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
}
return stateSizeIncrease
}
// Slow path for multiple columns.
allEmptyValues := true
keyBuf := sup.keyBuf[:0]
for _, f := range fields {
v := ""
if idx := getBlockColumnIndex(columns, f); idx >= 0 {
v = columns[idx].Values[rowIdx]
}
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
sup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
func (sup *statsUniqProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsUniqProcessor)
m := sup.m
for k := range src.m {
m[k] = struct{}{}
}
}
func (sup *statsUniqProcessor) finalizeStats() (string, string) {
n := uint64(len(sup.m))
value := strconv.FormatUint(n, 10)
return sup.su.resultName, value
}
func parseStatsUniq(lex *lexer) (*statsUniq, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
su := &statsUniq{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return su, nil
}