mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
402 lines
9.8 KiB
Go
402 lines
9.8 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"strconv"
|
|
"unsafe"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
)
|
|
|
|
type statsCountUniq struct {
|
|
fields []string
|
|
containsStar bool
|
|
limit uint64
|
|
}
|
|
|
|
func (su *statsCountUniq) String() string {
|
|
s := "count_uniq(" + fieldNamesString(su.fields) + ")"
|
|
if su.limit > 0 {
|
|
s += fmt.Sprintf(" limit %d", su.limit)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (su *statsCountUniq) neededFields() []string {
|
|
return su.fields
|
|
}
|
|
|
|
func (su *statsCountUniq) newStatsProcessor() (statsProcessor, int) {
|
|
sup := &statsCountUniqProcessor{
|
|
su: su,
|
|
|
|
m: make(map[string]struct{}),
|
|
}
|
|
return sup, int(unsafe.Sizeof(*sup))
|
|
}
|
|
|
|
type statsCountUniqProcessor struct {
|
|
su *statsCountUniq
|
|
|
|
m map[string]struct{}
|
|
|
|
columnValues [][]string
|
|
keyBuf []byte
|
|
}
|
|
|
|
func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
|
if sup.limitReached() {
|
|
return 0
|
|
}
|
|
|
|
fields := sup.su.fields
|
|
m := sup.m
|
|
|
|
stateSizeIncrease := 0
|
|
if sup.su.containsStar {
|
|
// Count unique rows
|
|
cs := br.getColumns()
|
|
keyBuf := sup.keyBuf[:0]
|
|
for i := range br.timestamps {
|
|
seenKey := true
|
|
for _, c := range cs {
|
|
values := c.getValues(br)
|
|
if i == 0 || values[i-1] != values[i] {
|
|
seenKey = false
|
|
break
|
|
}
|
|
}
|
|
if seenKey {
|
|
// This key has been already counted.
|
|
continue
|
|
}
|
|
|
|
allEmptyValues := true
|
|
keyBuf = keyBuf[:0]
|
|
for _, c := range cs {
|
|
v := c.getValueAtRow(br, i)
|
|
if v != "" {
|
|
allEmptyValues = false
|
|
}
|
|
// Put column name into key, since every block can contain different set of columns for '*' selector.
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
|
}
|
|
if allEmptyValues {
|
|
// Do not count empty values
|
|
continue
|
|
}
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
if len(fields) == 1 {
|
|
// Fast path for a single column.
|
|
// The unique key is formed as "<is_time> <value>",
|
|
// This guarantees that keys do not clash for different column types across blocks.
|
|
c := br.getColumnByName(fields[0])
|
|
if c.isTime {
|
|
// Count unique br.timestamps
|
|
timestamps := br.timestamps
|
|
keyBuf := sup.keyBuf[:0]
|
|
for i, timestamp := range timestamps {
|
|
if i > 0 && timestamps[i-1] == timestamps[i] {
|
|
// This timestamp has been already counted.
|
|
continue
|
|
}
|
|
keyBuf = append(keyBuf[:0], 1)
|
|
keyBuf = encoding.MarshalInt64(keyBuf, timestamp)
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
if c.isConst {
|
|
// count unique const values
|
|
v := c.encodedValues[0]
|
|
if v == "" {
|
|
// Do not count empty values
|
|
return stateSizeIncrease
|
|
}
|
|
keyBuf := sup.keyBuf[:0]
|
|
keyBuf = append(keyBuf[:0], 0)
|
|
keyBuf = append(keyBuf, v...)
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
if c.valueType == valueTypeDict {
|
|
// count unique non-zero c.dictValues
|
|
keyBuf := sup.keyBuf[:0]
|
|
for _, v := range c.dictValues {
|
|
if v == "" {
|
|
// Do not count empty values
|
|
continue
|
|
}
|
|
keyBuf = append(keyBuf[:0], 0)
|
|
keyBuf = append(keyBuf, v...)
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
// Count unique values across encodedValues
|
|
values := c.getValues(br)
|
|
keyBuf := sup.keyBuf[:0]
|
|
for i, v := range values {
|
|
if v == "" {
|
|
// Do not count empty values
|
|
continue
|
|
}
|
|
if i > 0 && values[i-1] == v {
|
|
// This value has been already counted.
|
|
continue
|
|
}
|
|
keyBuf = append(keyBuf[:0], 0)
|
|
keyBuf = append(keyBuf, v...)
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
// Slow path for multiple columns.
|
|
|
|
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
|
|
columnValues := sup.columnValues[:0]
|
|
for _, f := range fields {
|
|
c := br.getColumnByName(f)
|
|
values := c.getValues(br)
|
|
columnValues = append(columnValues, values)
|
|
}
|
|
sup.columnValues = columnValues
|
|
|
|
keyBuf := sup.keyBuf[:0]
|
|
for i := range br.timestamps {
|
|
seenKey := true
|
|
for _, values := range columnValues {
|
|
if i == 0 || values[i-1] != values[i] {
|
|
seenKey = false
|
|
break
|
|
}
|
|
}
|
|
if seenKey {
|
|
continue
|
|
}
|
|
|
|
allEmptyValues := true
|
|
keyBuf = keyBuf[:0]
|
|
for _, values := range columnValues {
|
|
v := values[i]
|
|
if v != "" {
|
|
allEmptyValues = false
|
|
}
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
|
}
|
|
if allEmptyValues {
|
|
// Do not count empty values
|
|
continue
|
|
}
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
|
if sup.limitReached() {
|
|
return 0
|
|
}
|
|
|
|
fields := sup.su.fields
|
|
m := sup.m
|
|
|
|
stateSizeIncrease := 0
|
|
if sup.su.containsStar {
|
|
// Count unique rows
|
|
allEmptyValues := true
|
|
keyBuf := sup.keyBuf[:0]
|
|
for _, c := range br.getColumns() {
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
if v != "" {
|
|
allEmptyValues = false
|
|
}
|
|
// Put column name into key, since every block can contain different set of columns for '*' selector.
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
|
|
if allEmptyValues {
|
|
// Do not count empty values
|
|
return stateSizeIncrease
|
|
}
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
return stateSizeIncrease
|
|
}
|
|
if len(fields) == 1 {
|
|
// Fast path for a single column.
|
|
// The unique key is formed as "<is_time> <value>",
|
|
// This guarantees that keys do not clash for different column types across blocks.
|
|
c := br.getColumnByName(fields[0])
|
|
if c.isTime {
|
|
// Count unique br.timestamps
|
|
keyBuf := sup.keyBuf[:0]
|
|
keyBuf = append(keyBuf[:0], 1)
|
|
keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx])
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
if c.isConst {
|
|
// count unique const values
|
|
v := c.encodedValues[0]
|
|
if v == "" {
|
|
// Do not count empty values
|
|
return stateSizeIncrease
|
|
}
|
|
keyBuf := sup.keyBuf[:0]
|
|
keyBuf = append(keyBuf[:0], 0)
|
|
keyBuf = append(keyBuf, v...)
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
if c.valueType == valueTypeDict {
|
|
// count unique non-zero c.dictValues
|
|
dictIdx := c.encodedValues[rowIdx][0]
|
|
v := c.dictValues[dictIdx]
|
|
if v == "" {
|
|
// Do not count empty values
|
|
return stateSizeIncrease
|
|
}
|
|
keyBuf := sup.keyBuf[:0]
|
|
keyBuf = append(keyBuf[:0], 0)
|
|
keyBuf = append(keyBuf, v...)
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
// Count unique values for the given rowIdx
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
if v == "" {
|
|
// Do not count empty values
|
|
return stateSizeIncrease
|
|
}
|
|
keyBuf := sup.keyBuf[:0]
|
|
keyBuf = append(keyBuf[:0], 0)
|
|
keyBuf = append(keyBuf, v...)
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
// Slow path for multiple columns.
|
|
allEmptyValues := true
|
|
keyBuf := sup.keyBuf[:0]
|
|
for _, f := range fields {
|
|
c := br.getColumnByName(f)
|
|
v := c.getValueAtRow(br, rowIdx)
|
|
if v != "" {
|
|
allEmptyValues = false
|
|
}
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
|
}
|
|
sup.keyBuf = keyBuf
|
|
|
|
if allEmptyValues {
|
|
// Do not count empty values
|
|
return stateSizeIncrease
|
|
}
|
|
if _, ok := m[string(keyBuf)]; !ok {
|
|
m[string(keyBuf)] = struct{}{}
|
|
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
|
}
|
|
return stateSizeIncrease
|
|
}
|
|
|
|
func (sup *statsCountUniqProcessor) mergeState(sfp statsProcessor) {
|
|
if sup.limitReached() {
|
|
return
|
|
}
|
|
|
|
src := sfp.(*statsCountUniqProcessor)
|
|
m := sup.m
|
|
for k := range src.m {
|
|
if _, ok := m[k]; !ok {
|
|
m[k] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (sup *statsCountUniqProcessor) finalizeStats() string {
|
|
n := uint64(len(sup.m))
|
|
if limit := sup.su.limit; limit > 0 && n > limit {
|
|
n = limit
|
|
}
|
|
return strconv.FormatUint(n, 10)
|
|
}
|
|
|
|
func (sup *statsCountUniqProcessor) limitReached() bool {
|
|
limit := sup.su.limit
|
|
return limit > 0 && uint64(len(sup.m)) >= limit
|
|
}
|
|
|
|
func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) {
|
|
fields, err := parseFieldNamesForStatsFunc(lex, "count_uniq")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
su := &statsCountUniq{
|
|
fields: fields,
|
|
containsStar: slices.Contains(fields, "*"),
|
|
}
|
|
if lex.isKeyword("limit") {
|
|
lex.nextToken()
|
|
n, ok := tryParseUint64(lex.token)
|
|
if !ok {
|
|
return nil, fmt.Errorf("cannot parse 'limit %s' for 'count_uniq': %w", lex.token, err)
|
|
}
|
|
lex.nextToken()
|
|
su.limit = n
|
|
}
|
|
return su, nil
|
|
}
|