From bb0deb7ac496c39c8298f8b76d953b9b8e4f05be Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 1 Jul 2024 01:44:17 +0200 Subject: [PATCH] lib/logstorage: add ability to store sorted log position into a separate field with `sort ... rank ` syntax --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 8 +++ lib/logstorage/parser_test.go | 1 + lib/logstorage/pipe_sort.go | 51 ++++++++++++++++-- lib/logstorage/pipe_sort_test.go | 89 ++++++++++++++++++++++++++++++++ lib/logstorage/pipe_sort_topk.go | 27 ++++++++-- 6 files changed, 169 insertions(+), 8 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 9966b30202..ce886ec1e8 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add ability to return log position (aka rank) after sorting logs with [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This can be done by adding `rank as ` to the end of `| sort ...` pipe. For example, `_time:5m | sort by (_time) rank as position` instructs storing position of every sorted log line into `position` [field name](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). * FEATURE: add delimiter log with `---` message between log chunks returned by [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This should simplify investigation of the returned logs. * FEATURE: reduce memory usage when big number of context logs are requested from [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index b6e73cfd0c..c09424c0a8 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -2115,6 +2115,14 @@ and then returns the next 20 sorted logs for the last 5 minutes: _time:1h | sort by (request_duration desc) offset 10 limit 20 ``` +It is possible returning a rank (sort order number) for every sorted log by adding `rank as ` to the end of `| sort ...` pipe. +For example, the following query stores rank for sorted by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) logs +into `position` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model): + +```logsql +_time:5m | sort by (_time) rank as position +``` + Note that sorting of big number of logs can be slow and can consume a lot of additional memory. It is recommended limiting the number of logs before sorting with the following approaches: diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 78558e329e..1d3abf15b6 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2070,6 +2070,7 @@ func TestQueryCanLiveTail(t *testing.T) { f("* | replace_regexp ('foo', 'bar')", true) f("* | sort by (a)", false) f("* | stats count() rows", false) + f("* | stream_context after 10", false) f("* | top 10 by (x)", false) f("* | uniq by (a)", false) f("* | unpack_json", true) diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index df0d8f8500..849a721eba 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" ) @@ -32,6 +33,9 @@ type pipeSort struct { // // if zero, then all the results are returned limit uint64 + + // The name of the field to store the row rank. + rankName string } func (ps *pipeSort) String() string { @@ -52,6 +56,9 @@ func (ps *pipeSort) String() string { if ps.limit > 0 { s += fmt.Sprintf(" limit %d", ps.limit) } + if ps.rankName != "" { + s += " rank as " + quoteTokenIfNeeded(ps.rankName) + } return s } @@ -64,6 +71,13 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) { return } + if ps.rankName != "" { + neededFields.remove(ps.rankName) + if neededFields.contains("*") { + unneededFields.add(ps.rankName) + } + } + if len(ps.byFields) == 0 { neededFields.add("*") unneededFields.reset() @@ -505,6 +519,9 @@ type pipeSortWriteContext struct { rcs []resultColumn br blockResult + // buf is a temporary buffer for non-flushed block. + buf []byte + // rowsWritten is the total number of rows passed to writeNextRow. rowsWritten uint64 @@ -517,6 +534,11 @@ type pipeSortWriteContext struct { func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { ps := shard.ps + rankName := ps.rankName + rankFields := 0 + if rankName != "" { + rankFields = 1 + } rowIdx := shard.rowRefNext shard.rowRefNext++ @@ -532,10 +554,10 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { byFields := ps.byFields rcs := wctx.rcs - areEqualColumns := len(rcs) == len(byFields)+len(b.otherColumns) + areEqualColumns := len(rcs) == rankFields+len(byFields)+len(b.otherColumns) if areEqualColumns { for i, c := range b.otherColumns { - if rcs[len(byFields)+i].name != c.name { + if rcs[rankFields+len(byFields)+i].name != c.name { areEqualColumns = false break } @@ -546,6 +568,9 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { wctx.flush() rcs = wctx.rcs[:0] + if rankName != "" { + rcs = appendResultColumnWithName(rcs, rankName) + } for _, bf := range byFields { rcs = appendResultColumnWithName(rcs, bf.name) } @@ -555,17 +580,24 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { wctx.rcs = rcs } + if rankName != "" { + bufLen := len(wctx.buf) + wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten) + v := bytesutil.ToUnsafeString(wctx.buf[bufLen:]) + rcs[0].addValue(v) + } + br := b.br byColumns := b.byColumns for i := range byFields { v := byColumns[i].c.getValueAtRow(br, rr.rowIdx) - rcs[i].addValue(v) + rcs[rankFields+i].addValue(v) wctx.valuesLen += len(v) } for i, c := range b.otherColumns { v := c.getValueAtRow(br, rr.rowIdx) - rcs[len(byFields)+i].addValue(v) + rcs[rankFields+len(byFields)+i].addValue(v) wctx.valuesLen += len(v) } @@ -589,6 +621,7 @@ func (wctx *pipeSortWriteContext) flush() { for i := range rcs { rcs[i].resetValues() } + wctx.buf = wctx.buf[:0] } type pipeSortProcessorShardsHeap []*pipeSortProcessorShard @@ -763,6 +796,16 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) { return nil, fmt.Errorf("duplicate 'limit'; the previous one is %d; the new one is %s", ps.limit, s) } ps.limit = n + case lex.isKeyword("rank"): + lex.nextToken() + if lex.isKeyword("as") { + lex.nextToken() + } + rankName, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot read rank field name: %s", err) + } + ps.rankName = rankName default: return &ps, nil } diff --git a/lib/logstorage/pipe_sort_test.go b/lib/logstorage/pipe_sort_test.go index f14a0d2aa4..671f4b0e77 100644 --- a/lib/logstorage/pipe_sort_test.go +++ b/lib/logstorage/pipe_sort_test.go @@ -11,9 +11,11 @@ func TestParsePipeSortSuccess(t *testing.T) { } f(`sort`) + f(`sort rank as foo`) f(`sort by (x)`) f(`sort by (x) limit 10`) f(`sort by (x) offset 20 limit 10`) + f(`sort by (x) offset 20 limit 10 rank as bar`) f(`sort by (x desc, y) desc`) } @@ -24,6 +26,7 @@ func TestParsePipeSortFailure(t *testing.T) { } f(`sort a`) + f(`sort rank`) f(`sort by`) f(`sort by(x) foo`) f(`sort by(x) limit`) @@ -59,6 +62,29 @@ func TestPipeSort(t *testing.T) { }, }) + // Sort by all fields with rank + f("sort rank x", [][]Field{ + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"_msg", `abc`}, + {"a", `2`}, + }, + }, [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"x", "1"}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"x", "2"}, + }, + }) + // Sort by a single field f("sort by (a asc) asc", [][]Field{ { @@ -205,6 +231,35 @@ func TestPipeSort(t *testing.T) { }, }) + // Sort by multiple fields with offset and rank + f("sort by (a, b) offset 1 rank x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + {"x", "2"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"x", "3"}, + }, + }) + // Sort by multiple fields with offset and limit f("sort by (a, b) offset 1 limit 1", [][]Field{ { @@ -228,6 +283,30 @@ func TestPipeSort(t *testing.T) { }, }) + // Sort by multiple fields with offset, limit and rank + f("sort by (a, b) offset 1 limit 1 rank x", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + {"x", "2"}, + }, + }) + // Sort by multiple fields with offset and limit f("sort by (a, b) desc offset 2 limit 100", [][]Field{ { @@ -259,17 +338,27 @@ func TestPipeSortUpdateNeededFields(t *testing.T) { } // all the needed fields + f("sort", "*", "", "*", "") + f("sort rank x", "*", "", "*", "") f("sort by(s1,s2)", "*", "", "*", "") + f("sort by(s1,s2) rank as x", "*", "", "*", "x") + f("sort by(x,s2) rank as x", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src f("sort by(s1,s2)", "*", "f1,f2", "*", "f1,f2") + f("sort by(s1,s2) rank as x", "*", "f1,f2", "*", "f1,f2,x") + f("sort by(x,s2) rank as x", "*", "f1,f2", "*", "f1,f2") // all the needed fields, unneeded fields intersect with src f("sort by(s1,s2)", "*", "s1,f1,f2", "*", "f1,f2") + f("sort by(s1,s2) rank as x", "*", "s1,f1,f2", "*", "f1,f2,x") + f("sort by(x,s2) rank as x", "*", "s1,f1,f2", "*", "f1,f2,s1") // needed fields do not intersect with src f("sort by(s1,s2)", "f1,f2", "", "s1,s2,f1,f2", "") + f("sort by(s1,s2) rank as x", "f1,f2", "", "s1,s2,f1,f2", "") // needed fields intersect with src f("sort by(s1,s2)", "s1,f1,f2", "", "s1,s2,f1,f2", "") + f("sort by(s1,s2) rank as x", "s1,f1,f2,x", "", "s1,s2,f1,f2", "") } diff --git a/lib/logstorage/pipe_sort_topk.go b/lib/logstorage/pipe_sort_topk.go index 57938b2fe2..6d9d26c208 100644 --- a/lib/logstorage/pipe_sort_topk.go +++ b/lib/logstorage/pipe_sort_topk.go @@ -425,6 +425,9 @@ type pipeTopkWriteContext struct { rcs []resultColumn br blockResult + // buf is a temporary buffer for non-flushed block. + buf []byte + // rowsWritten is the total number of rows passed to writeNextRow. rowsWritten uint64 @@ -437,6 +440,11 @@ type pipeTopkWriteContext struct { func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool { ps := shard.ps + rankName := ps.rankName + rankFields := 0 + if rankName != "" { + rankFields = 1 + } rowIdx := shard.rowNext shard.rowNext++ @@ -454,10 +462,10 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo byFields := ps.byFields rcs := wctx.rcs - areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns) + areEqualColumns := len(rcs) == rankFields+len(byFields)+len(r.otherColumns) if areEqualColumns { for i, c := range r.otherColumns { - if rcs[len(byFields)+i].name != c.Name { + if rcs[rankFields+len(byFields)+i].name != c.Name { areEqualColumns = false break } @@ -468,6 +476,9 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo wctx.flush() rcs = wctx.rcs[:0] + if rankName != "" { + rcs = appendResultColumnWithName(rcs, rankName) + } for _, bf := range byFields { rcs = appendResultColumnWithName(rcs, bf.name) } @@ -477,6 +488,13 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo wctx.rcs = rcs } + if rankName != "" { + bufLen := len(wctx.buf) + wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten) + v := bytesutil.ToUnsafeString(wctx.buf[bufLen:]) + rcs[0].addValue(v) + } + byColumns := r.byColumns byColumnsIsTime := r.byColumnsIsTime for i := range byFields { @@ -484,13 +502,13 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo if byColumnsIsTime[i] { v = string(marshalTimestampRFC3339NanoString(nil, r.timestamp)) } - rcs[i].addValue(v) + rcs[rankFields+i].addValue(v) wctx.valuesLen += len(v) } for i, c := range r.otherColumns { v := c.Value - rcs[len(byFields)+i].addValue(v) + rcs[rankFields+len(byFields)+i].addValue(v) wctx.valuesLen += len(v) } @@ -516,6 +534,7 @@ func (wctx *pipeTopkWriteContext) flush() { for i := range rcs { rcs[i].resetValues() } + wctx.buf = wctx.buf[:0] } type pipeTopkProcessorShardsHeap []*pipeTopkProcessorShard