diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 5b0ea5ae5..fd50f0c71 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -22,7 +22,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default. * FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields). * FEATURE: add support for calculating the number of matching logs and the number of logs with non-empty [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details. -* FEATURE: add support for returning the first `N` results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters). +* FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters). * FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed. * BUGFIX: prevent from additional CPU usage for up to a few seconds after canceling the query. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index a3671e7af..985bf1ebf 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1092,11 +1092,13 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo ## Limiters -LogsQL provides the following functionality for limiting the amounts of returned log entries: +LogsQL provides the following functionality for limiting the number of returned log entries: - `error | head 10` - returns up to 10 log entries with the `error` [word](#word). +- `error | skip 10` - skips the first 10 log entris with the `error` [word](#word). -LogsQL will support the ability to page the returned results. +It is recommended [sorting](#sorting) entries before limiting the number of returned log entries, +in order to get consistent results. It is possible to limit the returned results with `head`, `tail`, `less`, etc. Unix commands according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line). diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index c4fbea08e..b3bca1482 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -820,6 +820,12 @@ func TestParseQuerySuccess(t *testing.T) { // multiple head pipes f(`foo | head 100 | head 10 | head 234`, `foo | head 100 | head 10 | head 234`) + // skip pipe + f(`foo | skip 10`, `foo | skip 10`) + + // multiple skip pipes + f(`foo | skip 10 | skip 100`, `foo | skip 10 | skip 100`) + // stats count pipe f(`* | Stats count() AS foo`, `* | stats count() as foo`) f(`* | STATS bY (foo, b.a/r, "b az") count(*) as XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`) @@ -827,6 +833,7 @@ func TestParseQuerySuccess(t *testing.T) { // multiple different pipes f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`) + f(`* | skip 100 | head 20 | skip 10`, `* | skip 100 | head 20 | skip 10`) } func TestParseQueryFailure(t *testing.T) { @@ -1046,6 +1053,13 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | head bar`) f(`foo | head -123`) + // missing skip pipe value + f(`foo | skip`) + + // invalid skip pipe value + f(`foo | skip bar`) + f(`foo | skip -10`) + // missing stats f(`foo | stats`) diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index d757016bb..db24673e1 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -86,6 +86,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err) } pipes = append(pipes, hp) + case lex.isKeyword("skip"): + sp, err := parseSkipPipe(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err) + } + pipes = append(pipes, sp) default: return nil, fmt.Errorf("unexpected pipe %q", lex.token) } @@ -647,7 +653,7 @@ func parseHeadPipe(lex *lexer) (*headPipe, error) { } n, err := strconv.ParseUint(lex.token, 10, 64) if err != nil { - return nil, fmt.Errorf("cannot parse %q: %w", lex.token, err) + return nil, fmt.Errorf("cannot parse the number of head rows to return %q: %w", lex.token, err) } lex.nextToken() hp := &headPipe{ @@ -656,6 +662,73 @@ func parseHeadPipe(lex *lexer) (*headPipe, error) { return hp, nil } +type skipPipe struct { + n uint64 +} + +func (sp *skipPipe) String() string { + return fmt.Sprintf("skip %d", sp.n) +} + +func (sp *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + return &skipPipeProcessor{ + sp: sp, + cancel: cancel, + ppBase: ppBase, + } +} + +type skipPipeProcessor struct { + sp *skipPipe + cancel func() + ppBase pipeProcessor + + rowsSkipped atomic.Uint64 +} + +func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { + rowsSkipped := spp.rowsSkipped.Add(uint64(len(timestamps))) + if rowsSkipped <= spp.sp.n { + return + } + + rowsSkipped -= uint64(len(timestamps)) + if rowsSkipped >= spp.sp.n { + spp.ppBase.writeBlock(workerID, timestamps, columns) + return + } + + rowsRemaining := spp.sp.n - rowsSkipped + cs := make([]BlockColumn, len(columns)) + for i, c := range columns { + cDst := &cs[i] + cDst.Name = c.Name + cDst.Values = c.Values[rowsRemaining:] + } + timestamps = timestamps[rowsRemaining:] + spp.ppBase.writeBlock(workerID, timestamps, cs) +} + +func (spp *skipPipeProcessor) flush() { + spp.cancel() + spp.ppBase.flush() +} + +func parseSkipPipe(lex *lexer) (*skipPipe, error) { + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing the number of rows to skip") + } + n, err := strconv.ParseUint(lex.token, 10, 64) + if err != nil { + return nil, fmt.Errorf("cannot parse the number of rows to skip %q: %w", lex.token, err) + } + lex.nextToken() + sp := &skipPipe{ + n: n, + } + return sp, nil +} + func parseFieldNamesInParens(lex *lexer) ([]string, error) { if !lex.isKeyword("(") { return nil, fmt.Errorf("missing `(`")