From 3c06d083ea90715b0c8d3ba71fb7dad3ea5e96c6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 29 Oct 2024 15:37:07 +0100 Subject: [PATCH] lib/logstorage: add an ability to return rank from `top` pipe results --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 12 +++++++ lib/logstorage/pipe_top.go | 54 +++++++++++++++++++++++++--- lib/logstorage/pipe_top_test.go | 63 +++++++++++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 4 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 36affa5b7..9ce8cbbe6 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: improve performance for queries over large volume of logs with big number of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) (aka `wide events`). * FEATURE: improve performance for [`/select/logsql/field_values` HTTP endpoint](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values). * FEATURE: improve performance for [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe) when it is applied directly to [log filter](https://docs.victoriametrics.com/victorialogs/logsql/#filters). +* FEATURE: add an ability to return `rank` field from [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe). For example, the following query returns `1..5` rank per each returned `ip` with the biggest number of logs over the last 5 minute: `_time:5m | top 5 by (ip) with rank`. * BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix various glitches with updating query responses. The issue was introduced in [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7279). diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index be981f085..f610d909c 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -2400,6 +2400,18 @@ For example, the following query is equivalent to the previous one: _time:5m | fields ip | top ``` +It is possible to set `rank` field per each returned entry for `top` pipe by adding `with rank`. For example, the following query sets the `rank` field per each returned `ip`: + +```logsql +_time:5m | top 10 by (ip) with rank +``` + +The `rank` field can have other name. For example, the following query uses the `position` field name instead of `rank` field name in the output: + +```logsql +_time:5m | top 10 by (ip) with rank as position +``` + See also: - [`uniq` pipe](#uniq-pipe) diff --git a/lib/logstorage/pipe_top.go b/lib/logstorage/pipe_top.go index df1f59a3c..915f35a83 100644 --- a/lib/logstorage/pipe_top.go +++ b/lib/logstorage/pipe_top.go @@ -5,6 +5,7 @@ import ( "fmt" "slices" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -34,8 +35,11 @@ type pipeTop struct { // limitStr is string representation of the limit. limitStr string - // if hitsFieldName isn't empty, then the number of hits per each unique value is returned in this field. + // the number of hits per each unique value is returned in this field. hitsFieldName string + + // if rankFieldName isn't empty, then the rank per each unique value is returned in this field. + rankFieldName string } func (pt *pipeTop) String() string { @@ -46,6 +50,12 @@ func (pt *pipeTop) String() string { if len(pt.byFields) > 0 { s += " by (" + fieldNamesString(pt.byFields) + ")" } + if pt.rankFieldName != "" { + s += " with rank" + if pt.rankFieldName != "rank" { + s += " as " + pt.rankFieldName + } + } return s } @@ -273,8 +283,20 @@ func (ptp *pipeTopProcessor) flush() error { return dst } + addRankField := func(dst []Field, rank int) []Field { + if ptp.pt.rankFieldName == "" { + return dst + } + rankStr := strconv.Itoa(rank + 1) + dst = append(dst, Field{ + Name: ptp.pt.rankFieldName, + Value: rankStr, + }) + return dst + } + if len(byFields) == 0 { - for _, e := range entries { + for i, e := range entries { if needStop(ptp.stopCh) { return nil } @@ -300,11 +322,12 @@ func (ptp *pipeTopProcessor) flush() error { }) } rowFields = addHitsField(rowFields, e.hits) + rowFields = addRankField(rowFields, i) wctx.writeRow(rowFields) } } else if len(byFields) == 1 { fieldName := byFields[0] - for _, e := range entries { + for i, e := range entries { if needStop(ptp.stopCh) { return nil } @@ -314,10 +337,11 @@ func (ptp *pipeTopProcessor) flush() error { Value: e.k, }) rowFields = addHitsField(rowFields, e.hits) + rowFields = addRankField(rowFields, i) wctx.writeRow(rowFields) } } else { - for _, e := range entries { + for i, e := range entries { if needStop(ptp.stopCh) { return nil } @@ -339,6 +363,7 @@ func (ptp *pipeTopProcessor) flush() error { fieldIdx++ } rowFields = addHitsField(rowFields, e.hits) + rowFields = addRankField(rowFields, i) wctx.writeRow(rowFields) } } @@ -660,5 +685,26 @@ func parsePipeTop(lex *lexer) (*pipeTop, error) { hitsFieldName: hitsFieldName, } + if !lex.isKeyword("with") { + return pt, nil + } + + lex.nextToken() + if !lex.isKeyword("rank") { + return nil, fmt.Errorf("missing 'rank' word after 'with' in [%s]", pt) + } + lex.nextToken() + pt.rankFieldName = "rank" + if lex.isKeyword("as") { + lex.nextToken() + if lex.isKeyword("", "|", ")", "(") { + return nil, fmt.Errorf("missing rank name in [%s as]", pt) + } + } + if !lex.isKeyword("", "|", ")") { + pt.rankFieldName = lex.token + lex.nextToken() + } + return pt, nil } diff --git a/lib/logstorage/pipe_top_test.go b/lib/logstorage/pipe_top_test.go index edcd4db46..42c2d6be5 100644 --- a/lib/logstorage/pipe_top_test.go +++ b/lib/logstorage/pipe_top_test.go @@ -11,11 +11,15 @@ func TestParsePipeTopSuccess(t *testing.T) { } f(`top`) + f(`top with rank`) f(`top 5`) + f(`top 5 with rank as foo`) f(`top by (x)`) f(`top 5 by (x)`) f(`top by (x, y)`) f(`top 5 by (x, y)`) + f(`top by (x) with rank`) + f(`top by (x) with rank as foo`) } func TestParsePipeTopFailure(t *testing.T) { @@ -30,6 +34,8 @@ func TestParsePipeTopFailure(t *testing.T) { f(`top 5foo`) f(`top foo`) f(`top by`) + f(`top (x) with`) + f(`top (x) with rank as`) } func TestPipeTop(t *testing.T) { @@ -66,6 +72,36 @@ func TestPipeTop(t *testing.T) { }, }) + f("top with rank", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + {"rank", "1"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + {"hits", "1"}, + {"rank", "2"}, + }, + }) + f("top 1", [][]Field{ { {"a", `2`}, @@ -134,6 +170,33 @@ func TestPipeTop(t *testing.T) { }, }) + f("top by (b) with rank as x", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"hits", "2"}, + {"x", "1"}, + }, + { + {"b", "54"}, + {"hits", "1"}, + {"x", "2"}, + }, + }) + f("top by (hits)", [][]Field{ { {"a", `2`},