lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-05-28 19:29:41 +02:00
parent 31e23c6f6f
commit 0aafca29be
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
32 changed files with 2911 additions and 189 deletions

View file

@ -19,6 +19,18 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs)
Released at 2024-05-28
* FEATURE: add [`extract_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) for extracting arbitrary substrings from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with [RE2 egular expressions](https://github.com/google/re2/wiki/Syntax).
* FEATURE: add [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
* FEATURE: add [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), which returns unique values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
* FEATURE: allow omitting `stats` prefix in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). For example, `_time:5m | count() rows` is a valid query now. It is equivalent to `_time:5m | stats count() as rows`.
* FEATURE: allow omitting `filter` prefix in [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the filter doesn't clash with [pipe names](#https://docs.victoriametrics.com/victorialogs/logsql/#pipes). For example, `_time:5m | stats by (host) count() rows | rows:>1000` is a valid query now. It is equivalent to `_time:5m | stats by (host) count() rows | filter rows:>1000`.
* FEATURE: allow [`head` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) without number. For example, `error | head`. In this case 10 last values are returned as `head` Unix command does by default.
* FEATURE: allow using [comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filters) with strings. For example, `some_text_field:>="foo"` matches [log entries](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with `some_text_field` field values bigger or equal to `foo`.
## [v0.12.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.1-victorialogs)
Released at 2024-05-26

View file

@ -255,6 +255,7 @@ The list of LogsQL filters:
- [Phrase filter](#phrase-filter) - matches logs with the given phrase
- [Prefix filter](#prefix-filter) - matches logs with the given word prefix or phrase prefix
- [Substring filter](#substring-filter) - matches logs with the given substring
- [Range comparison filter](#range-comparison-filter) - matches logs with field values in the provided range
- [Empty value filter](#empty-value-filter) - matches logs without the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
- [Any value filter](#any-value-filter) - matches logs with the given non-empty [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
- [Exact filter](#exact-filter) - matches logs with the exact value
@ -576,6 +577,26 @@ See also:
- [Regexp filter](#regexp-filter)
### Range comparison filter
LogsQL supports `field:>X`, `field:>=X`, `field:<X` and `field:<=X` filters, where `field` is the name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
and `X` is either [numeric value](#numeric-values) or a string. For example, the following query returns logs containing numeric values for the `response_size` field bigger than `10*1024`:
```logsql
response_size:>10KiB
```
The following query returns logs with `user` field containing string values smaller than 'John`:
```logsql
username:<"John"
```
See also:
- [String range filter](#string-range-filter)
- [Range filter](#range-filter)
### Empty value filter
Sometimes it is needed to find log entries without the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -906,18 +927,12 @@ for searching for log entries with request durations exceeding 4.2 seconds:
request.duration:range(4.2, Inf)
```
This query can be shortened to:
This query can be shortened to by using [range comparison filter](#range-comparison-filter):
```logsql
request.duration:>4.2
```
The following query returns logs with request durations smaller or equal to 1.5 seconds:
```logsql
request.duration:<=1.5
```
The lower and the upper bounds of the `range(lower, upper)` are excluded by default. If they must be included, then substitute the corresponding
parentheses with square brackets. For example:
@ -941,6 +956,7 @@ Performance tips:
See also:
- [Range comparison filter](#range-comparison-filter)
- [IPv4 range filter](#ipv4-range-filter)
- [String range filter](#string-range-filter)
- [Length range filter](#length-range-filter)
@ -1012,6 +1028,7 @@ For example, the `user.name:string_range(C, E)` would match `user.name` fields,
See also:
- [Range comparison filter](#range-comparison-filter)
- [Range filter](#range-filter)
- [IPv4 range filter](#ipv4-range-filter)
- [Length range filter](#length-range-filter)
@ -1135,11 +1152,14 @@ LogsQL supports the following pipes:
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`extract`](#extract-pipe) extracts the sepcified text into the given log fields.
- [`extract_regexp`](#extract_regexp-pipe) extracts the sepcified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax).
- [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`field_values`](#field_values-pipe) returns all the values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`filter`](#filter-pipe) applies additional [filters](#filters) to results.
- [`format`](#format-pipe) formats ouptut field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`limit`](#limit-pipe) limits the number selected logs.
- [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`offset`](#offset-pipe) skips the given number of selected logs.
- [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object.
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1188,7 +1208,7 @@ For example, the following query deletes `host` and `app` fields from the logs o
_time:5m | delete host, app
```
`del` and `rm` keywords can be used instead of `delete` for convenience. For example, `_time:5m | del host` is equivalent to `_time:5m | rm host` and `_time:5m | delete host`.
`drop`, `del` and `rm` keywords can be used instead of `delete` for convenience. For example, `_time:5m | drop host` is equivalent to `_time:5m | delete host`.
See also:
@ -1251,6 +1271,7 @@ See also:
- [Conditional extract](#conditional-extract)
- [`unpack_json` pipe](#unpack_json-pipe)
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
- [`math` pipe](#math-pipe)
#### Format for extract pipe pattern
@ -1334,6 +1355,34 @@ For example, the following query is equivalent to the previous one:
_time:5m | extract "ip=<ip> " keep_original_fields
```
### extract_regexp pipe
`| extract_regexp "pattern" from field_name` [pipe](#pipes) extracts substrings from the [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
according to the provided `pattern`, and stores them into field names according to the named fields inside the `pattern`.
The `pattern` must contain [RE2 regular expression](https://github.com/google/re2/wiki/Syntax) with named fields (aka capturing groups) in the form `(?P<capture_field_name>...)`.
Matching substrings are stored to the given `capture_field_name` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
For example, the following query extracts ipv4 addresses from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
and puts them into `ip` field for logs over the last 5 minutes:
```logsql
_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)" from _msg
```
The `from _msg` part can be omitted if the data extraction is performed from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
So the following query is equivalent to the previous one:
```logsql
_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"
```
Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance.
See also:
- [`extract` pipe](#extract-pipe)
- [`replace_regexp` pipe](#replace_regexp-pipe)
- [`unpack_json` pipe](#unpack_json-pipe)
### field_names pipe
`| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
@ -1348,8 +1397,33 @@ Field names are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in or
See also:
- [`field_values` pipe](#field_values-pipe)
- [`uniq` pipe](#uniq-pipe)
### field_values pipe
`| field_values field_name` [pipe](#pipe) returns all the values for the given [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the number of logs per each value.
For example, the following query returns all the values with the number of matching logs for the field `level` over logs logs for the last 5 minutes:
```logsql
_time:5m | field_values level
```
It is possible limiting the number of returned values by adding `limit N` to the end of the `field_values ...`. For example, the following query returns
up to 10 values for the field `user_id` over logs for the last 5 minutes:
```logsql
_time:5m | field_values user_id limit 10
```
If the limit is reached, then the set of returned values is random. Also the number of matchin logs per each returned value is zeroed for performance reasons.
See also:
- [`field_names` pipe](#field_names-pipe)
- [`uniq` pipe)(#uniq-pipe)
### fields pipe
By default all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) are returned in the response.
@ -1374,8 +1448,7 @@ See also:
### filter pipe
Sometimes it is needed to apply additional filters on the calculated results. This can be done with `| filter ...` [pipe](#pipes).
The `filter` pipe can contain arbitrary [filters](#filters).
The `| filter ...` [pipe](#pipes) allows filtering the selected logs entries with arbitrary [filters](#filters).
For example, the following query returns `host` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values
if the number of log messages with the `error` [word](#word) for them over the last hour exceeds `1_000`:
@ -1384,6 +1457,13 @@ if the number of log messages with the `error` [word](#word) for them over the l
_time:1h error | stats by (host) count() logs_count | filter logs_count:> 1_000
```
It is allowed to omit `filter` prefix if the used filters do not clash with [pipe names](#pipes).
So the following query is equivalent to the previous one:
```logsql
_time:1h error | stats by (host) count() logs_count | logs_count:> 1_000
```
See also:
- [`stats` pipe](#stats-pipe)
@ -1463,6 +1543,12 @@ _time:5m | limit 100
`head` keyword can be used instead of `limit` for convenience. For example, `_time:5m | head 100` is equivalent to `_time:5m | limit 100`.
The `N` in `head N` can be omitted - in this case up to 10 matching logs are returned:
```logsql
error | head
```
By default rows are selected in arbitrary order because of performance reasons, so the query above can return different sets of logs every time it is executed.
[`sort` pipe](#sort-pipe) can be used for making sure the logs are in the same order before applying `limit ...` to them.
@ -1471,6 +1557,50 @@ See also:
- [`sort` pipe](#sort-pipe)
- [`offset` pipe](#offset-pipe)
### math pipe
`| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
For example, the following query divides `duration_msecs` field value by 1000, then rounds them to integer and stores the result in the `duration_secs` field:
```logsql
_time:5m | math round(duration_msecs / 1000) as duration_secs
```
The following mathematical operations are supported by `math` pipe:
- `arg1 + arg2` - returns the sum of `arg1` and `arg2`
- `arg1 - arg2` - returns the difference between `arg1` and `arg2`
- `arg1 * arg2` - multiplies `arg1` by `arg2`
- `arg1 / arg2` - divides `arg1` by `arg2`
- `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2`
- `arg1 ^ arg2` - returns the power of `arg1` by `arg2`
- `abs(arg)` - returns an absolute values for the given `arg`
- `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN`
- `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN`
- `round(arg)` - returns rounded to integer value for the given `arg`. The `round()` accepts optional `nearest` arg, which allows rounding the number to the given `nearest` multiple.
For example, `round(temperature, 0.1)` rounds `temperature` field to one decimal digit after the point.
Every `argX` argument in every mathematical operation can contain one of the following values:
- The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`.
- Any [supported numeric value](#numeric-values). For example, `response_size_bytes / 1MiB`.
- Another mathematical expression. Optionally, it may be put inside `(...)`. For example, `(a + b) * c`.
Multiple distinct results can be calculated in a single `math ...` pipe - just separate them with `,`. For example, the following query calculates the error rate
and the number of successful requests from `errors`, `warnings` and `requests` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
```logsql
_time:5m | math
(errors / requests) as error_rate,
(requests - (errors + warnings)) as success_requests
```
See also:
- [`stats` pipe](#stats-pipe)
- [`extract` pipe](#extract-pipe)
### offset pipe
If some selected logs must be skipped after [`sort`](#sort-pipe), then `| offset N` [pipe](#pipes) can be used, where `N` can contain any [supported integer numeric value](#numeric-values).
@ -1738,6 +1868,12 @@ For example, the following query calculates the following stats for logs over th
_time:5m | stats count() logs_total, count_uniq(_stream) streams_total
```
It is allowed to omit `stats` prefix for convenience. So the following query is equivalent to the previous one:
```logsql
_time:5m | count() logs_total, count_uniq(_stream) streams_total
```
See also:
- [stats by fields](#stats-by-fields)
@ -1747,6 +1883,7 @@ See also:
- [stats by IPv4 buckets](#stats-by-ipv4-buckets)
- [stats with additional filters](#stats-with-additional-filters)
- [stats pipe functions](#stats-pipe-functions)
- [`math` pipe](#math-pipe)
- [`sort` pipe](#sort-pipe)
@ -2462,12 +2599,7 @@ LogsQL supports the following transformations on the log entries selected with [
- Creating a new field from existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) according to the provided format. See [`format` pipe](#format-pipe).
- Replacing substrings in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
See [`replace` pipe](#replace-pipe) and [`replace_regexp` pipe](#replace_regexp-pipe) docs.
LogsQL will support the following transformations in the future:
- Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
See the [Roadmap](https://docs.victoriametrics.com/victorialogs/roadmap/) for details.
- Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [`math` pipe](#math-pipe).
It is also possible to perform various transformations on the [selected log entries](#filters) at client side
with `jq`, `awk`, `cut`, etc. Unix commands according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line).

View file

@ -13,8 +13,13 @@ import (
type filterAnd struct {
filters []filter
msgTokensOnce sync.Once
msgTokens []string
byFieldTokensOnce sync.Once
byFieldTokens []fieldTokens
}
type fieldTokens struct {
field string
tokens []string
}
func (fa *filterAnd) String() string {
@ -22,8 +27,7 @@ func (fa *filterAnd) String() string {
a := make([]string, len(filters))
for i, f := range filters {
s := f.String()
switch f.(type) {
case *filterOr:
if _, ok := f.(*filterOr); ok {
s = "(" + s + ")"
}
a[i] = s
@ -49,8 +53,8 @@ func (fa *filterAnd) applyToBlockResult(br *blockResult, bm *bitmap) {
}
func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
if !fa.matchMessageBloomFilter(bs) {
// Fast path - fa doesn't match _msg bloom filter.
if !fa.matchBloomFilters(bs) {
// Fast path - fa doesn't match bloom filters.
bm.resetBits()
return
}
@ -66,60 +70,114 @@ func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
}
}
func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool {
tokens := fa.getMessageTokens()
if len(tokens) == 0 {
func (fa *filterAnd) matchBloomFilters(bs *blockSearch) bool {
byFieldTokens := fa.getByFieldTokens()
if len(byFieldTokens) == 0 {
return true
}
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
return matchStringByAllTokens(v, tokens)
for _, fieldTokens := range byFieldTokens {
fieldName := fieldTokens.field
tokens := fieldTokens.tokens
v := bs.csh.getConstColumnValue(fieldName)
if v != "" {
if !matchStringByAllTokens(v, tokens) {
return false
}
continue
}
ch := bs.csh.getColumnHeader(fieldName)
if ch == nil {
return false
}
if ch.valueType == valueTypeDict {
if !matchDictValuesByAllTokens(ch.valuesDict.values, tokens) {
return false
}
continue
}
if !matchBloomFilterAllTokens(bs, ch, tokens) {
return false
}
}
ch := bs.csh.getColumnHeader("_msg")
if ch == nil {
return false
}
if ch.valueType == valueTypeDict {
return matchDictValuesByAllTokens(ch.valuesDict.values, tokens)
}
return matchBloomFilterAllTokens(bs, ch, tokens)
return true
}
func (fa *filterAnd) getMessageTokens() []string {
fa.msgTokensOnce.Do(fa.initMsgTokens)
return fa.msgTokens
func (fa *filterAnd) getByFieldTokens() []fieldTokens {
fa.byFieldTokensOnce.Do(fa.initByFieldTokens)
return fa.byFieldTokens
}
func (fa *filterAnd) initMsgTokens() {
var a []string
func (fa *filterAnd) initByFieldTokens() {
m := make(map[string]map[string]struct{})
byFieldFilters := make(map[string]int)
var fieldNames []string
for _, f := range fa.filters {
fieldName := ""
var tokens []string
switch t := f.(type) {
case *filterPhrase:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
case *filterSequence:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
case *filterExact:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
fieldName = t.fieldName
tokens = t.getTokens()
case *filterExactPrefix:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
}
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPhrase:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPrefix:
if isMsgFieldName(t.fieldName) {
a = append(a, t.getTokens()...)
fieldName = t.fieldName
tokens = t.getTokens()
case *filterRegexp:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterSequence:
fieldName = t.fieldName
tokens = t.getTokens()
}
fieldName = getCanonicalColumnName(fieldName)
byFieldFilters[fieldName]++
if len(tokens) > 0 {
mTokens, ok := m[fieldName]
if !ok {
fieldNames = append(fieldNames, fieldName)
mTokens = make(map[string]struct{})
m[fieldName] = mTokens
}
for _, token := range tokens {
mTokens[token] = struct{}{}
}
}
}
fa.msgTokens = a
var byFieldTokens []fieldTokens
for _, fieldName := range fieldNames {
if byFieldFilters[fieldName] < 2 {
// It is faster to perform bloom filter match inline when visiting the corresponding column
continue
}
mTokens := m[fieldName]
tokens := make([]string, 0, len(mTokens))
for token := range mTokens {
tokens = append(tokens, token)
}
byFieldTokens = append(byFieldTokens, fieldTokens{
field: fieldName,
tokens: tokens,
})
}
fa.byFieldTokens = byFieldTokens
}
func matchStringByAllTokens(v string, tokens []string) bool {

View file

@ -3,6 +3,7 @@ package logstorage
import (
"fmt"
"math"
"slices"
"strings"
"sync"
@ -27,8 +28,9 @@ type filterIn struct {
// qFieldName must be set to field name for obtaining values from if q is non-nil.
qFieldName string
tokenSetsOnce sync.Once
tokenSets [][]string
tokensOnce sync.Once
commonTokens []string
tokenSets [][]string
stringValuesOnce sync.Once
stringValues map[string]struct{}
@ -74,28 +76,15 @@ func (fi *filterIn) updateNeededFields(neededFields fieldsSet) {
neededFields.add(fi.fieldName)
}
func (fi *filterIn) getTokenSets() [][]string {
fi.tokenSetsOnce.Do(fi.initTokenSets)
return fi.tokenSets
func (fi *filterIn) getTokens() ([]string, [][]string) {
fi.tokensOnce.Do(fi.initTokens)
return fi.commonTokens, fi.tokenSets
}
// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter.
const maxTokenSetsToInit = 1000
func (fi *filterIn) initTokens() {
commonTokens, tokenSets := getCommonTokensAndTokenSets(fi.values)
func (fi *filterIn) initTokenSets() {
values := fi.values
tokenSetsLen := len(values)
if tokenSetsLen > maxTokenSetsToInit {
tokenSetsLen = maxTokenSetsToInit
}
tokenSets := make([][]string, 0, tokenSetsLen+1)
for _, v := range values {
tokens := tokenizeStrings(nil, []string{v})
tokenSets = append(tokenSets, tokens)
if len(tokens) > maxTokenSetsToInit {
break
}
}
fi.commonTokens = commonTokens
fi.tokenSets = tokenSets
}
@ -385,47 +374,47 @@ func (fi *filterIn) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
return
}
tokenSets := fi.getTokenSets()
commonTokens, tokenSets := fi.getTokens()
switch ch.valueType {
case valueTypeString:
stringValues := fi.getStringValues()
matchAnyValue(bs, ch, bm, stringValues, tokenSets)
matchAnyValue(bs, ch, bm, stringValues, commonTokens, tokenSets)
case valueTypeDict:
stringValues := fi.getStringValues()
matchValuesDictByAnyValue(bs, ch, bm, stringValues)
case valueTypeUint8:
binValues := fi.getUint8Values()
matchAnyValue(bs, ch, bm, binValues, tokenSets)
matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
case valueTypeUint16:
binValues := fi.getUint16Values()
matchAnyValue(bs, ch, bm, binValues, tokenSets)
matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
case valueTypeUint32:
binValues := fi.getUint32Values()
matchAnyValue(bs, ch, bm, binValues, tokenSets)
matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
case valueTypeUint64:
binValues := fi.getUint64Values()
matchAnyValue(bs, ch, bm, binValues, tokenSets)
matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
case valueTypeFloat64:
binValues := fi.getFloat64Values()
matchAnyValue(bs, ch, bm, binValues, tokenSets)
matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
case valueTypeIPv4:
binValues := fi.getIPv4Values()
matchAnyValue(bs, ch, bm, binValues, tokenSets)
matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
case valueTypeTimestampISO8601:
binValues := fi.getTimestampISO8601Values()
matchAnyValue(bs, ch, bm, binValues, tokenSets)
matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
default:
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
}
}
func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, tokenSets [][]string) {
func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, commonTokens []string, tokenSets [][]string) {
if len(values) == 0 {
bm.resetBits()
return
}
if !matchBloomFilterAnyTokenSet(bs, ch, tokenSets) {
if !matchBloomFilterAnyTokenSet(bs, ch, commonTokens, tokenSets) {
bm.resetBits()
return
}
@ -435,7 +424,10 @@ func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[str
})
}
func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets [][]string) bool {
func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, commonTokens []string, tokenSets [][]string) bool {
if len(commonTokens) > 0 {
return matchBloomFilterAllTokens(bs, ch, commonTokens)
}
if len(tokenSets) == 0 {
return false
}
@ -453,6 +445,9 @@ func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets []
return false
}
// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter.
const maxTokenSetsToInit = 1000
func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}) {
bb := bbPool.Get()
for _, v := range ch.valuesDict.values {
@ -465,3 +460,44 @@ func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, va
matchEncodedValuesDict(bs, ch, bm, bb.B)
bbPool.Put(bb)
}
func getCommonTokensAndTokenSets(values []string) ([]string, [][]string) {
tokenSets := make([][]string, len(values))
for i, v := range values {
tokenSets[i] = tokenizeStrings(nil, []string{v})
}
commonTokens := getCommonTokens(tokenSets)
if len(commonTokens) == 0 {
return nil, tokenSets
}
return commonTokens, nil
}
func getCommonTokens(tokenSets [][]string) []string {
if len(tokenSets) == 0 {
return nil
}
m := make(map[string]struct{}, len(tokenSets[0]))
for _, token := range tokenSets[0] {
m[token] = struct{}{}
}
for _, tokens := range tokenSets[1:] {
if len(m) == 0 {
return nil
}
for token := range m {
if !slices.Contains(tokens, token) {
delete(m, token)
}
}
}
tokens := make([]string, 0, len(m))
for token := range m {
tokens = append(tokens, token)
}
return tokens
}

View file

@ -1,6 +1,8 @@
package logstorage
import (
"reflect"
"slices"
"testing"
)
@ -688,3 +690,32 @@ func TestFilterIn(t *testing.T) {
testFilterMatchForColumns(t, columns, fi, "_msg", nil)
})
}
func TestGetCommonTokensAndTokenSets(t *testing.T) {
f := func(values []string, commonTokensExpected []string, tokenSetsExpected [][]string) {
t.Helper()
commonTokens, tokenSets := getCommonTokensAndTokenSets(values)
slices.Sort(commonTokens)
if !reflect.DeepEqual(commonTokens, commonTokensExpected) {
t.Fatalf("unexpected commonTokens for values=%q\ngot\n%q\nwant\n%q", values, commonTokens, commonTokensExpected)
}
for i, tokens := range tokenSets {
slices.Sort(tokens)
tokensExpected := tokenSetsExpected[i]
if !reflect.DeepEqual(tokens, tokensExpected) {
t.Fatalf("unexpected tokens for value=%q\ngot\n%q\nwant\n%q", values[i], tokens, tokensExpected)
}
}
}
f(nil, nil, nil)
f([]string{"foo"}, []string{"foo"}, nil)
f([]string{"foo", "foo"}, []string{"foo"}, nil)
f([]string{"foo", "bar", "bar", "foo"}, nil, [][]string{{"foo"}, {"bar"}, {"bar"}, {"foo"}})
f([]string{"foo", "foo bar", "bar foo"}, []string{"foo"}, nil)
f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, nil)
f([]string{"a xfoo bar", "xbar abc foo", "foo abc a bar"}, nil, [][]string{{"a", "bar", "xfoo"}, {"abc", "foo", "xbar"}, {"a", "abc", "bar", "foo"}})
}

View file

@ -2,6 +2,7 @@ package logstorage
import (
"strings"
"sync"
)
// filterOr contains filters joined by OR operator.
@ -9,6 +10,9 @@ import (
// It is epxressed as `f1 OR f2 ... OR fN` in LogsQL.
type filterOr struct {
filters []filter
byFieldTokensOnce sync.Once
byFieldTokens []fieldTokens
}
func (fo *filterOr) String() string {
@ -51,6 +55,12 @@ func (fo *filterOr) applyToBlockResult(br *blockResult, bm *bitmap) {
}
func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
if !fo.matchBloomFilters(bs) {
// Fast path - fo doesn't match bloom filters.
bm.resetBits()
return
}
bmResult := getBitmap(bm.bitsLen)
bmTmp := getBitmap(bm.bitsLen)
for _, f := range fo.filters {
@ -72,3 +82,105 @@ func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
bm.copyFrom(bmResult)
putBitmap(bmResult)
}
func (fo *filterOr) matchBloomFilters(bs *blockSearch) bool {
byFieldTokens := fo.getByFieldTokens()
if len(byFieldTokens) == 0 {
return true
}
for _, fieldTokens := range byFieldTokens {
fieldName := fieldTokens.field
tokens := fieldTokens.tokens
v := bs.csh.getConstColumnValue(fieldName)
if v != "" {
if matchStringByAllTokens(v, tokens) {
return true
}
continue
}
ch := bs.csh.getColumnHeader(fieldName)
if ch == nil {
continue
}
if ch.valueType == valueTypeDict {
if matchDictValuesByAllTokens(ch.valuesDict.values, tokens) {
return true
}
continue
}
if matchBloomFilterAllTokens(bs, ch, tokens) {
return true
}
}
return false
}
func (fo *filterOr) getByFieldTokens() []fieldTokens {
fo.byFieldTokensOnce.Do(fo.initByFieldTokens)
return fo.byFieldTokens
}
func (fo *filterOr) initByFieldTokens() {
m := make(map[string][][]string)
byFieldFilters := make(map[string]int)
var fieldNames []string
for _, f := range fo.filters {
fieldName := ""
var tokens []string
switch t := f.(type) {
case *filterExact:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterExactPrefix:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPhrase:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPrefix:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterRegexp:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterSequence:
fieldName = t.fieldName
tokens = t.getTokens()
}
fieldName = getCanonicalColumnName(fieldName)
byFieldFilters[fieldName]++
if len(tokens) > 0 {
if _, ok := m[fieldName]; !ok {
fieldNames = append(fieldNames, fieldName)
}
m[fieldName] = append(m[fieldName], tokens)
}
}
var byFieldTokens []fieldTokens
for _, fieldName := range fieldNames {
if byFieldFilters[fieldName] < 2 {
// It is faster to perform bloom filter match inline when visiting the corresponding column
continue
}
commonTokens := getCommonTokens(m[fieldName])
if len(commonTokens) > 0 {
byFieldTokens = append(byFieldTokens, fieldTokens{
field: fieldName,
tokens: commonTokens,
})
}
}
fo.byFieldTokens = byFieldTokens
}

View file

@ -1,11 +1,11 @@
package logstorage
import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
var maxStringRangeValue = string([]byte{255, 255, 255, 255})
// filterStringRange matches tie given string range [minValue..maxValue)
//
// Note that the minValue is included in the range, while the maxValue isn't included in the range.
@ -16,10 +16,12 @@ type filterStringRange struct {
fieldName string
minValue string
maxValue string
stringRepr string
}
func (fr *filterStringRange) String() string {
return fmt.Sprintf("%sstring_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), quoteTokenIfNeeded(fr.minValue), quoteTokenIfNeeded(fr.maxValue))
return quoteFieldNameIfNeeded(fr.fieldName) + fr.stringRepr
}
func (fr *filterStringRange) updateNeededFields(neededFields fieldsSet) {

View file

@ -74,6 +74,11 @@ func (lex *lexer) isQuotedToken() bool {
return lex.token != lex.rawToken
}
func (lex *lexer) isNumber() bool {
s := lex.rawToken + lex.s
return isNumberPrefix(s)
}
func (lex *lexer) isPrevToken(tokens ...string) bool {
for _, token := range tokens {
if token == lex.prevToken {
@ -247,7 +252,7 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
s := fmt.Sprintf("stats by (%s) count() hits", byFieldsStr)
lex := newLexer(s)
ps, err := parsePipeStats(lex)
ps, err := parsePipeStats(lex, true)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing [%s]: %s", s, err)
}
@ -855,6 +860,8 @@ func parseFilterStringRange(lex *lexer, fieldName string) (filter, error) {
fieldName: fieldName,
minValue: args[0],
maxValue: args[1],
stringRepr: fmt.Sprintf("string_range(%s, %s)", quoteTokenIfNeeded(args[0]), quoteTokenIfNeeded(args[1])),
}
return fr, nil
})
@ -1091,6 +1098,15 @@ func parseFilterGT(lex *lexer, fieldName string) (filter, error) {
op = ">="
}
if !lex.isNumber() {
lexState := lex.backupState()
fr := tryParseFilterGTString(lex, fieldName, op, includeMinValue)
if fr != nil {
return fr, nil
}
lex.restoreState(lexState)
}
minValue, fStr, err := parseFloat64(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err)
@ -1120,6 +1136,15 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) {
op = "<="
}
if !lex.isNumber() {
lexState := lex.backupState()
fr := tryParseFilterLTString(lex, fieldName, op, includeMaxValue)
if fr != nil {
return fr, nil
}
lex.restoreState(lexState)
}
maxValue, fStr, err := parseFloat64(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err)
@ -1138,6 +1163,43 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) {
return fr, nil
}
func tryParseFilterGTString(lex *lexer, fieldName, op string, includeMinValue bool) filter {
minValueOrig, err := getCompoundToken(lex)
if err != nil {
return nil
}
minValue := minValueOrig
if !includeMinValue {
minValue = string(append([]byte(minValue), 0))
}
fr := &filterStringRange{
fieldName: fieldName,
minValue: minValue,
maxValue: maxStringRangeValue,
stringRepr: op + quoteStringTokenIfNeeded(minValueOrig),
}
return fr
}
func tryParseFilterLTString(lex *lexer, fieldName, op string, includeMaxValue bool) filter {
maxValueOrig, err := getCompoundToken(lex)
if err != nil {
return nil
}
maxValue := maxValueOrig
if includeMaxValue {
maxValue = string(append([]byte(maxValue), 0))
}
fr := &filterStringRange{
fieldName: fieldName,
maxValue: maxValue,
stringRepr: op + quoteStringTokenIfNeeded(maxValueOrig),
}
return fr
}
func parseFilterRange(lex *lexer, fieldName string) (filter, error) {
funcName := lex.token
lex.nextToken()
@ -1495,6 +1557,13 @@ func parseTime(lex *lexer) (int64, string, error) {
return int64(math.Round(t*1e3)) * 1e6, s, nil
}
func quoteStringTokenIfNeeded(s string) string {
if !needQuoteStringToken(s) {
return s
}
return strconv.Quote(s)
}
func quoteTokenIfNeeded(s string) string {
if !needQuoteToken(s) {
return s
@ -1502,6 +1571,23 @@ func quoteTokenIfNeeded(s string) string {
return strconv.Quote(s)
}
func needQuoteStringToken(s string) bool {
return isNumberPrefix(s) || needQuoteToken(s)
}
func isNumberPrefix(s string) bool {
if len(s) == 0 {
return false
}
if s[0] == '-' || s[0] == '+' {
s = s[1:]
if len(s) == 0 {
return false
}
}
return s[0] >= '0' && s[0] <= '9'
}
func needQuoteToken(s string) bool {
sLower := strings.ToLower(s)
if _, ok := reservedKeywords[sLower]; ok {

View file

@ -353,6 +353,10 @@ func TestParseFilterStringRange(t *testing.T) {
f("string_range(foo, bar)", ``, "foo", "bar")
f(`abc:string_range("foo,bar", "baz) !")`, `abc`, `foo,bar`, `baz) !`)
f(">foo", ``, "foo\x00", maxStringRangeValue)
f("x:>=foo", `x`, "foo", maxStringRangeValue)
f("x:<foo", `x`, ``, `foo`)
f(`<="123"`, ``, ``, "123\x00")
}
func TestParseFilterRegexp(t *testing.T) {
@ -527,9 +531,9 @@ func TestParseRangeFilter(t *testing.T) {
f(`foo:>=10.43`, `foo`, 10.43, inf)
f(`foo: >= -10.43`, `foo`, -10.43, inf)
f(`foo:<10.43`, `foo`, -inf, nextafter(10.43, -inf))
f(`foo:<10.43K`, `foo`, -inf, nextafter(10_430, -inf))
f(`foo: < -10.43`, `foo`, -inf, nextafter(-10.43, -inf))
f(`foo:<=10.43`, `foo`, -inf, 10.43)
f(`foo:<=10.43ms`, `foo`, -inf, 10_430_000)
f(`foo: <= 10.43`, `foo`, -inf, 10.43)
}
@ -590,6 +594,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`NOT foo AND bar OR baz`, `!foo bar or baz`)
f(`NOT (foo AND bar) OR baz`, `!(foo bar) or baz`)
f(`foo OR bar AND baz`, `foo or bar baz`)
f(`foo bar or baz xyz`, `foo bar or baz xyz`)
f(`foo (bar or baz) xyz`, `foo (bar or baz) xyz`)
f(`foo or bar baz or xyz`, `foo or bar baz or xyz`)
f(`(foo or bar) (baz or xyz)`, `(foo or bar) (baz or xyz)`)
f(`(foo OR bar) AND baz`, `(foo or bar) baz`)
// parens
@ -802,6 +810,12 @@ func TestParseQuerySuccess(t *testing.T) {
// string_range filter
f(`string_range(foo, bar)`, `string_range(foo, bar)`)
f(`foo:string_range("foo, bar", baz)`, `foo:string_range("foo, bar", baz)`)
f(`foo:>bar`, `foo:>bar`)
f(`foo:>"1234"`, `foo:>"1234"`)
f(`>="abc"`, `>=abc`)
f(`foo:<bar`, `foo:<bar`)
f(`foo:<"-12.34"`, `foo:<"-12.34"`)
f(`<="abc < de"`, `<="abc < de"`)
// reserved field names
f(`"_stream"`, `_stream`)
@ -869,8 +883,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | DELETE foo, bar`, `* | delete foo, bar`)
// limit and head pipe
f(`foo | limit 10`, `foo | limit 10`)
f(`foo | head 10`, `foo | limit 10`)
f(`foo | limit`, `foo | limit 10`)
f(`foo | head`, `foo | limit 10`)
f(`foo | limit 20`, `foo | limit 20`)
f(`foo | head 20`, `foo | limit 20`)
f(`foo | HEAD 1_123_432`, `foo | limit 1123432`)
f(`foo | head 10K`, `foo | limit 10000`)
@ -1065,6 +1081,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`foo | # some comment | foo bar
fields x # another comment
|filter "foo#this#isn't a comment"#this is comment`, `foo | fields x | filter "foo#this#isn't a comment"`)
// skip 'stats' and 'filter' prefixes
f(`* | by (host) count() rows | rows:>10`, `* | stats by (host) count(*) as rows | filter rows:>10`)
f(`* | (host) count() rows, count() if (error) errors | rows:>10`, `* | stats by (host) count(*) as rows, count(*) if (error) as errors | filter rows:>10`)
}
func TestParseQueryFailure(t *testing.T) {
@ -1082,7 +1102,7 @@ func TestParseQueryFailure(t *testing.T) {
f("")
f("|")
f("foo|")
f("foo|bar")
f("foo|bar(")
f("foo and")
f("foo OR ")
f("not")
@ -1151,7 +1171,7 @@ func TestParseQueryFailure(t *testing.T) {
f(`very long query with error aaa ffdfd fdfdfd fdfd:( ffdfdfdfdfd`)
// query with unexpected tail
f(`foo | bar`)
f(`foo | bar(`)
// unexpected comma
f(`foo,bar`)
@ -1264,6 +1284,7 @@ func TestParseQueryFailure(t *testing.T) {
f(`string_range(foo, bar`)
f(`string_range(foo)`)
f(`string_range(foo, bar, baz)`)
f(`>(`)
// missing filter
f(`| fields *`)
@ -1271,9 +1292,9 @@ func TestParseQueryFailure(t *testing.T) {
// missing pipe keyword
f(`foo |`)
// unknown pipe keyword
f(`foo | bar`)
f(`foo | fields bar | baz`)
// invlaid pipe
f(`foo | bar(`)
f(`foo | fields bar | baz(`)
// missing field in fields pipe
f(`foo | fields`)
@ -1313,10 +1334,6 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | delete foo,`)
f(`foo | delete foo,,`)
// missing limit and head pipe value
f(`foo | limit`)
f(`foo | head`)
// invalid limit pipe value
f(`foo | limit bar`)
f(`foo | limit -123`)

View file

@ -86,6 +86,8 @@ func parsePipes(lex *lexer) ([]pipe, error) {
lex.nextToken()
case lex.isKeyword(")", ""):
return pipes, nil
default:
return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token)
}
}
}
@ -98,7 +100,7 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err)
}
return pc, nil
case lex.isKeyword("delete", "del", "rm"):
case lex.isKeyword("delete", "del", "rm", "drop"):
pd, err := parsePipeDelete(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
@ -110,12 +112,24 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err)
}
return pe, nil
case lex.isKeyword("extract_regexp"):
pe, err := parsePipeExtractRegexp(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'extract_regexp' pipe: %w", err)
}
return pe, nil
case lex.isKeyword("field_names"):
pf, err := parsePipeFieldNames(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("field_values"):
pf, err := parsePipeFieldValues(lex)
if err != nil {
return nil, fmt.Errorf("cannot pase 'field_values' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("fields", "keep"):
pf, err := parsePipeFields(lex)
if err != nil {
@ -123,7 +137,7 @@ func parsePipe(lex *lexer) (pipe, error) {
}
return pf, nil
case lex.isKeyword("filter"):
pf, err := parsePipeFilter(lex)
pf, err := parsePipeFilter(lex, true)
if err != nil {
return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err)
}
@ -140,6 +154,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err)
}
return pl, nil
case lex.isKeyword("math"):
pm, err := parsePipeMath(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'math' pipe: %w", err)
}
return pm, nil
case lex.isKeyword("offset", "skip"):
ps, err := parsePipeOffset(lex)
if err != nil {
@ -177,7 +197,7 @@ func parsePipe(lex *lexer) (pipe, error) {
}
return ps, nil
case lex.isKeyword("stats"):
ps, err := parsePipeStats(lex)
ps, err := parsePipeStats(lex, true)
if err != nil {
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
}
@ -207,6 +227,22 @@ func parsePipe(lex *lexer) (pipe, error) {
}
return pu, nil
default:
lexState := lex.backupState()
// Try parsing stats pipe without 'stats' keyword
ps, err := parsePipeStats(lex, false)
if err == nil {
return ps, nil
}
lex.restoreState(lexState)
// Try parsing filter pipe without 'filter' keyword
pf, err := parsePipeFilter(lex, false)
if err == nil {
return pf, nil
}
lex.restoreState(lexState)
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
}
}

View file

@ -70,8 +70,8 @@ func (pdp *pipeDeleteProcessor) flush() error {
}
func parsePipeDelete(lex *lexer) (*pipeDelete, error) {
if !lex.isKeyword("delete", "del", "rm") {
return nil, fmt.Errorf("expecting 'delete', 'del' or 'rm'; got %q", lex.token)
if !lex.isKeyword("delete", "del", "rm", "drop") {
return nil, fmt.Errorf("expecting 'delete', 'del', 'rm' or 'drop'; got %q", lex.token)
}
var fields []string

View file

@ -64,13 +64,14 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
unneededFieldsOrig := unneededFields.clone()
needFromField := false
for _, step := range pe.ptn.steps {
if step.field != "" {
if !unneededFieldsOrig.contains(step.field) {
needFromField = true
}
if !pe.keepOriginalFields && !pe.skipEmptyResults {
unneededFields.add(step.field)
}
if step.field == "" {
continue
}
if !unneededFieldsOrig.contains(step.field) {
needFromField = true
}
if !pe.keepOriginalFields && !pe.skipEmptyResults {
unneededFields.add(step.field)
}
}
if needFromField {
@ -85,7 +86,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
neededFieldsOrig := neededFields.clone()
needFromField := false
for _, step := range pe.ptn.steps {
if step.field != "" && neededFieldsOrig.contains(step.field) {
if step.field == "" {
continue
}
if neededFieldsOrig.contains(step.field) {
needFromField = true
if !pe.keepOriginalFields && !pe.skipEmptyResults {
neededFields.remove(step.field)

View file

@ -0,0 +1,334 @@
package logstorage
import (
"fmt"
"regexp"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeExtractRegexp processes '| extract_regexp ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe
type pipeExtractRegexp struct {
fromField string
re *regexp.Regexp
reFields []string
keepOriginalFields bool
skipEmptyResults bool
// iff is an optional filter for skipping the extract func
iff *ifFilter
}
func (pe *pipeExtractRegexp) String() string {
s := "extract_regexp"
if pe.iff != nil {
s += " " + pe.iff.String()
}
reStr := pe.re.String()
s += " " + quoteTokenIfNeeded(reStr)
if !isMsgFieldName(pe.fromField) {
s += " from " + quoteTokenIfNeeded(pe.fromField)
}
if pe.keepOriginalFields {
s += " keep_original_fields"
}
if pe.skipEmptyResults {
s += " skip_empty_results"
}
return s
}
func (pe *pipeExtractRegexp) optimize() {
pe.iff.optimizeFilterIn()
}
func (pe *pipeExtractRegexp) hasFilterInWithQuery() bool {
return pe.iff.hasFilterInWithQuery()
}
func (pe *pipeExtractRegexp) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
iffNew, err := pe.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
peNew := *pe
peNew.iff = iffNew
return &peNew, nil
}
func (pe *pipeExtractRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFieldsOrig := unneededFields.clone()
needFromField := false
for _, f := range pe.reFields {
if f == "" {
continue
}
if !unneededFieldsOrig.contains(f) {
needFromField = true
}
if !pe.keepOriginalFields && !pe.skipEmptyResults {
unneededFields.add(f)
}
}
if needFromField {
unneededFields.remove(pe.fromField)
if pe.iff != nil {
unneededFields.removeFields(pe.iff.neededFields)
}
} else {
unneededFields.add(pe.fromField)
}
} else {
neededFieldsOrig := neededFields.clone()
needFromField := false
for _, f := range pe.reFields {
if f == "" {
continue
}
if neededFieldsOrig.contains(f) {
needFromField = true
if !pe.keepOriginalFields && !pe.skipEmptyResults {
neededFields.remove(f)
}
}
}
if needFromField {
neededFields.add(pe.fromField)
if pe.iff != nil {
neededFields.addFields(pe.iff.neededFields)
}
}
}
}
func (pe *pipeExtractRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeExtractRegexpProcessor{
pe: pe,
ppNext: ppNext,
shards: make([]pipeExtractRegexpProcessorShard, workersCount),
}
}
type pipeExtractRegexpProcessor struct {
pe *pipeExtractRegexp
ppNext pipeProcessor
shards []pipeExtractRegexpProcessorShard
}
type pipeExtractRegexpProcessorShard struct {
pipeExtractRegexpProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeExtractRegexpProcessorShardNopad{})%128]byte
}
func (shard *pipeExtractRegexpProcessorShard) apply(re *regexp.Regexp, v string) {
shard.fields = slicesutil.SetLength(shard.fields, len(shard.rcs))
fields := shard.fields
clear(fields)
locs := re.FindStringSubmatchIndex(v)
if locs == nil {
return
}
for i := range fields {
start := locs[2*i]
if start < 0 {
// mismatch
continue
}
end := locs[2*i+1]
fields[i] = v[start:end]
}
}
type pipeExtractRegexpProcessorShardNopad struct {
bm bitmap
resultColumns []*blockResultColumn
resultValues []string
rcs []resultColumn
a arena
fields []string
}
func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
pe := pep.pe
shard := &pep.shards[workerID]
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
if iff := pe.iff; iff != nil {
iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
pep.ppNext.writeBlock(workerID, br)
return
}
}
reFields := pe.reFields
shard.rcs = slicesutil.SetLength(shard.rcs, len(reFields))
rcs := shard.rcs
for i := range reFields {
rcs[i].name = reFields[i]
}
c := br.getColumnByName(pe.fromField)
values := c.getValues(br)
shard.resultColumns = slicesutil.SetLength(shard.resultColumns, len(rcs))
resultColumns := shard.resultColumns
for i := range resultColumns {
if reFields[i] != "" {
resultColumns[i] = br.getColumnByName(rcs[i].name)
}
}
shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
resultValues := shard.resultValues
hadUpdates := false
vPrev := ""
for rowIdx, v := range values {
if bm.isSetBit(rowIdx) {
if !hadUpdates || vPrev != v {
vPrev = v
hadUpdates = true
shard.apply(pe.re, v)
for i, v := range shard.fields {
if reFields[i] == "" {
continue
}
if v == "" && pe.skipEmptyResults || pe.keepOriginalFields {
c := resultColumns[i]
if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" {
v = vOrig
}
} else {
v = shard.a.copyString(v)
}
resultValues[i] = v
}
}
} else {
for i, c := range resultColumns {
if reFields[i] != "" {
resultValues[i] = c.getValueAtRow(br, rowIdx)
}
}
}
for i, v := range resultValues {
if reFields[i] != "" {
rcs[i].addValue(v)
}
}
}
for i := range rcs {
if reFields[i] != "" {
br.addResultColumn(&rcs[i])
}
}
pep.ppNext.writeBlock(workerID, br)
for i := range rcs {
rcs[i].reset()
}
shard.a.reset()
}
func (pep *pipeExtractRegexpProcessor) flush() error {
return nil
}
func parsePipeExtractRegexp(lex *lexer) (*pipeExtractRegexp, error) {
if !lex.isKeyword("extract_regexp") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "extract_regexp")
}
lex.nextToken()
// parse optional if (...)
var iff *ifFilter
if lex.isKeyword("if") {
f, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
iff = f
}
// parse pattern
patternStr, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot read 'pattern': %w", err)
}
re, err := regexp.Compile(patternStr)
if err != nil {
return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", patternStr, err)
}
reFields := re.SubexpNames()
hasNamedFields := false
for _, f := range reFields {
if f != "" {
hasNamedFields = true
break
}
}
if !hasNamedFields {
return nil, fmt.Errorf("the 'pattern' %q must contain at least a single named group in the form (?P<group_name>...)", patternStr)
}
// parse optional 'from ...' part
fromField := "_msg"
if lex.isKeyword("from") {
lex.nextToken()
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'from' field name: %w", err)
}
fromField = f
}
keepOriginalFields := false
skipEmptyResults := false
switch {
case lex.isKeyword("keep_original_fields"):
lex.nextToken()
keepOriginalFields = true
case lex.isKeyword("skip_empty_results"):
lex.nextToken()
skipEmptyResults = true
}
pe := &pipeExtractRegexp{
fromField: fromField,
re: re,
reFields: reFields,
keepOriginalFields: keepOriginalFields,
skipEmptyResults: skipEmptyResults,
iff: iff,
}
return pe, nil
}

View file

@ -0,0 +1,329 @@
package logstorage
import (
"testing"
)
func TestParsePipeExtractRegexpSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`extract_regexp "foo(?P<bar>.*)"`)
f(`extract_regexp "foo(?P<bar>.*)" skip_empty_results`)
f(`extract_regexp "foo(?P<bar>.*)" keep_original_fields`)
f(`extract_regexp "foo(?P<bar>.*)" from x`)
f(`extract_regexp "foo(?P<bar>.*)" from x skip_empty_results`)
f(`extract_regexp "foo(?P<bar>.*)" from x keep_original_fields`)
f(`extract_regexp if (x:y) "foo(?P<bar>.*)" from baz`)
f(`extract_regexp if (x:y) "foo(?P<bar>.*)" from baz skip_empty_results`)
f(`extract_regexp if (x:y) "foo(?P<bar>.*)" from baz keep_original_fields`)
}
func TestParsePipeExtractRegexpFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`extract_regexp`)
f(`extract_regexp keep_original_fields`)
f(`extract_regexp skip_empty_results`)
f(`extract_regexp from`)
f(`extract_regexp from x`)
f(`extract_regexp from x "y(?P<foo>.*)"`)
f(`extract_regexp if (x:y)`)
f(`extract_regexp "a(?P<b>.*)" if (x:y)`)
f(`extract_regexp "a"`)
f(`extract_regexp "(foo)"`)
}
func TestPipeExtractRegexp(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// skip empty results
f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)" skip_empty_results`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=`},
{"aa", "foobar"},
{"abc", "ippl"},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=`},
{"aa", "foobar"},
{"abc", `"x y=z"`},
},
})
// no skip empty results
f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)"`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=`},
{"aa", "foobar"},
{"abc", "ippl"},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=`},
{"aa", ""},
{"abc", `"x y=z"`},
},
})
// keep original fields
f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)" keep_original_fields`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"aa", "foobar"},
{"abc", ""},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"abc", `"x y=z"`},
{"aa", "foobar"},
},
})
// no keep original fields
f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)"`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"aa", "foobar"},
{"abc", ""},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"abc", `"x y=z"`},
{"aa", "b"},
},
})
// single row, extract from _msg
f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)"`, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"abc", `"x y=z"`},
{"aa", "b"},
},
})
// single row, extract from _msg into _msg
f(`extract_regexp "msg=(?P<_msg>.*)"`, [][]Field{
{
{"_msg", `msg=bar`},
},
}, [][]Field{
{
{"_msg", "bar"},
},
})
// single row, extract from non-existing field
f(`extract_regexp "foo=(?P<bar>.*)" from x`, [][]Field{
{
{"_msg", `foo=bar`},
},
}, [][]Field{
{
{"_msg", `foo=bar`},
{"bar", ""},
},
})
// single row, pattern mismatch
f(`extract_regexp "foo=(?P<bar>.*)" from x`, [][]Field{
{
{"x", `foobar`},
},
}, [][]Field{
{
{"x", `foobar`},
{"bar", ""},
},
})
f(`extract_regexp "foo=(?P<bar>.*) baz=(?P<xx>.*)" from x`, [][]Field{
{
{"x", `a foo="a\"b\\c" cde baz=aa`},
},
}, [][]Field{
{
{"x", `a foo="a\"b\\c" cde baz=aa`},
{"bar", `"a\"b\\c" cde`},
{"xx", "aa"},
},
})
// single row, overwirte existing column
f(`extract_regexp "foo=(?P<bar>.*) baz=(?P<xx>.*)" from x`, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", "abc"},
},
}, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", `cc`},
{"xx", `aa b`},
},
})
// single row, if match
f(`extract_regexp if (x:baz) "foo=(?P<bar>.*) baz=(?P<xx>.*)" from "x"`, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", "abc"},
},
}, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", `cc`},
{"xx", `aa b`},
},
})
// single row, if mismatch
f(`extract_regexp if (bar:"") "foo=(?P<bar>.*) baz=(?P<xx>.*)" from 'x'`, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", "abc"},
},
}, [][]Field{
{
{"x", `a foo=cc baz=aa b`},
{"bar", `abc`},
},
})
// multiple rows with distinct set of labels
f(`extract_regexp if (!ip:keep) "ip=(?P<ip>([0-9]+[.]){3}[0-9]+) "`, [][]Field{
{
{"foo", "bar"},
{"_msg", "request from ip=1.2.3.4 xxx"},
{"f3", "y"},
},
{
{"foo", "aaa"},
{"_msg", "ip=5.4.3.1 abcd"},
{"ip", "keep"},
{"a", "b"},
},
{
{"foo", "aaa"},
{"_msg", "ip=34.32.11.94 abcd"},
{"ip", "ppp"},
{"a", "b"},
},
{
{"foo", "klkfs"},
{"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"},
{"ip", "bbbsd"},
{"a", "klo2i"},
},
}, [][]Field{
{
{"foo", "bar"},
{"_msg", "request from ip=1.2.3.4 xxx"},
{"f3", "y"},
{"ip", "1.2.3.4"},
},
{
{"foo", "aaa"},
{"_msg", "ip=5.4.3.1 abcd"},
{"ip", "keep"},
{"a", "b"},
},
{
{"foo", "aaa"},
{"_msg", "ip=34.32.11.94 abcd"},
{"ip", "34.32.11.94"},
{"a", "b"},
},
{
{"foo", "klkfs"},
{"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"},
{"ip", ""},
{"a", "klo2i"},
},
})
}
func TestPipeExtractRegexpUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("extract_regexp '(?P<foo>.*)' from x", "*", "", "*", "foo")
f("extract_regexp if (foo:bar) '(?P<foo>.*)' from x", "*", "", "*", "")
f("extract_regexp if (foo:bar) '(?P<foo>.*)' from x keep_original_fields", "*", "", "*", "")
f("extract_regexp if (foo:bar) '(?P<foo>.*)' from x skip_empty_results", "*", "", "*", "")
// unneeded fields do not intersect with pattern and output fields
f("extract_regexp '(?P<foo>.*)' from x", "*", "f1,f2", "*", "f1,f2,foo")
f("extract_regexp '(?P<foo>.*)' from x keep_original_fields", "*", "f1,f2", "*", "f1,f2")
f("extract_regexp '(?P<foo>.*)' from x skip_empty_results", "*", "f1,f2", "*", "f1,f2")
f("extract_regexp if (f1:x) '(?P<foo>.*)' from x", "*", "f1,f2", "*", "f2,foo")
f("extract_regexp if (f1:x) '(?P<foo>.*)' from x keep_original_fields", "*", "f1,f2", "*", "f2")
f("extract_regexp if (f1:x) '(?P<foo>.*)' from x skip_empty_results", "*", "f1,f2", "*", "f2")
f("extract_regexp if (foo:bar f1:x) '(?P<foo>.*)' from x", "*", "f1,f2", "*", "f2")
// unneeded fields intersect with pattern
f("extract_regexp '(?P<foo>.*)' from x", "*", "f2,x", "*", "f2,foo")
f("extract_regexp '(?P<foo>.*)' from x keep_original_fields", "*", "f2,x", "*", "f2")
f("extract_regexp '(?P<foo>.*)' from x skip_empty_results", "*", "f2,x", "*", "f2")
f("extract_regexp if (f1:abc) '(?P<foo>.*)' from x", "*", "f2,x", "*", "f2,foo")
f("extract_regexp if (f2:abc) '(?P<foo>.*)' from x", "*", "f2,x", "*", "foo")
// unneeded fields intersect with output fields
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo", "*", "bar,f2,foo")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "*", "f2,foo", "*", "f2,foo")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "*", "f2,foo", "*", "f2,foo")
f("extract_regexp if (f1:abc) '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo", "*", "bar,f2,foo")
f("extract_regexp if (f2:abc foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo", "*", "bar")
f("extract_regexp if (f2:abc foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "*", "f2,foo", "*", "")
f("extract_regexp if (f2:abc foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "*", "f2,foo", "*", "")
// unneeded fields intersect with all the output fields
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
f("extract_regexp if (a:b f2:q x:y foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
f("extract_regexp if (a:b f2:q x:y foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
f("extract_regexp if (a:b f2:q x:y foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
// needed fields do not intersect with pattern and output fields
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f1,f2", "", "f1,f2", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f1,f2", "", "f1,f2", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f1,f2", "", "f1,f2", "")
f("extract_regexp if (a:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f1,f2", "", "f1,f2", "")
f("extract_regexp if (f1:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f1,f2", "", "f1,f2", "")
// needed fields intersect with pattern field
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,x", "", "f2,x", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f2,x", "", "f2,x", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f2,x", "", "f2,x", "")
f("extract_regexp if (a:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,x", "", "f2,x", "")
// needed fields intersect with output fields
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo", "", "f2,x", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f2,foo", "", "foo,f2,x", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f2,foo", "", "foo,f2,x", "")
f("extract_regexp if (a:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo", "", "a,f2,x", "")
// needed fields intersect with pattern and output fields
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo,x,y", "", "f2,x,y", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f2,foo,x,y", "", "foo,f2,x,y", "")
f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "")
f("extract_regexp if (a:b foo:q) '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "")
}

View file

@ -353,31 +353,3 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) {
f("extract '<foo>x<bar>' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "")
f("extract if (a:b foo:q) '<foo>x<bar>' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "")
}
func expectParsePipeFailure(t *testing.T, pipeStr string) {
t.Helper()
lex := newLexer(pipeStr)
p, err := parsePipe(lex)
if err == nil && lex.isEnd() {
t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p)
}
}
func expectParsePipeSuccess(t *testing.T, pipeStr string) {
t.Helper()
lex := newLexer(pipeStr)
p, err := parsePipe(lex)
if err != nil {
t.Fatalf("cannot parse [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s)
}
pipeStrResult := p.String()
if pipeStrResult != pipeStr {
t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr)
}
}

View file

@ -0,0 +1,93 @@
package logstorage
import (
"fmt"
)
// pipeFieldValues processes '| field_values ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe
type pipeFieldValues struct {
field string
limit uint64
}
func (pf *pipeFieldValues) String() string {
s := "field_values " + quoteTokenIfNeeded(pf.field)
if pf.limit > 0 {
s += fmt.Sprintf(" limit %d", pf.limit)
}
return s
}
func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
neededFields.reset()
if !unneededFields.contains(pf.field) {
neededFields.add(pf.field)
}
unneededFields.reset()
} else {
neededFieldsOrig := neededFields.clone()
neededFields.reset()
if neededFieldsOrig.contains(pf.field) {
neededFields.add(pf.field)
}
}
}
func (pf *pipeFieldValues) optimize() {
// nothing to do
}
func (pf *pipeFieldValues) hasFilterInWithQuery() bool {
return false
}
func (pf *pipeFieldValues) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pf, nil
}
func (pf *pipeFieldValues) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
hitsFieldName := "hits"
if hitsFieldName == pf.field {
hitsFieldName = "hitss"
}
pu := &pipeUniq{
byFields: []string{pf.field},
hitsFieldName: hitsFieldName,
limit: pf.limit,
}
return pu.newPipeProcessor(workersCount, stopCh, cancel, ppNext)
}
func parsePipeFieldValues(lex *lexer) (*pipeFieldValues, error) {
if !lex.isKeyword("field_values") {
return nil, fmt.Errorf("expecting 'field_values'; got %q", lex.token)
}
lex.nextToken()
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name for 'field_values': %w", err)
}
limit := uint64(0)
if lex.isKeyword("limit") {
lex.nextToken()
n, ok := tryParseUint64(lex.token)
if !ok {
return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token)
}
lex.nextToken()
limit = n
}
pf := &pipeFieldValues{
field: field,
limit: limit,
}
return pf, nil
}

View file

@ -0,0 +1,148 @@
package logstorage
import (
"testing"
)
func TestParsePipeFieldValuesSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`field_values x`)
f(`field_values x limit 10`)
}
func TestParsePipeFieldValuesFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`field_values`)
f(`field_values a b`)
f(`field_values a limit`)
f(`field_values limit N`)
}
func TestPipeFieldValues(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("field_values a", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"hits", "3"},
},
})
f("field_values b", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"b", "3"},
{"hits", "2"},
},
{
{"b", "54"},
{"hits", "1"},
},
})
f("field_values c", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"c", ""},
{"hits", "2"},
},
{
{"c", "d"},
{"hits", "1"},
},
})
f("field_values d", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"d", ""},
{"hits", "3"},
},
})
}
func TestPipeFieldValuesUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("field_values x", "*", "", "x", "")
// all the needed fields, unneeded fields do not intersect with src
f("field_values x", "*", "f1,f2", "x", "")
// all the needed fields, unneeded fields intersect with src
f("field_values x", "*", "f1,x", "", "")
// needed fields do not intersect with src
f("field_values x", "f1,f2", "", "", "")
// needed fields intersect with src
f("field_values x", "f1,x", "", "x", "")
}

