This commit is contained in:
Aliaksandr Valialkin 2024-05-18 12:33:34 +02:00
parent 31a66e5e9a
commit 22f35a7340
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
10 changed files with 291 additions and 85 deletions

View file

@ -19,9 +19,11 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
## tip ## tip
* FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases. * FEATURE: add support for post-filtering of query results with [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe).
* FEATURE: allow applying individual [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) per each [stats function](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters). * FEATURE: allow applying individual [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) per each [stats function](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters).
* FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them. * FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them.
* FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases.
* FEATURE: allow using more convenient syntax for [`range` filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter) if upper or lower bound isn't needed. For example, it is possible to write `response_size:>=10KiB` instead of `response_size:range[10KiB, inf)`, or `temperature:<42` instead of `temperature:range(-inf, 42)`.
* BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`. * BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`.

View file

@ -184,7 +184,7 @@ For example, the following query selects all the logs for the last 5 minutes by
_time:5m _time:5m
``` ```
Additionally to filters, LogQL query may contain arbitrary mix of optional actions for processing the selected logs. These actions are delimited by `|` and are known as `pipes`. Additionally to filters, LogQL query may contain arbitrary mix of optional actions for processing the selected logs. These actions are delimited by `|` and are known as [`pipes`](#pipes).
For example, the following query uses [`stats` pipe](#stats-pipe) for returning the number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) For example, the following query uses [`stats` pipe](#stats-pipe) for returning the number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
with the `error` [word](#word) for the last 5 minutes: with the `error` [word](#word) for the last 5 minutes:
@ -213,7 +213,6 @@ single quotes `'` and backticks:
If doubt, it is recommended quoting field names and filter args. If doubt, it is recommended quoting field names and filter args.
The list of LogsQL filters: The list of LogsQL filters:
- [Time filter](#time-filter) - matches logs with [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) in the given time range - [Time filter](#time-filter) - matches logs with [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) in the given time range
@ -850,7 +849,7 @@ Note that the `range()` filter doesn't match [log fields](https://docs.victoriam
with non-numeric values alongside numeric values. For example, `range(1, 10)` doesn't match `the request took 4.2 seconds` with non-numeric values alongside numeric values. For example, `range(1, 10)` doesn't match `the request took 4.2 seconds`
[log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), since the `4.2` number is surrounded by other text. [log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), since the `4.2` number is surrounded by other text.
Extract the numeric value from the message with `parse(_msg, "the request took <request_duration> seconds")` [transformation](#transformations) Extract the numeric value from the message with `parse(_msg, "the request took <request_duration> seconds")` [transformation](#transformations)
and then apply the `range()` [post-filter](#post-filters) to the extracted `request_duration` field. and then apply the `range()` [filter pipe](#filter-pipe) to the extracted `request_duration` field.
Performance tips: Performance tips:
@ -892,7 +891,7 @@ user.ip:ipv4_range("1.2.3.4")
Note that the `ipv4_range()` doesn't match a string with IPv4 address if this string contains other text. For example, `ipv4_range("127.0.0.0/24")` Note that the `ipv4_range()` doesn't match a string with IPv4 address if this string contains other text. For example, `ipv4_range("127.0.0.0/24")`
doesn't match `request from 127.0.0.1: done` [log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), doesn't match `request from 127.0.0.1: done` [log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field),
since the `127.0.0.1` ip is surrounded by other text. Extract the IP from the message with `parse(_msg, "request from <ip>: done")` [transformation](#transformations) since the `127.0.0.1` ip is surrounded by other text. Extract the IP from the message with `parse(_msg, "request from <ip>: done")` [transformation](#transformations)
and then apply the `ipv4_range()` [post-filter](#post-filters) to the extracted `ip` field. and then apply the `ipv4_range()` [filter pipe](#filter-pipe) to the extracted `ip` field.
Hints: Hints:
@ -1054,6 +1053,7 @@ LogsQL supports the following pipes:
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- [`filter`](#filter-pipe) applies additional [filters](#filters) to results.
- [`limit`](#limit-pipe) limits the number selected logs. - [`limit`](#limit-pipe) limits the number selected logs.
- [`offset`](#offset-pipe) skips the given number of selected logs. - [`offset`](#offset-pipe) skips the given number of selected logs.
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
@ -1120,6 +1120,22 @@ See also:
- [`rename` pipe](#rename-pipe) - [`rename` pipe](#rename-pipe)
- [`delete` pipe](#delete-pipe) - [`delete` pipe](#delete-pipe)
### filter pipe
Sometimes it is needed to apply additional filters on the calculated results. This can be done with `| filter ...` [pipe](#pipes).
The `filter` pipe can contain arbitrary [filters](#filters).
For example, the following query returns `host` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) values
if the number of log messages with the `error` [word](#word) for them over the last hour exceeds `1_000`:
```logsql
_time:1h error | stats by (host) count() logs_count | filter logs_count:> 1_000
```
See also:
- [`stats` pipe](#stats-pipe)
### limit pipe ### limit pipe
If only a subset of selected logs must be processed, then `| limit N` [pipe](#pipes) can be used, where `N` can contain any [supported integer numeric value](#numeric-values). If only a subset of selected logs must be processed, then `| limit N` [pipe](#pipes) can be used, where `N` can contain any [supported integer numeric value](#numeric-values).
@ -1730,24 +1746,16 @@ LogsQL will support the following transformations for the [selected](#filters) l
according to the provided format. according to the provided format.
- Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
- Parsing duration strings into floating-point seconds for further [stats calculations](#stats-pipe). - Parsing duration strings into floating-point seconds for further [stats calculations](#stats-pipe).
- Creating a boolean field with the result of arbitrary [post-filters](#post-filters) applied to the current fields.
- Creating an integer field with the length of the given field value. This can be useful for [stats calculations](#stats-pipe).
See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details. See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details.
## Post-filters ## Post-filters
It is possible to perform post-filtering on the [selected log entries](#filters) at client side with `grep` or similar Unix commands Post-filtering of query results can be performed at any step by using [`filter` pipe](#filter-pipe).
It is also possible to perform post-filtering of the [selected log entries](#filters) at client side with `grep` and similar Unix commands
according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line). according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
LogsQL will support post-filtering on the original [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
and fields created by various [transformations](#transformations). The following post-filters will be supported:
- Full-text [filtering](#filters).
- [Logical filtering](#logical-filter).
See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details.
## Stats ## Stats
Stats over the selected logs can be calculated via [`stats` pipe](#stats-pipe). Stats over the selected logs can be calculated via [`stats` pipe](#stats-pipe).

View file

@ -37,7 +37,6 @@ The following functionality is planned in the future versions of VictoriaLogs:
- Add missing functionality to [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html): - Add missing functionality to [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html):
- [Stream context](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#stream-context). - [Stream context](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#stream-context).
- [Transformation functions](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#transformations). - [Transformation functions](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#transformations).
- [Post-filtering](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#post-filters).
- The ability to use subqueries inside [in()](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#multi-exact-filter) function. - The ability to use subqueries inside [in()](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#multi-exact-filter) function.
- Live tailing for [LogsQL filters](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#filters) aka `tail -f`. - Live tailing for [LogsQL filters](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#filters) aka `tail -f`.
- Web UI with the following abilities: - Web UI with the following abilities:

View file

@ -99,10 +99,10 @@ func (br *blockResult) clone() *blockResult {
return brNew return brNew
} }
// initFromNeededColumns initializes br from brSrc, by copying only the given neededColumns for rows identified by set bits at bm. // initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bets at bm.
// //
// The br valid until brSrc or bm is updated. // The br is valid until brSrc or bm is updated.
func (br *blockResult) initFromNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) { func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) {
br.reset() br.reset()
srcTimestamps := brSrc.timestamps srcTimestamps := brSrc.timestamps
@ -112,47 +112,63 @@ func (br *blockResult) initFromNeededColumns(brSrc *blockResult, bm *bitmap, nee
}) })
br.timestamps = dstTimestamps br.timestamps = dstTimestamps
if len(br.timestamps) == 0 { for _, cSrc := range brSrc.getColumns() {
// There is no need in initializing columns for zero rows. br.appendFilteredColumn(brSrc, cSrc, bm)
return
} }
}
// initFromFilterNeededColumns initializes br from brSrc by copying only the given neededColumns for rows identified by set bits at bm.
//
// The br is valid until brSrc or bm is updated.
func (br *blockResult) initFromFilterNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) {
br.reset()
srcTimestamps := brSrc.timestamps
dstTimestamps := br.timestamps[:0]
bm.forEachSetBitReadonly(func(idx int) {
dstTimestamps = append(dstTimestamps, srcTimestamps[idx])
})
br.timestamps = dstTimestamps
for _, neededColumn := range neededColumns { for _, neededColumn := range neededColumns {
cSrc := brSrc.getColumnByName(neededColumn) cSrc := brSrc.getColumnByName(neededColumn)
br.appendFilteredColumn(brSrc, cSrc, bm)
cDst := blockResultColumn{
name: cSrc.name,
}
if cSrc.isConst {
cDst.isConst = true
cDst.valuesEncoded = cSrc.valuesEncoded
} else if cSrc.isTime {
cDst.isTime = true
} else {
cDst.valueType = cSrc.valueType
cDst.minValue = cSrc.minValue
cDst.maxValue = cSrc.maxValue
cDst.dictValues = cSrc.dictValues
cDst.newValuesEncodedFunc = func(br *blockResult) []string {
valuesEncodedSrc := cSrc.getValuesEncoded(brSrc)
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
bm.forEachSetBitReadonly(func(idx int) {
valuesBuf = append(valuesBuf, valuesEncodedSrc[idx])
})
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
}
br.csBuf = append(br.csBuf, cDst)
br.csInitialized = false
} }
} }
func (br *blockResult) appendFilteredColumn(brSrc *blockResult, cSrc *blockResultColumn, bm *bitmap) {
cDst := blockResultColumn{
name: cSrc.name,
}
if cSrc.isConst {
cDst.isConst = true
cDst.valuesEncoded = cSrc.valuesEncoded
} else if cSrc.isTime {
cDst.isTime = true
} else {
cDst.valueType = cSrc.valueType
cDst.minValue = cSrc.minValue
cDst.maxValue = cSrc.maxValue
cDst.dictValues = cSrc.dictValues
cDst.newValuesEncodedFunc = func(br *blockResult) []string {
valuesEncodedSrc := cSrc.getValuesEncoded(brSrc)
valuesBuf := br.valuesBuf
valuesBufLen := len(valuesBuf)
bm.forEachSetBitReadonly(func(idx int) {
valuesBuf = append(valuesBuf, valuesEncodedSrc[idx])
})
br.valuesBuf = valuesBuf
return valuesBuf[valuesBufLen:]
}
}
br.csBuf = append(br.csBuf, cDst)
br.csInitialized = false
}
// cloneValues clones the given values into br and returns the cloned values. // cloneValues clones the given values into br and returns the cloned values.
func (br *blockResult) cloneValues(values []string) []string { func (br *blockResult) cloneValues(values []string) []string {
if values == nil { if values == nil {

View file

@ -12,8 +12,8 @@ import (
type filterRange struct { type filterRange struct {
fieldName string fieldName string
minValue float64 minValue float64
maxValue float64 maxValue float64
stringRepr string stringRepr string
} }

View file

@ -241,6 +241,16 @@ func (q *Query) AddPipeLimit(n uint64) {
func (q *Query) Optimize() { func (q *Query) Optimize() {
q.pipes = optimizeSortOffsetPipes(q.pipes) q.pipes = optimizeSortOffsetPipes(q.pipes)
q.pipes = optimizeSortLimitPipes(q.pipes) q.pipes = optimizeSortLimitPipes(q.pipes)
q.pipes = optimizeFilterPipes(q.pipes)
// Merge `q | filter ...` into q.
if len(q.pipes) > 0 {
pf, ok := q.pipes[0].(*pipeFilter)
if ok {
q.f = mergeFiltersAnd(q.f, pf.f)
q.pipes = append(q.pipes[:0], q.pipes[1:]...)
}
}
} }
func optimizeSortOffsetPipes(pipes []pipe) []pipe { func optimizeSortOffsetPipes(pipes []pipe) []pipe {
@ -287,6 +297,48 @@ func optimizeSortLimitPipes(pipes []pipe) []pipe {
return pipes return pipes
} }
func optimizeFilterPipes(pipes []pipe) []pipe {
// Merge multiple `| filter ...` pipes into a single `filter ...` pipe
i := 1
for i < len(pipes) {
pf1, ok := pipes[i-1].(*pipeFilter)
if !ok {
i++
continue
}
pf2, ok := pipes[i].(*pipeFilter)
if !ok {
i++
continue
}
pf1.f = mergeFiltersAnd(pf1.f, pf2.f)
pipes = append(pipes[:i], pipes[i+1:]...)
}
return pipes
}
func mergeFiltersAnd(f1, f2 filter) filter {
fa1, ok := f1.(*filterAnd)
if ok {
fa1.filters = append(fa1.filters, f2)
return fa1
}
fa2, ok := f2.(*filterAnd)
if ok {
filters := make([]filter, len(fa2.filters)+1)
filters[0] = f1
copy(filters[1:], fa2.filters)
fa2.filters = filters
return fa2
}
return &filterAnd{
filters: []filter{f1, f2},
}
}
func (q *Query) getNeededColumns() ([]string, []string) { func (q *Query) getNeededColumns() ([]string, []string) {
neededFields := newFieldsSet() neededFields := newFieldsSet()
neededFields.add("*") neededFields.add("*")

View file

@ -975,6 +975,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | uniq (f1,f2) limit 10`, `* | uniq by (f1, f2) limit 10`) f(`* | uniq (f1,f2) limit 10`, `* | uniq by (f1, f2) limit 10`)
f(`* | uniq limit 10`, `* | uniq limit 10`) f(`* | uniq limit 10`, `* | uniq limit 10`)
// filter pipe
f(`* | filter error ip:12.3.4.5 or warn`, `* | filter error ip:12.3.4.5 or warn`)
f(`foo | stats by (host) count() logs | filter logs:>50 | sort by (logs desc) | limit 10`, `foo | stats by (host) count(*) as logs | filter logs:>50 | sort by (logs desc) | limit 10`)
// multiple different pipes // multiple different pipes
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
@ -1341,6 +1345,12 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | uniq by(a) bar`) f(`foo | uniq by(a) bar`)
f(`foo | uniq by(a) limit -10`) f(`foo | uniq by(a) limit -10`)
f(`foo | uniq by(a) limit foo`) f(`foo | uniq by(a) limit foo`)
// invalid filter pipe
f(`foo | filter`)
f(`foo | filter | sort by (x)`)
f(`foo | filter (`)
f(`foo | filter )`)
} }
func TestQueryGetNeededColumns(t *testing.T) { func TestQueryGetNeededColumns(t *testing.T) {

View file

@ -71,24 +71,30 @@ func parsePipes(lex *lexer) ([]pipe, error) {
return nil, fmt.Errorf("missing token after '|'") return nil, fmt.Errorf("missing token after '|'")
} }
switch { switch {
case lex.isKeyword("stats"): case lex.isKeyword("copy", "cp"):
ps, err := parsePipeStats(lex) pc, err := parsePipeCopy(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err)
} }
pipes = append(pipes, ps) pipes = append(pipes, pc)
case lex.isKeyword("sort"): case lex.isKeyword("delete", "del", "rm"):
ps, err := parsePipeSort(lex) pd, err := parsePipeDelete(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
} }
pipes = append(pipes, ps) pipes = append(pipes, pd)
case lex.isKeyword("uniq"): case lex.isKeyword("fields"):
pu, err := parsePipeUniq(lex) pf, err := parsePipeFields(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
} }
pipes = append(pipes, pu) pipes = append(pipes, pf)
case lex.isKeyword("filter"):
pf, err := parsePipeFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err)
}
pipes = append(pipes, pf)
case lex.isKeyword("limit", "head"): case lex.isKeyword("limit", "head"):
pl, err := parsePipeLimit(lex) pl, err := parsePipeLimit(lex)
if err != nil { if err != nil {
@ -101,30 +107,30 @@ func parsePipes(lex *lexer) ([]pipe, error) {
return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err)
} }
pipes = append(pipes, ps) pipes = append(pipes, ps)
case lex.isKeyword("fields"):
pf, err := parsePipeFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
}
pipes = append(pipes, pf)
case lex.isKeyword("copy", "cp"):
pc, err := parsePipeCopy(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err)
}
pipes = append(pipes, pc)
case lex.isKeyword("rename", "mv"): case lex.isKeyword("rename", "mv"):
pr, err := parsePipeRename(lex) pr, err := parsePipeRename(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err)
} }
pipes = append(pipes, pr) pipes = append(pipes, pr)
case lex.isKeyword("delete", "del", "rm"): case lex.isKeyword("sort"):
pd, err := parsePipeDelete(lex) ps, err := parsePipeSort(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err)
} }
pipes = append(pipes, pd) pipes = append(pipes, ps)
case lex.isKeyword("stats"):
ps, err := parsePipeStats(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
}
pipes = append(pipes, ps)
case lex.isKeyword("uniq"):
pu, err := parsePipeUniq(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err)
}
pipes = append(pipes, pu)
default: default:
return nil, fmt.Errorf("unexpected pipe %q", lex.token) return nil, fmt.Errorf("unexpected pipe %q", lex.token)
} }

View file

@ -0,0 +1,109 @@
package logstorage
import (
"fmt"
"unsafe"
)
// pipeFilter processes '| filter ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe
type pipeFilter struct {
// f is a filter to apply to the written rows.
f filter
}
func (pf *pipeFilter) String() string {
return "filter " + pf.f.String()
}
func (pf *pipeFilter) updateNeededFields(neededFields, unneededFields fieldsSet) {
pf.f.updateNeededFields(neededFields)
}
func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeFilterProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeFilterProcessorShard{
pipeFilterProcessorShardNopad: pipeFilterProcessorShardNopad{
pf: pf,
},
}
}
pfp := &pipeFilterProcessor{
pf: pf,
ppBase: ppBase,
shards: shards,
}
return pfp
}
type pipeFilterProcessor struct {
pf *pipeFilter
ppBase pipeProcessor
shards []pipeFilterProcessorShard
}
type pipeFilterProcessorShard struct {
pipeFilterProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeFilterProcessorShardNopad{})%128]byte
}
type pipeFilterProcessorShardNopad struct {
pf *pipeFilter
br blockResult
bm bitmap
}
func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pfp.shards[workerID]
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
shard.pf.f.applyToBlockResult(br, bm)
if bm.areAllBitsSet() {
// Fast path - the filter didn't filter out anything - send br to the base pipe as is.
pfp.ppBase.writeBlock(workerID, br)
return
}
if bm.isZero() {
// Nothing to send
return
}
// Slow path - copy the remaining rows from br to shard.br before sending them to base pipe.
shard.br.initFromFilterAllColumns(br, bm)
pfp.ppBase.writeBlock(workerID, &shard.br)
}
func (pfp *pipeFilterProcessor) flush() error {
return nil
}
func parsePipeFilter(lex *lexer) (*pipeFilter, error) {
if !lex.isKeyword("filter") {
return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token)
}
lex.nextToken()
f, err := parseFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'filter': %w", err)
}
pf := &pipeFilter{
f: f,
}
return pf, nil
}

View file

@ -320,7 +320,11 @@ func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult
// Store the remaining rows for the needed per-func fields to brDst // Store the remaining rows for the needed per-func fields to brDst
brDst := &shard.brsBuf[i] brDst := &shard.brsBuf[i]
brDst.initFromNeededColumns(brSrc, bm, funcs[i].neededFieldsForFunc) if bm.isZero() {
brDst.reset()
} else {
brDst.initFromFilterNeededColumns(brSrc, bm, funcs[i].neededFieldsForFunc)
}
brs[i] = brDst brs[i] = brDst
} }
return brs return brs