From 54cc81602e9890f1e7159fbb5975029f60c65909 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 10 May 2024 15:51:39 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 8 +++++++ lib/logstorage/parser_test.go | 5 ++++- lib/logstorage/stats_uniq_values.go | 35 ++++++++++++++++++++++++++++- 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 8b2d7f2ae..34044bb40 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1491,6 +1491,14 @@ over logs for the last 5 minutes: _time:5m | stats uniq_values(ip) unique_ips ``` +It is possible to specify the limit on the number of returned unique values by adding `limit N` just after `uniq_values()` and before the resulting column name. +For example, the following query returns up to `100` unique values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +over the logs for the last 5 minutes. Note that it may return arbitrary subset of unique `ip` values: + +```logsql +_time:5m | stats uniq_values(ip) limit 100 as unique_ips_100 +``` + See also: - [`count_uniq`](#count_uniq-stats) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index de68cc1ee..542f42179 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -912,9 +912,10 @@ func TestParseQuerySuccess(t *testing.T) { // stats pipe uniq_values f(`* | stats uniq_values(foo) bar`, `* | stats uniq_values(foo) as bar`) + f(`* | stats uniq_values(foo) limit 10 bar`, `* | stats uniq_values(foo) limit 10 as bar`) f(`* | stats by(x, y) uniq_values(foo, bar) as baz`, `* | stats by (x, y) uniq_values(foo, bar) as baz`) f(`* | stats by(x) uniq_values(*) y`, `* | stats by (x) uniq_values(*) as y`) - f(`* | stats by(x) uniq_values() y`, `* | stats by (x) uniq_values(*) as y`) + f(`* | stats by(x) uniq_values() limit 1_000 AS y`, `* | stats by (x) uniq_values(*) limit 1000 as y`) f(`* | stats by(x) uniq_values(a,*,b) y`, `* | stats by (x) uniq_values(*) as y`) // stats pipe multiple funcs @@ -1230,6 +1231,8 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats uniq_values f(`foo | stats uniq_values`) f(`foo | stats uniq_values()`) + f(`foo | stats uniq_values() limit`) + f(`foo | stats uniq_values(a) limit foo`) // invalid stats grouping fields f(`foo | stats by(foo:bar) count() baz`) diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index b2376cb5d..1da95e1ea 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -1,6 +1,7 @@ package logstorage import ( + "fmt" "slices" "strconv" "strings" @@ -12,10 +13,15 @@ import ( type statsUniqValues struct { fields []string containsStar bool + limit uint64 } func (su *statsUniqValues) String() string { - return "uniq_values(" + fieldNamesString(su.fields) + ")" + s := "uniq_values(" + fieldNamesString(su.fields) + ")" + if su.limit > 0 { + s += fmt.Sprintf(" limit %d", su.limit) + } + return s } func (su *statsUniqValues) neededFields() []string { @@ -38,6 +44,11 @@ type statsUniqValuesProcessor struct { } func (sup *statsUniqValuesProcessor) updateStatsForAllRows(br *blockResult) int { + if sup.limitReached() { + // Limit on the number of unique values has been reached + return 0 + } + stateSizeIncrease := 0 if sup.su.containsStar { for _, c := range br.getColumns() { @@ -106,6 +117,11 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC } func (sup *statsUniqValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if sup.limitReached() { + // Limit on the number of unique values has been reached + return 0 + } + stateSizeIncrease := 0 if sup.su.containsStar { for _, c := range br.getColumns() { @@ -168,6 +184,10 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum } func (sup *statsUniqValuesProcessor) mergeState(sfp statsProcessor) { + if sup.limitReached() { + return + } + src := sfp.(*statsUniqValuesProcessor) m := sup.m for k := range src.m { @@ -211,6 +231,10 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string { return bytesutil.ToUnsafeString(b) } +func (sup *statsUniqValuesProcessor) limitReached() bool { + return sup.su.limit > 0 && uint64(len(sup.m)) >= sup.su.limit +} + func compareValues(a, b string) int { fA, okA := tryParseFloat64(a) fB, okB := tryParseFloat64(b) @@ -241,5 +265,14 @@ func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) { fields: fields, containsStar: slices.Contains(fields, "*"), } + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s' for 'uniq_values': %w", lex.token, err) + } + lex.nextToken() + su.limit = n + } return su, nil }