This commit is contained in:
Aliaksandr Valialkin 2024-05-15 03:55:46 +02:00
parent ff2b6fbe35
commit 12fe2b265c
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 77 additions and 17 deletions

View file

@ -1128,6 +1128,7 @@ By default rows are selected in arbitrary order because of performance reasons,
See also:
- [`sort` pipe](#sort-pipe)
- [`offset` pipe](#offset-pipe)
### offset pipe
@ -1147,6 +1148,7 @@ Rows can be sorted with [`sort` pipe](#sort-pipe).
See also:
- [`limit` pipe](#limit-pipe)
- [`sort` pipe](#sort-pipe)
### rename pipe
@ -1208,6 +1210,14 @@ for the `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs
_time:1h | sort by (request_duration desc) limit 10
```
If the first `N` sorted results must be skipped, then `offset N` can be added to `sort` pipe. For example,
the following query skips the first 10 logs with the biggest `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model),
and then returns the next 20 sorted logs for the last 5 minutes:
```logsql
_time:1h | sort by (request_duration desc) offset 10 limit 20
```
Note that sorting of big number of logs can be slow and can consume a lot of additional memory.
It is recommended limiting the number of logs before sorting with the following approaches:

View file

@ -975,9 +975,14 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`)
f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`)
f(`* | sort limit 10`, `* | sort limit 10`)
f(`* | sort offset 20 limit 10`, `* | sort offset 20 limit 10`)
f(`* | sort desc limit 10`, `* | sort desc limit 10`)
f(`* | sort desc offset 20 limit 10`, `* | sort desc offset 20 limit 10`)
f(`* | sort by (foo desc, bar) limit 10`, `* | sort by (foo desc, bar) limit 10`)
f(`* | sort by (foo desc, bar) oFFset 20 limit 10`, `* | sort by (foo desc, bar) offset 20 limit 10`)
f(`* | sort by (foo desc, bar) desc limit 10`, `* | sort by (foo desc, bar) desc limit 10`)
f(`* | sort by (foo desc, bar) desc OFFSET 30 limit 10`, `* | sort by (foo desc, bar) desc offset 30 limit 10`)
f(`* | sort by (foo desc, bar) desc limit 10 OFFSET 30`, `* | sort by (foo desc, bar) desc offset 30 limit 10`)
// uniq pipe
f(`* | uniq`, `* | uniq`)
@ -1338,6 +1343,12 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | sort by(bar) limit foo`)
f(`foo | sort by(bar) limit -1234`)
f(`foo | sort by(bar) limit 12.34`)
f(`foo | sort by(bar) limit 10 limit 20`)
f(`foo | sort by(bar) offset`)
f(`foo | sort by(bar) offset limit`)
f(`foo | sort by(bar) offset -1234`)
f(`foo | sort by(bar) offset 12.34`)
f(`foo | sort by(bar) offset 10 offset 20`)
// invalid uniq pipe
f(`foo | uniq bar`)

View file

@ -26,6 +26,9 @@ type pipeSort struct {
// whether to apply descending order
isDesc bool
// how many results to skip
offset uint64
// how many results to return
//
// if zero, then all the results are returned
@ -44,6 +47,9 @@ func (ps *pipeSort) String() string {
if ps.isDesc {
s += " desc"
}
if ps.offset > 0 {
s += fmt.Sprintf(" offset %d", ps.offset)
}
if ps.limit > 0 {
s += fmt.Sprintf(" limit %d", ps.limit)
}
@ -470,16 +476,25 @@ type pipeSortWriteContext struct {
rcs []resultColumn
br blockResult
rowsWritten uint64
valuesLen int
}
func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
ps := shard.ps
rowIdx := shard.rowRefNext
shard.rowRefNext++
wctx.rowsWritten++
if wctx.rowsWritten <= ps.offset {
return
}
rr := shard.rowRefs[rowIdx]
b := &shard.blocks[rr.blockIdx]
byFields := shard.ps.byFields
byFields := ps.byFields
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(byFields)+len(b.otherColumns)
@ -688,18 +703,36 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) {
ps.isDesc = true
}
for {
switch {
case lex.isKeyword("limit"):
case lex.isKeyword("offset"):
lex.nextToken()
n, ok := tryParseUint64(lex.token)
s := lex.token
n, ok := tryParseUint64(s)
lex.nextToken()
if !ok {
return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token)
return nil, fmt.Errorf("cannot parse 'offset %s'", s)
}
if ps.offset > 0 {
return nil, fmt.Errorf("duplicate 'offset'; the previous one is %d; the new one is %s", ps.offset, s)
}
ps.offset = n
case lex.isKeyword("limit"):
lex.nextToken()
s := lex.token
n, ok := tryParseUint64(s)
lex.nextToken()
if !ok {
return nil, fmt.Errorf("cannot parse 'limit %s'", s)
}
if ps.limit > 0 {
return nil, fmt.Errorf("duplicate 'limit'; the previous one is %d; the new one is %s", ps.limit, s)
}
ps.limit = n
}
default:
return &ps, nil
}
}
}
// bySortField represents 'by (...)' part of the pipeSort.

View file

@ -410,16 +410,22 @@ type pipeTopkWriteContext struct {
}
func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool {
if wctx.rowsWritten >= wctx.ptp.ps.limit {
return false
}
wctx.rowsWritten++
ps := shard.ps
rowIdx := shard.rowNext
shard.rowNext++
wctx.rowsWritten++
if wctx.rowsWritten <= ps.offset {
return true
}
if wctx.rowsWritten > ps.offset+ps.limit {
return false
}
r := shard.rows[rowIdx]
byFields := shard.ps.byFields
byFields := ps.byFields
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns)