diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index f706718ef..0f90b3c16 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,9 +19,11 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip -* FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases. +* FEATURE: add support for post-filtering of query results with [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe). * FEATURE: allow applying individual [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) per each [stats function](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters). * FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them. +* FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases. +* FEATURE: allow using more convenient syntax for [`range` filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter) if upper or lower bound isn't needed. For example, it is possible to write `response_size:>=10KiB` instead of `response_size:range[10KiB, inf)`, or `temperature:<42` instead of `temperature:range(-inf, 42)`. * BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 1b9fc3ee9..e8f80966d 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -184,7 +184,7 @@ For example, the following query selects all the logs for the last 5 minutes by _time:5m ``` -Additionally to filters, LogQL query may contain arbitrary mix of optional actions for processing the selected logs. These actions are delimited by `|` and are known as `pipes`. +Additionally to filters, LogQL query may contain arbitrary mix of optional actions for processing the selected logs. These actions are delimited by `|` and are known as [`pipes`](#pipes). For example, the following query uses [`stats` pipe](#stats-pipe) for returning the number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word) for the last 5 minutes: @@ -213,7 +213,6 @@ single quotes `'` and backticks: If doubt, it is recommended quoting field names and filter args. - The list of LogsQL filters: - [Time filter](#time-filter) - matches logs with [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) in the given time range @@ -850,7 +849,7 @@ Note that the `range()` filter doesn't match [log fields](https://docs.victoriam with non-numeric values alongside numeric values. For example, `range(1, 10)` doesn't match `the request took 4.2 seconds` [log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), since the `4.2` number is surrounded by other text. Extract the numeric value from the message with `parse(_msg, "the request took seconds")` [transformation](#transformations) -and then apply the `range()` [post-filter](#post-filters) to the extracted `request_duration` field. +and then apply the `range()` [filter pipe](#filter-pipe) to the extracted `request_duration` field. Performance tips: @@ -892,7 +891,7 @@ user.ip:ipv4_range("1.2.3.4") Note that the `ipv4_range()` doesn't match a string with IPv4 address if this string contains other text. For example, `ipv4_range("127.0.0.0/24")` doesn't match `request from 127.0.0.1: done` [log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field), since the `127.0.0.1` ip is surrounded by other text. Extract the IP from the message with `parse(_msg, "request from : done")` [transformation](#transformations) -and then apply the `ipv4_range()` [post-filter](#post-filters) to the extracted `ip` field. +and then apply the `ipv4_range()` [filter pipe](#filter-pipe) to the extracted `ip` field. Hints: @@ -1054,6 +1053,7 @@ LogsQL supports the following pipes: - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). +- [`filter`](#filter-pipe) applies additional [filters](#filters) to results. - [`limit`](#limit-pipe) limits the number selected logs. - [`offset`](#offset-pipe) skips the given number of selected logs. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). @@ -1120,6 +1120,22 @@ See also: - [`rename` pipe](#rename-pipe) - [`delete` pipe](#delete-pipe) +### filter pipe + +Sometimes it is needed to apply additional filters on the calculated results. This can be done with `| filter ...` [pipe](#pipes). +The `filter` pipe can contain arbitrary [filters](#filters). + +For example, the following query returns `host` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) values +if the number of log messages with the `error` [word](#word) for them over the last hour exceeds `1_000`: + +```logsql +_time:1h error | stats by (host) count() logs_count | filter logs_count:> 1_000 +``` + +See also: + +- [`stats` pipe](#stats-pipe) + ### limit pipe If only a subset of selected logs must be processed, then `| limit N` [pipe](#pipes) can be used, where `N` can contain any [supported integer numeric value](#numeric-values). @@ -1730,24 +1746,16 @@ LogsQL will support the following transformations for the [selected](#filters) l according to the provided format. - Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - Parsing duration strings into floating-point seconds for further [stats calculations](#stats-pipe). -- Creating a boolean field with the result of arbitrary [post-filters](#post-filters) applied to the current fields. -- Creating an integer field with the length of the given field value. This can be useful for [stats calculations](#stats-pipe). See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details. ## Post-filters -It is possible to perform post-filtering on the [selected log entries](#filters) at client side with `grep` or similar Unix commands +Post-filtering of query results can be performed at any step by using [`filter` pipe](#filter-pipe). + +It is also possible to perform post-filtering of the [selected log entries](#filters) at client side with `grep` and similar Unix commands according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line). -LogsQL will support post-filtering on the original [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) -and fields created by various [transformations](#transformations). The following post-filters will be supported: - -- Full-text [filtering](#filters). -- [Logical filtering](#logical-filter). - -See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) for details. - ## Stats Stats over the selected logs can be calculated via [`stats` pipe](#stats-pipe). diff --git a/docs/VictoriaLogs/Roadmap.md b/docs/VictoriaLogs/Roadmap.md index 42b4b06f1..1dc515730 100644 --- a/docs/VictoriaLogs/Roadmap.md +++ b/docs/VictoriaLogs/Roadmap.md @@ -37,7 +37,6 @@ The following functionality is planned in the future versions of VictoriaLogs: - Add missing functionality to [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html): - [Stream context](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#stream-context). - [Transformation functions](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#transformations). - - [Post-filtering](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#post-filters). - The ability to use subqueries inside [in()](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#multi-exact-filter) function. - Live tailing for [LogsQL filters](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#filters) aka `tail -f`. - Web UI with the following abilities: diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index f2921ea10..6508381c9 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -99,10 +99,10 @@ func (br *blockResult) clone() *blockResult { return brNew } -// initFromNeededColumns initializes br from brSrc, by copying only the given neededColumns for rows identified by set bits at bm. +// initFromFilterAllColumns initializes br from brSrc by copying rows identified by set bets at bm. // -// The br valid until brSrc or bm is updated. -func (br *blockResult) initFromNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) { +// The br is valid until brSrc or bm is updated. +func (br *blockResult) initFromFilterAllColumns(brSrc *blockResult, bm *bitmap) { br.reset() srcTimestamps := brSrc.timestamps @@ -112,47 +112,63 @@ func (br *blockResult) initFromNeededColumns(brSrc *blockResult, bm *bitmap, nee }) br.timestamps = dstTimestamps - if len(br.timestamps) == 0 { - // There is no need in initializing columns for zero rows. - return + for _, cSrc := range brSrc.getColumns() { + br.appendFilteredColumn(brSrc, cSrc, bm) } +} + +// initFromFilterNeededColumns initializes br from brSrc by copying only the given neededColumns for rows identified by set bits at bm. +// +// The br is valid until brSrc or bm is updated. +func (br *blockResult) initFromFilterNeededColumns(brSrc *blockResult, bm *bitmap, neededColumns []string) { + br.reset() + + srcTimestamps := brSrc.timestamps + dstTimestamps := br.timestamps[:0] + bm.forEachSetBitReadonly(func(idx int) { + dstTimestamps = append(dstTimestamps, srcTimestamps[idx]) + }) + br.timestamps = dstTimestamps for _, neededColumn := range neededColumns { cSrc := brSrc.getColumnByName(neededColumn) - - cDst := blockResultColumn{ - name: cSrc.name, - } - - if cSrc.isConst { - cDst.isConst = true - cDst.valuesEncoded = cSrc.valuesEncoded - } else if cSrc.isTime { - cDst.isTime = true - } else { - cDst.valueType = cSrc.valueType - cDst.minValue = cSrc.minValue - cDst.maxValue = cSrc.maxValue - cDst.dictValues = cSrc.dictValues - cDst.newValuesEncodedFunc = func(br *blockResult) []string { - valuesEncodedSrc := cSrc.getValuesEncoded(brSrc) - - valuesBuf := br.valuesBuf - valuesBufLen := len(valuesBuf) - bm.forEachSetBitReadonly(func(idx int) { - valuesBuf = append(valuesBuf, valuesEncodedSrc[idx]) - }) - br.valuesBuf = valuesBuf - - return valuesBuf[valuesBufLen:] - } - } - - br.csBuf = append(br.csBuf, cDst) - br.csInitialized = false + br.appendFilteredColumn(brSrc, cSrc, bm) } } +func (br *blockResult) appendFilteredColumn(brSrc *blockResult, cSrc *blockResultColumn, bm *bitmap) { + cDst := blockResultColumn{ + name: cSrc.name, + } + + if cSrc.isConst { + cDst.isConst = true + cDst.valuesEncoded = cSrc.valuesEncoded + } else if cSrc.isTime { + cDst.isTime = true + } else { + cDst.valueType = cSrc.valueType + cDst.minValue = cSrc.minValue + cDst.maxValue = cSrc.maxValue + cDst.dictValues = cSrc.dictValues + cDst.newValuesEncodedFunc = func(br *blockResult) []string { + valuesEncodedSrc := cSrc.getValuesEncoded(brSrc) + + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + bm.forEachSetBitReadonly(func(idx int) { + valuesBuf = append(valuesBuf, valuesEncodedSrc[idx]) + }) + br.valuesBuf = valuesBuf + + return valuesBuf[valuesBufLen:] + } + } + + br.csBuf = append(br.csBuf, cDst) + br.csInitialized = false +} + // cloneValues clones the given values into br and returns the cloned values. func (br *blockResult) cloneValues(values []string) []string { if values == nil { diff --git a/lib/logstorage/filter_range.go b/lib/logstorage/filter_range.go index d982e364c..dad9e0608 100644 --- a/lib/logstorage/filter_range.go +++ b/lib/logstorage/filter_range.go @@ -12,8 +12,8 @@ import ( type filterRange struct { fieldName string - minValue float64 - maxValue float64 + minValue float64 + maxValue float64 stringRepr string } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 1ade23ec1..eb4b1c5ac 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -241,6 +241,16 @@ func (q *Query) AddPipeLimit(n uint64) { func (q *Query) Optimize() { q.pipes = optimizeSortOffsetPipes(q.pipes) q.pipes = optimizeSortLimitPipes(q.pipes) + q.pipes = optimizeFilterPipes(q.pipes) + + // Merge `q | filter ...` into q. + if len(q.pipes) > 0 { + pf, ok := q.pipes[0].(*pipeFilter) + if ok { + q.f = mergeFiltersAnd(q.f, pf.f) + q.pipes = append(q.pipes[:0], q.pipes[1:]...) + } + } } func optimizeSortOffsetPipes(pipes []pipe) []pipe { @@ -287,6 +297,48 @@ func optimizeSortLimitPipes(pipes []pipe) []pipe { return pipes } +func optimizeFilterPipes(pipes []pipe) []pipe { + // Merge multiple `| filter ...` pipes into a single `filter ...` pipe + i := 1 + for i < len(pipes) { + pf1, ok := pipes[i-1].(*pipeFilter) + if !ok { + i++ + continue + } + pf2, ok := pipes[i].(*pipeFilter) + if !ok { + i++ + continue + } + + pf1.f = mergeFiltersAnd(pf1.f, pf2.f) + pipes = append(pipes[:i], pipes[i+1:]...) + } + return pipes +} + +func mergeFiltersAnd(f1, f2 filter) filter { + fa1, ok := f1.(*filterAnd) + if ok { + fa1.filters = append(fa1.filters, f2) + return fa1 + } + + fa2, ok := f2.(*filterAnd) + if ok { + filters := make([]filter, len(fa2.filters)+1) + filters[0] = f1 + copy(filters[1:], fa2.filters) + fa2.filters = filters + return fa2 + } + + return &filterAnd{ + filters: []filter{f1, f2}, + } +} + func (q *Query) getNeededColumns() ([]string, []string) { neededFields := newFieldsSet() neededFields.add("*") diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 6dac4caf5..f42d5e47f 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -975,6 +975,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | uniq (f1,f2) limit 10`, `* | uniq by (f1, f2) limit 10`) f(`* | uniq limit 10`, `* | uniq limit 10`) + // filter pipe + f(`* | filter error ip:12.3.4.5 or warn`, `* | filter error ip:12.3.4.5 or warn`) + f(`foo | stats by (host) count() logs | filter logs:>50 | sort by (logs desc) | limit 10`, `foo | stats by (host) count(*) as logs | filter logs:>50 | sort by (logs desc) | limit 10`) + // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) @@ -1341,6 +1345,12 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | uniq by(a) bar`) f(`foo | uniq by(a) limit -10`) f(`foo | uniq by(a) limit foo`) + + // invalid filter pipe + f(`foo | filter`) + f(`foo | filter | sort by (x)`) + f(`foo | filter (`) + f(`foo | filter )`) } func TestQueryGetNeededColumns(t *testing.T) { diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 7623b6075..200f48d36 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -71,24 +71,30 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("missing token after '|'") } switch { - case lex.isKeyword("stats"): - ps, err := parsePipeStats(lex) + case lex.isKeyword("copy", "cp"): + pc, err := parsePipeCopy(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) + return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) } - pipes = append(pipes, ps) - case lex.isKeyword("sort"): - ps, err := parsePipeSort(lex) + pipes = append(pipes, pc) + case lex.isKeyword("delete", "del", "rm"): + pd, err := parsePipeDelete(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err) + return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) } - pipes = append(pipes, ps) - case lex.isKeyword("uniq"): - pu, err := parsePipeUniq(lex) + pipes = append(pipes, pd) + case lex.isKeyword("fields"): + pf, err := parsePipeFields(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err) + return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) } - pipes = append(pipes, pu) + pipes = append(pipes, pf) + case lex.isKeyword("filter"): + pf, err := parsePipeFilter(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err) + } + pipes = append(pipes, pf) case lex.isKeyword("limit", "head"): pl, err := parsePipeLimit(lex) if err != nil { @@ -101,30 +107,30 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err) } pipes = append(pipes, ps) - case lex.isKeyword("fields"): - pf, err := parsePipeFields(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) - } - pipes = append(pipes, pf) - case lex.isKeyword("copy", "cp"): - pc, err := parsePipeCopy(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) - } - pipes = append(pipes, pc) case lex.isKeyword("rename", "mv"): pr, err := parsePipeRename(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err) } pipes = append(pipes, pr) - case lex.isKeyword("delete", "del", "rm"): - pd, err := parsePipeDelete(lex) + case lex.isKeyword("sort"): + ps, err := parsePipeSort(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) + return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err) } - pipes = append(pipes, pd) + pipes = append(pipes, ps) + case lex.isKeyword("stats"): + ps, err := parsePipeStats(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) + } + pipes = append(pipes, ps) + case lex.isKeyword("uniq"): + pu, err := parsePipeUniq(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err) + } + pipes = append(pipes, pu) default: return nil, fmt.Errorf("unexpected pipe %q", lex.token) } diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go new file mode 100644 index 000000000..8be594aa7 --- /dev/null +++ b/lib/logstorage/pipe_filter.go @@ -0,0 +1,109 @@ +package logstorage + +import ( + "fmt" + "unsafe" +) + +// pipeFilter processes '| filter ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe +type pipeFilter struct { + // f is a filter to apply to the written rows. + f filter +} + +func (pf *pipeFilter) String() string { + return "filter " + pf.f.String() +} + +func (pf *pipeFilter) updateNeededFields(neededFields, unneededFields fieldsSet) { + pf.f.updateNeededFields(neededFields) +} + +func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + shards := make([]pipeFilterProcessorShard, workersCount) + for i := range shards { + shards[i] = pipeFilterProcessorShard{ + pipeFilterProcessorShardNopad: pipeFilterProcessorShardNopad{ + pf: pf, + }, + } + } + + pfp := &pipeFilterProcessor{ + pf: pf, + ppBase: ppBase, + + shards: shards, + } + return pfp +} + +type pipeFilterProcessor struct { + pf *pipeFilter + ppBase pipeProcessor + + shards []pipeFilterProcessorShard +} + +type pipeFilterProcessorShard struct { + pipeFilterProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeFilterProcessorShardNopad{})%128]byte +} + +type pipeFilterProcessorShardNopad struct { + pf *pipeFilter + + br blockResult + bm bitmap +} + +func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pfp.shards[workerID] + + bm := &shard.bm + bm.init(len(br.timestamps)) + bm.setBits() + shard.pf.f.applyToBlockResult(br, bm) + if bm.areAllBitsSet() { + // Fast path - the filter didn't filter out anything - send br to the base pipe as is. + pfp.ppBase.writeBlock(workerID, br) + return + } + if bm.isZero() { + // Nothing to send + return + } + + // Slow path - copy the remaining rows from br to shard.br before sending them to base pipe. + shard.br.initFromFilterAllColumns(br, bm) + pfp.ppBase.writeBlock(workerID, &shard.br) +} + +func (pfp *pipeFilterProcessor) flush() error { + return nil +} + +func parsePipeFilter(lex *lexer) (*pipeFilter, error) { + if !lex.isKeyword("filter") { + return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token) + } + lex.nextToken() + + f, err := parseFilter(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'filter': %w", err) + } + + pf := &pipeFilter{ + f: f, + } + return pf, nil +} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index fbcda809c..e467bbd2a 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -320,7 +320,11 @@ func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(brSrc *blockResult // Store the remaining rows for the needed per-func fields to brDst brDst := &shard.brsBuf[i] - brDst.initFromNeededColumns(brSrc, bm, funcs[i].neededFieldsForFunc) + if bm.isZero() { + brDst.reset() + } else { + brDst.initFromFilterNeededColumns(brSrc, bm, funcs[i].neededFieldsForFunc) + } brs[i] = brDst } return brs