diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 452a928c6..39c46b937 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1033,12 +1033,12 @@ Performance tips: ## Pipes Additionally to [filters](#filters), LogsQL query may contain arbitrary mix of '|'-delimited actions known as `pipes`. -For example, the following query uses [`stats`](#stats-pipe), [`sort`](#sort-pipe) and [`head`](#head-pipe) pipes +For example, the following query uses [`stats`](#stats-pipe), [`sort`](#sort-pipe) and [`limit`](#limit-pipe) pipes for returning top 10 [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with the biggest number of logs during the last 5 minutes: ```logsql -_time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs desc) | head 10 +_time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs desc) | limit 10 ``` LogsQL supports the following pipes: @@ -1046,9 +1046,9 @@ LogsQL supports the following pipes: - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). -- [`head`](#head-pipe) limits the number selected logs. +- [`limit`](#limit-pipe) limits the number selected logs. +- [`offset`](#offset-pipe) skips the given number of selected logs. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). -- [`skip`](#skip-pipe) skips the given number of selected logs. - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. @@ -1107,20 +1107,36 @@ See also: - [`rename` pipe](#rename-pipe) - [`delete` pipe](#delete-pipe) -### head pipe +### limit pipe -If only a subset of selected logs must be processed, then `| head N` [pipe](#pipes) can be used. For example, the following query returns up to 100 logs over the last 5 minutes: +If only a subset of selected logs must be processed, then `| limit N` [pipe](#pipes) can be used. For example, the following query returns up to 100 logs over the last 5 minutes: ```logsql -_time:5m | head 100 +_time:5m | limit 100 ``` By default rows are selected in arbitrary order because of performance reasons, so the query above can return different sets of logs every time it is executed. -[`sort` pipe](#sort-pipe) can be used for making sure the logs are in the same order before applying `head ...` to them. +[`sort` pipe](#sort-pipe) can be used for making sure the logs are in the same order before applying `limit ...` to them. See also: -- [`skip` pipe](#skip-pipe) +- [`offset` pipe](#offset-pipe) + +### offset pipe + +If some selected logs must be skipped after [`sort`](#sort-pipe), then `| offset N` [pipe](#pipes) can be used. For example, the following query skips the first 100 logs +over the last 5 minutes after soring them by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): + +```logsql +_time:5m | sort by (_time) | offset 100 +``` + +Note that skipping rows without sorting has little sense, since they can be returned in arbitrary order because of performance reasons. +Rows can be sorted with [`sort` pipe](#sort-pipe). + +See also: + +- [`limit` pipe](#limit-pipe) ### rename pipe @@ -1143,22 +1159,6 @@ See also: - [`fields` pipe](#fields-pipe) - [`delete` pipe](#delete-pipe) -### skip pipe - -If some number of selected logs must be skipped after [`sort`](#sort-pipe), then `| skip N` [pipe](#pipes) can be used. For example, the following query skips the first 100 logs -over the last 5 minutes after soring them by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): - -```logsql -_time:5m | sort by (_time) | skip 100 -``` - -Note that skipping rows without sorting has little sense, since they can be returned in arbitrary order because of performance reasons. -Rows can be sorted with [`sort` pipe](#sort-pipe). - -See also: - -- [`head` pipe](#head-pipe) - ### sort pipe By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) must be used. @@ -1184,8 +1184,8 @@ It is recommended limiting the number of logs before sorting with the following See also: - [`stats` pipe](#stats-pipe) -- [`head` pipe](#head-pipe) -- [`skip` pipe](#skip-pipe) +- [`limit` pipe](#limit-pipe) +- [`offset` pipe](#offset-pipe) ### stats pipe @@ -1545,7 +1545,7 @@ Use [`sort` pipe](#sort-pipe) for sorting the results. LogsQL provides the following [pipes](#pipes) for limiting the number of returned log entries: - [`fields`](#fields-pipe) and [`delete`](#delete-pipe) pipes allow limiting the set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) to return. -- [`head` pipe](#head-pipe) allows limiting the number of log entries to return. +- [`limit` pipe](#limit-pipe) allows limiting the number of log entries to return. ## Querying specific fields diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 3fc9750c7..881611284 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -80,16 +80,16 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } pipes = append(pipes, ps) - case lex.isKeyword("head"): - ph, err := parsePipeHead(lex) + case lex.isKeyword("limit", "head"): + pl, err := parsePipeLimit(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err) + return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err) } - pipes = append(pipes, ph) - case lex.isKeyword("skip"): - ps, err := parsePipeSkip(lex) + pipes = append(pipes, pl) + case lex.isKeyword("offset", "skip"): + ps, err := parsePipeOffset(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err) + return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err) } pipes = append(pipes, ps) case lex.isKeyword("fields"): @@ -98,19 +98,19 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) } pipes = append(pipes, pf) - case lex.isKeyword("copy"): + case lex.isKeyword("copy", "cp"): pc, err := parsePipeCopy(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) } pipes = append(pipes, pc) - case lex.isKeyword("rename"): + case lex.isKeyword("rename", "mv"): pr, err := parsePipeRename(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err) } pipes = append(pipes, pr) - case lex.isKeyword("delete"): + case lex.isKeyword("delete", "del", "rm"): pd, err := parsePipeDelete(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index 474287cf0..cc0ae3d79 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -9,7 +9,7 @@ import ( // pipeCopy implements '| copy ...' pipe. // -// See https://docs.victoriametrics.com/victorialogs/logsql/#transformations +// See https://docs.victoriametrics.com/victorialogs/logsql/#copy-pipe type pipeCopy struct { // srcFields contains a list of source fields to copy srcFields []string @@ -61,8 +61,8 @@ func (pcp *pipeCopyProcessor) flush() error { } func parsePipeCopy(lex *lexer) (*pipeCopy, error) { - if !lex.isKeyword("copy") { - return nil, fmt.Errorf("expecting 'copy'; got %q", lex.token) + if !lex.isKeyword("copy", "cp") { + return nil, fmt.Errorf("expecting 'copy' or 'cp'; got %q", lex.token) } var srcFields []string diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 7b32f83d0..29d2074c5 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -8,7 +8,7 @@ import ( // pipeDelete implements '| delete ...' pipe. // -// See https://docs.victoriametrics.com/victorialogs/logsql/#transformations +// See https://docs.victoriametrics.com/victorialogs/logsql/#delete-pipe type pipeDelete struct { // fields contains a list of fields to delete fields []string @@ -52,8 +52,8 @@ func (pdp *pipeDeleteProcessor) flush() error { } func parsePipeDelete(lex *lexer) (*pipeDelete, error) { - if !lex.isKeyword("delete") { - return nil, fmt.Errorf("expecting 'delete'; got %q", lex.token) + if !lex.isKeyword("delete", "del", "rm") { + return nil, fmt.Errorf("expecting 'delete', 'del' or 'rm'; got %q", lex.token) } var fields []string diff --git a/lib/logstorage/pipe_head.go b/lib/logstorage/pipe_head.go deleted file mode 100644 index 110267e62..000000000 --- a/lib/logstorage/pipe_head.go +++ /dev/null @@ -1,86 +0,0 @@ -package logstorage - -import ( - "fmt" - "sync/atomic" -) - -// pipeHead implements '| head ...' pipe. -// -// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters -type pipeHead struct { - n uint64 -} - -func (ph *pipeHead) String() string { - return fmt.Sprintf("head %d", ph.n) -} - -func (ph *pipeHead) getNeededFields() ([]string, map[string][]string) { - return []string{"*"}, nil -} - -func (ph *pipeHead) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { - if ph.n == 0 { - // Special case - notify the caller to stop writing data to the returned pipeHeadProcessor - cancel() - } - return &pipeHeadProcessor{ - ph: ph, - cancel: cancel, - ppBase: ppBase, - } -} - -type pipeHeadProcessor struct { - ph *pipeHead - cancel func() - ppBase pipeProcessor - - rowsProcessed atomic.Uint64 -} - -func (php *pipeHeadProcessor) writeBlock(workerID uint, br *blockResult) { - rowsProcessed := php.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= php.ph.n { - // Fast path - write all the rows to ppBase. - php.ppBase.writeBlock(workerID, br) - return - } - - // Slow path - overflow. Write the remaining rows if needed. - rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= php.ph.n { - // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. - return - } - - // Write remaining rows. - keepRows := php.ph.n - rowsProcessed - br.truncateRows(int(keepRows)) - php.ppBase.writeBlock(workerID, br) - - // Notify the caller that it should stop passing more data to writeBlock(). - php.cancel() -} - -func (php *pipeHeadProcessor) flush() error { - return nil -} - -func parsePipeHead(lex *lexer) (*pipeHead, error) { - if !lex.isKeyword("head") { - return nil, fmt.Errorf("expecting 'head'; got %q", lex.token) - } - - lex.nextToken() - n, err := parseUint(lex.token) - if err != nil { - return nil, fmt.Errorf("cannot parse the number of head rows to return from %q: %w", lex.token, err) - } - lex.nextToken() - ph := &pipeHead{ - n: n, - } - return ph, nil -} diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go new file mode 100644 index 000000000..6f53975bb --- /dev/null +++ b/lib/logstorage/pipe_limit.go @@ -0,0 +1,86 @@ +package logstorage + +import ( + "fmt" + "sync/atomic" +) + +// pipeLimit implements '| limit ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe +type pipeLimit struct { + n uint64 +} + +func (pl *pipeLimit) String() string { + return fmt.Sprintf("limit %d", pl.n) +} + +func (pl *pipeLimit) getNeededFields() ([]string, map[string][]string) { + return []string{"*"}, nil +} + +func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + if pl.n == 0 { + // Special case - notify the caller to stop writing data to the returned pipeLimitProcessor + cancel() + } + return &pipeLimitProcessor{ + pl: pl, + cancel: cancel, + ppBase: ppBase, + } +} + +type pipeLimitProcessor struct { + pl *pipeLimit + cancel func() + ppBase pipeProcessor + + rowsProcessed atomic.Uint64 +} + +func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { + rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) + if rowsProcessed <= plp.pl.n { + // Fast path - write all the rows to ppBase. + plp.ppBase.writeBlock(workerID, br) + return + } + + // Slow path - overflow. Write the remaining rows if needed. + rowsProcessed -= uint64(len(br.timestamps)) + if rowsProcessed >= plp.pl.n { + // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. + return + } + + // Write remaining rows. + keepRows := plp.pl.n - rowsProcessed + br.truncateRows(int(keepRows)) + plp.ppBase.writeBlock(workerID, br) + + // Notify the caller that it should stop passing more data to writeBlock(). + plp.cancel() +} + +func (plp *pipeLimitProcessor) flush() error { + return nil +} + +func parsePipeLimit(lex *lexer) (*pipeLimit, error) { + if !lex.isKeyword("limit", "head") { + return nil, fmt.Errorf("expecting 'limit' or 'head'; got %q", lex.token) + } + + lex.nextToken() + n, err := parseUint(lex.token) + if err != nil { + return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err) + } + lex.nextToken() + pl := &pipeLimit{ + n: n, + } + return pl, nil +} diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go new file mode 100644 index 000000000..6b37a340a --- /dev/null +++ b/lib/logstorage/pipe_offset.go @@ -0,0 +1,73 @@ +package logstorage + +import ( + "fmt" + "sync/atomic" +) + +// pipeOffset implements '| offset ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe +type pipeOffset struct { + n uint64 +} + +func (po *pipeOffset) String() string { + return fmt.Sprintf("offset %d", po.n) +} + +func (po *pipeOffset) getNeededFields() ([]string, map[string][]string) { + return []string{"*"}, nil +} + +func (po *pipeOffset) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + return &pipeOffsetProcessor{ + po: po, + ppBase: ppBase, + } +} + +type pipeOffsetProcessor struct { + po *pipeOffset + ppBase pipeProcessor + + rowsProcessed atomic.Uint64 +} + +func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) { + rowsProcessed := pop.rowsProcessed.Add(uint64(len(br.timestamps))) + if rowsProcessed <= pop.po.n { + return + } + + rowsProcessed -= uint64(len(br.timestamps)) + if rowsProcessed >= pop.po.n { + pop.ppBase.writeBlock(workerID, br) + return + } + + rowsSkip := pop.po.n - rowsProcessed + br.skipRows(int(rowsSkip)) + pop.ppBase.writeBlock(workerID, br) +} + +func (pop *pipeOffsetProcessor) flush() error { + return nil +} + +func parsePipeOffset(lex *lexer) (*pipeOffset, error) { + if !lex.isKeyword("offset", "skip") { + return nil, fmt.Errorf("expecting 'offset' or 'skip'; got %q", lex.token) + } + + lex.nextToken() + n, err := parseUint(lex.token) + if err != nil { + return nil, fmt.Errorf("cannot parse the number of rows to skip from %q: %w", lex.token, err) + } + lex.nextToken() + po := &pipeOffset{ + n: n, + } + return po, nil +} diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index c1a19aa00..f98356d74 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -9,7 +9,7 @@ import ( // pipeRename implements '| rename ...' pipe. // -// See https://docs.victoriametrics.com/victorialogs/logsql/#transformations +// See https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe type pipeRename struct { // srcFields contains a list of source fields to rename srcFields []string @@ -66,8 +66,8 @@ func (prp *pipeRenameProcessor) flush() error { } func parsePipeRename(lex *lexer) (*pipeRename, error) { - if !lex.isKeyword("rename") { - return nil, fmt.Errorf("expecting 'rename'; got %q", lex.token) + if !lex.isKeyword("rename", "mv") { + return nil, fmt.Errorf("expecting 'rename' or 'mv'; got %q", lex.token) } var srcFields []string diff --git a/lib/logstorage/pipe_skip.go b/lib/logstorage/pipe_skip.go deleted file mode 100644 index 70f27c873..000000000 --- a/lib/logstorage/pipe_skip.go +++ /dev/null @@ -1,73 +0,0 @@ -package logstorage - -import ( - "fmt" - "sync/atomic" -) - -// pipeSkip implements '| skip ...' pipe. -// -// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters -type pipeSkip struct { - n uint64 -} - -func (ps *pipeSkip) String() string { - return fmt.Sprintf("skip %d", ps.n) -} - -func (ps *pipeSkip) getNeededFields() ([]string, map[string][]string) { - return []string{"*"}, nil -} - -func (ps *pipeSkip) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return &pipeSkipProcessor{ - ps: ps, - ppBase: ppBase, - } -} - -type pipeSkipProcessor struct { - ps *pipeSkip - ppBase pipeProcessor - - rowsProcessed atomic.Uint64 -} - -func (psp *pipeSkipProcessor) writeBlock(workerID uint, br *blockResult) { - rowsProcessed := psp.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= psp.ps.n { - return - } - - rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= psp.ps.n { - psp.ppBase.writeBlock(workerID, br) - return - } - - rowsSkip := psp.ps.n - rowsProcessed - br.skipRows(int(rowsSkip)) - psp.ppBase.writeBlock(workerID, br) -} - -func (psp *pipeSkipProcessor) flush() error { - return nil -} - -func parsePipeSkip(lex *lexer) (*pipeSkip, error) { - if !lex.isKeyword("skip") { - return nil, fmt.Errorf("expecting 'rename'; got %q", lex.token) - } - - lex.nextToken() - n, err := parseUint(lex.token) - if err != nil { - return nil, fmt.Errorf("cannot parse the number of rows to skip from %q: %w", lex.token, err) - } - lex.nextToken() - ps := &pipeSkip{ - n: n, - } - return ps, nil -} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 240d88174..bcf499d0e 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -15,7 +15,7 @@ import ( // pipeStats processes '| stats ...' queries. // -// See https://docs.victoriametrics.com/victorialogs/logsql/#stats +// See https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe type pipeStats struct { // byFields contains field names with optional buckets from 'by(...)' clause. byFields []*byField