From b4fd20f17a8903aa96045d5183a230c2520dab8f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 10 May 2024 16:14:42 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 15 ++++++++++++-- lib/logstorage/parser_test.go | 8 +++++++- lib/logstorage/pipe_stats.go | 4 ++-- lib/logstorage/stats_count_uniq.go | 33 +++++++++++++++++++++++++++++- 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 34044bb40..ece64fba2 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1414,6 +1414,15 @@ over the last 5 minutes: _time:5m | stats count_uniq(host, path) unique_host_path_pairs ``` +Every unique value is stored in memory during query execution. Big number of unique values may require a lot of memory. +Sometimes it is needed to know whether the number of unique values reaches some limit. In this case add `limit N` just after `count_uniq(...)` +for limiting the number of counted unique values up to `N`, while limiting the maximum memory usage. For example, the following query counts +up to `1_000_000` unique values for the `ip` field: + +```logsql +_time:5m | stats count_uniq(ip) limit 1_000_000 as ips_1_000_000 +``` + See also: - [`uniq_values`](#uniq_values-stats) @@ -1491,9 +1500,11 @@ over logs for the last 5 minutes: _time:5m | stats uniq_values(ip) unique_ips ``` -It is possible to specify the limit on the number of returned unique values by adding `limit N` just after `uniq_values()` and before the resulting column name. +Every unique value is stored in memory during query execution. Big number of unique values may require a lot of memory. Sometimes it is enough to return +only a subset of unique values. In this case add `limit N` after `uniq_values(...)` in order to limit the number of returned unique values to `N`, +while limiting the maximum memory usage. For example, the following query returns up to `100` unique values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -over the logs for the last 5 minutes. Note that it may return arbitrary subset of unique `ip` values: +over the logs for the last 5 minutes. Note that arbitrary subset of unique `ip` values is returned every time: ```logsql _time:5m | stats uniq_values(ip) limit 100 as unique_ips_100 diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 542f42179..6b01168db 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -905,7 +905,7 @@ func TestParseQuerySuccess(t *testing.T) { // stats pipe count_uniq f(`* | stats count_uniq(foo) bar`, `* | stats count_uniq(foo) as bar`) - f(`* | stats by(x, y) count_uniq(foo,bar) as baz`, `* | stats by (x, y) count_uniq(foo, bar) as baz`) + f(`* | stats by(x, y) count_uniq(foo,bar) LiMit 10 As baz`, `* | stats by (x, y) count_uniq(foo, bar) limit 10 as baz`) f(`* | stats by(x) count_uniq(*) z`, `* | stats by (x) count_uniq(*) as z`) f(`* | stats by(x) count_uniq() z`, `* | stats by (x) count_uniq(*) as z`) f(`* | stats by(x) count_uniq(a,*,b) z`, `* | stats by (x) count_uniq(*) as z`) @@ -1227,12 +1227,18 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats count_uniq f(`foo | stats count_uniq`) f(`foo | stats count_uniq()`) + f(`foo | stats count_uniq() limit`) + f(`foo | stats count_uniq() limit foo`) + f(`foo | stats count_uniq() limit 0.5`) + f(`foo | stats count_uniq() limit -1`) // invalid stats uniq_values f(`foo | stats uniq_values`) f(`foo | stats uniq_values()`) f(`foo | stats uniq_values() limit`) f(`foo | stats uniq_values(a) limit foo`) + f(`foo | stats uniq_values(a) limit 0.5`) + f(`foo | stats uniq_values(a) limit -1`) // invalid stats grouping fields f(`foo | stats by(foo:bar) count() baz`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 1bdc06446..bcc34c47a 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -517,7 +517,7 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { resultName, err := parseResultName(lex) if err != nil { - return nil, "", fmt.Errorf("cannot parse result name: %w", err) + return nil, "", fmt.Errorf("cannot parse result name for %s: %w", sf, err) } return sf, resultName, nil } @@ -528,7 +528,7 @@ func parseResultName(lex *lexer) (string, error) { } resultName, err := parseFieldName(lex) if err != nil { - return "", fmt.Errorf("cannot parse 'as' field name: %w", err) + return "", err } return resultName, nil } diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 931535667..c0a6e0fe4 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -1,6 +1,7 @@ package logstorage import ( + "fmt" "slices" "strconv" "unsafe" @@ -12,10 +13,15 @@ import ( type statsCountUniq struct { fields []string containsStar bool + limit uint64 } func (su *statsCountUniq) String() string { - return "count_uniq(" + fieldNamesString(su.fields) + ")" + s := "count_uniq(" + fieldNamesString(su.fields) + ")" + if su.limit > 0 { + s += fmt.Sprintf(" limit %d", su.limit) + } + return s } func (su *statsCountUniq) neededFields() []string { @@ -41,6 +47,10 @@ type statsCountUniqProcessor struct { } func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { + if sup.limitReached() { + return 0 + } + fields := sup.su.fields m := sup.m @@ -216,6 +226,10 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { } func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if sup.limitReached() { + return 0 + } + fields := sup.su.fields m := sup.m @@ -340,6 +354,10 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in } func (sup *statsCountUniqProcessor) mergeState(sfp statsProcessor) { + if sup.limitReached() { + return + } + src := sfp.(*statsCountUniqProcessor) m := sup.m for k := range src.m { @@ -354,6 +372,10 @@ func (sup *statsCountUniqProcessor) finalizeStats() string { return strconv.FormatUint(n, 10) } +func (sup *statsCountUniqProcessor) limitReached() bool { + return sup.su.limit > 0 && uint64(len(sup.m)) >= sup.su.limit +} + func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) { fields, err := parseFieldNamesForStatsFunc(lex, "count_uniq") if err != nil { @@ -363,5 +385,14 @@ func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) { fields: fields, containsStar: slices.Contains(fields, "*"), } + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s' for 'count_uniq': %w", lex.token, err) + } + lex.nextToken() + su.limit = n + } return su, nil }