View file

@ -108,11 +108,13 @@ func (pfp *pipeFilterProcessor) flush() error {
return nil
}
func parsePipeFilter(lex *lexer) (*pipeFilter, error) {
if !lex.isKeyword("filter") {
return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token)
func parsePipeFilter(lex *lexer, needFilterKeyword bool) (*pipeFilter, error) {
if needFilterKeyword {
if !lex.isKeyword("filter") {
return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token)
}
lex.nextToken()
}
lex.nextToken()
f, err := parseFilter(lex)
if err != nil {

View file

@ -32,6 +32,14 @@ func TestPipeFilter(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// filter mismatch, missing 'filter' prefix
f("abc", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{})
// filter mismatch
f("filter abc", [][]Field{
{
@ -40,6 +48,19 @@ func TestPipeFilter(t *testing.T) {
},
}, [][]Field{})
// filter match, missing 'filter' prefix
f("foo", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
})
// filter match
f("filter foo", [][]Field{
{

View file

@ -4,6 +4,8 @@ import (
"fmt"
"strconv"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// pipeFormat processes '| format ...' pipe.
@ -169,8 +171,8 @@ func (pfp *pipeFormatProcessor) flush() error {
}
func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string {
bb := bbPool.Get()
b := bb.B
b := shard.a.b
bLen := len(b)
for _, step := range pf.steps {
b = append(b, step.prefix...)
if step.field != "" {
@ -183,11 +185,9 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult
}
}
}
bb.B = b
shard.a.b = b
v := shard.a.copyBytesToString(b)
bbPool.Put(bb)
return v
return bytesutil.ToUnsafeString(b[bLen:])
}
func parsePipeFormat(lex *lexer) (*pipeFormat, error) {

View file

@ -88,15 +88,20 @@ func parsePipeLimit(lex *lexer) (*pipeLimit, error) {
if !lex.isKeyword("limit", "head") {
return nil, fmt.Errorf("expecting 'limit' or 'head'; got %q", lex.token)
}
lex.nextToken()
lex.nextToken()
n, err := parseUint(lex.token)
if err != nil {
return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err)
limit := uint64(10)
if !lex.isKeyword("|", ")", "") {
n, err := parseUint(lex.token)
if err != nil {
return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err)
}
lex.nextToken()
limit = n
}
lex.nextToken()
pl := &pipeLimit{
limit: n,
limit: limit,
}
return pl, nil
}

View file

@ -20,7 +20,6 @@ func TestParsePipeLimitFailure(t *testing.T) {
expectParsePipeFailure(t, pipeStr)
}
f(`limit`)
f(`limit -10`)
f(`limit foo`)
}
@ -30,6 +29,17 @@ func TestPipeLimit(t *testing.T) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("limit", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
})
f("limit 100", [][]Field{
{

776
lib/logstorage/pipe_math.go Normal file
View file

@ -0,0 +1,776 @@
package logstorage
import (
"fmt"
"math"
"strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeMath processes '| math ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe
type pipeMath struct {
entries []*mathEntry
}
type mathEntry struct {
// The calculated expr result is stored in resultField.
resultField string
// expr is the expression to calculate.
expr *mathExpr
}
type mathExpr struct {
// if isConst is set, then the given mathExpr returns the given constValue.
isConst bool
constValue float64
// constValueStr is the original string representation of constValue.
//
// It is used in String() method for returning the original representation of the given constValue.
constValueStr string
// if fieldName isn't empty, then the given mathExpr fetches numeric values from the given fieldName.
fieldName string
// args are args for the given mathExpr.
args []*mathExpr
// op is the operation name (aka function name) for the given mathExpr.
op string
// f is the function for calculating results for the given mathExpr.
f mathFunc
// whether the mathExpr was wrapped in parens.
wrappedInParens bool
}
// mathFunc must fill result with calculated results based on the given args.
type mathFunc func(result []float64, args [][]float64)
func (pm *pipeMath) String() string {
s := "math"
a := make([]string, len(pm.entries))
for i, e := range pm.entries {
a[i] = e.String()
}
s += " " + strings.Join(a, ", ")
return s
}
func (me *mathEntry) String() string {
s := me.expr.String()
if isMathBinaryOp(me.expr.op) {
s = "(" + s + ")"
}
s += " as " + quoteTokenIfNeeded(me.resultField)
return s
}
func (me *mathExpr) String() string {
if me.isConst {
return me.constValueStr
}
if me.fieldName != "" {
return quoteTokenIfNeeded(me.fieldName)
}
args := me.args
if isMathBinaryOp(me.op) {
opPriority := getMathBinaryOpPriority(me.op)
left := me.args[0]
right := me.args[1]
leftStr := left.String()
rightStr := right.String()
if isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > opPriority {
leftStr = "(" + leftStr + ")"
}
if isMathBinaryOp(right.op) && getMathBinaryOpPriority(right.op) > opPriority {
rightStr = "(" + rightStr + ")"
}
return fmt.Sprintf("%s %s %s", leftStr, me.op, rightStr)
}
if me.op == "unary_minus" {
argStr := args[0].String()
if isMathBinaryOp(args[0].op) {
argStr = "(" + argStr + ")"
}
return "-" + argStr
}
a := make([]string, len(args))
for i, arg := range args {
a[i] = arg.String()
}
argsStr := strings.Join(a, ", ")
return fmt.Sprintf("%s(%s)", me.op, argsStr)
}
func isMathBinaryOp(op string) bool {
_, ok := mathBinaryOps[op]
return ok
}
func getMathBinaryOpPriority(op string) int {
bo, ok := mathBinaryOps[op]
if !ok {
logger.Panicf("BUG: unexpected binary op: %q", op)
}
return bo.priority
}
func getMathFuncForBinaryOp(op string) (mathFunc, error) {
bo, ok := mathBinaryOps[op]
if !ok {
return nil, fmt.Errorf("unsupported binary operation: %q", op)
}
return bo.f, nil
}
var mathBinaryOps = map[string]mathBinaryOp{
"^": {
priority: 1,
f: mathFuncPow,
},
"*": {
priority: 2,
f: mathFuncMul,
},
"/": {
priority: 2,
f: mathFuncDiv,
},
"%": {
priority: 2,
f: mathFuncMod,
},
"+": {
priority: 3,
f: mathFuncPlus,
},
"-": {
priority: 3,
f: mathFuncMinus,
},
}
type mathBinaryOp struct {
priority int
f mathFunc
}
func (pm *pipeMath) updateNeededFields(neededFields, unneededFields fieldsSet) {
for i := len(pm.entries) - 1; i >= 0; i-- {
e := pm.entries[i]
if neededFields.contains("*") {
if !unneededFields.contains(e.resultField) {
unneededFields.add(e.resultField)
entryFields := e.getNeededFields()
unneededFields.removeFields(entryFields)
}
} else {
if neededFields.contains(e.resultField) {
neededFields.remove(e.resultField)
entryFields := e.getNeededFields()
neededFields.addFields(entryFields)
}
}
}
}
func (me *mathEntry) getNeededFields() []string {
neededFields := newFieldsSet()
me.expr.updateNeededFields(neededFields)
return neededFields.getAll()
}
func (me *mathExpr) updateNeededFields(neededFields fieldsSet) {
if me.isConst {
return
}
if me.fieldName != "" {
neededFields.add(me.fieldName)
return
}
for _, arg := range me.args {
arg.updateNeededFields(neededFields)
}
}
func (pm *pipeMath) optimize() {
// nothing to do
}
func (pm *pipeMath) hasFilterInWithQuery() bool {
return false
}
func (pm *pipeMath) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pm, nil
}
func (pm *pipeMath) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
pmp := &pipeMathProcessor{
pm: pm,
ppNext: ppNext,
shards: make([]pipeMathProcessorShard, workersCount),
}
return pmp
}
type pipeMathProcessor struct {
pm *pipeMath
ppNext pipeProcessor
shards []pipeMathProcessorShard
}
type pipeMathProcessorShard struct {
pipeMathProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeMathProcessorShardNopad{})%128]byte
}
type pipeMathProcessorShardNopad struct {
// a holds all the data for rcs.
a arena
// rcs is used for storing calculated results before they are written to ppNext.
rcs []resultColumn
// rs is storage for temporary results
rs [][]float64
// rsBuf is backing storage for rs slices
rsBuf []float64
}
func (shard *pipeMathProcessorShard) executeMathEntry(e *mathEntry, rc *resultColumn, br *blockResult) {
clear(shard.rs)
shard.rs = shard.rs[:0]
shard.rsBuf = shard.rsBuf[:0]
shard.executeExpr(e.expr, br)
r := shard.rs[0]
b := shard.a.b
for _, f := range r {
bLen := len(b)
b = marshalFloat64String(b, f)
v := bytesutil.ToUnsafeString(b[bLen:])
rc.addValue(v)
}
shard.a.b = b
}
func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) {
rIdx := len(shard.rs)
shard.rs = slicesutil.SetLength(shard.rs, len(shard.rs)+1)
shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+len(br.timestamps))
shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-len(br.timestamps):]
if me.isConst {
r := shard.rs[rIdx]
for i := range br.timestamps {
r[i] = me.constValue
}
return
}
if me.fieldName != "" {
c := br.getColumnByName(me.fieldName)
values := c.getValues(br)
r := shard.rs[rIdx]
var f float64
for i, v := range values {
if i == 0 || v != values[i-1] {
var ok bool
f, ok = tryParseFloat64(v)
if !ok {
f = nan
}
}
r[i] = f
}
return
}
rsBufLen := len(shard.rsBuf)
for _, arg := range me.args {
shard.executeExpr(arg, br)
}
result := shard.rs[rIdx]
args := shard.rs[rIdx+1:]
me.f(result, args)
shard.rs = shard.rs[:rIdx+1]
shard.rsBuf = shard.rsBuf[:rsBufLen]
}
func (pmp *pipeMathProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pmp.shards[workerID]
entries := pmp.pm.entries
shard.rcs = slicesutil.SetLength(shard.rcs, len(entries))
rcs := shard.rcs
for i, e := range entries {
rc := &rcs[i]
rc.name = e.resultField
shard.executeMathEntry(e, rc, br)
br.addResultColumn(rc)
}
pmp.ppNext.writeBlock(workerID, br)
for i := range rcs {
rcs[i].resetValues()
}
shard.a.reset()
}
func (pmp *pipeMathProcessor) flush() error {
return nil
}
func parsePipeMath(lex *lexer) (*pipeMath, error) {
if !lex.isKeyword("math") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "math")
}
lex.nextToken()
var mes []*mathEntry
for {
me, err := parseMathEntry(lex)
if err != nil {
return nil, err
}
mes = append(mes, me)
switch {
case lex.isKeyword(","):
lex.nextToken()
case lex.isKeyword("|", ")", ""):
if len(mes) == 0 {
return nil, fmt.Errorf("missing 'math' expressions")
}
pm := &pipeMath{
entries: mes,
}
return pm, nil
default:
return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|' or ')'", mes[len(mes)-1], lex.token)
}
}
}
func parseMathEntry(lex *lexer) (*mathEntry, error) {
me, err := parseMathExpr(lex)
if err != nil {
return nil, err
}
// skip optional 'as'
if lex.isKeyword("as") {
lex.nextToken()
}
resultField, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for [%s]: %w", me, err)
}
e := &mathEntry{
resultField: resultField,
expr: me,
}
return e, nil
}
func parseMathExpr(lex *lexer) (*mathExpr, error) {
// parse left operand
left, err := parseMathExprOperand(lex)
if err != nil {
return nil, err
}
for {
if !isMathBinaryOp(lex.token) {
// There is no right operand
return left, nil
}
// parse operator
op := lex.token
lex.nextToken()
f, err := getMathFuncForBinaryOp(op)
if err != nil {
return nil, fmt.Errorf("cannot parse operator after [%s]: %w", left, err)
}
// parse right operand
right, err := parseMathExprOperand(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse operand after [%s %s]: %w", left, op, err)
}
me := &mathExpr{
args: []*mathExpr{left, right},
op: op,
f: f,
}
// balance operands according to their priority
if !left.wrappedInParens && isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > getMathBinaryOpPriority(op) {
me.args[0] = left.args[1]
left.args[1] = me
me = left
}
left = me
}
}
func parseMathExprInParens(lex *lexer) (*mathExpr, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing '('")
}
lex.nextToken()
me, err := parseMathExpr(lex)
if err != nil {
return nil, err
}
me.wrappedInParens = true
if !lex.isKeyword(")") {
return nil, fmt.Errorf("missing ')'; got %q instead", lex.token)
}
lex.nextToken()
return me, nil
}
func parseMathExprOperand(lex *lexer) (*mathExpr, error) {
if lex.isKeyword("(") {
return parseMathExprInParens(lex)
}
switch {
case lex.isKeyword("abs"):
return parseMathExprAbs(lex)
case lex.isKeyword("max"):
return parseMathExprMax(lex)
case lex.isKeyword("min"):
return parseMathExprMin(lex)
case lex.isKeyword("round"):
return parseMathExprRound(lex)
case lex.isKeyword("-"):
return parseMathExprUnaryMinus(lex)
case lex.isKeyword("+"):
// just skip unary plus
lex.nextToken()
return parseMathExprOperand(lex)
case lex.isNumber():
return parseMathExprConstNumber(lex)
default:
return parseMathExprFieldName(lex)
}
}
func parseMathExprAbs(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "abs", mathFuncAbs)
if err != nil {
return nil, err
}
if len(me.args) != 1 {
return nil, fmt.Errorf("'abs' function accepts only one arg; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprMax(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "max", mathFuncMax)
if err != nil {
return nil, err
}
if len(me.args) < 2 {
return nil, fmt.Errorf("'max' function needs at least 2 args; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprMin(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "min", mathFuncMin)
if err != nil {
return nil, err
}
if len(me.args) < 2 {
return nil, fmt.Errorf("'min' function needs at least 2 args; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprRound(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "round", mathFuncRound)
if err != nil {
return nil, err
}
if len(me.args) != 1 && len(me.args) != 2 {
return nil, fmt.Errorf("'round' function needs 1 or 2 args; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprGenericFunc(lex *lexer, funcName string, f mathFunc) (*mathExpr, error) {
if !lex.isKeyword(funcName) {
return nil, fmt.Errorf("missing %q keyword", funcName)
}
lex.nextToken()
args, err := parseMathFuncArgs(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse args for %q function: %w", funcName, err)
}
if len(args) == 0 {
return nil, fmt.Errorf("%q function needs at least one org", funcName)
}
me := &mathExpr{
args: args,
op: funcName,
f: f,
}
return me, nil
}
func parseMathFuncArgs(lex *lexer) ([]*mathExpr, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing '('")
}
lex.nextToken()
var args []*mathExpr
for {
if lex.isKeyword(")") {
lex.nextToken()
return args, nil
}
me, err := parseMathExpr(lex)
if err != nil {
return nil, err
}
args = append(args, me)
switch {
case lex.isKeyword(")"):
case lex.isKeyword(","):
lex.nextToken()
default:
return nil, fmt.Errorf("unexpected token after [%s]: %q; want ',' or ')'", me, lex.token)
}
}
}
func parseMathExprUnaryMinus(lex *lexer) (*mathExpr, error) {
if !lex.isKeyword("-") {
return nil, fmt.Errorf("missing '-'")
}
lex.nextToken()
expr, err := parseMathExprOperand(lex)
if err != nil {
return nil, err
}
me := &mathExpr{
args: []*mathExpr{expr},
op: "unary_minus",
f: mathFuncUnaryMinus,
}
return me, nil
}
func parseMathExprConstNumber(lex *lexer) (*mathExpr, error) {
if !lex.isNumber() {
return nil, fmt.Errorf("cannot parse number from %q", lex.token)
}
numStr, err := getCompoundMathToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse number: %w", err)
}
f, ok := tryParseNumber(numStr)
if !ok {
return nil, fmt.Errorf("cannot parse number from %q", numStr)
}
me := &mathExpr{
isConst: true,
constValue: f,
constValueStr: numStr,
}
return me, nil
}
func parseMathExprFieldName(lex *lexer) (*mathExpr, error) {
fieldName, err := getCompoundMathToken(lex)
if err != nil {
return nil, err
}
fieldName = getCanonicalColumnName(fieldName)
me := &mathExpr{
fieldName: fieldName,
}
return me, nil
}
func getCompoundMathToken(lex *lexer) (string, error) {
stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", ""}
if lex.isKeyword(stopTokens...) {
return "", fmt.Errorf("compound token cannot start with '%s'", lex.token)
}
s := lex.token
rawS := lex.rawToken
lex.nextToken()
suffix := ""
for !lex.isSkippedSpace && !lex.isKeyword(stopTokens...) {
s += lex.token
lex.nextToken()
}
if suffix == "" {
return s, nil
}
return rawS + suffix, nil
}
func mathFuncPlus(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] + b[i]
}
}
func mathFuncMinus(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] - b[i]
}
}
func mathFuncMul(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] * b[i]
}
}
func mathFuncDiv(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = a[i] / b[i]
}
}
func mathFuncMod(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = math.Mod(a[i], b[i])
}
}
func mathFuncPow(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
result[i] = math.Pow(a[i], b[i])
}
}
func mathFuncAbs(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = math.Abs(arg[i])
}
}
func mathFuncUnaryMinus(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = -arg[i]
}
}
func mathFuncMax(result []float64, args [][]float64) {
for i := range result {
f := nan
for _, arg := range args {
if math.IsNaN(f) || arg[i] > f {
f = arg[i]
}
}
result[i] = f
}
}
func mathFuncMin(result []float64, args [][]float64) {
for i := range result {
f := nan
for _, arg := range args {
if math.IsNaN(f) || arg[i] < f {
f = arg[i]
}
}
result[i] = f
}
}
func mathFuncRound(result []float64, args [][]float64) {
arg := args[0]
if len(args) == 1 {
// Round to integer
for i := range result {
result[i] = math.Round(arg[i])
}
return
}
// Round to nearest
nearest := args[1]
var f float64
for i := range result {
if i == 0 || arg[i-1] != arg[i] || nearest[i-1] != nearest[i] {
f = round(arg[i], nearest[i])
}
result[i] = f
}
}
func round(f, nearest float64) float64 {
_, e := decimal.FromFloat(nearest)
p10 := math.Pow10(int(-e))
f += 0.5 * math.Copysign(nearest, f)
f -= math.Mod(f, nearest)
f, _ = math.Modf(f * p10)
return f / p10
}

