diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index fdded45c8..62b6a4979 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1128,6 +1128,7 @@ By default rows are selected in arbitrary order because of performance reasons, See also: +- [`sort` pipe](#sort-pipe) - [`offset` pipe](#offset-pipe) ### offset pipe @@ -1147,6 +1148,7 @@ Rows can be sorted with [`sort` pipe](#sort-pipe). See also: - [`limit` pipe](#limit-pipe) +- [`sort` pipe](#sort-pipe) ### rename pipe @@ -1208,6 +1210,14 @@ for the `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs _time:1h | sort by (request_duration desc) limit 10 ``` +If the first `N` sorted results must be skipped, then `offset N` can be added to `sort` pipe. For example, +the following query skips the first 10 logs with the biggest `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model), +and then returns the next 20 sorted logs for the last 5 minutes: + +```logsql +_time:1h | sort by (request_duration desc) offset 10 limit 20 +``` + Note that sorting of big number of logs can be slow and can consume a lot of additional memory. It is recommended limiting the number of logs before sorting with the following approaches: diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 5e7a15c61..1f0289e58 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -975,9 +975,14 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`) f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`) f(`* | sort limit 10`, `* | sort limit 10`) + f(`* | sort offset 20 limit 10`, `* | sort offset 20 limit 10`) f(`* | sort desc limit 10`, `* | sort desc limit 10`) + f(`* | sort desc offset 20 limit 10`, `* | sort desc offset 20 limit 10`) f(`* | sort by (foo desc, bar) limit 10`, `* | sort by (foo desc, bar) limit 10`) + f(`* | sort by (foo desc, bar) oFFset 20 limit 10`, `* | sort by (foo desc, bar) offset 20 limit 10`) f(`* | sort by (foo desc, bar) desc limit 10`, `* | sort by (foo desc, bar) desc limit 10`) + f(`* | sort by (foo desc, bar) desc OFFSET 30 limit 10`, `* | sort by (foo desc, bar) desc offset 30 limit 10`) + f(`* | sort by (foo desc, bar) desc limit 10 OFFSET 30`, `* | sort by (foo desc, bar) desc offset 30 limit 10`) // uniq pipe f(`* | uniq`, `* | uniq`) @@ -1338,6 +1343,12 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | sort by(bar) limit foo`) f(`foo | sort by(bar) limit -1234`) f(`foo | sort by(bar) limit 12.34`) + f(`foo | sort by(bar) limit 10 limit 20`) + f(`foo | sort by(bar) offset`) + f(`foo | sort by(bar) offset limit`) + f(`foo | sort by(bar) offset -1234`) + f(`foo | sort by(bar) offset 12.34`) + f(`foo | sort by(bar) offset 10 offset 20`) // invalid uniq pipe f(`foo | uniq bar`) diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 16aded0da..4787b56bb 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -26,6 +26,9 @@ type pipeSort struct { // whether to apply descending order isDesc bool + // how many results to skip + offset uint64 + // how many results to return // // if zero, then all the results are returned @@ -44,6 +47,9 @@ func (ps *pipeSort) String() string { if ps.isDesc { s += " desc" } + if ps.offset > 0 { + s += fmt.Sprintf(" offset %d", ps.offset) + } if ps.limit > 0 { s += fmt.Sprintf(" limit %d", ps.limit) } @@ -470,16 +476,25 @@ type pipeSortWriteContext struct { rcs []resultColumn br blockResult - valuesLen int + rowsWritten uint64 + valuesLen int } func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { + ps := shard.ps + rowIdx := shard.rowRefNext shard.rowRefNext++ + + wctx.rowsWritten++ + if wctx.rowsWritten <= ps.offset { + return + } + rr := shard.rowRefs[rowIdx] b := &shard.blocks[rr.blockIdx] - byFields := shard.ps.byFields + byFields := ps.byFields rcs := wctx.rcs areEqualColumns := len(rcs) == len(byFields)+len(b.otherColumns) @@ -688,18 +703,36 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) { ps.isDesc = true } - switch { - case lex.isKeyword("limit"): - lex.nextToken() - n, ok := tryParseUint64(lex.token) - lex.nextToken() - if !ok { - return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token) + for { + switch { + case lex.isKeyword("offset"): + lex.nextToken() + s := lex.token + n, ok := tryParseUint64(s) + lex.nextToken() + if !ok { + return nil, fmt.Errorf("cannot parse 'offset %s'", s) + } + if ps.offset > 0 { + return nil, fmt.Errorf("duplicate 'offset'; the previous one is %d; the new one is %s", ps.offset, s) + } + ps.offset = n + case lex.isKeyword("limit"): + lex.nextToken() + s := lex.token + n, ok := tryParseUint64(s) + lex.nextToken() + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s'", s) + } + if ps.limit > 0 { + return nil, fmt.Errorf("duplicate 'limit'; the previous one is %d; the new one is %s", ps.limit, s) + } + ps.limit = n + default: + return &ps, nil } - ps.limit = n } - - return &ps, nil } // bySortField represents 'by (...)' part of the pipeSort. diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index 1b2018c4e..42b18df6a 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -410,16 +410,22 @@ type pipeTopkWriteContext struct { } func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool { - if wctx.rowsWritten >= wctx.ptp.ps.limit { - return false - } - wctx.rowsWritten++ + ps := shard.ps rowIdx := shard.rowNext shard.rowNext++ + + wctx.rowsWritten++ + if wctx.rowsWritten <= ps.offset { + return true + } + if wctx.rowsWritten > ps.offset+ps.limit { + return false + } + r := shard.rows[rowIdx] - byFields := shard.ps.byFields + byFields := ps.byFields rcs := wctx.rcs areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns)