From 5512787b72b7ff14d1f7a885843907cb931cf5a9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 25 Apr 2024 03:44:07 +0200 Subject: [PATCH] lib/logstorage: add support for fields modifier --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/LogsQL.md | 39 ++----- docs/VictoriaLogs/data-ingestion/README.md | 24 ++-- docs/VictoriaLogs/querying/README.md | 6 +- lib/logstorage/block_search.go | 3 +- lib/logstorage/filters.go | 83 -------------- lib/logstorage/parser.go | 126 +++++++++++++++------ lib/logstorage/parser_test.go | 23 ++++ 8 files changed, 134 insertions(+), 171 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 79b9d84a4..3516dfac6 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -20,6 +20,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip * FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed. +* FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default. If only some fields must be returned, then they can be listed in `| fields ...` section as described in [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields). * BUGFIX: prevent from additional CPU usage for up to a few seconds after canceling the query. * BUGFIX: prevent from returning log entries with emtpy `_stream` field in the form `"_stream":""` in [search query results](https://docs.victoriametrics.com/victorialogs/querying/). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042). diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 77fff6fdd..d4b662657 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -83,20 +83,8 @@ So LogsQL allows omitting `AND` words. For example, the following query is equiv error _time:5m ``` -The query returns the following [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) by default: - -- [`_msg` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field) -- [`_stream` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) -- [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) - -Logs may contain arbitrary number of other fields. If you need obtaining some of these fields in query results, -then just refer them in the query with `field_name:*` [filter](#any-value-filter). See [these docs](#querying-specific-fields) for more details. - -For example, the following query returns `host.hostname` field additionally to `_msg`, `_stream` and `_time` fields: - -```logsql -error _time:5m host.hostname:* -``` +The query returns all the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) by default. +See [how to query specific fields](#querying-specific-fields). Suppose the query above selects too many rows because some buggy app pushes invalid error logs to VictoriaLogs. Suppose the app adds `buggy_app` [word](#word) to every log line. Then the following query removes all the logs from the buggy app, allowing us paying attention to the real errors: @@ -1107,24 +1095,15 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo ## Querying specific fields -By default VictoriaLogs query response contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), -[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and -[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields. +By default VictoriaLogs query response contains all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -If you want selecting other fields from the ingested [structured logs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model), -then they must be mentioned in query filters. For example, if you want selecting `log.level` field, and this field isn't mentioned in the query yet, then add -`log.level:*` [filter](#any-value-filter) filter to the end of the query. -The `field_name:*` filter doesn't return log entries with empty or missing `field_name`. If you want returning log entries -with and without the given field, then `(field_name:* OR field_name:"")` filter can be used. -See the following docs for details: +If you want selecting some specific fields, then add `| fields field1, field2, ... fieldN` to the end of the query. +For example, the following query returns only [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field), +[`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), `host` and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields: -- [Any value filter](#any-value-filter) -- [Empty value filter](#empty-value-filter) -- [Logical filter](#logical-filter) - -In the future LogsQL will support `| fields field1, field2, ... fieldN` syntax for selecting the listed fields. -It will also support the ability to select all the fields for the matching log entries with `| fields *` syntax. -See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details. +```logsql +error | fields _time, _stream, host, _msg +``` ## Performance tips diff --git a/docs/VictoriaLogs/data-ingestion/README.md b/docs/VictoriaLogs/data-ingestion/README.md index 6d1bdc63b..8ec0e8b3c 100644 --- a/docs/VictoriaLogs/data-ingestion/README.md +++ b/docs/VictoriaLogs/data-ingestion/README.md @@ -79,10 +79,8 @@ The command should return the following response: {"_msg":"cannot open file","_stream":"{}","_time":"2023-06-21T04:24:24Z","host.name":"host123"} ``` -The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), -[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and -[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details. +The response by default contains all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +See [how to query specific fields](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields). The duration of requests to `/insert/elasticsearch/_bulk` can be monitored with `vl_http_request_duration_seconds{path="/insert/elasticsearch/_bulk"}` metric. @@ -133,10 +131,8 @@ The command should return the following response: {"_msg":"oh no!","_stream":"{stream=\"stream1\"}","_time":"2023-06-20T15:32:10.567Z","log.level":"error"} ``` -The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), -[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and -[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details. +The response by default contains all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +See [how to query specific fields](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields). The duration of requests to `/insert/jsonline` can be monitored with `vl_http_request_duration_seconds{path="/insert/jsonline"}` metric. @@ -174,10 +170,8 @@ The command should return the following response: {"_msg":"foo fizzbuzz bar","_stream":"{instance=\"host123\",job=\"app42\"}","_time":"2023-07-20T23:01:19.288676497Z"} ``` -The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), -[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and -[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details. +The response by default contains all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +See [how to query specific fields](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields). The duration of requests to `/insert/loki/api/v1/push` can be monitored with `vl_http_request_duration_seconds{path="/insert/loki/api/v1/push"}` metric. @@ -232,10 +226,8 @@ using [any value filter](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.ht while `head` cancels query execution after reading the first 10 log lines. See [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line) for more details on how `head` integrates with VictoriaLogs. -The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), -[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and -[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details. +The response by default contains all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +See [how to query specific fields](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields). VictoriaLogs provides the following command-line flags, which can help debugging data ingestion issues: diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index a1aaff819..77ef0004f 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -31,10 +31,8 @@ For example, the following query returns all the log entries with the `error` wo curl http://localhost:9428/select/logsql/query -d 'query=error' ``` -The response by default contains [`_msg`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), -[`_stream`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) and -[`_time`](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) fields plus the explicitly mentioned fields. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields) for details. +The response by default contains all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +See [how to query specific fields](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#querying-specific-fields). The `query` argument can be passed either in the request url itself (aka HTTP GET request) or via request body with the `x-www-form-urlencoded` encoding (aka HTTP POST request). The HTTP POST is useful for sending long queries diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index d0de2acb8..a0ea0e3dd 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -1,6 +1,7 @@ package logstorage import ( + "slices" "strconv" "sync" "time" @@ -118,7 +119,7 @@ func (bs *blockSearch) search(bsw *blockSearchWork) { // fetch the requested columns to bs.br. columnNames := bs.bsw.so.resultColumnNames - if len(columnNames) == 1 && columnNames[0] == "*" { + if slices.Contains(columnNames, "*") { bs.br.fetchAllColumns(bs, bm) } else { bs.br.fetchRequestedColumns(bs, bm, columnNames) diff --git a/lib/logstorage/filters.go b/lib/logstorage/filters.go index aab59d591..acc638b3f 100644 --- a/lib/logstorage/filters.go +++ b/lib/logstorage/filters.go @@ -140,9 +140,6 @@ type filter interface { // String returns string representation of the filter String() string - // updateReferencedColumnNames updates m with the column names referenced by the filter - updateReferencedColumnNames(m map[string]struct{}) - // apply must update bm according to the filter applied to the given bs block apply(bs *blockSearch, bm *filterBitmap) } @@ -155,10 +152,6 @@ func (nf *noopFilter) String() string { return "" } -func (nf *noopFilter) updateReferencedColumnNames(_ map[string]struct{}) { - // nothing to do -} - func (nf *noopFilter) apply(_ *blockSearch, _ *filterBitmap) { // nothing to do } @@ -180,12 +173,6 @@ func (of *orFilter) String() string { return strings.Join(a, " or ") } -func (of *orFilter) updateReferencedColumnNames(m map[string]struct{}) { - for _, f := range of.filters { - f.updateReferencedColumnNames(m) - } -} - func (of *orFilter) apply(bs *blockSearch, bm *filterBitmap) { bmResult := getFilterBitmap(bm.bitsLen) bmTmp := getFilterBitmap(bm.bitsLen) @@ -233,12 +220,6 @@ func (af *andFilter) String() string { return strings.Join(a, " ") } -func (af *andFilter) updateReferencedColumnNames(m map[string]struct{}) { - for _, f := range af.filters { - f.updateReferencedColumnNames(m) - } -} - func (af *andFilter) apply(bs *blockSearch, bm *filterBitmap) { if tokens := af.getMsgTokens(); len(tokens) > 0 { // Verify whether af tokens for the _msg field match bloom filter. @@ -316,10 +297,6 @@ func (nf *notFilter) String() string { return "!" + s } -func (nf *notFilter) updateReferencedColumnNames(m map[string]struct{}) { - nf.f.updateReferencedColumnNames(m) -} - func (nf *notFilter) apply(bs *blockSearch, bm *filterBitmap) { // Minimize the number of rows to check by the filter by applying it // only to the rows, which match the bm, e.g. they may change the bm result. @@ -367,10 +344,6 @@ func (sf *streamFilter) initStreamIDs() { sf.streamIDs = m } -func (sf *streamFilter) updateReferencedColumnNames(m map[string]struct{}) { - m["_stream"] = struct{}{} -} - func (sf *streamFilter) apply(bs *blockSearch, bm *filterBitmap) { if sf.f.isEmpty() { return @@ -396,10 +369,6 @@ func (tf *timeFilter) String() string { return "_time:" + tf.stringRepr } -func (tf *timeFilter) updateReferencedColumnNames(m map[string]struct{}) { - m["_time"] = struct{}{} -} - func (tf *timeFilter) apply(bs *blockSearch, bm *filterBitmap) { minTimestamp := tf.minTimestamp maxTimestamp := tf.maxTimestamp @@ -475,10 +444,6 @@ func (sf *sequenceFilter) initNonEmptyPhrases() { sf.nonEmptyPhrases = result } -func (sf *sequenceFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[sf.fieldName] = struct{}{} -} - func (sf *sequenceFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := sf.fieldName phrases := sf.getNonEmptyPhrases() @@ -556,10 +521,6 @@ func (ef *exactPrefixFilter) initTokens() { ef.tokens = getTokensSkipLast(ef.prefix) } -func (ef *exactPrefixFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[ef.fieldName] = struct{}{} -} - func (ef *exactPrefixFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := ef.fieldName prefix := ef.prefix @@ -632,10 +593,6 @@ func (ef *exactFilter) initTokens() { ef.tokens = tokenizeStrings(nil, []string{ef.value}) } -func (ef *exactFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[ef.fieldName] = struct{}{} -} - func (ef *exactFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := ef.fieldName value := ef.value @@ -923,10 +880,6 @@ func (af *inFilter) initTimestampISO8601Values() { af.timestampISO8601Values = m } -func (af *inFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[af.fieldName] = struct{}{} -} - func (af *inFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := af.fieldName @@ -1006,10 +959,6 @@ func (rf *ipv4RangeFilter) String() string { return fmt.Sprintf("%sipv4_range(%s, %s)", quoteFieldNameIfNeeded(rf.fieldName), toIPv4String(nil, minValue), toIPv4String(nil, maxValue)) } -func (rf *ipv4RangeFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[rf.fieldName] = struct{}{} -} - func (rf *ipv4RangeFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := rf.fieldName minValue := rf.minValue @@ -1076,10 +1025,6 @@ func (rf *stringRangeFilter) String() string { return fmt.Sprintf("%sstring_range(%s, %s)", quoteFieldNameIfNeeded(rf.fieldName), quoteTokenIfNeeded(rf.minValue), quoteTokenIfNeeded(rf.maxValue)) } -func (rf *stringRangeFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[rf.fieldName] = struct{}{} -} - func (rf *stringRangeFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := rf.fieldName minValue := rf.minValue @@ -1144,10 +1089,6 @@ func (rf *lenRangeFilter) String() string { return quoteFieldNameIfNeeded(rf.fieldName) + fmt.Sprintf("len_range(%d,%d)", rf.minLen, rf.maxLen) } -func (rf *lenRangeFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[rf.fieldName] = struct{}{} -} - func (rf *lenRangeFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := rf.fieldName minLen := rf.minLen @@ -1215,10 +1156,6 @@ func (rf *rangeFilter) String() string { return quoteFieldNameIfNeeded(rf.fieldName) + "range" + rf.stringRepr } -func (rf *rangeFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[rf.fieldName] = struct{}{} -} - func (rf *rangeFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := rf.fieldName minValue := rf.minValue @@ -1281,10 +1218,6 @@ func (rf *regexpFilter) String() string { return fmt.Sprintf("%sre(%q)", quoteFieldNameIfNeeded(rf.fieldName), rf.re.String()) } -func (rf *regexpFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[rf.fieldName] = struct{}{} -} - func (rf *regexpFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := rf.fieldName re := rf.re @@ -1361,10 +1294,6 @@ func (pf *anyCasePrefixFilter) initTokens() { pf.tokens = getTokensSkipLast(pf.prefix) } -func (pf *anyCasePrefixFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[pf.fieldName] = struct{}{} -} - func (pf *anyCasePrefixFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := pf.fieldName prefixLowercase := strings.ToLower(pf.prefix) @@ -1442,10 +1371,6 @@ func (pf *prefixFilter) initTokens() { pf.tokens = getTokensSkipLast(pf.prefix) } -func (pf *prefixFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[pf.fieldName] = struct{}{} -} - func (pf *prefixFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := pf.fieldName prefix := pf.prefix @@ -1517,10 +1442,6 @@ func (pf *anyCasePhraseFilter) initTokens() { pf.tokens = tokenizeStrings(nil, []string{pf.phrase}) } -func (pf *anyCasePhraseFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[pf.fieldName] = struct{}{} -} - func (pf *anyCasePhraseFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := pf.fieldName phraseLowercase := strings.ToLower(pf.phrase) @@ -1603,10 +1524,6 @@ func (pf *phraseFilter) initTokens() { pf.tokens = tokenizeStrings(nil, []string{pf.phrase}) } -func (pf *phraseFilter) updateReferencedColumnNames(m map[string]struct{}) { - m[pf.fieldName] = struct{}{} -} - func (pf *phraseFilter) apply(bs *blockSearch, bm *filterBitmap) { fieldName := pf.fieldName phrase := pf.phrase diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index a7155ff83..0b31dc995 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -4,7 +4,6 @@ import ( "fmt" "math" "regexp" - "sort" "strconv" "strings" "time" @@ -187,37 +186,34 @@ func (lex *lexer) nextToken() { // Query represents LogsQL query. type Query struct { f filter + + // fields contains optional list of fields to fetch + fields []string } // String returns string representation for q. func (q *Query) String() string { - return q.f.String() + s := q.f.String() + + if len(q.fields) > 0 { + a := make([]string, len(q.fields)) + for i, f := range q.fields { + if f != "*" { + f = quoteTokenIfNeeded(f) + } + a[i] = f + } + s += " | fields " + strings.Join(a, ", ") + } + + return s } func (q *Query) getResultColumnNames() []string { + if len(q.fields) > 0 { + return q.fields + } return []string{"*"} - - m := make(map[string]struct{}) - q.f.updateReferencedColumnNames(m) - - // Substitute an empty column name with _msg column - if _, ok := m[""]; ok { - delete(m, "") - m["_msg"] = struct{}{} - } - - // unconditionally select _time, _stream and _msg columns - // TODO: add the ability to filter out these columns - m["_time"] = struct{}{} - m["_stream"] = struct{}{} - m["_msg"] = struct{}{} - - columnNames := make([]string, 0, len(m)) - for k := range m { - columnNames = append(columnNames, k) - } - sort.Strings(columnNames) - return columnNames } // ParseQuery parses s. @@ -226,18 +222,74 @@ func ParseQuery(s string) (*Query, error) { f, err := parseFilter(lex) if err != nil { - return nil, fmt.Errorf("cannot parse filter expression: %w; context: %s", err, lex.context()) + return nil, fmt.Errorf("%w; context: %s", err, lex.context()) } - if !lex.isEnd() { - return nil, fmt.Errorf("unexpected tail: %q", lex.s) - } - q := &Query{ f: f, } + + if err := q.parsePipes(lex); err != nil { + return nil, fmt.Errorf("%w; context: %s", err, lex.context()) + } + return q, nil } +func (q *Query) parsePipes(lex *lexer) error { + for { + if lex.isEnd() { + return nil + } + if !lex.isKeyword("|") { + return fmt.Errorf("expecting '|'") + } + if !lex.mustNextToken() { + return fmt.Errorf("missing token after '|'") + } + switch { + case lex.isKeyword("fields"): + if err := q.parseFieldsPipe(lex); err != nil { + return fmt.Errorf("cannot parse fields pipe: %w", err) + } + default: + return fmt.Errorf("unexpected pipe %q", lex.token) + } + } +} + +func (q *Query) parseFieldsPipe(lex *lexer) error { + var fields []string + + for { + if !lex.mustNextToken() { + return fmt.Errorf("missing field name") + } + if lex.isKeyword(",") { + return fmt.Errorf("unexpected ','; expecting field name") + } + field := parseFieldName(lex) + fields = append(fields, field) + switch { + case lex.isKeyword("|", ""): + q.fields = fields + return nil + case lex.isKeyword(","): + default: + return fmt.Errorf("unexpected token: %q; expecting ','", lex.token) + } + } +} + +func parseFieldName(lex *lexer) string { + s := lex.token + lex.nextToken() + for !lex.isSkippedSpace && !lex.isKeyword(",", "|", "") { + s += lex.rawToken + lex.nextToken() + } + return s +} + func parseFilter(lex *lexer) (filter, error) { if !lex.mustNextToken() || lex.isKeyword("|") { return nil, fmt.Errorf("missing query") @@ -344,25 +396,25 @@ func parseGenericFilter(lex *lexer, fieldName string) (filter, error) { case lex.isKeyword(",", ")", "[", "]"): return nil, fmt.Errorf("unexpected token %q", lex.token) } - phrase := getCompoundPhrase(lex, fieldName) + phrase := getCompoundPhrase(lex, fieldName == "") return parseFilterForPhrase(lex, phrase, fieldName) } -func getCompoundPhrase(lex *lexer, fieldName string) string { +func getCompoundPhrase(lex *lexer, stopOnColon bool) string { phrase := lex.token rawPhrase := lex.rawToken lex.nextToken() - suffix := getCompoundSuffix(lex, fieldName) + suffix := getCompoundSuffix(lex, stopOnColon) if suffix == "" { return phrase } return rawPhrase + suffix } -func getCompoundSuffix(lex *lexer, fieldName string) string { +func getCompoundSuffix(lex *lexer, stopOnColon bool) string { s := "" stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", ""} - if fieldName == "" { + if stopOnColon { stopTokens = append(stopTokens, ":") } for !lex.isSkippedSpace && !lex.isKeyword(stopTokens...) { @@ -495,7 +547,7 @@ func parseFuncArgMaybePrefix(lex *lexer, funcName, fieldName string, callback fu phrase := lex.token lex.nextToken() if !lex.isKeyword("(") { - phrase += getCompoundSuffix(lex, fieldName) + phrase += getCompoundSuffix(lex, fieldName == "") return parseFilterForPhrase(lex, phrase, fieldName) } if !lex.mustNextToken() { @@ -673,7 +725,7 @@ func parseRangeFilter(lex *lexer, fieldName string) (filter, error) { case lex.isKeyword("["): includeMinValue = true default: - phrase := funcName + getCompoundSuffix(lex, fieldName) + phrase := funcName + getCompoundSuffix(lex, fieldName == "") return parseFilterForPhrase(lex, phrase, fieldName) } if !lex.mustNextToken() { @@ -756,7 +808,7 @@ func parseFuncArgs(lex *lexer, fieldName string, callback func(args []string) (f funcName := lex.token lex.nextToken() if !lex.isKeyword("(") { - phrase := funcName + getCompoundSuffix(lex, fieldName) + phrase := funcName + getCompoundSuffix(lex, fieldName == "") return parseFilterForPhrase(lex, phrase, fieldName) } if !lex.mustNextToken() { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index ca510a785..8b01ceae5 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -803,6 +803,13 @@ func TestParseQuerySuccess(t *testing.T) { and (_stream:{job="a"} or _stream:{instance!="b"}) and (err* or ip:(ipv4_range(1.2.3.0, 1.2.3.255) and not 1.2.3.4))`, `(_time:(2023-04-20,now] or _time:[-10m,-1m)) (_stream:{job="a"} or _stream:{instance!="b"}) (err* or ip:ipv4_range(1.2.3.0, 1.2.3.255) !ip:1.2.3.4)`) + + // fields pipe + f(`foo | fields *`, `foo | fields *`) + f(`foo | fields bar`, `foo | fields bar`) + f(`foo | FIELDS bar,Baz , "a,b|c"`, `foo | fields bar, Baz, "a,b|c"`) + f(`foo | Fields x.y:z/a, _b$c`, `foo | fields "x.y:z/a", "_b$c"`) + f(`foo | fields bar | fields baz, abc`, `foo | fields baz, abc`) } func TestParseQueryFailure(t *testing.T) { @@ -998,4 +1005,20 @@ func TestParseQueryFailure(t *testing.T) { f(`string_range(foo, bar`) f(`string_range(foo)`) f(`string_range(foo, bar, baz)`) + + // missing filter + f(`| fields *`) + + // missing pipe keyword + f(`foo |`) + + // unknown pipe keyword + f(`foo | bar`) + f(`foo | fields bar | baz`) + + // missing field in fields pipe + f(`foo | fields`) + f(`foo | fields ,`) + f(`foo | fields bar,`) + f(`foo | fields bar,,`) }