View file

@ -0,0 +1,233 @@
package logstorage
import (
"testing"
)
func TestParsePipeMathSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`math b as a`)
f(`math -123 as a`)
f(`math 12.345KB as a`)
f(`math (-2 + 2) as a`)
f(`math x as a, z as y`)
f(`math (foo / bar + baz * abc % -45ms) as a`)
f(`math (foo / (bar + baz) * abc ^ 2) as a`)
f(`math (foo / ((bar + baz) * abc) ^ -2) as a`)
f(`math (foo + bar / baz - abc) as a`)
f(`math min(3, foo, (1 + bar) / baz) as a, max(a, b) as b, (abs(c) + 5) as d`)
f(`math round(foo) as x`)
f(`math round(foo, 0.1) as y`)
}
func TestParsePipeMathFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`math`)
f(`math x`)
f(`math x as`)
f(`math abs() as x`)
f(`math abs(a, b) as x`)
f(`math min() as x`)
f(`math min(a) as x`)
f(`math max() as x`)
f(`math max(a) as x`)
f(`math round() as x`)
f(`math round(a, b, c) as x`)
}
func TestPipeMath(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("math b+1 as a, a*2 as b, b-10.5+c as c", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "3"},
{"b", "6"},
{"c", "-1.5"},
},
})
f("math 1 as a", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "1"},
{"b", "2"},
{"c", "3"},
},
})
f("math 10 * 5 - 3 a", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "47"},
{"b", "2"},
{"c", "3"},
},
})
f("math -1.5K as a", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "-1500"},
{"b", "2"},
{"c", "3"},
},
})
f("math b as a", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "2"},
{"c", "3"},
},
})
f("math a as a", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "NaN"},
{"b", "2"},
{"c", "3"},
},
})
f("math 2*c + b as x", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
{"x", "8"},
},
})
f("math abs(-min(a,b)) as min, round(max(40*b/30,c)) as max", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
}, [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
{"min", "2"},
{"max", "3"},
},
})
f("math round((2*c + (b%c))/(c-b)^(b-1), -0.001) as a", [][]Field{
{
{"a", "v"},
{"b", "2"},
{"c", "3"},
},
{
{"a", "x"},
{"b", "3"},
{"c", "5"},
},
{
{"b", "3"},
{"c", "6"},
},
}, [][]Field{
{
{"a", "8"},
{"b", "2"},
{"c", "3"},
},
{
{"a", "3.25"},
{"b", "3"},
{"c", "5"},
},
{
{"a", "1.667"},
{"b", "3"},
{"c", "6"},
},
})
}
func TestPipeMathUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("math (x + 1) as y", "*", "", "*", "y")
// all the needed fields, unneeded fields do not intersect with src and dst
f("math (x + 1) as y", "*", "f1,f2", "*", "f1,f2,y")
// all the needed fields, unneeded fields intersect with src
f("math (x + 1) as y", "*", "f1,x", "*", "f1,y")
// all the needed fields, unneeded fields intersect with dst
f("math (x + 1) as y", "*", "f1,y", "*", "f1,y")
// all the needed fields, unneeded fields intersect with src and dst
f("math (x + 1) as y", "*", "f1,x,y", "*", "f1,x,y")
// needed fields do not intersect with src and dst
f("math (x + 1) as y", "f1,f2", "", "f1,f2", "")
// needed fields intersect with src
f("math (x + 1) as y", "f1,x", "", "f1,x", "")
// needed fields intersect with dst
f("math (x + 1) as y", "f1,y", "", "f1,x", "")
// needed fields intersect with src and dst
f("math (x + 1) as y", "f1,x,y", "", "f1,x", "")
}

