This commit is contained in:
Aliaksandr Valialkin 2024-05-05 12:43:38 +02:00
parent f2214f5073
commit c856b528a8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
10 changed files with 207 additions and 207 deletions

View file

@ -1033,12 +1033,12 @@ Performance tips:
## Pipes ## Pipes
Additionally to [filters](#filters), LogsQL query may contain arbitrary mix of '|'-delimited actions known as `pipes`. Additionally to [filters](#filters), LogsQL query may contain arbitrary mix of '|'-delimited actions known as `pipes`.
For example, the following query uses [`stats`](#stats-pipe), [`sort`](#sort-pipe) and [`head`](#head-pipe) pipes For example, the following query uses [`stats`](#stats-pipe), [`sort`](#sort-pipe) and [`limit`](#limit-pipe) pipes
for returning top 10 [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) for returning top 10 [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
with the biggest number of logs during the last 5 minutes: with the biggest number of logs during the last 5 minutes:
```logsql ```logsql
_time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs desc) | head 10 _time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs desc) | limit 10
``` ```
LogsQL supports the following pipes: LogsQL supports the following pipes:
@ -1046,9 +1046,9 @@ LogsQL supports the following pipes:
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`head`](#head-pipe) limits the number selected logs. - [`limit`](#limit-pipe) limits the number selected logs.
- [`offset`](#offset-pipe) skips the given number of selected logs.
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`skip`](#skip-pipe) skips the given number of selected logs.
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`stats`](#stats-pipe) calculates various stats over the selected logs. - [`stats`](#stats-pipe) calculates various stats over the selected logs.
@ -1107,20 +1107,36 @@ See also:
- [`rename` pipe](#rename-pipe) - [`rename` pipe](#rename-pipe)
- [`delete` pipe](#delete-pipe) - [`delete` pipe](#delete-pipe)
### head pipe ### limit pipe
If only a subset of selected logs must be processed, then `| head N` [pipe](#pipes) can be used. For example, the following query returns up to 100 logs over the last 5 minutes: If only a subset of selected logs must be processed, then `| limit N` [pipe](#pipes) can be used. For example, the following query returns up to 100 logs over the last 5 minutes:
```logsql ```logsql
_time:5m | head 100 _time:5m | limit 100
``` ```
By default rows are selected in arbitrary order because of performance reasons, so the query above can return different sets of logs every time it is executed. By default rows are selected in arbitrary order because of performance reasons, so the query above can return different sets of logs every time it is executed.
[`sort` pipe](#sort-pipe) can be used for making sure the logs are in the same order before applying `head ...` to them. [`sort` pipe](#sort-pipe) can be used for making sure the logs are in the same order before applying `limit ...` to them.
See also: See also:
- [`skip` pipe](#skip-pipe) - [`offset` pipe](#offset-pipe)
### offset pipe
If some selected logs must be skipped after [`sort`](#sort-pipe), then `| offset N` [pipe](#pipes) can be used. For example, the following query skips the first 100 logs
over the last 5 minutes after soring them by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
```logsql
_time:5m | sort by (_time) | offset 100
```
Note that skipping rows without sorting has little sense, since they can be returned in arbitrary order because of performance reasons.
Rows can be sorted with [`sort` pipe](#sort-pipe).
See also:
- [`limit` pipe](#limit-pipe)
### rename pipe ### rename pipe
@ -1143,22 +1159,6 @@ See also:
- [`fields` pipe](#fields-pipe) - [`fields` pipe](#fields-pipe)
- [`delete` pipe](#delete-pipe) - [`delete` pipe](#delete-pipe)
### skip pipe
If some number of selected logs must be skipped after [`sort`](#sort-pipe), then `| skip N` [pipe](#pipes) can be used. For example, the following query skips the first 100 logs
over the last 5 minutes after soring them by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
```logsql
_time:5m | sort by (_time) | skip 100
```
Note that skipping rows without sorting has little sense, since they can be returned in arbitrary order because of performance reasons.
Rows can be sorted with [`sort` pipe](#sort-pipe).
See also:
- [`head` pipe](#head-pipe)
### sort pipe ### sort pipe
By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) must be used. By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) must be used.
@ -1184,8 +1184,8 @@ It is recommended limiting the number of logs before sorting with the following
See also: See also:
- [`stats` pipe](#stats-pipe) - [`stats` pipe](#stats-pipe)
- [`head` pipe](#head-pipe) - [`limit` pipe](#limit-pipe)
- [`skip` pipe](#skip-pipe) - [`offset` pipe](#offset-pipe)
### stats pipe ### stats pipe
@ -1545,7 +1545,7 @@ Use [`sort` pipe](#sort-pipe) for sorting the results.
LogsQL provides the following [pipes](#pipes) for limiting the number of returned log entries: LogsQL provides the following [pipes](#pipes) for limiting the number of returned log entries:
- [`fields`](#fields-pipe) and [`delete`](#delete-pipe) pipes allow limiting the set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) to return. - [`fields`](#fields-pipe) and [`delete`](#delete-pipe) pipes allow limiting the set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) to return.
- [`head` pipe](#head-pipe) allows limiting the number of log entries to return. - [`limit` pipe](#limit-pipe) allows limiting the number of log entries to return.
## Querying specific fields ## Querying specific fields

View file

@ -80,16 +80,16 @@ func parsePipes(lex *lexer) ([]pipe, error) {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
} }
pipes = append(pipes, ps) pipes = append(pipes, ps)
case lex.isKeyword("head"): case lex.isKeyword("limit", "head"):
ph, err := parsePipeHead(lex) pl, err := parsePipeLimit(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err)
} }
pipes = append(pipes, ph) pipes = append(pipes, pl)
case lex.isKeyword("skip"): case lex.isKeyword("offset", "skip"):
ps, err := parsePipeSkip(lex) ps, err := parsePipeOffset(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err)
} }
pipes = append(pipes, ps) pipes = append(pipes, ps)
case lex.isKeyword("fields"): case lex.isKeyword("fields"):
@ -98,19 +98,19 @@ func parsePipes(lex *lexer) ([]pipe, error) {
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
} }
pipes = append(pipes, pf) pipes = append(pipes, pf)
case lex.isKeyword("copy"): case lex.isKeyword("copy", "cp"):
pc, err := parsePipeCopy(lex) pc, err := parsePipeCopy(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err)
} }
pipes = append(pipes, pc) pipes = append(pipes, pc)
case lex.isKeyword("rename"): case lex.isKeyword("rename", "mv"):
pr, err := parsePipeRename(lex) pr, err := parsePipeRename(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err)
} }
pipes = append(pipes, pr) pipes = append(pipes, pr)
case lex.isKeyword("delete"): case lex.isKeyword("delete", "del", "rm"):
pd, err := parsePipeDelete(lex) pd, err := parsePipeDelete(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)

View file

@ -9,7 +9,7 @@ import (
// pipeCopy implements '| copy ...' pipe. // pipeCopy implements '| copy ...' pipe.
// //
// See https://docs.victoriametrics.com/victorialogs/logsql/#transformations // See https://docs.victoriametrics.com/victorialogs/logsql/#copy-pipe
type pipeCopy struct { type pipeCopy struct {
// srcFields contains a list of source fields to copy // srcFields contains a list of source fields to copy
srcFields []string srcFields []string
@ -61,8 +61,8 @@ func (pcp *pipeCopyProcessor) flush() error {
} }
func parsePipeCopy(lex *lexer) (*pipeCopy, error) { func parsePipeCopy(lex *lexer) (*pipeCopy, error) {
if !lex.isKeyword("copy") { if !lex.isKeyword("copy", "cp") {
return nil, fmt.Errorf("expecting 'copy'; got %q", lex.token) return nil, fmt.Errorf("expecting 'copy' or 'cp'; got %q", lex.token)
} }
var srcFields []string var srcFields []string

View file

@ -8,7 +8,7 @@ import (
// pipeDelete implements '| delete ...' pipe. // pipeDelete implements '| delete ...' pipe.
// //
// See https://docs.victoriametrics.com/victorialogs/logsql/#transformations // See https://docs.victoriametrics.com/victorialogs/logsql/#delete-pipe
type pipeDelete struct { type pipeDelete struct {
// fields contains a list of fields to delete // fields contains a list of fields to delete
fields []string fields []string
@ -52,8 +52,8 @@ func (pdp *pipeDeleteProcessor) flush() error {
} }
func parsePipeDelete(lex *lexer) (*pipeDelete, error) { func parsePipeDelete(lex *lexer) (*pipeDelete, error) {
if !lex.isKeyword("delete") { if !lex.isKeyword("delete", "del", "rm") {
return nil, fmt.Errorf("expecting 'delete'; got %q", lex.token) return nil, fmt.Errorf("expecting 'delete', 'del' or 'rm'; got %q", lex.token)
} }
var fields []string var fields []string

View file

@ -1,86 +0,0 @@
package logstorage
import (
"fmt"
"sync/atomic"
)
// pipeHead implements '| head ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters
type pipeHead struct {
n uint64
}
func (ph *pipeHead) String() string {
return fmt.Sprintf("head %d", ph.n)
}
func (ph *pipeHead) getNeededFields() ([]string, map[string][]string) {
return []string{"*"}, nil
}
func (ph *pipeHead) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
if ph.n == 0 {
// Special case - notify the caller to stop writing data to the returned pipeHeadProcessor
cancel()
}
return &pipeHeadProcessor{
ph: ph,
cancel: cancel,
ppBase: ppBase,
}
}
type pipeHeadProcessor struct {
ph *pipeHead
cancel func()
ppBase pipeProcessor
rowsProcessed atomic.Uint64
}
func (php *pipeHeadProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := php.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= php.ph.n {
// Fast path - write all the rows to ppBase.
php.ppBase.writeBlock(workerID, br)
return
}
// Slow path - overflow. Write the remaining rows if needed.
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= php.ph.n {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return
}
// Write remaining rows.
keepRows := php.ph.n - rowsProcessed
br.truncateRows(int(keepRows))
php.ppBase.writeBlock(workerID, br)
// Notify the caller that it should stop passing more data to writeBlock().
php.cancel()
}
func (php *pipeHeadProcessor) flush() error {
return nil
}
func parsePipeHead(lex *lexer) (*pipeHead, error) {
if !lex.isKeyword("head") {
return nil, fmt.Errorf("expecting 'head'; got %q", lex.token)
}
lex.nextToken()
n, err := parseUint(lex.token)
if err != nil {
return nil, fmt.Errorf("cannot parse the number of head rows to return from %q: %w", lex.token, err)
}
lex.nextToken()
ph := &pipeHead{
n: n,
}
return ph, nil
}

View file

@ -0,0 +1,86 @@
package logstorage
import (
"fmt"
"sync/atomic"
)
// pipeLimit implements '| limit ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
type pipeLimit struct {
n uint64
}
func (pl *pipeLimit) String() string {
return fmt.Sprintf("limit %d", pl.n)
}
func (pl *pipeLimit) getNeededFields() ([]string, map[string][]string) {
return []string{"*"}, nil
}
func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
if pl.n == 0 {
// Special case - notify the caller to stop writing data to the returned pipeLimitProcessor
cancel()
}
return &pipeLimitProcessor{
pl: pl,
cancel: cancel,
ppBase: ppBase,
}
}
type pipeLimitProcessor struct {
pl *pipeLimit
cancel func()
ppBase pipeProcessor
rowsProcessed atomic.Uint64
}
func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= plp.pl.n {
// Fast path - write all the rows to ppBase.
plp.ppBase.writeBlock(workerID, br)
return
}
// Slow path - overflow. Write the remaining rows if needed.
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= plp.pl.n {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return
}
// Write remaining rows.
keepRows := plp.pl.n - rowsProcessed
br.truncateRows(int(keepRows))
plp.ppBase.writeBlock(workerID, br)
// Notify the caller that it should stop passing more data to writeBlock().
plp.cancel()
}
func (plp *pipeLimitProcessor) flush() error {
return nil
}
func parsePipeLimit(lex *lexer) (*pipeLimit, error) {
if !lex.isKeyword("limit", "head") {
return nil, fmt.Errorf("expecting 'limit' or 'head'; got %q", lex.token)
}
lex.nextToken()
n, err := parseUint(lex.token)
if err != nil {
return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err)
}
lex.nextToken()
pl := &pipeLimit{
n: n,
}
return pl, nil
}

View file

@ -0,0 +1,73 @@
package logstorage
import (
"fmt"
"sync/atomic"
)
// pipeOffset implements '| offset ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe
type pipeOffset struct {
n uint64
}
func (po *pipeOffset) String() string {
return fmt.Sprintf("offset %d", po.n)
}
func (po *pipeOffset) getNeededFields() ([]string, map[string][]string) {
return []string{"*"}, nil
}
func (po *pipeOffset) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return &pipeOffsetProcessor{
po: po,
ppBase: ppBase,
}
}
type pipeOffsetProcessor struct {
po *pipeOffset
ppBase pipeProcessor
rowsProcessed atomic.Uint64
}
func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := pop.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= pop.po.n {
return
}
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= pop.po.n {
pop.ppBase.writeBlock(workerID, br)
return
}
rowsSkip := pop.po.n - rowsProcessed
br.skipRows(int(rowsSkip))
pop.ppBase.writeBlock(workerID, br)
}
func (pop *pipeOffsetProcessor) flush() error {
return nil
}
func parsePipeOffset(lex *lexer) (*pipeOffset, error) {
if !lex.isKeyword("offset", "skip") {
return nil, fmt.Errorf("expecting 'offset' or 'skip'; got %q", lex.token)
}
lex.nextToken()
n, err := parseUint(lex.token)
if err != nil {
return nil, fmt.Errorf("cannot parse the number of rows to skip from %q: %w", lex.token, err)
}
lex.nextToken()
po := &pipeOffset{
n: n,
}
return po, nil
}

View file

@ -9,7 +9,7 @@ import (
// pipeRename implements '| rename ...' pipe. // pipeRename implements '| rename ...' pipe.
// //
// See https://docs.victoriametrics.com/victorialogs/logsql/#transformations // See https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe
type pipeRename struct { type pipeRename struct {
// srcFields contains a list of source fields to rename // srcFields contains a list of source fields to rename
srcFields []string srcFields []string
@ -66,8 +66,8 @@ func (prp *pipeRenameProcessor) flush() error {
} }
func parsePipeRename(lex *lexer) (*pipeRename, error) { func parsePipeRename(lex *lexer) (*pipeRename, error) {
if !lex.isKeyword("rename") { if !lex.isKeyword("rename", "mv") {
return nil, fmt.Errorf("expecting 'rename'; got %q", lex.token) return nil, fmt.Errorf("expecting 'rename' or 'mv'; got %q", lex.token)
} }
var srcFields []string var srcFields []string

View file

@ -1,73 +0,0 @@
package logstorage
import (
"fmt"
"sync/atomic"
)
// pipeSkip implements '| skip ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limiters
type pipeSkip struct {
n uint64
}
func (ps *pipeSkip) String() string {
return fmt.Sprintf("skip %d", ps.n)
}
func (ps *pipeSkip) getNeededFields() ([]string, map[string][]string) {
return []string{"*"}, nil
}
func (ps *pipeSkip) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return &pipeSkipProcessor{
ps: ps,
ppBase: ppBase,
}
}
type pipeSkipProcessor struct {
ps *pipeSkip
ppBase pipeProcessor
rowsProcessed atomic.Uint64
}
func (psp *pipeSkipProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := psp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= psp.ps.n {
return
}
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= psp.ps.n {
psp.ppBase.writeBlock(workerID, br)
return
}
rowsSkip := psp.ps.n - rowsProcessed
br.skipRows(int(rowsSkip))
psp.ppBase.writeBlock(workerID, br)
}
func (psp *pipeSkipProcessor) flush() error {
return nil
}
func parsePipeSkip(lex *lexer) (*pipeSkip, error) {
if !lex.isKeyword("skip") {
return nil, fmt.Errorf("expecting 'rename'; got %q", lex.token)
}
lex.nextToken()
n, err := parseUint(lex.token)
if err != nil {
return nil, fmt.Errorf("cannot parse the number of rows to skip from %q: %w", lex.token, err)
}
lex.nextToken()
ps := &pipeSkip{
n: n,
}
return ps, nil
}

View file

@ -15,7 +15,7 @@ import (
// pipeStats processes '| stats ...' queries. // pipeStats processes '| stats ...' queries.
// //
// See https://docs.victoriametrics.com/victorialogs/logsql/#stats // See https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe
type pipeStats struct { type pipeStats struct {
// byFields contains field names with optional buckets from 'by(...)' clause. // byFields contains field names with optional buckets from 'by(...)' clause.
byFields []*byField byFields []*byField