This commit is contained in:
Aliaksandr Valialkin 2024-05-10 15:51:39 +02:00
parent 0dd32f5144
commit 54cc81602e
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 46 additions and 2 deletions

View file

@ -1491,6 +1491,14 @@ over logs for the last 5 minutes:
_time:5m | stats uniq_values(ip) unique_ips _time:5m | stats uniq_values(ip) unique_ips
``` ```
It is possible to specify the limit on the number of returned unique values by adding `limit N` just after `uniq_values()` and before the resulting column name.
For example, the following query returns up to `100` unique values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
over the logs for the last 5 minutes. Note that it may return arbitrary subset of unique `ip` values:
```logsql
_time:5m | stats uniq_values(ip) limit 100 as unique_ips_100
```
See also: See also:
- [`count_uniq`](#count_uniq-stats) - [`count_uniq`](#count_uniq-stats)

View file

@ -912,9 +912,10 @@ func TestParseQuerySuccess(t *testing.T) {
// stats pipe uniq_values // stats pipe uniq_values
f(`* | stats uniq_values(foo) bar`, `* | stats uniq_values(foo) as bar`) f(`* | stats uniq_values(foo) bar`, `* | stats uniq_values(foo) as bar`)
f(`* | stats uniq_values(foo) limit 10 bar`, `* | stats uniq_values(foo) limit 10 as bar`)
f(`* | stats by(x, y) uniq_values(foo, bar) as baz`, `* | stats by (x, y) uniq_values(foo, bar) as baz`) f(`* | stats by(x, y) uniq_values(foo, bar) as baz`, `* | stats by (x, y) uniq_values(foo, bar) as baz`)
f(`* | stats by(x) uniq_values(*) y`, `* | stats by (x) uniq_values(*) as y`) f(`* | stats by(x) uniq_values(*) y`, `* | stats by (x) uniq_values(*) as y`)
f(`* | stats by(x) uniq_values() y`, `* | stats by (x) uniq_values(*) as y`) f(`* | stats by(x) uniq_values() limit 1_000 AS y`, `* | stats by (x) uniq_values(*) limit 1000 as y`)
f(`* | stats by(x) uniq_values(a,*,b) y`, `* | stats by (x) uniq_values(*) as y`) f(`* | stats by(x) uniq_values(a,*,b) y`, `* | stats by (x) uniq_values(*) as y`)
// stats pipe multiple funcs // stats pipe multiple funcs
@ -1230,6 +1231,8 @@ func TestParseQueryFailure(t *testing.T) {
// invalid stats uniq_values // invalid stats uniq_values
f(`foo | stats uniq_values`) f(`foo | stats uniq_values`)
f(`foo | stats uniq_values()`) f(`foo | stats uniq_values()`)
f(`foo | stats uniq_values() limit`)
f(`foo | stats uniq_values(a) limit foo`)
// invalid stats grouping fields // invalid stats grouping fields
f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:bar) count() baz`)

View file

@ -1,6 +1,7 @@
package logstorage package logstorage
import ( import (
"fmt"
"slices" "slices"
"strconv" "strconv"
"strings" "strings"
@ -12,10 +13,15 @@ import (
type statsUniqValues struct { type statsUniqValues struct {
fields []string fields []string
containsStar bool containsStar bool
limit uint64
} }
func (su *statsUniqValues) String() string { func (su *statsUniqValues) String() string {
return "uniq_values(" + fieldNamesString(su.fields) + ")" s := "uniq_values(" + fieldNamesString(su.fields) + ")"
if su.limit > 0 {
s += fmt.Sprintf(" limit %d", su.limit)
}
return s
} }
func (su *statsUniqValues) neededFields() []string { func (su *statsUniqValues) neededFields() []string {
@ -38,6 +44,11 @@ type statsUniqValuesProcessor struct {
} }
func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int { func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int {
if sup.limitReached() {
// Limit on the number of unique values has been reached
return 0
}
stateSizeIncrease := 0 stateSizeIncrease := 0
if sup.su.containsStar { if sup.su.containsStar {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
@ -106,6 +117,11 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC
} }
func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if sup.limitReached() {
// Limit on the number of unique values has been reached
return 0
}
stateSizeIncrease := 0 stateSizeIncrease := 0
if sup.su.containsStar { if sup.su.containsStar {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
@ -168,6 +184,10 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum
} }
func (sup *statsUniqValuesProcessor) mergeState(sfp statsProcessor) { func (sup *statsUniqValuesProcessor) mergeState(sfp statsProcessor) {
if sup.limitReached() {
return
}
src := sfp.(*statsUniqValuesProcessor) src := sfp.(*statsUniqValuesProcessor)
m := sup.m m := sup.m
for k := range src.m { for k := range src.m {
@ -211,6 +231,10 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string {
return bytesutil.ToUnsafeString(b) return bytesutil.ToUnsafeString(b)
} }
func (sup *statsUniqValuesProcessor) limitReached() bool {
return sup.su.limit > 0 && uint64(len(sup.m)) >= sup.su.limit
}
func compareValues(a, b string) int { func compareValues(a, b string) int {
fA, okA := tryParseFloat64(a) fA, okA := tryParseFloat64(a)
fB, okB := tryParseFloat64(b) fB, okB := tryParseFloat64(b)
@ -241,5 +265,14 @@ func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) {
fields: fields, fields: fields,
containsStar: slices.Contains(fields, "*"), containsStar: slices.Contains(fields, "*"),
} }
if lex.isKeyword("limit") {
lex.nextToken()
n, ok := tryParseUint64(lex.token)
if !ok {
return nil, fmt.Errorf("cannot parse 'limit %s' for 'uniq_values': %w", lex.token, err)
}
lex.nextToken()
su.limit = n
}
return su, nil return su, nil
} }