diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index bebd889324..5b0ea5ae5f 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -22,6 +22,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default. * FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields). * FEATURE: add support for calculating the number of matching logs and the number of logs with non-empty [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details. +* FEATURE: add support for returning the first `N` results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters). * FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed. * BUGFIX: prevent from additional CPU usage for up to a few seconds after canceling the query. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index b5dd58ae92..a3671e7afe 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1092,12 +1092,15 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo ## Limiters +LogsQL provides the following functionality for limiting the amounts of returned log entries: + +- `error | head 10` - returns up to 10 log entries with the `error` [word](#word). + +LogsQL will support the ability to page the returned results. + It is possible to limit the returned results with `head`, `tail`, `less`, etc. Unix commands according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line). -LogsQL will support the ability to limit the number of returned results alongside the ability to page the returned results. -Additionally, LogsQL will provide the ability to select fields, which must be returned in the response. - See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details. ## Querying specific fields diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 369b8afd66..c4fbea08e5 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -813,10 +813,20 @@ func TestParseQuerySuccess(t *testing.T) { // multiple fields pipes f(`foo | fields bar | fields baz, abc`, `foo | fields bar | fields baz, abc`) + // head pipe + f(`foo | head 10`, `foo | head 10`) + f(`foo | HEAD 1123432`, `foo | head 1123432`) + + // multiple head pipes + f(`foo | head 100 | head 10 | head 234`, `foo | head 100 | head 10 | head 234`) + // stats count pipe f(`* | Stats count() AS foo`, `* | stats count() as foo`) f(`* | STATS bY (foo, b.a/r, "b az") count(*) as XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`) f(`* | stats by() COUNT(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`) + + // multiple different pipes + f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`) } func TestParseQueryFailure(t *testing.T) { @@ -1028,4 +1038,35 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | fields ,`) f(`foo | fields bar,`) f(`foo | fields bar,,`) + + // missing head pipe value + f(`foo | head`) + + // invalid head pipe value + f(`foo | head bar`) + f(`foo | head -123`) + + // missing stats + f(`foo | stats`) + + // invalid stats + f(`foo | stats bar`) + + // invalid count + f(`foo | stats count`) + f(`foo | stats count(`) + f(`foo | stats count bar`) + f(`foo | stats count(bar`) + f(`foo | stats count(bar)`) + f(`foo | stats count() bar`) + f(`foo | stats count() as`) + f(`foo | stats count() as |`) + + // invalid by clause + f(`foo | stats by`) + f(`foo | stats by bar`) + f(`foo | stats by(`) + f(`foo | stats by(bar`) + f(`foo | stats by(bar,`) + f(`foo | stats by(bar)`) } diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 645628d413..d757016bbf 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -5,6 +5,7 @@ import ( "slices" "strconv" "strings" + "sync/atomic" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -23,7 +24,7 @@ type pipe interface { // If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds. // It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds. // - // The returned pipeProcessor may call cancel() at any time in order to stop ppBase. + // The returned pipeProcessor may call cancel() at any time in order to notify writeBlock callers that it doesn't accept more data. newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor } @@ -39,7 +40,7 @@ type pipeProcessor interface { // flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor. // - // The pipeProcessor must call ppBase.flush() and cancel(), which has been passed to newPipeProcessor, before returning from the flush. + // The pipeProcessor must call cancel() and ppBase.flush(), which has been passed to newPipeProcessor, before returning from the flush. flush() } @@ -79,6 +80,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } pipes = append(pipes, sp) + case lex.isKeyword("head"): + hp, err := parseHeadPipe(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err) + } + pipes = append(pipes, hp) default: return nil, fmt.Errorf("unexpected pipe %q", lex.token) } @@ -135,8 +142,8 @@ func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, co } func (fpp *fieldsPipeProcessor) flush() { - fpp.ppBase.flush() fpp.cancel() + fpp.ppBase.flush() } func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { @@ -148,7 +155,10 @@ func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { if lex.isKeyword(",") { return nil, fmt.Errorf("unexpected ','; expecting field name") } - field := parseFieldName(lex) + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse field name: %w", err) + } fields = append(fields, field) switch { case lex.isKeyword("|", ")", ""): @@ -320,8 +330,8 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col func (spp *statsPipeProcessor) flush() { defer func() { - spp.ppBase.flush() spp.cancel() + spp.ppBase.flush() }() // Merge states across shards @@ -560,7 +570,10 @@ func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { if !lex.mustNextToken() { return nil, fmt.Errorf("missing token after 'as' keyword") } - resultName := parseFieldName(lex) + resultName, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'as' field name: %w", err) + } sfc := &statsFuncCount{ fields: fields, @@ -569,6 +582,80 @@ func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { return sfc, nil } +type headPipe struct { + n uint64 +} + +func (hp *headPipe) String() string { + return fmt.Sprintf("head %d", hp.n) +} + +func (hp *headPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + return &headPipeProcessor{ + hp: hp, + cancel: cancel, + ppBase: ppBase, + } +} + +type headPipeProcessor struct { + hp *headPipe + cancel func() + ppBase pipeProcessor + + rowsWritten atomic.Uint64 +} + +func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { + rowsWritten := hpp.rowsWritten.Add(uint64(len(timestamps))) + if rowsWritten <= hpp.hp.n { + // Fast path - write all the rows to ppBase. + hpp.ppBase.writeBlock(workerID, timestamps, columns) + return + } + + // Slow path - overflow. Write the remaining rows if needed. + rowsWritten -= uint64(len(timestamps)) + if rowsWritten >= hpp.hp.n { + // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. + return + } + + // Write remaining rows. + rowsRemaining := hpp.hp.n - rowsWritten + cs := make([]BlockColumn, len(columns)) + for i, c := range columns { + cDst := &cs[i] + cDst.Name = c.Name + cDst.Values = c.Values[:rowsRemaining] + } + timestamps = timestamps[:rowsRemaining] + hpp.ppBase.writeBlock(workerID, timestamps, cs) + + // Notify the caller that it should stop passing more data to writeBlock(). + hpp.cancel() +} + +func (hpp *headPipeProcessor) flush() { + hpp.cancel() + hpp.ppBase.flush() +} + +func parseHeadPipe(lex *lexer) (*headPipe, error) { + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing the number of head rows to return") + } + n, err := strconv.ParseUint(lex.token, 10, 64) + if err != nil { + return nil, fmt.Errorf("cannot parse %q: %w", lex.token, err) + } + lex.nextToken() + hp := &headPipe{ + n: n, + } + return hp, nil +} + func parseFieldNamesInParens(lex *lexer) ([]string, error) { if !lex.isKeyword("(") { return nil, fmt.Errorf("missing `(`") @@ -585,7 +672,10 @@ func parseFieldNamesInParens(lex *lexer) ([]string, error) { if lex.isKeyword(",") { return nil, fmt.Errorf("unexpected `,`") } - field := parseFieldName(lex) + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse field name: %w", err) + } fields = append(fields, field) switch { case lex.isKeyword(")"): @@ -598,14 +688,12 @@ func parseFieldNamesInParens(lex *lexer) ([]string, error) { } } -func parseFieldName(lex *lexer) string { - s := lex.token - lex.nextToken() - for !lex.isSkippedSpace && !lex.isKeyword(",", "|", ")", "") { - s += lex.rawToken - lex.nextToken() +func parseFieldName(lex *lexer) (string, error) { + if lex.isKeyword(",", "(", ")", "[", "]", "|", "") { + return "", fmt.Errorf("unexpected token: %q", lex.token) } - return s + token := getCompoundToken(lex) + return token, nil } func fieldNamesString(fields []string) string {