From b7c062ac61d75765c4fe92f6389541102b97cd4c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 30 May 2024 14:26:05 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 2 ++ docs/VictoriaLogs/LogsQL.md | 11 +++++++-- lib/logstorage/parser_test.go | 33 ++++++++++++++++--------- lib/logstorage/pipe_stats.go | 45 ++++++++++++++++++++++------------ 4 files changed, 61 insertions(+), 30 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 1c2cfbc8b..d02c15869 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: allow omitting result name in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). In this case the result name is automatically set to string representation of the corresponding [stats function expression](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). For example, `_time:5m | count(*)` is valid [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) now. It is equivalent to `_time:5m | stats count(*) as "count(*)"`. + * BUGFIX: properly calculate the number of matching rows in `* | field_values x | stats count() rows` and in `* | unroll (x) | stats count() rows` queries. ## [v0.14.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.14.0-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index a0f9c3c73..ea48a5e1c 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1885,7 +1885,7 @@ See also: uses [`count` stats function](#count-stats) for calculating the number of logs for the last 5 minutes: ```logsql -_time:5m | stats count() logs_total +_time:5m | stats count() as logs_total ``` `| stats ...` pipe has the following basic format: @@ -1909,12 +1909,19 @@ For example, the following query calculates the following stats for logs over th _time:5m | stats count() logs_total, count_uniq(_stream) streams_total ``` -It is allowed to omit `stats` prefix for convenience. So the following query is equivalent to the previous one: +It is allowed omitting `stats` prefix for convenience. So the following query is equivalent to the previous one: ```logsql _time:5m | count() logs_total, count_uniq(_stream) streams_total ``` +It is allowed omitting the result name. In this case the result name equals to the string representation of the used [stats function](#stats-pipe-functions). +For example, the following query returns the same stats as the previous one, but gives uses `count()` and `count_uniq(_stream)` names for the returned fields: + +```logsql +_time:5m | count(), count_uniq(_stream) +``` + See also: - [stats by fields](#stats-by-fields) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index a1f177ff5..2ab4aaf1c 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -909,15 +909,18 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats count(foo,*,bar) x`, `* | stats count(*) as x`) f(`* | stats count('') foo`, `* | stats count(_msg) as foo`) f(`* | stats count(foo) ''`, `* | stats count(foo) as _msg`) + f(`* | count()`, `* | stats count(*) as "count(*)"`) // stats pipe count_empty f(`* | stats count_empty() x`, `* | stats count_empty(*) as x`) - f(`* | stats by (x, y) count_empty(a,b,c) x`, `* | stats by (x, y) count_empty(a, b, c) as x`) + f(`* | stats by (x, y) count_empty(a,b,c) z`, `* | stats by (x, y) count_empty(a, b, c) as z`) + f(`* | count_empty()`, `* | stats count_empty(*) as "count_empty(*)"`) // stats pipe sum f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`) f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`) f(`* | stats sum() x`, `* | stats sum(*) as x`) + f(`* | sum()`, `* | stats sum(*) as "sum(*)"`) f(`* | stats sum(*) x`, `* | stats sum(*) as x`) f(`* | stats sum(foo,*,bar) x`, `* | stats sum(*) as x`) @@ -925,6 +928,7 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Max(foo) bar`, `* | stats max(foo) as bar`) f(`* | stats BY(x, y, ) MAX(foo,bar,) bar`, `* | stats by (x, y) max(foo, bar) as bar`) f(`* | stats max() x`, `* | stats max(*) as x`) + f(`* | max()`, `* | stats max(*) as "max(*)"`) f(`* | stats max(*) x`, `* | stats max(*) as x`) f(`* | stats max(foo,*,bar) x`, `* | stats max(*) as x`) @@ -932,22 +936,26 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`) f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`) f(`* | stats min() x`, `* | stats min(*) as x`) + f(`* | min()`, `* | stats min(*) as "min(*)"`) f(`* | stats min(*) x`, `* | stats min(*) as x`) f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`) // stats pipe fields_min f(`* | stats fields_Min(foo) bar`, `* | stats fields_min(foo) as bar`) + f(`* | fields_Min(foo)`, `* | stats fields_min(foo) as "fields_min(foo)"`) f(`* | stats BY(x, y, ) fields_MIN(foo,bar,) bar`, `* | stats by (x, y) fields_min(foo, bar) as bar`) // stats pipe avg f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`) f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`) f(`* | stats avg() x`, `* | stats avg(*) as x`) + f(`* | avg()`, `* | stats avg(*) as "avg(*)"`) f(`* | stats avg(*) x`, `* | stats avg(*) as x`) f(`* | stats avg(foo,*,bar) x`, `* | stats avg(*) as x`) // stats pipe count_uniq f(`* | stats count_uniq(foo) bar`, `* | stats count_uniq(foo) as bar`) + f(`* | count_uniq(foo)`, `* | stats count_uniq(foo) as "count_uniq(foo)"`) f(`* | stats by(x, y) count_uniq(foo,bar) LiMit 10 As baz`, `* | stats by (x, y) count_uniq(foo, bar) limit 10 as baz`) f(`* | stats by(x) count_uniq(*) z`, `* | stats by (x) count_uniq(*) as z`) f(`* | stats by(x) count_uniq() z`, `* | stats by (x) count_uniq(*) as z`) @@ -955,6 +963,7 @@ func TestParseQuerySuccess(t *testing.T) { // stats pipe uniq_values f(`* | stats uniq_values(foo) bar`, `* | stats uniq_values(foo) as bar`) + f(`* | uniq_values(foo)`, `* | stats uniq_values(foo) as "uniq_values(foo)"`) f(`* | stats uniq_values(foo) limit 10 bar`, `* | stats uniq_values(foo) limit 10 as bar`) f(`* | stats by(x, y) uniq_values(foo, bar) as baz`, `* | stats by (x, y) uniq_values(foo, bar) as baz`) f(`* | stats by(x) uniq_values(*) y`, `* | stats by (x) uniq_values(*) as y`) @@ -963,6 +972,7 @@ func TestParseQuerySuccess(t *testing.T) { // stats pipe values f(`* | stats values(foo) bar`, `* | stats values(foo) as bar`) + f(`* | values(foo)`, `* | stats values(foo) as "values(foo)"`) f(`* | stats values(foo) limit 10 bar`, `* | stats values(foo) limit 10 as bar`) f(`* | stats by(x, y) values(foo, bar) as baz`, `* | stats by (x, y) values(foo, bar) as baz`) f(`* | stats by(x) values(*) y`, `* | stats by (x) values(*) as y`) @@ -973,6 +983,7 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats Sum_len(foo) bar`, `* | stats sum_len(foo) as bar`) f(`* | stats BY(x, y, ) SUM_Len(foo,bar,) bar`, `* | stats by (x, y) sum_len(foo, bar) as bar`) f(`* | stats sum_len() x`, `* | stats sum_len(*) as x`) + f(`* | sum_len()`, `* | stats sum_len(*) as "sum_len(*)"`) f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`) f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`) @@ -981,12 +992,14 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`) f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`) f(`* | stats quantile(0.99) bar`, `* | stats quantile(0.99) as bar`) + f(`* | quantile(0.99)`, `* | stats quantile(0.99) as "quantile(0.99)"`) f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99) as bar`) // stats pipe median f(`* | stats Median(foo) bar`, `* | stats median(foo) as bar`) f(`* | stats BY(x, y, ) MEDIAN(foo,bar,) bar`, `* | stats by (x, y) median(foo, bar) as bar`) f(`* | stats median() x`, `* | stats median(*) as x`) + f(`* | median()`, `* | stats median(*) as "median(*)"`) f(`* | stats median(*) x`, `* | stats median(*) as x`) f(`* | stats median(foo,*,bar) x`, `* | stats median(*) as x`) @@ -995,7 +1008,7 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`) // stats pipe with grouping buckets - f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count(*) as foo`) + f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as bar`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count(*) as bar`) f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count(*) as foo`) f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`) f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`) @@ -1357,7 +1370,6 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats count(`) f(`foo | stats count bar`) f(`foo | stats count(bar`) - f(`foo | stats count(bar)`) f(`foo | stats count() as`) f(`foo | stats count() as |`) @@ -1368,27 +1380,21 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats sum f(`foo | stats sum`) - f(`foo | stats sum()`) // invalid stats max f(`foo | stats max`) - f(`foo | stats max()`) // invalid stats min f(`foo | stats min`) - f(`foo | stats min()`) // invalid stats min f(`foo | stats fields_min`) - f(`foo | stats fields_min()`) // invalid stats avg f(`foo | stats avg`) - f(`foo | stats avg()`) // invalid stats count_uniq f(`foo | stats count_uniq`) - f(`foo | stats count_uniq()`) f(`foo | stats count_uniq() limit`) f(`foo | stats count_uniq() limit foo`) f(`foo | stats count_uniq() limit 0.5`) @@ -1396,7 +1402,6 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats uniq_values f(`foo | stats uniq_values`) - f(`foo | stats uniq_values()`) f(`foo | stats uniq_values() limit`) f(`foo | stats uniq_values(a) limit foo`) f(`foo | stats uniq_values(a) limit 0.5`) @@ -1404,7 +1409,6 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats values f(`foo | stats values`) - f(`foo | stats values()`) f(`foo | stats values() limit`) f(`foo | stats values(a) limit foo`) f(`foo | stats values(a) limit 0.5`) @@ -1412,7 +1416,6 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats sum_len f(`foo | stats sum_len`) - f(`foo | stats sum_len()`) // invalid stats quantile f(`foo | stats quantile`) @@ -1436,6 +1439,12 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats by(bar,`) f(`foo | stats by(bar)`) + // duplicate stats result names + f(`foo | stats min() x, max() x`) + + // stats result names identical to by fields + f(`foo | stats by (x) count() x`) + // invalid sort pipe f(`foo | sort bar`) f(`foo | sort by`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 31506bfa3..b73744fbb 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -545,9 +545,17 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { ps.byFields = bfs } + seenByFields := make(map[string]*byStatsField, len(ps.byFields)) + for _, bf := range ps.byFields { + seenByFields[bf.name] = bf + } + + seenResultNames := make(map[string]statsFunc) + var funcs []pipeStatsFunc for { var f pipeStatsFunc + sf, err := parseStatsFunc(lex) if err != nil { return nil, err @@ -557,15 +565,31 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { if lex.isKeyword("if") { iff, err := parseIfFilter(lex) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot parse 'if' filter for [%s]: %w", sf, err) } f.iff = iff } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err) + resultName := "" + if lex.isKeyword(",", "|", ")", "") { + resultName = sf.String() + } else { + if lex.isKeyword("as") { + lex.nextToken() + } + fieldName, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err) + } + resultName = fieldName } + if bf := seenByFields[resultName]; bf != nil { + return nil, fmt.Errorf("the %q is used as 'by' field [%s], so it cannot be used as result name for [%s]", resultName, bf, sf) + } + if sfPrev := seenResultNames[resultName]; sfPrev != nil { + return nil, fmt.Errorf("cannot use identical result name %q for [%s] and [%s]", resultName, sfPrev, sf) + } + seenResultNames[resultName] = sf f.resultName = resultName funcs = append(funcs, f) @@ -575,7 +599,7 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) { return &ps, nil } if !lex.isKeyword(",") { - return nil, fmt.Errorf("unexpected token %q; want ',', '|' or ')'", lex.token) + return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", sf, lex.token) } lex.nextToken() } @@ -672,17 +696,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { } } -func parseResultName(lex *lexer) (string, error) { - if lex.isKeyword("as") { - lex.nextToken() - } - resultName, err := parseFieldName(lex) - if err != nil { - return "", err - } - return resultName, nil -} - var zeroByStatsField = &byStatsField{} // byStatsField represents 'by (...)' part of the pipeStats.