VictoriaMetrics/lib/logstorage/stats_unique.go

366 lines
9.5 KiB
Go
Raw Normal View History

2024-04-29 01:20:43 +00:00
package logstorage
import (
"fmt"
"slices"
"strconv"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
type statsUniq struct {
fields []string
containsStar bool
}
func (su *statsUniq) String() string {
2024-04-30 23:19:22 +00:00
return "uniq(" + fieldNamesString(su.fields) + ")"
2024-04-29 01:20:43 +00:00
}
func (su *statsUniq) neededFields() []string {
return su.fields
}
2024-04-29 01:23:41 +00:00
func (su *statsUniq) newStatsProcessor() (statsProcessor, int) {
2024-04-29 01:20:43 +00:00
sup := &statsUniqProcessor{
su: su,
m: make(map[string]struct{}),
}
return sup, int(unsafe.Sizeof(*sup))
}
type statsUniqProcessor struct {
su *statsUniq
m map[string]struct{}
columnValues [][]string
keyBuf []byte
}
2024-04-30 21:03:34 +00:00
func (sup *statsUniqProcessor) updateStatsForAllRows(br *blockResult) int {
2024-04-29 01:20:43 +00:00
fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0
if len(fields) == 0 || sup.su.containsStar {
// Count unique rows
2024-04-30 21:03:34 +00:00
columns := br.getColumns()
keyBuf := sup.keyBuf[:0]
for i := range br.timestamps {
2024-04-29 01:20:43 +00:00
seenKey := true
for _, c := range columns {
2024-04-30 21:03:34 +00:00
values := c.getValues(br)
2024-04-29 01:20:43 +00:00
if i == 0 || values[i-1] != values[i] {
seenKey = false
break
}
}
if seenKey {
2024-04-30 21:03:34 +00:00
// This key has been already counted.
2024-04-29 01:20:43 +00:00
continue
}
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, c := range columns {
2024-04-30 21:03:34 +00:00
v := c.getValueAtRow(br, i)
2024-04-29 01:20:43 +00:00
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
2024-04-30 21:03:34 +00:00
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
2024-04-29 01:20:43 +00:00
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
if len(fields) == 1 {
2024-04-30 21:03:34 +00:00
// Fast path for a single column.
// The unique key is formed as "<is_time> <value_type>? <encodedValue>",
// where <value_type> is skipped if <is_time> == 1.
// This guarantees that keys do not clash for different column types acorss blocks.
c := br.getColumnByName(fields[0])
if c.isTime {
// Count unique br.timestamps
timestamps := br.timestamps
keyBuf := sup.keyBuf[:0]
for i, timestamp := range timestamps {
if i > 0 && timestamps[i-1] == timestamps[i] {
// This timestamp has been already counted.
2024-04-29 01:20:43 +00:00
continue
}
2024-04-30 21:03:34 +00:00
keyBuf = append(keyBuf[:0], 1)
keyBuf = encoding.MarshalInt64(keyBuf, timestamp)
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
if c.isConst {
// count unique const values
v := c.encodedValues[0]
if v == "" {
// Do not count empty values
return stateSizeIncrease
}
keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0, byte(valueTypeString))
keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
if c.valueType == valueTypeDict {
// count unique non-zero c.dictValues
keyBuf := sup.keyBuf[:0]
for i, v := range c.dictValues {
if v == "" {
// Do not count empty values
2024-04-29 01:20:43 +00:00
continue
}
2024-04-30 21:03:34 +00:00
keyBuf = append(keyBuf[:0], 0, byte(valueTypeDict))
keyBuf = append(keyBuf, byte(i))
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
2024-04-29 01:20:43 +00:00
}
}
2024-04-30 21:03:34 +00:00
sup.keyBuf = keyBuf
return stateSizeIncrease
}
// Count unique values across encodedValues
encodedValues := c.getEncodedValues(br)
isStringValueType := c.valueType == valueTypeString
keyBuf := sup.keyBuf[:0]
for i, v := range encodedValues {
if isStringValueType && v == "" {
// Do not count empty values
continue
}
if i > 0 && encodedValues[i-1] == v {
// This value has been already counted.
continue
}
keyBuf = append(keyBuf[:0], 0, byte(c.valueType))
keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
2024-04-29 01:20:43 +00:00
}
2024-04-30 21:03:34 +00:00
keyBuf = sup.keyBuf
2024-04-29 01:20:43 +00:00
return stateSizeIncrease
}
// Slow path for multiple columns.
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
2024-04-30 21:03:34 +00:00
sup.columnValues = br.appendColumnValues(sup.columnValues[:0], fields)
2024-04-29 01:20:43 +00:00
columnValues := sup.columnValues
2024-04-30 21:03:34 +00:00
keyBuf := sup.keyBuf[:0]
for i := range br.timestamps {
2024-04-29 01:20:43 +00:00
seenKey := true
for _, values := range columnValues {
if i == 0 || values[i-1] != values[i] {
seenKey = false
2024-04-30 21:03:34 +00:00
break
2024-04-29 01:20:43 +00:00
}
}
if seenKey {
continue
}
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, values := range columnValues {
v := values[i]
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
2024-04-30 21:03:34 +00:00
func (sup *statsUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
2024-04-29 01:20:43 +00:00
fields := sup.su.fields
m := sup.m
stateSizeIncrease := 0
if len(fields) == 0 || sup.su.containsStar {
// Count unique rows
allEmptyValues := true
keyBuf := sup.keyBuf[:0]
2024-04-30 21:03:34 +00:00
for _, c := range br.getColumns() {
v := c.getValueAtRow(br, rowIdx)
2024-04-29 01:20:43 +00:00
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
2024-04-30 21:03:34 +00:00
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
2024-04-29 01:20:43 +00:00
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
sup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
if len(fields) == 1 {
2024-04-30 21:03:34 +00:00
// Fast path for a single column.
// The unique key is formed as "<is_time> <value_type>? <encodedValue>",
// where <value_type> is skipped if <is_time> == 1.
// This guarantees that keys do not clash for different column types acorss blocks.
c := br.getColumnByName(fields[0])
if c.isTime {
// Count unique br.timestamps
keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 1)
keyBuf = encoding.MarshalInt64(keyBuf, br.timestamps[rowIdx])
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
if c.isConst {
// count unique const values
v := c.encodedValues[0]
2024-04-29 01:20:43 +00:00
if v == "" {
// Do not count empty values
return stateSizeIncrease
}
2024-04-30 21:03:34 +00:00
keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0, byte(valueTypeString))
keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
2024-04-29 01:20:43 +00:00
}
2024-04-30 21:03:34 +00:00
sup.keyBuf = keyBuf
return stateSizeIncrease
2024-04-29 01:20:43 +00:00
}
2024-04-30 21:03:34 +00:00
if c.valueType == valueTypeDict {
// count unique non-zero c.dictValues
dictIdx := c.encodedValues[rowIdx][0]
if c.dictValues[dictIdx] == "" {
// Do not count empty values
return stateSizeIncrease
}
keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0, byte(valueTypeDict))
keyBuf = append(keyBuf, dictIdx)
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
sup.keyBuf = keyBuf
return stateSizeIncrease
}
// Count unique values across encodedValues
encodedValues := c.getEncodedValues(br)
v := encodedValues[rowIdx]
if c.valueType == valueTypeString && v == "" {
// Do not count empty values
return stateSizeIncrease
}
keyBuf := sup.keyBuf[:0]
keyBuf = append(keyBuf[:0], 0, byte(c.valueType))
keyBuf = append(keyBuf, v...)
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
keyBuf = sup.keyBuf
2024-04-29 01:20:43 +00:00
return stateSizeIncrease
}
// Slow path for multiple columns.
allEmptyValues := true
keyBuf := sup.keyBuf[:0]
for _, f := range fields {
2024-04-30 21:03:34 +00:00
c := br.getColumnByName(f)
v := c.getValueAtRow(br, rowIdx)
2024-04-29 01:20:43 +00:00
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
sup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
2024-04-29 01:23:41 +00:00
func (sup *statsUniqProcessor) mergeState(sfp statsProcessor) {
2024-04-29 01:20:43 +00:00
src := sfp.(*statsUniqProcessor)
m := sup.m
for k := range src.m {
m[k] = struct{}{}
}
}
2024-04-30 23:19:22 +00:00
func (sup *statsUniqProcessor) finalizeStats() string {
2024-04-29 01:20:43 +00:00
n := uint64(len(sup.m))
2024-04-30 23:19:22 +00:00
return strconv.FormatUint(n, 10)
2024-04-29 01:20:43 +00:00
}
func parseStatsUniq(lex *lexer) (*statsUniq, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err)
}
su := &statsUniq{
fields: fields,
containsStar: slices.Contains(fields, "*"),
}
return su, nil
}