diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 265d74009..c0ccb5c6c 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1081,8 +1081,11 @@ LogsQL supports calculating the following stats: - The number of unique values for the given set of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Examples: - `error | stats uniq(client_ip) as unique_user_ips` returns the number of unique values for `client_ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across log messages with the `error` [word](#word). - - `error | stats by (app) uniq(path, host) as unique_path_hosts` - returns the number of unique pairs of `path` and `host` [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) - across log messages with the `error` [word](#word), grouped by `app` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + - `error | stats by (app) uniq(path, host) as unique_path_hosts` - returns the number of unique `(path, host)` pairs + for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across log messages with the `error` [word](#word), + grouped by `app` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + - `error | fields path, host | stats uniq(*)` - returns the number of unique `(path, host)` pairs + for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across log messages with the `error` [word](#word). Stats' calculation can be combined in a single query. For example, the following query calculates the number of log messages with the `error` [word](#word), the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the number of unique values diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 043e205bf..266207e40 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -392,6 +392,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col for _, sfp := range shard.getStatsFuncProcessors(keyBuf) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) } + shard.keyBuf = keyBuf return } @@ -696,8 +697,9 @@ func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) { } type statsFuncUniq struct { - fields []string - resultName string + fields []string + containsStar bool + resultName string } func (sfu *statsFuncUniq) String() string { @@ -731,6 +733,33 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co m := sfup.m stateSizeIncrease := 0 + if len(fields) == 0 || sfup.sfu.containsStar { + // Count unique rows + keyBuf := sfup.keyBuf + for i := range timestamps { + allEmptyValues := true + keyBuf = keyBuf[:0] + for _, c := range columns { + v := c.Values[i] + if v != "" { + allEmptyValues = false + } + // Put column name into key, since every block can contain different set of columns for '*' selector. + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + if allEmptyValues { + // Do not count empty values + continue + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + } + sfup.keyBuf = keyBuf + return stateSizeIncrease + } if len(fields) == 1 { // Fast path for a single column if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { @@ -784,6 +813,31 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column m := sfup.m stateSizeIncrease := 0 + if len(fields) == 0 || sfup.sfu.containsStar { + // Count unique rows + allEmptyValues := true + keyBuf := sfup.keyBuf[:0] + for _, c := range columns { + v := c.Values[rowIdx] + if v != "" { + allEmptyValues = false + } + // Put column name into key, since every block can contain different set of columns for '*' selector. + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + sfup.keyBuf = keyBuf + + if allEmptyValues { + // Do not count empty values + return stateSizeIncrease + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + return stateSizeIncrease + } if len(fields) == 1 { // Fast path for a single column if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { @@ -803,7 +857,7 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column // Slow path for multiple columns. allEmptyValues := true - keyBuf := sfup.keyBuf + keyBuf := sfup.keyBuf[:0] for _, f := range fields { v := "" if idx := getBlockColumnIndex(columns, f); idx >= 0 { @@ -855,8 +909,9 @@ func parseStatsFuncUniq(lex *lexer) (*statsFuncUniq, error) { return nil, fmt.Errorf("cannot parse result name: %w", err) } sfu := &statsFuncUniq{ - fields: fields, - resultName: resultName, + fields: fields, + containsStar: slices.Contains(fields, "*"), + resultName: resultName, } return sfu, nil }