View file

@ -3,6 +3,8 @@ package logstorage
import (
"fmt"
"strings"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// pipeReplace processes '| replace ...' pipe.
@ -59,11 +61,9 @@ func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldVal
func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
updateFunc := func(a *arena, v string) string {
bb := bbPool.Get()
bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit)
result := a.copyBytesToString(bb.B)
bbPool.Put(bb)
return result
bLen := len(a.b)
a.b = appendReplace(a.b, v, pr.oldSubstr, pr.newSubstr, pr.limit)
return bytesutil.ToUnsafeString(a.b[bLen:])
}
return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)

View file

@ -3,6 +3,8 @@ package logstorage
import (
"fmt"
"regexp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// pipeReplaceRegexp processes '| replace_regexp ...' pipe.
@ -59,11 +61,9 @@ func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFi
func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
updateFunc := func(a *arena, v string) string {
bb := bbPool.Get()
bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit)
result := a.copyBytesToString(bb.B)
bbPool.Put(bb)
return result
bLen := len(a.b)
a.b = appendReplaceRegexp(a.b, v, pr.re, pr.replacement, pr.limit)
return bytesutil.ToUnsafeString(a.b[bLen:])
}
return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)

View file

@ -537,13 +537,14 @@ func (psp *pipeStatsProcessor) flush() error {
return nil
}
func parsePipeStats(lex *lexer) (*pipeStats, error) {
if !lex.isKeyword("stats") {
return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token)
func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
if needStatsKeyword {
if !lex.isKeyword("stats") {
return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token)
}
lex.nextToken()
}
lex.nextToken()
var ps pipeStats
if lex.isKeyword("by", "(") {
if lex.isKeyword("by") {

View file

@ -39,6 +39,70 @@ func TestPipeStats(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// missing 'stats' keyword
f("count(*) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "3"},
},
})
// missing 'stats' keyword
f("count() as rows, count() if (a:2) rows2", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "3"},
{"rows2", "2"},
},
})
f("stats count() as rows, count() if (a:2) rows2", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "3"},
{"rows2", "2"},
},
})
f("stats count(*) as rows", [][]Field{
{
{"_msg", `abc`},
@ -141,6 +205,32 @@ func TestPipeStats(t *testing.T) {
},
})
// missing 'stats' keyword
f("by (a) count(*) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"a", "1"},
{"rows", "1"},
},
{
{"a", "2"},
{"rows", "2"},
},
})
f("stats by (a) count(*) as rows", [][]Field{
{
{"_msg", `abc`},

View file

@ -80,10 +80,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) {
nameCopy := ""
fieldPrefix := uctx.fieldPrefix
if fieldPrefix != "" {
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
copy(nameBuf, fieldPrefix)
copy(nameBuf[len(fieldPrefix):], name)
nameCopy = bytesutil.ToUnsafeString(nameBuf)
b := uctx.a.b
bLen := len(b)
b = append(b, fieldPrefix...)
b = append(b, name...)
uctx.a.b = b
nameCopy = bytesutil.ToUnsafeString(b[bLen:])
} else {
nameCopy = uctx.a.copyString(name)
}

View file

@ -8,6 +8,34 @@ import (
"testing"
)
func expectParsePipeFailure(t *testing.T, pipeStr string) {
t.Helper()
lex := newLexer(pipeStr)
p, err := parsePipe(lex)
if err == nil && lex.isEnd() {
t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p)
}
}
func expectParsePipeSuccess(t *testing.T, pipeStr string) {
t.Helper()
lex := newLexer(pipeStr)
p, err := parsePipe(lex)
if err != nil {
t.Fatalf("cannot parse [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s)
}
pipeStrResult := p.String()
if pipeStrResult != pipeStr {
t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr)
}
}
func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()

View file

@ -229,12 +229,12 @@ func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID
func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) {
pipes := append([]pipe{}, q.pipes...)
quotedFieldName := quoteTokenIfNeeded(fieldName)
pipeStr := fmt.Sprintf("uniq by (%s) with hits limit %d", quotedFieldName, limit)
pipeStr := fmt.Sprintf("field_values %s limit %d", quotedFieldName, limit)
lex := newLexer(pipeStr)
pu, err := parsePipeUniq(lex)
pu, err := parsePipeFieldValues(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err)
logger.Panicf("BUG: unexpected error when parsing 'field_values' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {

View file

@ -3,6 +3,7 @@ package logstorage
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
@ -311,6 +312,157 @@ func TestStorageRunQuery(t *testing.T) {
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
})
t.Run("field_names-all", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{"_msg", 1155},
{"_stream", 1155},
{"_time", 1155},
{"instance", 1155},
{"job", 1155},
{"source-file", 1155},
{"stream-id", 1155},
{"tenant.id", 1155},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
t.Run("field_names-some", func(t *testing.T) {
q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`)
names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{"_msg", 385},
{"_stream", 385},
{"_time", 385},
{"instance", 385},
{"job", 385},
{"source-file", 385},
{"stream-id", 385},
{"tenant.id", 385},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
t.Run("field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 0},
{`{instance="host-1:234",job="foobar"}`, 0},
{`{instance="host-2:234",job="foobar"}`, 0},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("instance:='host-1:234'")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-1:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("stream_field_names", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{"instance", 1155},
{"job", 1155},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
t.Run("stream_field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("stream_field_values-limit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
}
})
t.Run("streams", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetStreams(context.Background(), allTenantIDs, q, 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
}
})
// Run more complex tests
f := func(t *testing.T, query string, rowsExpected [][]Field) {