This commit is contained in:
Aliaksandr Valialkin 2024-04-29 02:04:42 +02:00
parent fcbad5ac1b
commit ccb4031e62
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 65 additions and 7 deletions

View file

@ -1081,8 +1081,11 @@ LogsQL supports calculating the following stats:
- The number of unique values for the given set of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Examples:
- `error | stats uniq(client_ip) as unique_user_ips` returns the number of unique values for `client_ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across log messages with the `error` [word](#word).
- `error | stats by (app) uniq(path, host) as unique_path_hosts` - returns the number of unique pairs of `path` and `host` [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across log messages with the `error` [word](#word), grouped by `app` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- `error | stats by (app) uniq(path, host) as unique_path_hosts` - returns the number of unique `(path, host)` pairs
for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across log messages with the `error` [word](#word),
grouped by `app` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- `error | fields path, host | stats uniq(*)` - returns the number of unique `(path, host)` pairs
for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across log messages with the `error` [word](#word).
Stats' calculation can be combined in a single query. For example, the following query calculates the number of log messages with the `error` [word](#word),
the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the number of unique values

View file

@ -392,6 +392,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
for _, sfp := range shard.getStatsFuncProcessors(keyBuf) {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
}
shard.keyBuf = keyBuf
return
}
@ -696,8 +697,9 @@ func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) {
}
type statsFuncUniq struct {
fields []string
resultName string
fields []string
containsStar bool
resultName string
}
func (sfu *statsFuncUniq) String() string {
@ -731,6 +733,33 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
m := sfup.m
stateSizeIncrease := 0
if len(fields) == 0 || sfup.sfu.containsStar {
// Count unique rows
keyBuf := sfup.keyBuf
for i := range timestamps {
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, c := range columns {
v := c.Values[i]
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sfup.keyBuf = keyBuf
return stateSizeIncrease
}
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
@ -784,6 +813,31 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column
m := sfup.m
stateSizeIncrease := 0
if len(fields) == 0 || sfup.sfu.containsStar {
// Count unique rows
allEmptyValues := true
keyBuf := sfup.keyBuf[:0]
for _, c := range columns {
v := c.Values[rowIdx]
if v != "" {
allEmptyValues = false
}
// Put column name into key, since every block can contain different set of columns for '*' selector.
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
sfup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
@ -803,7 +857,7 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column
// Slow path for multiple columns.
allEmptyValues := true
keyBuf := sfup.keyBuf
keyBuf := sfup.keyBuf[:0]
for _, f := range fields {
v := ""
if idx := getBlockColumnIndex(columns, f); idx >= 0 {
@ -855,8 +909,9 @@ func parseStatsFuncUniq(lex *lexer) (*statsFuncUniq, error) {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sfu := &statsFuncUniq{
fields: fields,
resultName: resultName,
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sfu, nil
}