lib/logstorage: support skipping _stream: prefix for stream filters

'_stream:{...}' can be written as '{...}'

This simplifies writing queries with stream filters, and makes them more familier to Loki users.
This commit is contained in:
Aliaksandr Valialkin 2024-09-27 12:05:40 +02:00
parent fbde238cdc
commit 76c1b0b8ea
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 50 additions and 21 deletions

View file

@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): keep selected columns in table view on page reloads. Before, selected columns were reset on each update. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7016). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): keep selected columns in table view on page reloads. Before, selected columns were reset on each update. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7016).
* FEATURE: allow skipping `_stream:` prefix in [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter). This simplifies writing queries with stream filters. Now `{foo="bar"}` is the recommended format for stream filters over the `_stream:{foo="bar"}` format.
## [v0.30.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.30.1-victorialogs) ## [v0.30.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.30.1-victorialogs)

View file

@ -177,7 +177,7 @@ and query performance when querying the needed streams via [`_stream` filter](#s
If the `app` field is associated with the log stream, then the query above can be rewritten to more performant one: If the `app` field is associated with the log stream, then the query above can be rewritten to more performant one:
```logsql ```logsql
_time:5m log.level:error _stream:{app!~"buggy_app|foobar"} _time:5m log.level:error {app!~"buggy_app|foobar"}
``` ```
This query skips scanning for [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) from `buggy_app` and `foobar` apps. This query skips scanning for [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) from `buggy_app` and `foobar` apps.
@ -428,14 +428,14 @@ See also:
### Stream filter ### Stream filter
VictoriaLogs provides an optimized way to select logs, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). VictoriaLogs provides an optimized way to select logs, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
This can be done via `_stream:{...}` filter. The `{...}` may contain arbitrary This can be done via `{...}` filter, which may contain arbitrary
[Prometheus-compatible label selector](https://docs.victoriametrics.com/keyconcepts/#filtering) [Prometheus-compatible label selector](https://docs.victoriametrics.com/keyconcepts/#filtering)
over fields associated with [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). over fields associated with [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
For example, the following query selects [log entries](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) For example, the following query selects [log entries](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with `app` field equal to `nginx`: with `app` field equal to `nginx`:
```logsql ```logsql
_stream:{app="nginx"} {app="nginx"}
``` ```
This query is equivalent to the following [`exact` filter](#exact-filter) query, but the upper query usually works much faster: This query is equivalent to the following [`exact` filter](#exact-filter) query, but the upper query usually works much faster:
@ -444,13 +444,19 @@ This query is equivalent to the following [`exact` filter](#exact-filter) query,
app:="nginx" app:="nginx"
``` ```
It is allowed to add `_stream:` prefix in front of `{...}` filter. The following filter is equivalent to `{app="nginx"}`:
```logsql
_stream:{app="nginx"}
```
Performance tips: Performance tips:
- It is recommended using the most specific `_stream:{...}` filter matching the smallest number of log streams, - It is recommended using the most specific `{...}` filter matching the smallest number of log streams,
which needs to be scanned by the rest of filters in the query. which needs to be scanned by the rest of filters in the query.
- While LogsQL supports arbitrary number of `_stream:{...}` filters at any level of [logical filters](#logical-filter), - While LogsQL supports arbitrary number of `{...}` filters at any level of [logical filters](#logical-filter),
it is recommended specifying a single `_stream:...` filter at the top level of the query. it is recommended specifying a single `{...}` filter at the top level of the query.
- See [other performance tips](#performance-tips). - See [other performance tips](#performance-tips).

View file

@ -146,7 +146,7 @@ to the query. For example, the following query selects logs with `error` and `ku
from [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) containing `container="my-app"` field, over the last hour: from [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) containing `container="my-app"` field, over the last hour:
```logsql ```logsql
error kubernetes _stream:{container="my-app"} _time:1h error kubernetes {container="my-app"} _time:1h
``` ```
The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
@ -220,14 +220,14 @@ For example, if the application contains `job="app-42"` and `instance="host-123:
then the following query selects all the logs from this application: then the following query selects all the logs from this application:
```logsql ```logsql
_stream:{job="app-42",instance="host-123:5678"} {job="app-42",instance="host-123:5678"}
``` ```
If the number of returned logs is too big, it is recommended adding [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) If the number of returned logs is too big, it is recommended adding [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter)
to the query in order to reduce the number of matching logs. For example, the following query returns logs for the given application for the last day: to the query in order to reduce the number of matching logs. For example, the following query returns logs for the given application for the last day:
```logsql ```logsql
_stream:{job="app-42",instance="host-123:5678"} _time:1d {job="app-42",instance="host-123:5678"} _time:1d
``` ```
If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters)
@ -236,7 +236,7 @@ which contain `error` [word](https://docs.victoriametrics.com/victorialogs/logsq
over the last day: over the last day:
```logsql ```logsql
_stream:{job="app-42",instance="host-123:5678"} error _time:1d {job="app-42",instance="host-123:5678"} error _time:1d
``` ```
The logs are returned in arbitrary order because of performance reasons. Use [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) The logs are returned in arbitrary order because of performance reasons. Use [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
@ -244,7 +244,7 @@ for sorting the returned logs by the needed fields. For example, the following q
by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
```logsql ```logsql
_stream:{job="app-42",instance="host-123:5678"} _time:1d | sort by (_time) {job="app-42",instance="host-123:5678"} _time:1d | sort by (_time)
``` ```
See also: See also:

View file

@ -876,7 +876,7 @@ received from [streams](https://docs.victoriametrics.com/victorialogs/keyconcept
during the last 5 minutes: during the last 5 minutes:
```sh ```sh
curl http://localhost:9428/select/logsql/query -d 'query=_stream:{app="nginx"} AND _time:5m AND error' | wc -l curl http://localhost:9428/select/logsql/query -d 'query={app="nginx"} AND _time:5m AND error' | wc -l
``` ```
See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter) about `_stream` filter, See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter) about `_stream` filter,
@ -886,7 +886,7 @@ and [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#logical-f
Alternatively, you can count the number of matching logs at VictoriaLogs side with [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe): Alternatively, you can count the number of matching logs at VictoriaLogs side with [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe):
```sh ```sh
curl http://localhost:9428/select/logsql/query -d 'query=_stream:{app="nginx"} AND _time:5m AND error | stats count() logs_with_error' curl http://localhost:9428/select/logsql/query -d 'query={app="nginx"} AND _time:5m AND error | stats count() logs_with_error'
``` ```
The following example shows how to sort query results by the [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) with traditional Unix tools: The following example shows how to sort query results by the [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) with traditional Unix tools:

View file

@ -6,7 +6,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// filterStream is the filter for `_stream:{...}` // filterStream is the filter for `{}` aka `_stream:{...}`
type filterStream struct { type filterStream struct {
// f is the filter to apply // f is the filter to apply
f *StreamFilter f *StreamFilter
@ -22,7 +22,7 @@ type filterStream struct {
} }
func (fs *filterStream) String() string { func (fs *filterStream) String() string {
return "_stream:" + fs.f.String() return fs.f.String()
} }
func (fs *filterStream) updateNeededFields(neededFields fieldsSet) { func (fs *filterStream) updateNeededFields(neededFields fieldsSet) {

View file

@ -822,6 +822,11 @@ func parseFilterAnd(lex *lexer, fieldName string) (filter, error) {
func parseGenericFilter(lex *lexer, fieldName string) (filter, error) { func parseGenericFilter(lex *lexer, fieldName string) (filter, error) {
// Check for special keywords // Check for special keywords
switch { switch {
case lex.isKeyword("{"):
if fieldName != "" && fieldName != "_stream" {
return nil, fmt.Errorf("stream filter cannot be applied to %q field; it can be applied only to _stream field", fieldName)
}
return parseFilterStream(lex)
case lex.isKeyword(":"): case lex.isKeyword(":"):
if !lex.mustNextToken() { if !lex.mustNextToken() {
return nil, fmt.Errorf("missing filter after ':'") return nil, fmt.Errorf("missing filter after ':'")

View file

@ -29,6 +29,8 @@ func TestLexer(t *testing.T) {
f("foo:bar", []string{"foo", ":", "bar"}) f("foo:bar", []string{"foo", ":", "bar"})
f(` re ( "тест(\":" ) `, []string{"re", "(", `тест(":`, ")"}) f(` re ( "тест(\":" ) `, []string{"re", "(", `тест(":`, ")"})
f(" `foo, bar`* AND baz:(abc or 'd\\'\"ЙЦУК `'*)", []string{"foo, bar", "*", "AND", "baz", ":", "(", "abc", "or", `d'"ЙЦУК ` + "`", "*", ")"}) f(" `foo, bar`* AND baz:(abc or 'd\\'\"ЙЦУК `'*)", []string{"foo, bar", "*", "AND", "baz", ":", "(", "abc", "or", `d'"ЙЦУК ` + "`", "*", ")"})
f(`{foo="bar",a=~"baz", b != 'cd',"d,}a"!~abc} def`,
[]string{"{", "foo", "=", "bar", ",", "a", "=~", "baz", ",", "b", "!=", "cd", ",", "d,}a", "!~", "abc", "}", "def"})
f(`_stream:{foo="bar",a=~"baz", b != 'cd',"d,}a"!~abc}`, f(`_stream:{foo="bar",a=~"baz", b != 'cd',"d,}a"!~abc}`,
[]string{"_stream", ":", "{", "foo", "=", "bar", ",", "a", "=~", "baz", ",", "b", "!=", "cd", ",", "d,}a", "!~", "abc", "}"}) []string{"_stream", ":", "{", "foo", "=", "bar", ",", "a", "=~", "baz", ",", "b", "!=", "cd", ",", "d,}a", "!~", "abc", "}"})
} }
@ -700,10 +702,14 @@ func TestParseQuerySuccess(t *testing.T) {
f(`_stream_id:in(_time:5m | fields _stream_id)`, `_stream_id:in(_time:5m | fields _stream_id)`) f(`_stream_id:in(_time:5m | fields _stream_id)`, `_stream_id:in(_time:5m | fields _stream_id)`)
// _stream filters // _stream filters
f(`_stream:{}`, `_stream:{}`) f(`_stream:{}`, `{}`)
f(`_stream:{foo="bar", baz=~"x" OR or!="b", "x=},"="d}{"}`, `_stream:{foo="bar",baz=~"x" or "or"!="b","x=},"="d}{"}`) f(`_stream:{foo="bar", baz=~"x" OR or!="b", "x=},"="d}{"}`, `{foo="bar",baz=~"x" or "or"!="b","x=},"="d}{"}`)
f(`_stream:{or=a or ","="b"}`, `_stream:{"or"="a" or ","="b"}`) f(`_stream:{or=a or ","="b"}`, `{"or"="a" or ","="b"}`)
f("_stream : { foo = bar , } ", `_stream:{foo="bar"}`) f("_stream : { foo = bar , } ", `{foo="bar"}`)
// _stream filter without _stream prefix
f(`{}`, `{}`)
f(`{foo="bar", baz=~"x" OR or!="b", "x=},"="d}{"}`, `{foo="bar",baz=~"x" or "or"!="b","x=},"="d}{"}`)
// _time filters // _time filters
f(`_time:[-5m,now)`, `_time:[-5m,now)`) f(`_time:[-5m,now)`, `_time:[-5m,now)`)
@ -942,11 +948,11 @@ func TestParseQuerySuccess(t *testing.T) {
// complex queries // complex queries
f(`_time:[-1h, now] _stream:{job="foo",env=~"prod|staging"} level:(error or warn*) and not "connection reset by peer"`, f(`_time:[-1h, now] _stream:{job="foo",env=~"prod|staging"} level:(error or warn*) and not "connection reset by peer"`,
`_time:[-1h,now] _stream:{job="foo",env=~"prod|staging"} (level:error or level:warn*) !"connection reset by peer"`) `_time:[-1h,now] {job="foo",env=~"prod|staging"} (level:error or level:warn*) !"connection reset by peer"`)
f(`(_time:(2023-04-20, now] or _time:[-10m, -1m)) f(`(_time:(2023-04-20, now] or _time:[-10m, -1m))
and (_stream:{job="a"} or _stream:{instance!="b"}) and (_stream:{job="a"} or _stream:{instance!="b"})
and (err* or ip:(ipv4_range(1.2.3.0, 1.2.3.255) and not 1.2.3.4))`, and (err* or ip:(ipv4_range(1.2.3.0, 1.2.3.255) and not 1.2.3.4))`,
`(_time:(2023-04-20,now] or _time:[-10m,-1m)) (_stream:{job="a"} or _stream:{instance!="b"}) (err* or ip:ipv4_range(1.2.3.0, 1.2.3.255) !ip:1.2.3.4)`) `(_time:(2023-04-20,now] or _time:[-10m,-1m)) ({job="a"} or {instance!="b"}) (err* or ip:ipv4_range(1.2.3.0, 1.2.3.255) !ip:1.2.3.4)`)
// fields pipe // fields pipe
f(`foo|fields *`, `foo | fields *`) f(`foo|fields *`, `foo | fields *`)
@ -1273,6 +1279,17 @@ func TestParseQueryFailure(t *testing.T) {
f("_stream:(foo)") f("_stream:(foo)")
f("_stream:[foo]") f("_stream:[foo]")
// invalid _stream filters without _stream: prefix
f("{")
f(`{foo`)
f(`{foo}`)
f(`{foo=`)
f(`{foo=}`)
f(`{foo="bar`)
f(`{foo='bar`)
f(`{foo="bar}`)
f(`{foo='bar}`)
// invalid _time filters // invalid _time filters
f("_time:") f("_time:")
f("_time:[") f("_time:[")