This commit is contained in:
Aliaksandr Valialkin 2024-04-27 02:50:19 +02:00
parent 51b869d458
commit 2270c42c82
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 150 additions and 17 deletions

View file

@ -22,6 +22,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default.
* FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields).
* FEATURE: add support for calculating the number of matching logs and the number of logs with non-empty [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details.
* FEATURE: add support for returning the first `N` results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters).
* FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed.
* BUGFIX: prevent from additional CPU usage for up to a few seconds after canceling the query.

View file

@ -1092,12 +1092,15 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo
## Limiters
LogsQL provides the following functionality for limiting the amounts of returned log entries:
- `error | head 10` - returns up to 10 log entries with the `error` [word](#word).
LogsQL will support the ability to page the returned results.
It is possible to limit the returned results with `head`, `tail`, `less`, etc. Unix commands
according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
LogsQL will support the ability to limit the number of returned results alongside the ability to page the returned results.
Additionally, LogsQL will provide the ability to select fields, which must be returned in the response.
See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details.
## Querying specific fields

View file

@ -813,10 +813,20 @@ func TestParseQuerySuccess(t *testing.T) {
// multiple fields pipes
f(`foo | fields bar | fields baz, abc`, `foo | fields bar | fields baz, abc`)
// head pipe
f(`foo | head 10`, `foo | head 10`)
f(`foo | HEAD 1123432`, `foo | head 1123432`)
// multiple head pipes
f(`foo | head 100 | head 10 | head 234`, `foo | head 100 | head 10 | head 234`)
// stats count pipe
f(`* | Stats count() AS foo`, `* | stats count() as foo`)
f(`* | STATS bY (foo, b.a/r, "b az") count(*) as XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`)
f(`* | stats by() COUNT(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`)
// multiple different pipes
f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`)
}
func TestParseQueryFailure(t *testing.T) {
@ -1028,4 +1038,35 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | fields ,`)
f(`foo | fields bar,`)
f(`foo | fields bar,,`)
// missing head pipe value
f(`foo | head`)
// invalid head pipe value
f(`foo | head bar`)
f(`foo | head -123`)
// missing stats
f(`foo | stats`)
// invalid stats
f(`foo | stats bar`)
// invalid count
f(`foo | stats count`)
f(`foo | stats count(`)
f(`foo | stats count bar`)
f(`foo | stats count(bar`)
f(`foo | stats count(bar)`)
f(`foo | stats count() bar`)
f(`foo | stats count() as`)
f(`foo | stats count() as |`)
// invalid by clause
f(`foo | stats by`)
f(`foo | stats by bar`)
f(`foo | stats by(`)
f(`foo | stats by(bar`)
f(`foo | stats by(bar,`)
f(`foo | stats by(bar)`)
}

View file

@ -5,6 +5,7 @@ import (
"slices"
"strconv"
"strings"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -23,7 +24,7 @@ type pipe interface {
// If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds.
// It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds.
//
// The returned pipeProcessor may call cancel() at any time in order to stop ppBase.
// The returned pipeProcessor may call cancel() at any time in order to notify writeBlock callers that it doesn't accept more data.
newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor
}
@ -39,7 +40,7 @@ type pipeProcessor interface {
// flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor.
//
// The pipeProcessor must call ppBase.flush() and cancel(), which has been passed to newPipeProcessor, before returning from the flush.
// The pipeProcessor must call cancel() and ppBase.flush(), which has been passed to newPipeProcessor, before returning from the flush.
flush()
}
@ -79,6 +80,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
}
pipes = append(pipes, sp)
case lex.isKeyword("head"):
hp, err := parseHeadPipe(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err)
}
pipes = append(pipes, hp)
default:
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
}
@ -135,8 +142,8 @@ func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, co
}
func (fpp *fieldsPipeProcessor) flush() {
fpp.ppBase.flush()
fpp.cancel()
fpp.ppBase.flush()
}
func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) {
@ -148,7 +155,10 @@ func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) {
if lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected ','; expecting field name")
}
field := parseFieldName(lex)
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
fields = append(fields, field)
switch {
case lex.isKeyword("|", ")", ""):
@ -320,8 +330,8 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
func (spp *statsPipeProcessor) flush() {
defer func() {
spp.ppBase.flush()
spp.cancel()
spp.ppBase.flush()
}()
// Merge states across shards
@ -560,7 +570,10 @@ func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing token after 'as' keyword")
}
resultName := parseFieldName(lex)
resultName, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'as' field name: %w", err)
}
sfc := &statsFuncCount{
fields: fields,
@ -569,6 +582,80 @@ func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
return sfc, nil
}
type headPipe struct {
n uint64
}
func (hp *headPipe) String() string {
return fmt.Sprintf("head %d", hp.n)
}
func (hp *headPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
return &headPipeProcessor{
hp: hp,
cancel: cancel,
ppBase: ppBase,
}
}
type headPipeProcessor struct {
hp *headPipe
cancel func()
ppBase pipeProcessor
rowsWritten atomic.Uint64
}
func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
rowsWritten := hpp.rowsWritten.Add(uint64(len(timestamps)))
if rowsWritten <= hpp.hp.n {
// Fast path - write all the rows to ppBase.
hpp.ppBase.writeBlock(workerID, timestamps, columns)
return
}
// Slow path - overflow. Write the remaining rows if needed.
rowsWritten -= uint64(len(timestamps))
if rowsWritten >= hpp.hp.n {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return
}
// Write remaining rows.
rowsRemaining := hpp.hp.n - rowsWritten
cs := make([]BlockColumn, len(columns))
for i, c := range columns {
cDst := &cs[i]
cDst.Name = c.Name
cDst.Values = c.Values[:rowsRemaining]
}
timestamps = timestamps[:rowsRemaining]
hpp.ppBase.writeBlock(workerID, timestamps, cs)
// Notify the caller that it should stop passing more data to writeBlock().
hpp.cancel()
}
func (hpp *headPipeProcessor) flush() {
hpp.cancel()
hpp.ppBase.flush()
}
func parseHeadPipe(lex *lexer) (*headPipe, error) {
if !lex.mustNextToken() {
return nil, fmt.Errorf("missing the number of head rows to return")
}
n, err := strconv.ParseUint(lex.token, 10, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse %q: %w", lex.token, err)
}
lex.nextToken()
hp := &headPipe{
n: n,
}
return hp, nil
}
func parseFieldNamesInParens(lex *lexer) ([]string, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
@ -585,7 +672,10 @@ func parseFieldNamesInParens(lex *lexer) ([]string, error) {
if lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected `,`")
}
field := parseFieldName(lex)
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
fields = append(fields, field)
switch {
case lex.isKeyword(")"):
@ -598,14 +688,12 @@ func parseFieldNamesInParens(lex *lexer) ([]string, error) {
}
}
func parseFieldName(lex *lexer) string {
s := lex.token
lex.nextToken()
for !lex.isSkippedSpace && !lex.isKeyword(",", "|", ")", "") {
s += lex.rawToken
lex.nextToken()
func parseFieldName(lex *lexer) (string, error) {
if lex.isKeyword(",", "(", ")", "[", "]", "|", "") {
return "", fmt.Errorf("unexpected token: %q", lex.token)
}
return s
token := getCompoundToken(lex)
return token, nil
}
func fieldNamesString(fields []string) string {