From 7a623c225fca6f21e6c74a6d95e9c1d86ec624a3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 29 Oct 2024 16:43:07 +0100 Subject: [PATCH] lib/logstorage: follow-up for af831a6c906158f371f1b6810706fa0a54b78386 Sync the code between top and sort pipes regarding the code related to rank. --- docs/VictoriaLogs/LogsQL.md | 6 +++-- lib/logstorage/pipe_sort.go | 30 ++++++++++------------ lib/logstorage/pipe_sort_test.go | 2 +- lib/logstorage/pipe_sort_topk.go | 10 ++++---- lib/logstorage/pipe_top.go | 44 +++++++++++++++++++++----------- lib/logstorage/pipe_top_test.go | 15 +++++------ 6 files changed, 59 insertions(+), 48 deletions(-) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index f610d909c..9e71c20c0 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -2169,6 +2169,7 @@ It is recommended limiting the number of logs before sorting with the following See also: +- [`top` pipe](#top-pipe) - [`stats` pipe](#stats-pipe) - [`limit` pipe](#limit-pipe) - [`offset` pipe](#offset-pipe) @@ -2403,19 +2404,20 @@ _time:5m | fields ip | top It is possible to set `rank` field per each returned entry for `top` pipe by adding `with rank`. For example, the following query sets the `rank` field per each returned `ip`: ```logsql -_time:5m | top 10 by (ip) with rank +_time:5m | top 10 by (ip) rank ``` The `rank` field can have other name. For example, the following query uses the `position` field name instead of `rank` field name in the output: ```logsql -_time:5m | top 10 by (ip) with rank as position +_time:5m | top 10 by (ip) rank as position ``` See also: - [`uniq` pipe](#uniq-pipe) - [`stats` pipe](#stats-pipe) +- [`sort` pipe](#sort-pipe) ### uniq pipe diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index cffb12c56..7a0567339 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -36,7 +36,7 @@ type pipeSort struct { limit uint64 // The name of the field to store the row rank. - rankName string + rankFieldName string } func (ps *pipeSort) String() string { @@ -57,8 +57,8 @@ func (ps *pipeSort) String() string { if ps.limit > 0 { s += fmt.Sprintf(" limit %d", ps.limit) } - if ps.rankName != "" { - s += " rank as " + quoteTokenIfNeeded(ps.rankName) + if ps.rankFieldName != "" { + s += rankFieldNameString(ps.rankFieldName) } return s } @@ -72,10 +72,10 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) { return } - if ps.rankName != "" { - neededFields.remove(ps.rankName) + if ps.rankFieldName != "" { + neededFields.remove(ps.rankFieldName) if neededFields.contains("*") { - unneededFields.add(ps.rankName) + unneededFields.add(ps.rankFieldName) } } @@ -533,9 +533,9 @@ type pipeSortWriteContext struct { func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { ps := shard.ps - rankName := ps.rankName + rankFieldName := ps.rankFieldName rankFields := 0 - if rankName != "" { + if rankFieldName != "" { rankFields = 1 } @@ -567,8 +567,8 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { wctx.flush() rcs = wctx.rcs[:0] - if rankName != "" { - rcs = appendResultColumnWithName(rcs, rankName) + if rankFieldName != "" { + rcs = appendResultColumnWithName(rcs, rankFieldName) } for _, bf := range byFields { rcs = appendResultColumnWithName(rcs, bf.name) @@ -579,7 +579,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { wctx.rcs = rcs } - if rankName != "" { + if rankFieldName != "" { bufLen := len(wctx.buf) wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten) v := bytesutil.ToUnsafeString(wctx.buf[bufLen:]) @@ -798,15 +798,11 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) { } ps.limit = n case lex.isKeyword("rank"): - lex.nextToken() - if lex.isKeyword("as") { - lex.nextToken() - } - rankName, err := getCompoundToken(lex) + rankFieldName, err := parseRankFieldName(lex) if err != nil { return nil, fmt.Errorf("cannot read rank field name: %s", err) } - ps.rankName = rankName + ps.rankFieldName = rankFieldName default: return &ps, nil } diff --git a/lib/logstorage/pipe_sort_test.go b/lib/logstorage/pipe_sort_test.go index 0385a51c0..8d2dbf27b 100644 --- a/lib/logstorage/pipe_sort_test.go +++ b/lib/logstorage/pipe_sort_test.go @@ -11,6 +11,7 @@ func TestParsePipeSortSuccess(t *testing.T) { } f(`sort`) + f(`sort rank`) f(`sort rank as foo`) f(`sort by (x)`) f(`sort by (x) limit 10`) @@ -26,7 +27,6 @@ func TestParsePipeSortFailure(t *testing.T) { } f(`sort a`) - f(`sort rank`) f(`sort by`) f(`sort by(x) foo`) f(`sort by(x) limit`) diff --git a/lib/logstorage/pipe_sort_topk.go b/lib/logstorage/pipe_sort_topk.go index ccccd6e0f..29cd24e8a 100644 --- a/lib/logstorage/pipe_sort_topk.go +++ b/lib/logstorage/pipe_sort_topk.go @@ -440,9 +440,9 @@ type pipeTopkWriteContext struct { func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool { ps := shard.ps - rankName := ps.rankName + rankFieldName := ps.rankFieldName rankFields := 0 - if rankName != "" { + if rankFieldName != "" { rankFields = 1 } @@ -476,8 +476,8 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo wctx.flush() rcs = wctx.rcs[:0] - if rankName != "" { - rcs = appendResultColumnWithName(rcs, rankName) + if rankFieldName != "" { + rcs = appendResultColumnWithName(rcs, rankFieldName) } for _, bf := range byFields { rcs = appendResultColumnWithName(rcs, bf.name) @@ -488,7 +488,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo wctx.rcs = rcs } - if rankName != "" { + if rankFieldName != "" { bufLen := len(wctx.buf) wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten) v := bytesutil.ToUnsafeString(wctx.buf[bufLen:]) diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index 915f35a83..e33a57840 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -51,10 +51,7 @@ func (pt *pipeTop) String() string { s += " by (" + fieldNamesString(pt.byFields) + ")" } if pt.rankFieldName != "" { - s += " with rank" - if pt.rankFieldName != "rank" { - s += " as " + pt.rankFieldName - } + s += rankFieldNameString(pt.rankFieldName) } return s } @@ -685,26 +682,43 @@ func parsePipeTop(lex *lexer) (*pipeTop, error) { hitsFieldName: hitsFieldName, } - if !lex.isKeyword("with") { - return pt, nil + if lex.isKeyword("rank") { + rankFieldName, err := parseRankFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse rank field name in [%s]: %w", pt, err) + } + pt.rankFieldName = rankFieldName } + return pt, nil +} - lex.nextToken() +func parseRankFieldName(lex *lexer) (string, error) { if !lex.isKeyword("rank") { - return nil, fmt.Errorf("missing 'rank' word after 'with' in [%s]", pt) + return "", fmt.Errorf("unexpected token: %q; want 'rank'", lex.token) } lex.nextToken() - pt.rankFieldName = "rank" + + rankFieldName := "rank" if lex.isKeyword("as") { lex.nextToken() if lex.isKeyword("", "|", ")", "(") { - return nil, fmt.Errorf("missing rank name in [%s as]", pt) + return "", fmt.Errorf("missing rank name") } } - if !lex.isKeyword("", "|", ")") { - pt.rankFieldName = lex.token - lex.nextToken() + if !lex.isKeyword("", "|", ")", "limit") { + s, err := getCompoundToken(lex) + if err != nil { + return "", err + } + rankFieldName = s } - - return pt, nil + return rankFieldName, nil +} + +func rankFieldNameString(rankFieldName string) string { + s := " rank" + if rankFieldName != "rank" { + s += " as " + rankFieldName + } + return s } diff --git a/lib/logstorage/pipe_top_test.go b/lib/logstorage/pipe_top_test.go index 42c2d6be5..a7ea9f569 100644 --- a/lib/logstorage/pipe_top_test.go +++ b/lib/logstorage/pipe_top_test.go @@ -11,15 +11,15 @@ func TestParsePipeTopSuccess(t *testing.T) { } f(`top`) - f(`top with rank`) + f(`top rank`) f(`top 5`) - f(`top 5 with rank as foo`) + f(`top 5 rank as foo`) f(`top by (x)`) f(`top 5 by (x)`) f(`top by (x, y)`) f(`top 5 by (x, y)`) - f(`top by (x) with rank`) - f(`top by (x) with rank as foo`) + f(`top by (x) rank`) + f(`top by (x) rank as foo`) } func TestParsePipeTopFailure(t *testing.T) { @@ -34,8 +34,7 @@ func TestParsePipeTopFailure(t *testing.T) { f(`top 5foo`) f(`top foo`) f(`top by`) - f(`top (x) with`) - f(`top (x) with rank as`) + f(`top (x) rank a b`) } func TestPipeTop(t *testing.T) { @@ -72,7 +71,7 @@ func TestPipeTop(t *testing.T) { }, }) - f("top with rank", [][]Field{ + f("top rank", [][]Field{ { {"a", `2`}, {"b", `3`}, @@ -170,7 +169,7 @@ func TestPipeTop(t *testing.T) { }, }) - f("top by (b) with rank as x", [][]Field{ + f("top by (b) rank as x", [][]Field{ { {"a", `2`}, {"b", `3`},