lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-05-30 16:19:23 +02:00
parent 8b6a31d8c6
commit b30e80b071
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
30 changed files with 988 additions and 319 deletions

View file

@ -19,6 +19,18 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
## [v0.15.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.15.0-victorialogs)
Released at 2024-05-30
* FEATURE: add [`row_any`](https://docs.victoriametrics.com/victorialogs/logsql/#row_any-stats) function for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). This function returns a sample log entry per every calculated [group of results](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields).
* FEATURE: add `default` operator to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). It allows overriding `NaN` results with the given default value.
* FEATURE: add `exp()` and `ln()` functions to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
* FEATURE: allow omitting result name in [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) expresions. In this case the result name is automatically set to string representation of the corresponding math expression. For example, `_time:5m | math duration / 1000` is equivalent to `_time:5m | math (duration / 1000) as "duration / 1000"`.
* FEATURE: allow omitting result name in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). In this case the result name is automatically set to string representation of the corresponding [stats function expression](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). For example, `_time:5m | count(*)` is valid [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) now. It is equivalent to `_time:5m | stats count(*) as "count(*)"`.
* BUGFIX: properly calculate the number of matching rows in `* | field_values x | stats count() rows` and in `* | unroll (x) | stats count() rows` queries.
## [v0.14.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.14.0-victorialogs)
Released at 2024-05-29
@ -103,7 +115,7 @@ Released at 2024-05-22
* FEATURE: add ability to extract fields with [`extract` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-extract).
* FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json).
* FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt).
* FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field.
* FEATURE: add [`row_min`](https://docs.victoriametrics.com/victorialogs/logsql/#row_min-stats) and [`row_max`](https://docs.victoriametrics.com/victorialogs/logsql/#row_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field.
* FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details.
* FEATURE: add `/select/logsql/stream_field_names` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-names) for details.
* FEATURE: add `/select/logsql/stream_field_values` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values for the given label from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-field-values) for details.

View file

