lib/logstorage: add ability to store sorted log position into a separate field with sort ... rank <fieldName> syntax

This commit is contained in:
Aliaksandr Valialkin 2024-07-01 01:44:17 +02:00
parent dc291d8980
commit bb0deb7ac4
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 169 additions and 8 deletions

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: add ability to return log position (aka rank) after sorting logs with [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This can be done by adding `rank as <fieldName>` to the end of `| sort ...` pipe. For example, `_time:5m | sort by (_time) rank as position` instructs storing position of every sorted log line into `position` [field name](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
* FEATURE: add delimiter log with `---` message between log chunks returned by [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe). This should simplify investigation of the returned logs.
* FEATURE: reduce memory usage when big number of context logs are requested from [`stream_context` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe).

View file

@ -2115,6 +2115,14 @@ and then returns the next 20 sorted logs for the last 5 minutes:
_time:1h | sort by (request_duration desc) offset 10 limit 20
```
It is possible returning a rank (sort order number) for every sorted log by adding `rank as <fieldName>` to the end of `| sort ...` pipe.
For example, the following query stores rank for sorted by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) logs
into `position` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
```logsql
_time:5m | sort by (_time) rank as position
```
Note that sorting of big number of logs can be slow and can consume a lot of additional memory.
It is recommended limiting the number of logs before sorting with the following approaches:

View file

@ -2070,6 +2070,7 @@ func TestQueryCanLiveTail(t *testing.T) {
f("* | replace_regexp ('foo', 'bar')", true)
f("* | sort by (a)", false)
f("* | stats count() rows", false)
f("* | stream_context after 10", false)
f("* | top 10 by (x)", false)
f("* | uniq by (a)", false)
f("* | unpack_json", true)

View file

@ -11,6 +11,7 @@ import (
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
)
@ -32,6 +33,9 @@ type pipeSort struct {
//
// if zero, then all the results are returned
limit uint64
// The name of the field to store the row rank.
rankName string
}
func (ps *pipeSort) String() string {
@ -52,6 +56,9 @@ func (ps *pipeSort) String() string {
if ps.limit > 0 {
s += fmt.Sprintf(" limit %d", ps.limit)
}
if ps.rankName != "" {
s += " rank as " + quoteTokenIfNeeded(ps.rankName)
}
return s
}
@ -64,6 +71,13 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) {
return
}
if ps.rankName != "" {
neededFields.remove(ps.rankName)
if neededFields.contains("*") {
unneededFields.add(ps.rankName)
}
}
if len(ps.byFields) == 0 {
neededFields.add("*")
unneededFields.reset()
@ -505,6 +519,9 @@ type pipeSortWriteContext struct {
rcs []resultColumn
br blockResult
// buf is a temporary buffer for non-flushed block.
buf []byte
// rowsWritten is the total number of rows passed to writeNextRow.
rowsWritten uint64
@ -517,6 +534,11 @@ type pipeSortWriteContext struct {
func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
ps := shard.ps
rankName := ps.rankName
rankFields := 0
if rankName != "" {
rankFields = 1
}
rowIdx := shard.rowRefNext
shard.rowRefNext++
@ -532,10 +554,10 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
byFields := ps.byFields
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(byFields)+len(b.otherColumns)
areEqualColumns := len(rcs) == rankFields+len(byFields)+len(b.otherColumns)
if areEqualColumns {
for i, c := range b.otherColumns {
if rcs[len(byFields)+i].name != c.name {
if rcs[rankFields+len(byFields)+i].name != c.name {
areEqualColumns = false
break
}
@ -546,6 +568,9 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
wctx.flush()
rcs = wctx.rcs[:0]
if rankName != "" {
rcs = appendResultColumnWithName(rcs, rankName)
}
for _, bf := range byFields {
rcs = appendResultColumnWithName(rcs, bf.name)
}
@ -555,17 +580,24 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
wctx.rcs = rcs
}
if rankName != "" {
bufLen := len(wctx.buf)
wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten)
v := bytesutil.ToUnsafeString(wctx.buf[bufLen:])
rcs[0].addValue(v)
}
br := b.br
byColumns := b.byColumns
for i := range byFields {
v := byColumns[i].c.getValueAtRow(br, rr.rowIdx)
rcs[i].addValue(v)
rcs[rankFields+i].addValue(v)
wctx.valuesLen += len(v)
}
for i, c := range b.otherColumns {
v := c.getValueAtRow(br, rr.rowIdx)
rcs[len(byFields)+i].addValue(v)
rcs[rankFields+len(byFields)+i].addValue(v)
wctx.valuesLen += len(v)
}
@ -589,6 +621,7 @@ func (wctx *pipeSortWriteContext) flush() {
for i := range rcs {
rcs[i].resetValues()
}
wctx.buf = wctx.buf[:0]
}
type pipeSortProcessorShardsHeap []*pipeSortProcessorShard
@ -763,6 +796,16 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) {
return nil, fmt.Errorf("duplicate 'limit'; the previous one is %d; the new one is %s", ps.limit, s)
}
ps.limit = n
case lex.isKeyword("rank"):
lex.nextToken()
if lex.isKeyword("as") {
lex.nextToken()
}
rankName, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot read rank field name: %s", err)
}
ps.rankName = rankName
default:
return &ps, nil
}

View file

@ -11,9 +11,11 @@ func TestParsePipeSortSuccess(t *testing.T) {
}
f(`sort`)
f(`sort rank as foo`)
f(`sort by (x)`)
f(`sort by (x) limit 10`)
f(`sort by (x) offset 20 limit 10`)
f(`sort by (x) offset 20 limit 10 rank as bar`)
f(`sort by (x desc, y) desc`)
}
@ -24,6 +26,7 @@ func TestParsePipeSortFailure(t *testing.T) {
}
f(`sort a`)
f(`sort rank`)
f(`sort by`)
f(`sort by(x) foo`)
f(`sort by(x) limit`)
@ -59,6 +62,29 @@ func TestPipeSort(t *testing.T) {
},
})
// Sort by all fields with rank
f("sort rank x", [][]Field{
{
{"_msg", `def`},
{"a", `1`},
},
{
{"_msg", `abc`},
{"a", `2`},
},
}, [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"x", "1"},
},
{
{"_msg", `def`},
{"a", `1`},
{"x", "2"},
},
})
// Sort by a single field
f("sort by (a asc) asc", [][]Field{
{
@ -205,6 +231,35 @@ func TestPipeSort(t *testing.T) {
},
})
// Sort by multiple fields with offset and rank
f("sort by (a, b) offset 1 rank x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
{"x", "2"},
},
{
{"a", `2`},
{"b", `54`},
{"x", "3"},
},
})
// Sort by multiple fields with offset and limit
f("sort by (a, b) offset 1 limit 1", [][]Field{
{
@ -228,6 +283,30 @@ func TestPipeSort(t *testing.T) {
},
})
// Sort by multiple fields with offset, limit and rank
f("sort by (a, b) offset 1 limit 1 rank x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
{"x", "2"},
},
})
// Sort by multiple fields with offset and limit
f("sort by (a, b) desc offset 2 limit 100", [][]Field{
{
@ -259,17 +338,27 @@ func TestPipeSortUpdateNeededFields(t *testing.T) {
}
// all the needed fields
f("sort", "*", "", "*", "")
f("sort rank x", "*", "", "*", "")
f("sort by(s1,s2)", "*", "", "*", "")
f("sort by(s1,s2) rank as x", "*", "", "*", "x")
f("sort by(x,s2) rank as x", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("sort by(s1,s2)", "*", "f1,f2", "*", "f1,f2")
f("sort by(s1,s2) rank as x", "*", "f1,f2", "*", "f1,f2,x")
f("sort by(x,s2) rank as x", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with src
f("sort by(s1,s2)", "*", "s1,f1,f2", "*", "f1,f2")
f("sort by(s1,s2) rank as x", "*", "s1,f1,f2", "*", "f1,f2,x")
f("sort by(x,s2) rank as x", "*", "s1,f1,f2", "*", "f1,f2,s1")
// needed fields do not intersect with src
f("sort by(s1,s2)", "f1,f2", "", "s1,s2,f1,f2", "")
f("sort by(s1,s2) rank as x", "f1,f2", "", "s1,s2,f1,f2", "")
// needed fields intersect with src
f("sort by(s1,s2)", "s1,f1,f2", "", "s1,s2,f1,f2", "")
f("sort by(s1,s2) rank as x", "s1,f1,f2,x", "", "s1,s2,f1,f2", "")
}

View file

@ -425,6 +425,9 @@ type pipeTopkWriteContext struct {
rcs []resultColumn
br blockResult
// buf is a temporary buffer for non-flushed block.
buf []byte
// rowsWritten is the total number of rows passed to writeNextRow.
rowsWritten uint64
@ -437,6 +440,11 @@ type pipeTopkWriteContext struct {
func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool {
ps := shard.ps
rankName := ps.rankName
rankFields := 0
if rankName != "" {
rankFields = 1
}
rowIdx := shard.rowNext
shard.rowNext++
@ -454,10 +462,10 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
byFields := ps.byFields
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns)
areEqualColumns := len(rcs) == rankFields+len(byFields)+len(r.otherColumns)
if areEqualColumns {
for i, c := range r.otherColumns {
if rcs[len(byFields)+i].name != c.Name {
if rcs[rankFields+len(byFields)+i].name != c.Name {
areEqualColumns = false
break
}
@ -468,6 +476,9 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
wctx.flush()
rcs = wctx.rcs[:0]
if rankName != "" {
rcs = appendResultColumnWithName(rcs, rankName)
}
for _, bf := range byFields {
rcs = appendResultColumnWithName(rcs, bf.name)
}
@ -477,6 +488,13 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
wctx.rcs = rcs
}
if rankName != "" {
bufLen := len(wctx.buf)
wctx.buf = marshalUint64String(wctx.buf, wctx.rowsWritten)
v := bytesutil.ToUnsafeString(wctx.buf[bufLen:])
rcs[0].addValue(v)
}
byColumns := r.byColumns
byColumnsIsTime := r.byColumnsIsTime
for i := range byFields {
@ -484,13 +502,13 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
if byColumnsIsTime[i] {
v = string(marshalTimestampRFC3339NanoString(nil, r.timestamp))
}
rcs[i].addValue(v)
rcs[rankFields+i].addValue(v)
wctx.valuesLen += len(v)
}
for i, c := range r.otherColumns {
v := c.Value
rcs[len(byFields)+i].addValue(v)
rcs[rankFields+len(byFields)+i].addValue(v)
wctx.valuesLen += len(v)
}
@ -516,6 +534,7 @@ func (wctx *pipeTopkWriteContext) flush() {
for i := range rcs {
rcs[i].resetValues()
}
wctx.buf = wctx.buf[:0]
}
type pipeTopkProcessorShardsHeap []*pipeTopkProcessorShard