@ -1594,6 +1594,19 @@ See also:
### math pipe
`| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
It has the following format:
```
| math
expr1 as resultName1,
...
exprN as resultNameN
```
Where `exprX` is one of the supported math expressions mentioned below, while `resultNameX` is the name of the field to store the calculated result to.
The `as` keyword is optional. The result name can be omitted. In this case the result is stored to a field with the name equal to string represenation
of the corresponding math expression.
For example, the following query divides `duration_msecs` field value by 1000, then rounds it to integer and stores the result in the `duration_secs` field:
```logsql
@ -1608,7 +1621,10 @@ The following mathematical operations are supported by `math` pipe:
- `arg1 / arg2` - divides `arg1` by `arg2`
- `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2`
- `arg1 ^ arg2` - returns the power of `arg1` by `arg2`
- `arg1 default arg2` - returns `arg2` if `arg1` equals to `NaN`.
- `abs(arg)` - returns an absolute value for the given `arg`
- `exp(arg)` - powers [`e`](https://en.wikipedia.org/wiki/E_(mathematical_constant)) by `arg`.
- `ln(arg)` - returns [natural logarightm](https://en.wikipedia.org/wiki/Natural_logarithm) for the given `arg`.
- `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN`
- `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN`
- `round(arg)` - returns rounded to integer value for the given `arg`. The `round()` accepts optional `nearest` arg, which allows rounding the number to the given `nearest` multiple.
@ -1617,18 +1633,10 @@ The following mathematical operations are supported by `math` pipe:
Every `argX` argument in every mathematical operation can contain one of the following values:
- The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`.
If the log field contains value, which cannot be parsed into [supported numeric value](#numeric-values), then it is replaced with `NaN`.
- Any [supported numeric value](#numeric-values). For example, `response_size_bytes / 1MiB`.
- Another mathematical expression. Optionally, it may be put inside `(...)`. For example, `(a + b) * c`.
Multiple distinct results can be calculated in a single `math ...` pipe - just separate them with `,`. For example, the following query calculates the error rate
and the number of successful requests from `errors`, `warnings` and `requests` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
```logsql
_time:5m | math
(errors / requests) as error_rate,
(requests - (errors + warnings)) as success_requests
```
See also:
- [`stats` pipe](#stats-pipe)
@ -1885,7 +1893,7 @@ See also:
uses [`count` stats function](#count-stats) for calculating the number of logs for the last 5 minutes:
```logsql
_time:5m | stats count() logs_total
_time:5m | stats count() as logs_total
```
`| stats ...` pipe has the following basic format:
@ -1909,12 +1917,19 @@ For example, the following query calculates the following stats for logs over th
_time:5m | stats count() logs_total, count_uniq(_stream) streams_total
```
It is allowed to omit `stats` prefix for convenience. So the following query is equivalent to the previous one:
It is allowed omitting `stats` prefix for convenience. So the following query is equivalent to the previous one:
```logsql
_time:5m | count() logs_total, count_uniq(_stream) streams_total
```
It is allowed omitting the result name. In this case the result name equals to the string representation of the used [stats function](#stats-pipe-functions).
For example, the following query returns the same stats as the previous one, but gives uses `count()` and `count_uniq(_stream)` names for the returned fields:
```logsql
_time:5m | count(), count_uniq(_stream)
```
See also:
- [stats by fields](#stats-by-fields)
@ -1954,6 +1969,12 @@ The `by` keyword can be skipped in `stats ...` pipe. For example, the following
_time:5m | stats (host, path) count() logs_total, count_uniq(ip) ips_total
```
See also:
- [`row_min`](#row_min-stats)
- [`row_max`](#row_max-stats)
- [`row_any`](#row_any-stats)
#### Stats by time buckets
The following syntax can be used for calculating stats grouped by time buckets:
@ -2284,12 +2305,13 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
- [`count`](#count-stats) returns the number of log entries.
- [`count_empty`](#count_empty-stats) returns the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`count_uniq`](#count_uniq-stats) returns the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`fields_max`](#fields_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field.
- [`fields_min`](#fields_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field.
- [`max`](#max-stats) returns the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`median`](#median-stats) returns the [median](https://en.wikipedia.org/wiki/Median) value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`min`](#min-stats) returns the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`quantile`](#quantile-stats) returns the given quantile for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`row_any`](#row_any-stats) returns a sample [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) per each selected [stats group](#stats-by-fields).
- [`row_max`](#row_max-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the minimum value at the given field.
- [`row_min`](#row_min-stats) returns the [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the maximum value at the given field.
- [`sum`](#sum-stats) returns the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`sum_len`](#sum_len-stats) returns the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -2397,59 +2419,6 @@ See also:
- [`uniq_values`](#uniq_values-stats)
- [`count`](#count-stats)
### fields_max stats
`fields_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats fields_max(duration) as log_with_max_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `fields_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes:
```logsql
_time:5m | stats fields_max(duration, _time, path, duration) as time_and_ip_with_max_duration
```
See also:
- [`max`](#max-stats)
- [`fields_min`](#fields_min-stats)
### fields_min stats
`fields_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats fields_min(duration) as log_with_min_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `fields_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes:
```logsql
_time:5m | stats fields_min(duration, _time, path, duration) as time_and_ip_with_min_duration
```
See also:
- [`min`](#min-stats)
- [`fields_max`](#fields_max-stats)
### max stats
`max(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) returns the maximum value across
@ -2462,11 +2431,11 @@ over logs for the last 5 minutes:
_time:5m | stats max(duration) max_duration
```
[`fields_max`](#fields_max-stats) function can be used for obtaining other fields with the maximum duration.
[`row_max`](#row_max-stats) function can be used for obtaining other fields with the maximum duration.
See also:
- [`fields_max`](#fields_max-stats)
- [`row_max`](#row_max-stats)
- [`min`](#min-stats)
- [`quantile`](#quantile-stats)
- [`avg`](#avg-stats)
@ -2500,11 +2469,11 @@ over logs for the last 5 minutes:
_time:5m | stats min(duration) min_duration
```
[`fields_min`](#fields_min-stats) function can be used for obtaining other fields with the minimum duration.
[`row_min`](#row_min-stats) function can be used for obtaining other fields with the minimum duration.
See also:
- [`fields_min`](#fields_min-stats)
- [`row_min`](#row_min-stats)
- [`max`](#max-stats)
- [`quantile`](#quantile-stats)
- [`avg`](#avg-stats)
@ -2532,6 +2501,86 @@ See also:
- [`median`](#median-stats)
- [`avg`](#avg-stats)
### row_any stats
`row_any()` [stats pipe function](#stats-pipe-functions) returns arbitrary [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
(aka sample) per each selected [stats group](#stats-by-fields). Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns a sample log entry per each [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
across logs for the last 5 minutes:
```logsql
_time:5m | stats by (_stream) row_any() as sample_row
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed, then they can be enumerated inside `row_any(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from a sample log entry for logs over the last 5 minutes:
```logsql
_time:5m | stats row_any(_time, path) as time_and_path_sample
```
See also:
- [`row_max`](#row_max-stats)
- [`row_min`](#row_min-stats)
### row_max stats
`row_max(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the maximum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats row_max(duration) as log_with_max_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the maximum `duration` over the last 5 minutes:
```logsql
_time:5m | stats row_max(duration, _time, path, duration) as time_and_path_with_max_duration
```
See also:
- [`max`](#max-stats)
- [`row_min`](#row_min-stats)
- [`row_any`](#row_any-stats)
### row_min stats
`row_min(field)` [stats pipe function](#stats-pipe-functions) returns [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
with the minimum value for the given `field`. Log entry is returned as JSON-encoded dictionary with all the fields from the original log.
For example, the following query returns log entry with the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs for the last 5 minutes:
```logsql
_time:5m | stats row_min(duration) as log_with_min_duration
```
Fields from the returned values can be decoded with [`unpack_json`](#unpack_json-pipe) or [`extract`](#extract-pipe) pipes.
If only the specific fields are needed from the returned log entry, then they can be enumerated inside `row_max(...)`.
For example, the following query returns only `_time`, `path` and `duration` fields from the log entry with the minimum `duration` over the last 5 minutes:
```logsql
_time:5m | stats row_min(duration, _time, path, duration) as time_and_path_with_min_duration
```
See also:
- [`min`](#min-stats)
- [`row_max`](#row_max-stats)
- [`row_any`](#row_any-stats)
### sum stats
`sum(field1, ..., fieldN)` [stats pipe function](#stats-pipe-functions) calculates the sum of numeric values across

View file

@ -17,7 +17,7 @@ aliases:
[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) works with both structured and unstructured logs.
Every log entry must contain at least [log message field](#message-field) plus arbitrary number of additional `key=value` fields.
A single log entry can be expressed as a single-level [JSON](https://www.json.org/json-en.html) object with string keys and values.
A single log entry can be expressed as a single-level [JSON](https://www.json.org/json-en.html) object with string keys and string values.
For example:
```json
@ -31,6 +31,30 @@ For example:
}
```
Empty values are treated the same as non-existing values. For example, the following log entries are equivalent,
since they have only one identical non-empty field - [`_msg`](#message-field):
```json
{
"_msg": "foo bar",
"some_field": "",
"another_field": ""
}
```
```json
{
"_msg": "foo bar",
"third_field": "",
}
```
```json
{
"_msg": "foo bar",
}
```
VictoriaLogs automatically transforms multi-level JSON (aka nested JSON) into single-level JSON
during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/) according to the following rules:

View file

@ -28,6 +28,10 @@ func (fs fieldsSet) clone() fieldsSet {
return fsNew
}
func (fs fieldsSet) isEmpty() bool {
return len(fs) == 0
}
func (fs fieldsSet) getAll() []string {
a := make([]string, 0, len(fs))
for f := range fs {

View file

@ -117,35 +117,13 @@ func (fa *filterAnd) initByFieldTokens() {
byFieldFilters := make(map[string]int)
var fieldNames []string
for _, f := range fa.filters {
fieldName := ""
var tokens []string
switch t := f.(type) {
case *filterExact:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterExactPrefix:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPhrase:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPrefix:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterRegexp:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterSequence:
fieldName = t.fieldName
tokens = t.getTokens()
}
mergeFieldTokens := func(fieldName string, tokens []string) {
fieldName = getCanonicalColumnName(fieldName)
byFieldFilters[fieldName]++
if len(tokens) == 0 {
return
}
if len(tokens) > 0 {
mTokens, ok := m[fieldName]
if !ok {
fieldNames = append(fieldNames, fieldName)
@ -156,6 +134,33 @@ func (fa *filterAnd) initByFieldTokens() {
mTokens[token] = struct{}{}
}
}
for _, f := range fa.filters {
switch t := f.(type) {
case *filterExact:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterExactPrefix:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterPhrase:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterPrefix:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterRegexp:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterSequence:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterOr:
bfts := t.getByFieldTokens()
for _, bft := range bfts {
mergeFieldTokens(bft.field, bft.tokens)
}
}
}
var byFieldTokens []fieldTokens

View file

@ -426,10 +426,12 @@ func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[str
func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, commonTokens []string, tokenSets [][]string) bool {
if len(commonTokens) > 0 {
return matchBloomFilterAllTokens(bs, ch, commonTokens)
if !matchBloomFilterAllTokens(bs, ch, commonTokens) {
return false
}
}
if len(tokenSets) == 0 {
return false
return len(commonTokens) > 0
}
if len(tokenSets) > maxTokenSetsToInit || uint64(len(tokenSets)) > 10*bs.bsw.bh.rowsCount {
// It is faster to match every row in the block against all the values
@ -471,7 +473,22 @@ func getCommonTokensAndTokenSets(values []string) ([]string, [][]string) {
if len(commonTokens) == 0 {
return nil, tokenSets
}
// remove commonTokens from tokenSets
for i, tokens := range tokenSets {
dstTokens := tokens[:0]
for _, token := range tokens {
if !slices.Contains(commonTokens, token) {
dstTokens = append(dstTokens, token)
}
}
if len(dstTokens) == 0 {
return commonTokens, nil
}
tokenSets[i] = dstTokens
}
return commonTokens, tokenSets
}
func getCommonTokens(tokenSets [][]string) []string {

View file

@ -716,6 +716,6 @@ func TestGetCommonTokensAndTokenSets(t *testing.T) {
f([]string{"foo", "foo"}, []string{"foo"}, nil)
f([]string{"foo", "bar", "bar", "foo"}, nil, [][]string{{"foo"}, {"bar"}, {"bar"}, {"foo"}})
f([]string{"foo", "foo bar", "bar foo"}, []string{"foo"}, nil)
f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, nil)
f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, [][]string{{"a"}, {"abc"}, {"a", "abc"}})
f([]string{"a xfoo bar", "xbar abc foo", "foo abc a bar"}, nil, [][]string{{"a", "bar", "xfoo"}, {"abc", "foo", "xbar"}, {"a", "abc", "bar", "foo"}})
}

View file

@ -130,40 +130,45 @@ func (fo *filterOr) initByFieldTokens() {
byFieldFilters := make(map[string]int)
var fieldNames []string
for _, f := range fo.filters {
fieldName := ""
var tokens []string
switch t := f.(type) {
case *filterExact:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterExactPrefix:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPhrase:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterPrefix:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterRegexp:
fieldName = t.fieldName
tokens = t.getTokens()
case *filterSequence:
fieldName = t.fieldName
tokens = t.getTokens()
}
mergeFieldTokens := func(fieldName string, tokens []string) {
fieldName = getCanonicalColumnName(fieldName)
byFieldFilters[fieldName]++
if len(tokens) == 0 {
return
}
if len(tokens) > 0 {
if _, ok := m[fieldName]; !ok {
fieldNames = append(fieldNames, fieldName)
}
m[fieldName] = append(m[fieldName], tokens)
}
for _, f := range fo.filters {
switch t := f.(type) {
case *filterExact:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterExactPrefix:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterPhrase:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterPrefix:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterRegexp:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterSequence:
tokens := t.getTokens()
mergeFieldTokens(t.fieldName, tokens)
case *filterAnd:
bfts := t.getByFieldTokens()
for _, bft := range bfts {
mergeFieldTokens(bft.field, bft.tokens)
}
}
}
var byFieldTokens []fieldTokens

View file

@ -909,15 +909,18 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats count(foo,*,bar) x`, `* | stats count(*) as x`)
f(`* | stats count('') foo`, `* | stats count(_msg) as foo`)
f(`* | stats count(foo) ''`, `* | stats count(foo) as _msg`)
f(`* | count()`, `* | stats count(*) as "count(*)"`)
// stats pipe count_empty
f(`* | stats count_empty() x`, `* | stats count_empty(*) as x`)
f(`* | stats by (x, y) count_empty(a,b,c) x`, `* | stats by (x, y) count_empty(a, b, c) as x`)
f(`* | stats by (x, y) count_empty(a,b,c) z`, `* | stats by (x, y) count_empty(a, b, c) as z`)
f(`* | count_empty()`, `* | stats count_empty(*) as "count_empty(*)"`)
// stats pipe sum
f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`)
f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`)
f(`* | stats sum() x`, `* | stats sum(*) as x`)
f(`* | sum()`, `* | stats sum(*) as "sum(*)"`)
f(`* | stats sum(*) x`, `* | stats sum(*) as x`)
f(`* | stats sum(foo,*,bar) x`, `* | stats sum(*) as x`)
@ -925,6 +928,7 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats Max(foo) bar`, `* | stats max(foo) as bar`)
f(`* | stats BY(x, y, ) MAX(foo,bar,) bar`, `* | stats by (x, y) max(foo, bar) as bar`)
f(`* | stats max() x`, `* | stats max(*) as x`)
f(`* | max()`, `* | stats max(*) as "max(*)"`)
f(`* | stats max(*) x`, `* | stats max(*) as x`)
f(`* | stats max(foo,*,bar) x`, `* | stats max(*) as x`)
@ -932,22 +936,26 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`)
f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`)
f(`* | stats min() x`, `* | stats min(*) as x`)
f(`* | min()`, `* | stats min(*) as "min(*)"`)
f(`* | stats min(*) x`, `* | stats min(*) as x`)
f(`* | stats min(foo,*,bar) x`, `* | stats min(*) as x`)
// stats pipe fields_min
f(`* | stats fields_Min(foo) bar`, `* | stats fields_min(foo) as bar`)
f(`* | stats BY(x, y, ) fields_MIN(foo,bar,) bar`, `* | stats by (x, y) fields_min(foo, bar) as bar`)
// stats pipe row_min
f(`* | stats row_Min(foo) bar`, `* | stats row_min(foo) as bar`)
f(`* | row_Min(foo)`, `* | stats row_min(foo) as "row_min(foo)"`)
f(`* | stats BY(x, y, ) row_MIN(foo,bar,) bar`, `* | stats by (x, y) row_min(foo, bar) as bar`)
// stats pipe avg
f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`)
f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`)
f(`* | stats avg() x`, `* | stats avg(*) as x`)
f(`* | avg()`, `* | stats avg(*) as "avg(*)"`)
f(`* | stats avg(*) x`, `* | stats avg(*) as x`)
f(`* | stats avg(foo,*,bar) x`, `* | stats avg(*) as x`)
// stats pipe count_uniq
f(`* | stats count_uniq(foo) bar`, `* | stats count_uniq(foo) as bar`)
f(`* | count_uniq(foo)`, `* | stats count_uniq(foo) as "count_uniq(foo)"`)
f(`* | stats by(x, y) count_uniq(foo,bar) LiMit 10 As baz`, `* | stats by (x, y) count_uniq(foo, bar) limit 10 as baz`)
f(`* | stats by(x) count_uniq(*) z`, `* | stats by (x) count_uniq(*) as z`)
f(`* | stats by(x) count_uniq() z`, `* | stats by (x) count_uniq(*) as z`)
@ -955,6 +963,7 @@ func TestParseQuerySuccess(t *testing.T) {
// stats pipe uniq_values
f(`* | stats uniq_values(foo) bar`, `* | stats uniq_values(foo) as bar`)
f(`* | uniq_values(foo)`, `* | stats uniq_values(foo) as "uniq_values(foo)"`)
f(`* | stats uniq_values(foo) limit 10 bar`, `* | stats uniq_values(foo) limit 10 as bar`)
f(`* | stats by(x, y) uniq_values(foo, bar) as baz`, `* | stats by (x, y) uniq_values(foo, bar) as baz`)
f(`* | stats by(x) uniq_values(*) y`, `* | stats by (x) uniq_values(*) as y`)
@ -963,6 +972,7 @@ func TestParseQuerySuccess(t *testing.T) {
// stats pipe values
f(`* | stats values(foo) bar`, `* | stats values(foo) as bar`)
f(`* | values(foo)`, `* | stats values(foo) as "values(foo)"`)
f(`* | stats values(foo) limit 10 bar`, `* | stats values(foo) limit 10 as bar`)
f(`* | stats by(x, y) values(foo, bar) as baz`, `* | stats by (x, y) values(foo, bar) as baz`)
f(`* | stats by(x) values(*) y`, `* | stats by (x) values(*) as y`)
@ -973,6 +983,7 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats Sum_len(foo) bar`, `* | stats sum_len(foo) as bar`)
f(`* | stats BY(x, y, ) SUM_Len(foo,bar,) bar`, `* | stats by (x, y) sum_len(foo, bar) as bar`)
f(`* | stats sum_len() x`, `* | stats sum_len(*) as x`)
f(`* | sum_len()`, `* | stats sum_len(*) as "sum_len(*)"`)
f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`)
f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`)
@ -981,12 +992,14 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`)
f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`)
f(`* | stats quantile(0.99) bar`, `* | stats quantile(0.99) as bar`)
f(`* | quantile(0.99)`, `* | stats quantile(0.99) as "quantile(0.99)"`)
f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99) as bar`)
// stats pipe median
f(`* | stats Median(foo) bar`, `* | stats median(foo) as bar`)
f(`* | stats BY(x, y, ) MEDIAN(foo,bar,) bar`, `* | stats by (x, y) median(foo, bar) as bar`)
f(`* | stats median() x`, `* | stats median(*) as x`)
f(`* | median()`, `* | stats median(*) as "median(*)"`)
f(`* | stats median(*) x`, `* | stats median(*) as x`)
f(`* | stats median(foo,*,bar) x`, `* | stats median(*) as x`)
@ -995,7 +1008,7 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`)
// stats pipe with grouping buckets
f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as foo`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count(*) as foo`)
f(`* | stats by(_time:1d, response_size:1_000KiB, request_duration:5s, foo) count() as bar`, `* | stats by (_time:1d, response_size:1_000KiB, request_duration:5s, foo) count(*) as bar`)
f(`*|stats by(client_ip:/24, server_ip:/16) count() foo`, `* | stats by (client_ip:/24, server_ip:/16) count(*) as foo`)
f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`)
f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`)
@ -1357,7 +1370,6 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats count(`)
f(`foo | stats count bar`)
f(`foo | stats count(bar`)
f(`foo | stats count(bar)`)
f(`foo | stats count() as`)
f(`foo | stats count() as |`)
@ -1368,27 +1380,21 @@ func TestParseQueryFailure(t *testing.T) {
// invalid stats sum
f(`foo | stats sum`)
f(`foo | stats sum()`)
// invalid stats max
f(`foo | stats max`)
f(`foo | stats max()`)
// invalid stats min
f(`foo | stats min`)
f(`foo | stats min()`)
// invalid stats min
f(`foo | stats fields_min`)
f(`foo | stats fields_min()`)
f(`foo | stats row_min`)
// invalid stats avg
f(`foo | stats avg`)
f(`foo | stats avg()`)
// invalid stats count_uniq
f(`foo | stats count_uniq`)
f(`foo | stats count_uniq()`)
f(`foo | stats count_uniq() limit`)
f(`foo | stats count_uniq() limit foo`)
f(`foo | stats count_uniq() limit 0.5`)
@ -1396,7 +1402,6 @@ func TestParseQueryFailure(t *testing.T) {
// invalid stats uniq_values
f(`foo | stats uniq_values`)
f(`foo | stats uniq_values()`)
f(`foo | stats uniq_values() limit`)
f(`foo | stats uniq_values(a) limit foo`)
f(`foo | stats uniq_values(a) limit 0.5`)
@ -1404,7 +1409,6 @@ func TestParseQueryFailure(t *testing.T) {
// invalid stats values
f(`foo | stats values`)
f(`foo | stats values()`)
f(`foo | stats values() limit`)
f(`foo | stats values(a) limit foo`)
f(`foo | stats values(a) limit 0.5`)
@ -1412,7 +1416,6 @@ func TestParseQueryFailure(t *testing.T) {
// invalid stats sum_len
f(`foo | stats sum_len`)
f(`foo | stats sum_len()`)
// invalid stats quantile
f(`foo | stats quantile`)
@ -1436,6 +1439,12 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats by(bar,`)
f(`foo | stats by(bar)`)
// duplicate stats result names
f(`foo | stats min() x, max() x`)
// stats result names identical to by fields
f(`foo | stats by (x) count() x`)
// invalid sort pipe
f(`foo | sort bar`)
f(`foo | sort by`)
@ -1513,10 +1522,10 @@ func TestQueryGetNeededColumns(t *testing.T) {
unneededColumns := strings.Join(unneeded, ",")
if neededColumns != neededColumnsExpected {
t.Fatalf("unexpected neededColumns; got %q; want %q", neededColumns, neededColumnsExpected)
t.Fatalf("unexpected neededColumns for [%s]; got %q; want %q", s, neededColumns, neededColumnsExpected)
}
if unneededColumns != unneededColumnsExpected {
t.Fatalf("unexpected unneededColumns; got %q; want %q", unneededColumns, unneededColumnsExpected)
t.Fatalf("unexpected unneededColumns for [%s]; got %q; want %q", s, unneededColumns, unneededColumnsExpected)
}
}
@ -1542,7 +1551,6 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | fields f1, f2 | mv f1 f3, f4 f5`, `f1,f2`, ``)
f(`* | fields f1, f2 | mv f2 f3, f4 f5`, `f1,f2`, ``)
f(`* | fields f1, f2 | mv f2 f3, f4 f1`, `f2`, ``)
f(`* | fields f1, f2 | stats count() r1`, ``, ``)
f(`* | fields f1, f2 | stats count_uniq() r1`, `f1,f2`, ``)
f(`* | fields f1, f2 | stats count(f1) r1`, `f1`, ``)
f(`* | fields f1, f2 | stats count(f1,f2,f3) r1`, `f1,f2`, ``)
@ -1553,7 +1561,7 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | fields f1, f2 | sort by(f3)`, `f1,f2`, ``)
f(`* | fields f1, f2 | sort by(f1,f3)`, `f1,f2`, ``)
f(`* | fields f1, f2 | sort by(f3) | stats count() r1`, ``, ``)
f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, `f1`, ``)
f(`* | fields f1, f2 | sort by(f1) | stats count() r1`, ``, ``)
f(`* | fields f1, f2 | sort by(f1) | stats count(f2,f3) r1`, `f1,f2`, ``)
f(`* | fields f1, f2 | sort by(f3) | fields f2`, `f2`, ``)
f(`* | fields f1, f2 | sort by(f1,f3) | fields f2`, `f1,f2`, ``)
@ -1619,12 +1627,12 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | stats count_uniq() q`, `*`, ``)
f(`* | stats count_uniq(*) q`, `*`, ``)
f(`* | stats count_uniq(x) q`, `x`, ``)
f(`* | stats fields_max(a) q`, `*`, ``)
f(`* | stats fields_max(a, *) q`, `*`, ``)
f(`* | stats fields_max(a, x) q`, `a,x`, ``)
f(`* | stats fields_min(a) q`, `*`, ``)
f(`* | stats fields_min(a, *) q`, `*`, ``)
f(`* | stats fields_min(a, x) q`, `a,x`, ``)
f(`* | stats row_max(a) q`, `*`, ``)
f(`* | stats row_max(a, *) q`, `*`, ``)
f(`* | stats row_max(a, x) q`, `a,x`, ``)
f(`* | stats row_min(a) q`, `*`, ``)
f(`* | stats row_min(a, *) q`, `*`, ``)
f(`* | stats row_min(a, x) q`, `a,x`, ``)
f(`* | stats min() q`, `*`, ``)
f(`* | stats min(*) q`, `*`, ``)
f(`* | stats min(x) q`, `x`, ``)
@ -1776,11 +1784,51 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | rm f1, f2 | mv f2 f3 | sort by(f1)`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | fields f3`, `f3`, ``)
f(`* | rm f1, f2 | fields f1,f3`, `f3`, ``)
f(`* | rm f1, f2 | stats count() f1`, ``, ``)
f(`* | rm f1, f2 | stats count(f3) r1`, `f3`, ``)
f(`* | rm f1, f2 | stats count(f1) r1`, ``, ``)
f(`* | rm f1, f2 | stats count(f1,f3) r1`, `f3`, ``)
f(`* | rm f1, f2 | stats by(f1) count(f2) r1`, ``, ``)
f(`* | rm f1, f2 | stats by(f3) count(f2) r1`, `f3`, ``)
f(`* | rm f1, f2 | stats by(f3) count(f4) r1`, `f3,f4`, ``)
// Verify that fields are correctly tracked before count(*)
f(`* | copy a b, c d | count() r1`, ``, ``)
f(`* | delete a, b | count() r1`, ``, ``)
f(`* | extract "<f1>bar" from x | count() r1`, ``, ``)
f(`* | extract if (q:w p:a) "<f1>bar" from x | count() r1`, `p,q`, ``)
f(`* | extract_regexp "(?P<f1>.*)bar" from x | count() r1`, ``, ``)
f(`* | extract_regexp if (q:w p:a) "(?P<f1>.*)bar" from x | count() r1`, `p,q`, ``)
f(`* | field_names | count() r1`, `*`, `_time`)
f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``)
f(`* | fields a, b | count() r1`, ``, ``)
f(`* | field_values a | count() r1`, `a`, ``)
f(`* | limit 10 | filter a:b c:d | count() r1`, `a,c`, ``)
f(`* | limit 10 | count() r1`, ``, ``)
f(`* | format "<a><b>" as c | count() r1`, ``, ``)
f(`* | format if (q:w p:a) "<a><b>" as c | count() r1`, `p,q`, ``)
f(`* | math (a + b) as c, d * 2 as x | count() r1`, ``, ``)
f(`* | offset 10 | count() r1`, ``, ``)
f(`* | pack_json | count() r1`, ``, ``)
f(`* | pack_json fields(a,b) | count() r1`, ``, ``)
f(`* | rename a b, c d | count() r1`, ``, ``)
f(`* | replace ("a", "b") at x | count() r1`, ``, ``)
f(`* | replace if (q:w p:a) ("a", "b") at x | count() r1`, `p,q`, ``)
f(`* | replace_regexp ("a", "b") at x | count() r1`, ``, ``)
f(`* | replace_regexp if (q:w p:a) ("a", "b") at x | count() r1`, `p,q`, ``)
f(`* | sort by (a,b) | count() r1`, ``, ``)
f(`* | stats count_uniq(a, b) as c | count() r1`, ``, ``)
f(`* | stats count_uniq(a, b) if (q:w p:a) as c | count() r1`, ``, ``)
f(`* | stats by (a1,a2) count_uniq(a, b) as c | count() r1`, `a1,a2`, ``)
f(`* | stats by (a1,a2) count_uniq(a, b) if (q:w p:a) as c | count() r1`, `a1,a2`, ``)
f(`* | uniq by (a, b) | count() r1`, `a,b`, ``)
f(`* | unpack_json from x | count() r1`, ``, ``)
f(`* | unpack_json from x fields (a,b) | count() r1`, ``, ``)
f(`* | unpack_json if (q:w p:a) from x | count() r1`, `p,q`, ``)
f(`* | unpack_json if (q:w p:a) from x fields(a,b) | count() r1`, `p,q`, ``)
f(`* | unpack_logfmt from x | count() r1`, ``, ``)
f(`* | unpack_logfmt from x fields (a,b) | count() r1`, ``, ``)
f(`* | unpack_logfmt if (q:w p:a) from x | count() r1`, `p,q`, ``)
f(`* | unpack_logfmt if (q:w p:a) from x fields(a,b) | count() r1`, `p,q`, ``)
f(`* | unroll (a, b) | count() r1`, `a,b`, ``)
f(`* | unroll if (q:w p:a) (a, b) | count() r1`, `a,b,p,q`, ``)
}

View file

@ -24,10 +24,8 @@ func (pd *pipeDelete) String() string {
func (pd *pipeDelete) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
// update only unneeded fields
unneededFields.addFields(pd.fields)
} else {
// update only needed fields
neededFields.removeFields(pd.fields)
}
}

View file

@ -60,6 +60,13 @@ func (pe *pipeExtract) initFilterInValues(cache map[string][]string, getFieldVal
}
func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
if pe.iff != nil {
neededFields.addFields(pe.iff.neededFields)
}
return
}
if neededFields.contains("*") {
unneededFieldsOrig := unneededFields.clone()
needFromField := false

View file

@ -62,6 +62,13 @@ func (pe *pipeExtractRegexp) initFilterInValues(cache map[string][]string, getFi
}
func (pe *pipeExtractRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
if pe.iff != nil {
neededFields.addFields(pe.iff.neededFields)
}
return
}
if neededFields.contains("*") {
unneededFieldsOrig := unneededFields.clone()
needFromField := false

View file

@ -22,6 +22,11 @@ func (pf *pipeFieldValues) String() string {
}
func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
neededFields.add(pf.field)
return
}
if neededFields.contains("*") {
neededFields.reset()
if !unneededFields.contains(pf.field) {

View file

@ -29,6 +29,7 @@ func (pf *pipeFields) updateNeededFields(neededFields, unneededFields fieldsSet)
if pf.containsStar {
return
}
if neededFields.contains("*") {
// subtract unneeded fields from pf.fields
neededFields.reset()

View file

@ -43,6 +43,13 @@ func (pf *pipeFormat) String() string {
}
func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
if pf.iff != nil {
neededFields.addFields(pf.iff.neededFields)
}
return
}
if neededFields.contains("*") {
if !unneededFields.contains(pf.resultField) {
if !pf.keepOriginalFields && !pf.skipEmptyResults {

View file

@ -161,6 +161,10 @@ var mathBinaryOps = map[string]mathBinaryOp{
priority: 3,
f: mathFuncMinus,
},
"default": {
priority: 10,
f: mathFuncDefault,
},
}
type mathBinaryOp struct {
@ -175,26 +179,19 @@ func (pm *pipeMath) updateNeededFields(neededFields, unneededFields fieldsSet) {
if !unneededFields.contains(e.resultField) {
unneededFields.add(e.resultField)
entryFields := e.getNeededFields()
unneededFields.removeFields(entryFields)
fs := newFieldsSet()
e.expr.updateNeededFields(fs)
unneededFields.removeFields(fs.getAll())
}
} else {
if neededFields.contains(e.resultField) {
neededFields.remove(e.resultField)
entryFields := e.getNeededFields()
neededFields.addFields(entryFields)
e.expr.updateNeededFields(neededFields)
}
}
}
}
func (me *mathEntry) getNeededFields() []string {
neededFields := newFieldsSet()
me.expr.updateNeededFields(neededFields)
return neededFields.getAll()
}
func (me *mathExpr) updateNeededFields(neededFields fieldsSet) {
if me.isConst {
return
@ -387,15 +384,21 @@ func parseMathEntry(lex *lexer) (*mathEntry, error) {
return nil, err
}
// skip optional 'as'
resultField := ""
if lex.isKeyword(",", "|", ")", "") {
resultField = me.String()
} else {
if lex.isKeyword("as") {
// skip optional 'as'
lex.nextToken()
}
resultField, err := parseFieldName(lex)
fieldName, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for [%s]: %w", me, err)
}
resultField = fieldName
}
e := &mathEntry{
resultField: resultField,
@ -476,6 +479,10 @@ func parseMathExprOperand(lex *lexer) (*mathExpr, error) {
switch {
case lex.isKeyword("abs"):
return parseMathExprAbs(lex)
case lex.isKeyword("exp"):
return parseMathExprExp(lex)
case lex.isKeyword("ln"):
return parseMathExprLn(lex)
case lex.isKeyword("max"):
return parseMathExprMax(lex)
case lex.isKeyword("min"):
@ -506,6 +513,28 @@ func parseMathExprAbs(lex *lexer) (*mathExpr, error) {
return me, nil
}
func parseMathExprExp(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "exp", mathFuncExp)
if err != nil {
return nil, err
}
if len(me.args) != 1 {
return nil, fmt.Errorf("'exp' function accepts only one arg; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprLn(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "ln", mathFuncLn)
if err != nil {
return nil, err
}
if len(me.args) != 1 {
return nil, fmt.Errorf("'ln' function accepts only one arg; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprMax(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "max", mathFuncMax)
if err != nil {
@ -707,6 +736,18 @@ func mathFuncPow(result []float64, args [][]float64) {
}
}
func mathFuncDefault(result []float64, args [][]float64) {
values := args[0]
defaultValues := args[1]
for i := range result {
f := values[i]
if math.IsNaN(f) {
f = defaultValues[i]
}
result[i] = f
}
}
func mathFuncAbs(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
@ -714,6 +755,20 @@ func mathFuncAbs(result []float64, args [][]float64) {
}
}
func mathFuncExp(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = math.Exp(arg[i])
}
}
func mathFuncLn(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = math.Log(arg[i])
}
}
func mathFuncUnaryMinus(result []float64, args [][]float64) {
arg := args[0]
for i := range result {

View file

@ -22,6 +22,8 @@ func TestParsePipeMathSuccess(t *testing.T) {
f(`math min(3, foo, (1 + bar) / baz) as a, max(a, b) as b, (abs(c) + 5) as d`)
f(`math round(foo) as x`)
f(`math round(foo, 0.1) as y`)
f(`math (a / b default 10) as z`)
f(`math (ln(a) + exp(b)) as x`)
}
func TestParsePipeMathFailure(t *testing.T) {
@ -31,7 +33,6 @@ func TestParsePipeMathFailure(t *testing.T) {
}
f(`math`)
f(`math x`)
f(`math x as`)
f(`math abs() as x`)
f(`math abs(a, b) as x`)
@ -63,6 +64,94 @@ func TestPipeMath(t *testing.T) {
},
})
f("math a / b default c", [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
},
{
{"a", "0"},
{"b", "0"},
{"c", "3"},
},
{
{"a", "3"},
{"b", "2"},
},
{
{"a", "3"},
{"b", "foo"},
},
}, [][]Field{
{
{"a", "v1"},
{"b", "2"},
{"c", "3"},
{"a / b default c", "3"},
},
{
{"a", "0"},
{"b", "0"},
{"c", "3"},
{"a / b default c", "3"},
},
{
{"a", "3"},
{"b", "2"},
{"a / b default c", "1.5"},
},
{
{"a", "3"},
{"b", "foo"},
{"a / b default c", "NaN"},
},
})
f("math round(exp(a), 0.01), round(ln(a), 0.01)", [][]Field{
{
{"a", "v1"},
},
{
{"a", "0"},
},
{
{"a", "1"},
},
{
{"a", "2"},
},
{
{"a", "3"},
},
}, [][]Field{
{
{"a", "v1"},
{"round(exp(a), 0.01)", "NaN"},
{"round(ln(a), 0.01)", "NaN"},
},
{
{"a", "0"},
{"round(exp(a), 0.01)", "1"},
{"round(ln(a), 0.01)", "NaN"},
},
{
{"a", "1"},
{"round(exp(a), 0.01)", "2.72"},
{"round(ln(a), 0.01)", "0"},
},
{
{"a", "2"},
{"round(exp(a), 0.01)", "7.39"},
{"round(ln(a), 0.01)", "0.69"},
},
{
{"a", "3"},
{"round(exp(a), 0.01)", "20.09"},
{"round(ln(a), 0.01)", "1.1"},
},
})
f("math 1 as a", [][]Field{
{
{"a", "v1"},

View file

@ -56,6 +56,10 @@ func (ps *pipeSort) String() string {
}
func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
return
}
if len(ps.byFields) == 0 {
neededFields.add("*")
unneededFields.reset()

View file

@ -545,9 +545,17 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
ps.byFields = bfs
}
seenByFields := make(map[string]*byStatsField, len(ps.byFields))
for _, bf := range ps.byFields {
seenByFields[bf.name] = bf
}
seenResultNames := make(map[string]statsFunc)
var funcs []pipeStatsFunc
for {
var f pipeStatsFunc
sf, err := parseStatsFunc(lex)
if err != nil {
return nil, err
@ -557,15 +565,31 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot parse 'if' filter for [%s]: %w", sf, err)
}
f.iff = iff
}
resultName, err := parseResultName(lex)
resultName := ""
if lex.isKeyword(",", "|", ")", "") {
resultName = sf.String()
} else {
if lex.isKeyword("as") {
lex.nextToken()
}
fieldName, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err)
}
resultName = fieldName
}
if bf := seenByFields[resultName]; bf != nil {
return nil, fmt.Errorf("the %q is used as 'by' field [%s], so it cannot be used as result name for [%s]", resultName, bf, sf)
}
if sfPrev := seenResultNames[resultName]; sfPrev != nil {
return nil, fmt.Errorf("cannot use identical result name %q for [%s] and [%s]", resultName, sfPrev, sf)
}
seenResultNames[resultName] = sf
f.resultName = resultName
funcs = append(funcs, f)
@ -575,7 +599,7 @@ func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
return &ps, nil
}
if !lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected token %q; want ',', '|' or ')'", lex.token)
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", sf, lex.token)
}
lex.nextToken()
}
@ -607,18 +631,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err)
}
return sus, nil
case lex.isKeyword("fields_max"):
sms, err := parseStatsFieldsMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_max' func: %w", err)
}
return sms, nil
case lex.isKeyword("fields_min"):
sms, err := parseStatsFieldsMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_min' func: %w", err)
}
return sms, nil
case lex.isKeyword("max"):
sms, err := parseStatsMax(lex)
if err != nil {
@ -643,6 +655,24 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err)
}
return sqs, nil
case lex.isKeyword("row_any"):
sas, err := parseStatsRowAny(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_any' func: %w", err)
}
return sas, nil
case lex.isKeyword("row_max"):
sms, err := parseStatsRowMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err)
}
return sms, nil
case lex.isKeyword("row_min"):
sms, err := parseStatsRowMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err)
}
return sms, nil
case lex.isKeyword("sum"):
sss, err := parseStatsSum(lex)
if err != nil {
@ -672,17 +702,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
}
}
func parseResultName(lex *lexer) (string, error) {
if lex.isKeyword("as") {
lex.nextToken()
}
resultName, err := parseFieldName(lex)
if err != nil {
return "", err
}
return resultName, nil
}
var zeroByStatsField = &byStatsField{}
// byStatsField represents 'by (...)' part of the pipeStats.

View file

@ -39,8 +39,8 @@ func TestPipeStats(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// missing 'stats' keyword
f("count(*) as rows", [][]Field{
// missing 'stats' keyword and resutl name
f("count(*)", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -56,7 +56,7 @@ func TestPipeStats(t *testing.T) {
},
}, [][]Field{
{
{"rows", "3"},
{`count(*)`, "3"},
},
})

View file

@ -7,6 +7,13 @@ import (
)
func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter, neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
if iff != nil {
neededFields.addFields(iff.neededFields)
}
return
}
if neededFields.contains("*") {
unneededFieldsOrig := unneededFields.clone()
unneededFieldsCount := 0

View file

@ -52,25 +52,15 @@ func (pu *pipeUnroll) initFilterInValues(cache map[string][]string, getFieldValu
func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFieldsCount := 0
for _, f := range pu.fields {
if unneededFields.contains(f) {
unneededFieldsCount++
}
}
if unneededFieldsCount < len(pu.fields) && pu.iff != nil {
if pu.iff != nil {
unneededFields.removeFields(pu.iff.neededFields)
}
unneededFields.removeFields(pu.fields)
} else {
needIfFields := false
for _, f := range pu.fields {
if neededFields.contains(f) {
needIfFields = true
}
}
if needIfFields && pu.iff != nil {
if pu.iff != nil {
neededFields.addFields(pu.iff.neededFields)
}
neededFields.addFields(pu.fields)
}
}

View file

@ -225,13 +225,13 @@ func TestPipeUnrollUpdateNeededFields(t *testing.T) {
f("unroll if (f1:b) (x)", "*", "f1,f2", "*", "f2")
// all the needed fields, unneeded fields intersect with src
f("unroll (x)", "*", "f2,x", "*", "f2,x")
f("unroll if (a:b) (x)", "*", "f2,x", "*", "f2,x")
f("unroll if (f2:b) (x)", "*", "f2,x", "*", "f2,x")
f("unroll (x)", "*", "f2,x", "*", "f2")
f("unroll if (a:b) (x)", "*", "f2,x", "*", "f2")
f("unroll if (f2:b) (x)", "*", "f2,x", "*", "")
// needed fields do not intersect with src
f("unroll (x)", "f1,f2", "", "f1,f2", "")
f("unroll if (a:b) (x)", "f1,f2", "", "f1,f2", "")
f("unroll (x)", "f1,f2", "", "f1,f2,x", "")
f("unroll if (a:b) (x)", "f1,f2", "", "a,f1,f2,x", "")
// needed fields intersect with src
f("unroll (x)", "f2,x", "", "f2,x", "")

View file

@ -5,6 +5,13 @@ import (
)
func updateNeededFieldsForUpdatePipe(neededFields, unneededFields fieldsSet, field string, iff *ifFilter) {
if neededFields.isEmpty() {
if iff != nil {
neededFields.addFields(iff.neededFields)
}
return
}
if neededFields.contains("*") {
if !unneededFields.contains(field) && iff != nil {
unneededFields.removeFields(iff.neededFields)

View file

@ -0,0 +1,127 @@
package logstorage
import (
"fmt"
"slices"
"strings"
"unsafe"
)
type statsRowAny struct {
fields []string
}
func (sa *statsRowAny) String() string {
return "row_any(" + statsFuncFieldsToString(sa.fields) + ")"
}
func (sa *statsRowAny) updateNeededFields(neededFields fieldsSet) {
if len(sa.fields) == 0 {
neededFields.add("*")
} else {
neededFields.addFields(sa.fields)
}
}
func (sa *statsRowAny) newStatsProcessor() (statsProcessor, int) {
sap := &statsRowAnyProcessor{
sa: sa,
}
return sap, int(unsafe.Sizeof(*sap))
}
type statsRowAnyProcessor struct {
sa *statsRowAny
captured bool
fields []Field
}
func (sap *statsRowAnyProcessor) updateStatsForAllRows(br *blockResult) int {
if len(br.timestamps) == 0 {
return 0
}
if sap.captured {
return 0
}
sap.captured = true
return sap.updateState(br, 0)
}
func (sap *statsRowAnyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if sap.captured {
return 0
}
sap.captured = true
return sap.updateState(br, rowIdx)
}
func (sap *statsRowAnyProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsRowAnyProcessor)
if !sap.captured {
sap.captured = src.captured
sap.fields = src.fields
}
}
func (sap *statsRowAnyProcessor) updateState(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
fields := sap.fields
fetchFields := sap.sa.fields
if len(fetchFields) == 0 {
cs := br.getColumns()
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
})
stateSizeIncrease += len(c.name) + len(v)
}
} else {
for _, field := range fetchFields {
c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: strings.Clone(c.name),
Value: strings.Clone(v),
})
stateSizeIncrease += len(c.name) + len(v)
}
}
sap.fields = fields
return stateSizeIncrease
}
func (sap *statsRowAnyProcessor) finalizeStats() string {
bb := bbPool.Get()
bb.B = marshalFieldsToJSON(bb.B, sap.fields)
result := string(bb.B)
bbPool.Put(bb)
return result
}
func parseStatsRowAny(lex *lexer) (*statsRowAny, error) {
if !lex.isKeyword("row_any") {
return nil, fmt.Errorf("unexpected func; got %q; want 'row_any'", lex.token)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_any' args: %w", err)
}
if slices.Contains(fields, "*") {
fields = nil
}
sa := &statsRowAny{
fields: fields,
}
return sa, nil
}

View file

@ -0,0 +1,182 @@
package logstorage
import (
"testing"
)
func TestParseStatsRowAnySuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncSuccess(t, pipeStr)
}
f(`row_any(*)`)
f(`row_any(foo)`)
f(`row_any(foo, bar)`)
}
func TestParseStatsRowAnyFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncFailure(t, pipeStr)
}
f(`row_any`)
f(`row_any(x) bar`)
}
func TestStatsRowAny(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("row_any()", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
}, [][]Field{
{
{"row_any(*)", `{"_msg":"abc","a":"2","b":"3"}`},
},
})
f("stats row_any(a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
}, [][]Field{
{
{"x", `{"a":"2"}`},
},
})
f("stats row_any(a, x, b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
}, [][]Field{
{
{"x", `{"a":"2","x":"","b":"3"}`},
},
})
f("stats row_any(a) if (b:'') as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
}, [][]Field{
{
{"x", `{"a":"1"}`},
},
})
f("stats by (b) row_any(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"a", `3`},
{"c", `54`},
},
}, [][]Field{
{
{"b", "3"},
{"x", `{"a":"2"}`},
},
{
{"b", ""},
{"x", `{}`},
},
})
f("stats by (a) row_any(b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"a", `3`},
{"b", `5`},
},
}, [][]Field{
{
{"a", "1"},
{"x", `{"b":"3"}`},
},
{
{"a", "3"},
{"x", `{"b":"5"}`},
},
})
f("stats by (a) row_any(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"a", `3`},
{"c", `foo`},
},
}, [][]Field{
{
{"a", "1"},
{"x", `{"c":""}`},
},
{
{"a", "3"},
{"x", `{"c":"foo"}`},
},
})
f("stats by (a, b) row_any(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"c", "foo"},
},
{
{"a", `3`},
{"b", `5`},
{"c", "4"},
},
}, [][]Field{
{
{"a", "1"},
{"b", "3"},
{"x", `{"c":""}`},
},
{
{"a", "1"},
{"b", ""},
{"x", `{"c":"foo"}`},
},
{
{"a", "3"},
{"b", "5"},
{"x", `{"c":"4"}`},
},
})
}

View file

@ -11,14 +11,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type statsFieldsMax struct {
type statsRowMax struct {
srcField string
fetchFields []string
}
func (sm *statsFieldsMax) String() string {
s := "fields_max(" + quoteTokenIfNeeded(sm.srcField)
func (sm *statsRowMax) String() string {
s := "row_max(" + quoteTokenIfNeeded(sm.srcField)
if len(sm.fetchFields) > 0 {
s += ", " + fieldNamesString(sm.fetchFields)
}
@ -26,7 +26,7 @@ func (sm *statsFieldsMax) String() string {
return s
}
func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) {
func (sm *statsRowMax) updateNeededFields(neededFields fieldsSet) {
if len(sm.fetchFields) == 0 {
neededFields.add("*")
} else {
@ -35,22 +35,22 @@ func (sm *statsFieldsMax) updateNeededFields(neededFields fieldsSet) {
neededFields.add(sm.srcField)
}
func (sm *statsFieldsMax) newStatsProcessor() (statsProcessor, int) {
smp := &statsFieldsMaxProcessor{
func (sm *statsRowMax) newStatsProcessor() (statsProcessor, int) {
smp := &statsRowMaxProcessor{
sm: sm,
}
return smp, int(unsafe.Sizeof(*smp))
}
type statsFieldsMaxProcessor struct {
sm *statsFieldsMax
type statsRowMaxProcessor struct {
sm *statsRowMax
max string
fields []Field
}
func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField)
@ -114,7 +114,7 @@ func (smp *statsFieldsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
return stateSizeIncrease
}
func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
func (smp *statsRowMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField)
@ -138,27 +138,27 @@ func (smp *statsFieldsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx in
return stateSizeIncrease
}
func (smp *statsFieldsMaxProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsFieldsMaxProcessor)
func (smp *statsRowMaxProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsRowMaxProcessor)
if smp.needUpdateStateString(src.max) {
smp.max = src.max
smp.fields = src.fields
}
}
func (smp *statsFieldsMaxProcessor) needUpdateStateBytes(b []byte) bool {
func (smp *statsRowMaxProcessor) needUpdateStateBytes(b []byte) bool {
v := bytesutil.ToUnsafeString(b)
return smp.needUpdateStateString(v)
}
func (smp *statsFieldsMaxProcessor) needUpdateStateString(v string) bool {
func (smp *statsRowMaxProcessor) needUpdateStateString(v string) bool {
if v == "" {
return false
}
return smp.max == "" || lessString(smp.max, v)
}
func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int {
func (smp *statsRowMaxProcessor) updateState(v string, br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
if !smp.needUpdateStateString(v) {
@ -204,7 +204,7 @@ func (smp *statsFieldsMaxProcessor) updateState(v string, br *blockResult, rowId
return stateSizeIncrease
}
func (smp *statsFieldsMaxProcessor) finalizeStats() string {
func (smp *statsRowMaxProcessor) finalizeStats() string {
bb := bbPool.Get()
bb.B = marshalFieldsToJSON(bb.B, smp.fields)
result := string(bb.B)
@ -213,18 +213,18 @@ func (smp *statsFieldsMaxProcessor) finalizeStats() string {
return result
}
func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) {
if !lex.isKeyword("fields_max") {
return nil, fmt.Errorf("unexpected func; got %q; want 'fields_max'", lex.token)
func parseStatsRowMax(lex *lexer) (*statsRowMax, error) {
if !lex.isKeyword("row_max") {
return nil, fmt.Errorf("unexpected func; got %q; want 'row_max'", lex.token)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_max' args: %w", err)
return nil, fmt.Errorf("cannot parse 'row_max' args: %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("missing first arg for 'fields_max' func - source field")
return nil, fmt.Errorf("missing first arg for 'row_max' func - source field")
}
srcField := fields[0]
@ -233,7 +233,7 @@ func parseStatsFieldsMax(lex *lexer) (*statsFieldsMax, error) {
fetchFields = nil
}
sm := &statsFieldsMax{
sm := &statsRowMax{
srcField: srcField,
fetchFields: fetchFields,
}

View file

@ -4,35 +4,35 @@ import (
"testing"
)
func TestParseStatsFieldsMaxSuccess(t *testing.T) {
func TestParseStatsRowMaxSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncSuccess(t, pipeStr)
}
f(`fields_max(foo)`)
f(`fields_max(foo, bar)`)
f(`fields_max(foo, bar, baz)`)
f(`row_max(foo)`)
f(`row_max(foo, bar)`)
f(`row_max(foo, bar, baz)`)
}
func TestParseStatsFieldsMaxFailure(t *testing.T) {
func TestParseStatsRowMaxFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncFailure(t, pipeStr)
}
f(`fields_max`)
f(`fields_max()`)
f(`fields_max(x) bar`)
f(`row_max`)
f(`row_max()`)
f(`row_max(x) bar`)
}
func TestStatsFieldsMax(t *testing.T) {
func TestStatsRowMax(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("stats fields_max(a) as x", [][]Field{
f("stats row_max(a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -52,7 +52,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats fields_max(foo) as x", [][]Field{
f("stats row_max(foo) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -72,7 +72,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats fields_max(b, a) as x", [][]Field{
f("stats row_max(b, a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -93,7 +93,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats fields_max(b, a, x, b) as x", [][]Field{
f("stats row_max(b, a, x, b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -114,7 +114,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats fields_max(a) if (b:*) as x", [][]Field{
f("stats row_max(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -134,7 +134,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats by (b) fields_max(a) if (b:*) as x", [][]Field{
f("stats by (b) row_max(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -160,7 +160,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats by (a) fields_max(b) as x", [][]Field{
f("stats by (a) row_max(b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
@ -189,7 +189,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats by (a) fields_max(c) as x", [][]Field{
f("stats by (a) row_max(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
@ -218,7 +218,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats by (a) fields_max(b, c) as x", [][]Field{
f("stats by (a) row_max(b, c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
@ -250,7 +250,7 @@ func TestStatsFieldsMax(t *testing.T) {
},
})
f("stats by (a, b) fields_max(c) as x", [][]Field{
f("stats by (a, b) row_max(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},

View file

@ -11,14 +11,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
type statsFieldsMin struct {
type statsRowMin struct {
srcField string
fetchFields []string
}
func (sm *statsFieldsMin) String() string {
s := "fields_min(" + quoteTokenIfNeeded(sm.srcField)
func (sm *statsRowMin) String() string {
s := "row_min(" + quoteTokenIfNeeded(sm.srcField)
if len(sm.fetchFields) > 0 {
s += ", " + fieldNamesString(sm.fetchFields)
}
@ -26,7 +26,7 @@ func (sm *statsFieldsMin) String() string {
return s
}
func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) {
func (sm *statsRowMin) updateNeededFields(neededFields fieldsSet) {
if len(sm.fetchFields) == 0 {
neededFields.add("*")
} else {
@ -35,22 +35,22 @@ func (sm *statsFieldsMin) updateNeededFields(neededFields fieldsSet) {
neededFields.add(sm.srcField)
}
func (sm *statsFieldsMin) newStatsProcessor() (statsProcessor, int) {
smp := &statsFieldsMinProcessor{
func (sm *statsRowMin) newStatsProcessor() (statsProcessor, int) {
smp := &statsRowMinProcessor{
sm: sm,
}
return smp, int(unsafe.Sizeof(*smp))
}
type statsFieldsMinProcessor struct {
sm *statsFieldsMin
type statsRowMinProcessor struct {
sm *statsRowMin
min string
fields []Field
}
func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int {
func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField)
@ -114,7 +114,7 @@ func (smp *statsFieldsMinProcessor) updateStatsForAllRows(br *blockResult) int {
return stateSizeIncrease
}
func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
func (smp *statsRowMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
c := br.getColumnByName(smp.sm.srcField)
@ -138,27 +138,27 @@ func (smp *statsFieldsMinProcessor) updateStatsForRow(br *blockResult, rowIdx in
return stateSizeIncrease
}
func (smp *statsFieldsMinProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsFieldsMinProcessor)
func (smp *statsRowMinProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsRowMinProcessor)
if smp.needUpdateStateString(src.min) {
smp.min = src.min
smp.fields = src.fields
}
}
func (smp *statsFieldsMinProcessor) needUpdateStateBytes(b []byte) bool {
func (smp *statsRowMinProcessor) needUpdateStateBytes(b []byte) bool {
v := bytesutil.ToUnsafeString(b)
return smp.needUpdateStateString(v)
}
func (smp *statsFieldsMinProcessor) needUpdateStateString(v string) bool {
func (smp *statsRowMinProcessor) needUpdateStateString(v string) bool {
if v == "" {
return false
}
return smp.min == "" || lessString(v, smp.min)
}
func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowIdx int) int {
func (smp *statsRowMinProcessor) updateState(v string, br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
if !smp.needUpdateStateString(v) {
@ -204,7 +204,7 @@ func (smp *statsFieldsMinProcessor) updateState(v string, br *blockResult, rowId
return stateSizeIncrease
}
func (smp *statsFieldsMinProcessor) finalizeStats() string {
func (smp *statsRowMinProcessor) finalizeStats() string {
bb := bbPool.Get()
bb.B = marshalFieldsToJSON(bb.B, smp.fields)
result := string(bb.B)
@ -213,18 +213,18 @@ func (smp *statsFieldsMinProcessor) finalizeStats() string {
return result
}
func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) {
if !lex.isKeyword("fields_min") {
return nil, fmt.Errorf("unexpected func; got %q; want 'fields_min'", lex.token)
func parseStatsRowMin(lex *lexer) (*statsRowMin, error) {
if !lex.isKeyword("row_min") {
return nil, fmt.Errorf("unexpected func; got %q; want 'row_min'", lex.token)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields_min' args: %w", err)
return nil, fmt.Errorf("cannot parse 'row_min' args: %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("missing first arg for 'fields_min' func - source field")
return nil, fmt.Errorf("missing first arg for 'row_min' func - source field")
}
srcField := fields[0]
@ -233,7 +233,7 @@ func parseStatsFieldsMin(lex *lexer) (*statsFieldsMin, error) {
fetchFields = nil
}
sm := &statsFieldsMin{
sm := &statsRowMin{
srcField: srcField,
fetchFields: fetchFields,
}

View file

@ -4,35 +4,35 @@ import (
"testing"
)
func TestParseStatsFieldsMinSuccess(t *testing.T) {
func TestParseStatsRowMinSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncSuccess(t, pipeStr)
}
f(`fields_min(foo)`)
f(`fields_min(foo, bar)`)
f(`fields_min(foo, bar, baz)`)
f(`row_min(foo)`)
f(`row_min(foo, bar)`)
f(`row_min(foo, bar, baz)`)
}
func TestParseStatsFieldsMinFailure(t *testing.T) {
func TestParseStatsRowMinFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParseStatsFuncFailure(t, pipeStr)
}
f(`fields_min`)
f(`fields_min()`)
f(`fields_min(x) bar`)
f(`row_min`)
f(`row_min()`)
f(`row_min(x) bar`)
}
func TestStatsFieldsMin(t *testing.T) {
func TestStatsRowMin(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("stats fields_min(a) as x", [][]Field{
f("stats row_min(a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -52,7 +52,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats fields_min(foo) as x", [][]Field{
f("stats row_min(foo) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -72,7 +72,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats fields_min(b, a) as x", [][]Field{
f("stats row_min(b, a) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -93,7 +93,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats fields_min(b, a, x, b) as x", [][]Field{
f("stats row_min(b, a, x, b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -114,7 +114,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats fields_min(a) if (b:*) as x", [][]Field{
f("stats row_min(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -134,7 +134,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats by (b) fields_min(a) if (b:*) as x", [][]Field{
f("stats by (b) row_min(a) if (b:*) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
@ -160,7 +160,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats by (a) fields_min(b) as x", [][]Field{
f("stats by (a) row_min(b) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
@ -189,7 +189,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats by (a) fields_min(c) as x", [][]Field{
f("stats by (a) row_min(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
@ -218,7 +218,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats by (a) fields_min(b, c) as x", [][]Field{
f("stats by (a) row_min(b, c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},
@ -249,7 +249,7 @@ func TestStatsFieldsMin(t *testing.T) {
},
})
f("stats by (a, b) fields_min(c) as x", [][]Field{
f("stats by (a, b) row_min(c) as x", [][]Field{
{
{"_msg", `abc`},
{"a", `1`},