lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-05 03:18:12 +02:00
parent f497d3c222
commit 43cf221681
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
37 changed files with 923 additions and 262 deletions

View file

@ -355,19 +355,20 @@ type row struct {
} }
func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {
q.AddPipeLimit(uint64(limit + 1)) limitUpper := 2 * limit
q.AddPipeLimit(uint64(limitUpper))
q.Optimize() q.Optimize()
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1) rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(rows) <= limit { if len(rows) < limitUpper {
// Fast path - the requested time range contains up to limit rows. // Fast path - the requested time range contains up to limitUpper rows.
sortRowsByTime(rows) rows = getLastNRows(rows, limit)
return rows, nil return rows, nil
} }
// Slow path - search for the time range with the requested limit rows. // Slow path - search for the time range containing up to limitUpper rows.
start, end := q.GetFilterTimeRange() start, end := q.GetFilterTimeRange()
d := end/2 - start/2 d := end/2 - start/2
start += d start += d
@ -376,16 +377,13 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
for { for {
q = qOrig.Clone() q = qOrig.Clone()
q.AddTimeFilter(start, end) q.AddTimeFilter(start, end)
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1) rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(rows) == limit || len(rows) > limit && d < 10e6 || d == 0 { if len(rows) >= limit && len(rows) < limitUpper || d == 0 {
sortRowsByTime(rows) rows = getLastNRows(rows, limit)
if len(rows) > limit {
rows = rows[len(rows)-limit:]
}
return rows, nil return rows, nil
} }
@ -399,10 +397,14 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID,
} }
} }
func sortRowsByTime(rows []row) { func getLastNRows(rows []row, limit int) []row {
sort.Slice(rows, func(i, j int) bool { sort.Slice(rows, func(i, j int) bool {
return rows[i].timestamp < rows[j].timestamp return rows[i].timestamp < rows[j].timestamp
}) })
if len(rows) > limit {
rows = rows[len(rows)-limit:]
}
return rows
} }
func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {

View file

@ -1,4 +1,4 @@
FROM golang:1.22.3 as build-web-stage FROM golang:1.22.4 as build-web-stage
COPY build /build COPY build /build
WORKDIR /build WORKDIR /build

View file

@ -6,7 +6,7 @@ ROOT_IMAGE ?= alpine:3.20.0
ROOT_IMAGE_SCRATCH ?= scratch ROOT_IMAGE_SCRATCH ?= scratch
CERTS_IMAGE := alpine:3.20.0 CERTS_IMAGE := alpine:3.20.0
GO_BUILDER_IMAGE := golang:1.22.3-alpine GO_BUILDER_IMAGE := golang:1.22.4-alpine
BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1 BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1
BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __) BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __)
DOCKER ?= docker DOCKER ?= docker

View file

@ -43,7 +43,7 @@ services:
# storing logs and serving read queries. # storing logs and serving read queries.
victorialogs: victorialogs:
container_name: victorialogs container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
command: command:
- "--storageDataPath=/vlogs" - "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428" - "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066 - -beat.uri=http://filebeat-victorialogs:5066
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-docker-vl:/vlogs - victorialogs-filebeat-docker-vl:/vlogs
ports: ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-syslog-vl:/vlogs - victorialogs-filebeat-syslog-vl:/vlogs
ports: ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
volumes: volumes:
- victorialogs-fluentbit-vl:/vlogs - victorialogs-fluentbit-vl:/vlogs
ports: ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
volumes: volumes:
- victorialogs-logstash-vl:/vlogs - victorialogs-logstash-vl:/vlogs
ports: ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140" - "5140:5140"
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
volumes: volumes:
- victorialogs-promtail-docker:/vlogs - victorialogs-promtail-docker:/vlogs
ports: ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy condition: service_healthy
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
volumes: volumes:
- victorialogs-vector-docker-vl:/vlogs - victorialogs-vector-docker-vl:/vlogs
ports: ports:

View file

@ -18,7 +18,7 @@ services:
- vlogs - vlogs
generator: generator:
image: golang:1.22.3-alpine image: golang:1.22.4-alpine
restart: always restart: always
working_dir: /go/src/app working_dir: /go/src/app
volumes: volumes:

View file

@ -2,7 +2,7 @@ version: '3'
services: services:
generator: generator:
image: golang:1.22.3-alpine image: golang:1.22.4-alpine
restart: always restart: always
working_dir: /go/src/app working_dir: /go/src/app
volumes: volumes:

View file

@ -3,7 +3,7 @@ version: '3'
services: services:
# Run `make package-victoria-logs` to build victoria-logs image # Run `make package-victoria-logs` to build victoria-logs image
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
volumes: volumes:
- vlogs:/vlogs - vlogs:/vlogs
ports: ports:

View file

@ -19,6 +19,20 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
## [v0.17.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.17.0-victorialogs)
Released at 2024-06-05
* FEATURE: add [`pack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe) for formatting [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) messages.
* FEATURE: allow using IPv4 addresses in [range comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filter). For example, `ip:>'12.34.56.78'` is valid filter now.
* FEATURE: add `ceil()` and `floor()` functions to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
* FEATURE: add support for bitwise `and`, `or` and `xor` operations at [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
* FEATURE: add support for automatic conversion of [RFC3339 time](https://www.rfc-editor.org/rfc/rfc3339) and IPv4 addresses into numeric representation at [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
* FEATURE: add ability to format numeric fields into string representation of time, duration and IPv4 with [`format` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe).
* FEATURE: set `format` field to `rfc3164` or `rfc5424` depending on the [Syslog format](https://en.wikipedia.org/wiki/Syslog) parsed via [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe).
* BUGFIX: always respect the limit set in [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe). Previously the limit could be exceeded in some cases.
## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs) ## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs)
Released at 2024-06-04 Released at 2024-06-04

View file

@ -584,7 +584,7 @@ See also:
### Range comparison filter ### Range comparison filter
LogsQL supports `field:>X`, `field:>=X`, `field:<X` and `field:<=X` filters, where `field` is the name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) LogsQL supports `field:>X`, `field:>=X`, `field:<X` and `field:<=X` filters, where `field` is the name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
and `X` is either [numeric value](#numeric-values) or a string. For example, the following query returns logs containing numeric values for the `response_size` field bigger than `10*1024`: and `X` is [numeric value](#numeric-values), IPv4 address or a string. For example, the following query returns logs containing numeric values for the `response_size` field bigger than `10*1024`:
```logsql ```logsql
response_size:>10KiB response_size:>10KiB
@ -1167,6 +1167,7 @@ LogsQL supports the following pipes:
- [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`offset`](#offset-pipe) skips the given number of selected logs. - [`offset`](#offset-pipe) skips the given number of selected logs.
- [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object. - [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object.
- [`pack_logfmt`](#pack_logfmt-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message.
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
@ -1554,6 +1555,18 @@ and stores it into `my_json` output field:
_time:5m | format '{"_msg":<q:_msg>,"stacktrace":<q:stacktrace>}' as my_json _time:5m | format '{"_msg":<q:_msg>,"stacktrace":<q:stacktrace>}' as my_json
``` ```
Numeric fields can be transformed into the following string representation at `format` pipe:
- [RFC3339 time](https://www.rfc-editor.org/rfc/rfc3339) - by adding `time:` in front of the corresponding field name
containing [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) in nanoseconds.
For example, `format "time=<time:timestamp_nsecs>"`. The timestamp can be converted into nanoseconds with the [`math` pipe](#math-pipe).
- Human-readable duration - by adding `duration:` in front of the corresponding numeric field name containing duration in nanoseconds.
For example, `format "duration=<duration:duration_nsecs>"`. The duration can be converted into nanoseconds with the [`math` pipe](#math-pipe).
- IPv4 - by adding `ipv4:` in front of the corresponding field name containing `uint32` representation of the IPv4 address.
For example, `format "ip=<ipv4:ip_num>"`.
Add `keep_original_fields` to the end of `format ... as result_field` when the original non-empty value of the `result_field` must be preserved Add `keep_original_fields` to the end of `format ... as result_field` when the original non-empty value of the `result_field` must be preserved
instead of overwriting it with the `format` results. For example, the following query adds formatted result to `foo` field only if it was missing or empty: instead of overwriting it with the `format` results. For example, the following query adds formatted result to `foo` field only if it was missing or empty:
@ -1645,9 +1658,14 @@ The following mathematical operations are supported by `math` pipe:
- `arg1 / arg2` - divides `arg1` by `arg2` - `arg1 / arg2` - divides `arg1` by `arg2`
- `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2` - `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2`
- `arg1 ^ arg2` - returns the power of `arg1` by `arg2` - `arg1 ^ arg2` - returns the power of `arg1` by `arg2`
- `arg1 & arg2` - returns bitwise `and` for `arg1` and `arg2`. It is expected that `arg1` and `arg2` are in the range `[0 .. 2^53-1]`
- `arg1 | arg2` - returns bitwise `or` for `arg1` and `arg2`. It is expected that `arg1` and `arg2` are in the range `[0 .. 2^53-1]`
- `arg1 xor arg2` - returns bitwise `xor` for `arg1` and `arg2`. It is expected that `arg1` and `arg2` are in the range `[0 .. 2^53-1]`
- `arg1 default arg2` - returns `arg2` if `arg1` is non-[numeric](#numeric-values) or equals to `NaN` - `arg1 default arg2` - returns `arg2` if `arg1` is non-[numeric](#numeric-values) or equals to `NaN`
- `abs(arg)` - returns an absolute value for the given `arg` - `abs(arg)` - returns an absolute value for the given `arg`
- `exp(arg)` - powers [`e`](https://en.wikipedia.org/wiki/E_(mathematical_constant)) by `arg`. - `ceil(arg)` - returns the least integer value greater than or equal to `arg`
- `exp(arg)` - powers [`e`](https://en.wikipedia.org/wiki/E_(mathematical_constant)) by `arg`
- `floor(arg)` - returns the greatest integer values less than or equal to `arg`
- `ln(arg)` - returns [natural logarithm](https://en.wikipedia.org/wiki/Natural_logarithm) for the given `arg` - `ln(arg)` - returns [natural logarithm](https://en.wikipedia.org/wiki/Natural_logarithm) for the given `arg`
- `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN` - `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN`
- `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN` - `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN`
@ -1657,9 +1675,11 @@ The following mathematical operations are supported by `math` pipe:
Every `argX` argument in every mathematical operation can contain one of the following values: Every `argX` argument in every mathematical operation can contain one of the following values:
- The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`. - The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`.
If the log field contains value, which cannot be parsed into [supported numeric value](#numeric-values), then it is replaced with `NaN`. The log field is parsed into numeric value if it contains [supported numeric value](#numeric-values). The log field is parsed into [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time)
- Any [supported numeric value](#numeric-values). For example, `response_size_bytes / 1MiB`. in nanoseconds if it contains [rfc3339 time](https://www.rfc-editor.org/rfc/rfc3339). The log field is parsed into `uint32` number if it contains IPv4 address.
- Another mathematical expression. Optionally, it may be put inside `(...)`. For example, `(a + b) * c`. The log field is parsed into `NaN` in other cases.
- Any [supported numeric value](#numeric-values), [rfc3339 time](https://www.rfc-editor.org/rfc/rfc3339) or IPv4 address. For example, `1MiB`, `"2024-05-15T10:20:30.934324Z"` or `"12.34.56.78"`.
- Another mathematical expression, which can be put inside `(...)`. For example, `(a + b) * c`.
See also: See also:
@ -1721,9 +1741,48 @@ _time:5m | pack_json as foo | fields foo
See also: See also:
- [`pack_logfmt` pipe](#pack_logfmt-pipe)
- [`unpack_json` pipe](#unpack_json-pipe) - [`unpack_json` pipe](#unpack_json-pipe)
### pack_logfmt pipe
`| pack_logfmt as field_name` [pipe](#pipe) packs all [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message
and stores its as a string in the given `field_name`.
For example, the following query packs all the fields into [logfmt](https://brandur.org/logfmt) message and stores it
into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) for logs over the last 5 minutes:
```logsql
_time:5m | pack_logfmt as _msg
```
The `as _msg` part can be omitted if packed message is stored into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
The following query is equivalent to the previous one:
```logsql
_time:5m | pack_logfmt
```
If only a subset of labels must be packed into [logfmt](https://brandur.org/logfmt), then it must be listed inside `fields (...)` after `pack_logfmt`.
For example, the following query builds [logfmt](https://brandur.org/logfmt) message with `foo` and `bar` fields only and stores the result in `baz` field:
```logsql
_time:5m | pack_logfmt fields (foo, bar) as baz
```
The `pack_logfmt` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_logfmt` pipe. For example, the following query
leaves only the `foo` label with the original log fields packed into [logfmt](https://brandur.org/logfmt):
```logsql
_time:5m | pack_logfmt as foo | fields foo
```
See also:
- [`pack_json` pipe](#pack_json-pipe)
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
### rename pipe ### rename pipe
If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be renamed, then `| rename src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be renamed, then `| rename src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used.
@ -2200,6 +2259,7 @@ See also:
- [`extract` pipe](#extract-pipe) - [`extract` pipe](#extract-pipe)
- [`unroll` pipe](#unroll-pipe) - [`unroll` pipe](#unroll-pipe)
- [`pack_json` pipe](#pack_json-pipe) - [`pack_json` pipe](#pack_json-pipe)
- [`pack_logfmt` pipe](#pack_logfmt-pipe)
#### Conditional unpack_json #### Conditional unpack_json
@ -2301,13 +2361,14 @@ _time:5m | unpack_logfmt if (ip:"") from foo
from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats: from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats:
- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `<PRI>MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE` - [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `<PRI>MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE`
- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` - [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `<PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE`
The following fields are unpacked: The following fields are unpacked:
- `priority` - it is obtained from `PRI`. - `priority` - it is obtained from `PRI`.
- `facility` - it is calculated as `PRI / 8`. - `facility` - it is calculated as `PRI / 8`.
- `severity` - it is calculated as `PRI % 8`. - `severity` - it is calculated as `PRI % 8`.
- `format` - either `rfc3164` or `rfc5424` depending on which Syslog format is unpacked.
- `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) - `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164)
is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months. is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months.
- `hostname` - `hostname`
@ -2316,6 +2377,8 @@ The following fields are unpacked:
- `msg_id` - `msg_id`
- `message` - `message`
The `<PRI>` part is optional. If it is missing, then `priority`, `facility` and `severity` fields aren't set.
The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value
according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe). according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe).

View file

@ -34,8 +34,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh ```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.16.0-victorialogs/victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.17.0-victorialogs/victoria-logs-linux-amd64-v0.17.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz tar xzf victoria-logs-linux-amd64-v0.17.0-victorialogs.tar.gz
./victoria-logs-prod ./victoria-logs-prod
``` ```
@ -59,7 +59,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh ```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs
``` ```
See also: See also:

View file

@ -36,7 +36,8 @@ _time:5m | sort by (_time desc) | limit 10
See also: See also:
- [How to count the number of matching logs](#how-to-count-the-number-of-matching-logs) - [How to count the number of matching logs?](#how-to-count-the-number-of-matching-logs)
- [How to return last N logs for the given query?](#how-to-return-last-n-logs-for-the-given-query)
## How to select logs with the given word in log message? ## How to select logs with the given word in log message?
@ -398,3 +399,25 @@ can be passed to it in order to return up to `N` latest log entries. For example
```sh ```sh
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10' curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10'
``` ```
See also:
- [How to select recently ingested logs?](#how-to-select-recently-ingested-logs)
- [How to return last N logs for the given query?](#how-to-return-last-n-logs-for-the-given-query)
## How to calculate the share of error logs to the total number of logs?
Use the following query:
```logsql
_time:5m | stats count() logs, count() if (error) errors | math errors / logs
```
This query uses the following [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) features:
- [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) for selecting logs on the given time range (last 5 minutes in the query above).
- [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) with [additional filtering](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters)
for calculating the total number of logs and the number of logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) on the selected time range.
- [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for calculating the share of logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
comparing to the total number of logs.

2
go.mod
View file

@ -1,6 +1,6 @@
module github.com/VictoriaMetrics/VictoriaMetrics module github.com/VictoriaMetrics/VictoriaMetrics
go 1.22.3 go 1.22.4
require ( require (
cloud.google.com/go/storage v1.41.0 cloud.google.com/go/storage v1.41.0

View file

@ -3,6 +3,8 @@ package logstorage
import ( import (
"math" "math"
"slices" "slices"
"strconv"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -1916,5 +1918,49 @@ func getCanonicalColumnName(columnName string) string {
return columnName return columnName
} }
func tryParseNumber(s string) (float64, bool) {
if len(s) == 0 {
return 0, false
}
f, ok := tryParseFloat64(s)
if ok {
return f, true
}
nsecs, ok := tryParseDuration(s)
if ok {
return float64(nsecs), true
}
bytes, ok := tryParseBytes(s)
if ok {
return float64(bytes), true
}
if isLikelyNumber(s) {
f, err := strconv.ParseFloat(s, 64)
if err == nil {
return f, true
}
n, err := strconv.ParseInt(s, 0, 64)
if err == nil {
return float64(n), true
}
}
return 0, false
}
func isLikelyNumber(s string) bool {
if !isNumberPrefix(s) {
return false
}
if strings.Count(s, ".") > 1 {
// This is likely IP address
return false
}
if strings.IndexByte(s, ':') >= 0 || strings.Count(s, "-") > 2 {
// This is likely a timestamp
return false
}
return true
}
var nan = math.NaN() var nan = math.NaN()
var inf = math.Inf(1) var inf = math.Inf(1)

View file

@ -44,7 +44,11 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) {
return return
} }
if c.isTime { if c.isTime {
bm.resetBits() minValueInt, maxValueInt := toInt64Range(minValue, maxValue)
bm.forEachSetBit(func(idx int) bool {
timestamp := br.timestamps[idx]
return timestamp >= minValueInt && timestamp <= maxValueInt
})
return return
} }
@ -129,8 +133,30 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) {
f := unmarshalFloat64(v) f := unmarshalFloat64(v)
return f >= minValue && f <= maxValue return f >= minValue && f <= maxValue
}) })
case valueTypeIPv4:
minValueUint32, maxValueUint32 := toUint32Range(minValue, maxValue)
if maxValue < 0 || uint64(minValueUint32) > c.maxValue || uint64(maxValueUint32) < c.minValue {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := unmarshalIPv4(v)
return n >= minValueUint32 && n <= maxValueUint32
})
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
bm.resetBits() minValueInt, maxValueInt := toInt64Range(minValue, maxValue)
if maxValue < 0 || minValueInt > int64(c.maxValue) || maxValueInt < int64(c.minValue) {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := unmarshalTimestampISO8601(v)
return n >= minValueInt && n <= maxValueInt
})
default: default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType) logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
} }
@ -178,9 +204,10 @@ func (fr *filterRange) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
case valueTypeFloat64: case valueTypeFloat64:
matchFloat64ByRange(bs, ch, bm, minValue, maxValue) matchFloat64ByRange(bs, ch, bm, minValue, maxValue)
case valueTypeIPv4: case valueTypeIPv4:
bm.resetBits() minValueUint32, maxValueUint32 := toUint32Range(minValue, maxValue)
matchIPv4ByRange(bs, ch, bm, minValueUint32, maxValueUint32)
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
bm.resetBits() matchTimestampISO8601ByRange(bs, ch, bm, minValue, maxValue)
default: default:
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType) logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
} }
@ -263,7 +290,7 @@ func matchUint32ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue,
bb := bbPool.Get() bb := bbPool.Get()
visitValues(bs, ch, bm, func(v string) bool { visitValues(bs, ch, bm, func(v string) bool {
if len(v) != 4 { if len(v) != 4 {
logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 4", bs.partPath(), len(v)) logger.Panicf("FATAL: %s: unexpected length for binary representation of uint32 number: got %d; want 4", bs.partPath(), len(v))
} }
n := uint64(unmarshalUint32(v)) n := uint64(unmarshalUint32(v))
return n >= minValueUint && n <= maxValueUint return n >= minValueUint && n <= maxValueUint
@ -280,7 +307,7 @@ func matchUint64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue,
bb := bbPool.Get() bb := bbPool.Get()
visitValues(bs, ch, bm, func(v string) bool { visitValues(bs, ch, bm, func(v string) bool {
if len(v) != 8 { if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 8", bs.partPath(), len(v)) logger.Panicf("FATAL: %s: unexpected length for binary representation of uint64 number: got %d; want 8", bs.partPath(), len(v))
} }
n := unmarshalUint64(v) n := unmarshalUint64(v)
return n >= minValueUint && n <= maxValueUint return n >= minValueUint && n <= maxValueUint
@ -288,31 +315,26 @@ func matchUint64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue,
bbPool.Put(bb) bbPool.Put(bb)
} }
func matchRange(s string, minValue, maxValue float64) bool { func matchTimestampISO8601ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue float64) {
f, ok := tryParseNumber(s) minValueInt, maxValueInt := toInt64Range(minValue, maxValue)
if !ok { if maxValue < 0 || minValueInt > int64(ch.maxValue) || maxValueInt < int64(ch.minValue) {
return false bm.resetBits()
return
} }
return f >= minValue && f <= maxValue bb := bbPool.Get()
visitValues(bs, ch, bm, func(v string) bool {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected length for binary representation of timestampISO8601: got %d; want 8", bs.partPath(), len(v))
}
n := unmarshalTimestampISO8601(v)
return n >= minValueInt && n <= maxValueInt
})
bbPool.Put(bb)
} }
func tryParseNumber(s string) (float64, bool) { func matchRange(s string, minValue, maxValue float64) bool {
if len(s) == 0 { f := parseMathNumber(s)
return 0, false return f >= minValue && f <= maxValue
}
f, ok := tryParseFloat64(s)
if ok {
return f, true
}
nsecs, ok := tryParseDuration(s)
if ok {
return float64(nsecs), true
}
bytes, ok := tryParseBytes(s)
if ok {
return float64(bytes), true
}
return 0, false
} }
func toUint64Range(minValue, maxValue float64) (uint64, uint64) { func toUint64Range(minValue, maxValue float64) (uint64, uint64) {
@ -330,3 +352,35 @@ func toUint64Clamp(f float64) uint64 {
} }
return uint64(f) return uint64(f)
} }
func toInt64Range(minValue, maxValue float64) (int64, int64) {
minValue = math.Ceil(minValue)
maxValue = math.Floor(maxValue)
return toInt64Clamp(minValue), toInt64Clamp(maxValue)
}
func toInt64Clamp(f float64) int64 {
if f < math.MinInt64 {
return math.MinInt64
}
if f > math.MaxInt64 {
return math.MaxInt64
}
return int64(f)
}
func toUint32Range(minValue, maxValue float64) (uint32, uint32) {
minValue = math.Ceil(minValue)
maxValue = math.Floor(maxValue)
return toUint32Clamp(minValue), toUint32Clamp(maxValue)
}
func toUint32Clamp(f float64) uint32 {
if f < 0 {
return 0
}
if f > math.MaxUint32 {
return math.MaxUint32
}
return uint32(f)
}

View file

@ -207,5 +207,5 @@ func matchUint64ByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, min
} }
func matchStringRange(s, minValue, maxValue string) bool { func matchStringRange(s, minValue, maxValue string) bool {
return s >= minValue && s < maxValue return !lessString(s, minValue) && lessString(s, maxValue)
} }

View file

@ -74,11 +74,6 @@ func (lex *lexer) isQuotedToken() bool {
return lex.token != lex.rawToken return lex.token != lex.rawToken
} }
func (lex *lexer) isNumber() bool {
s := lex.rawToken + lex.s
return isNumberPrefix(s)
}
func (lex *lexer) isPrevToken(tokens ...string) bool { func (lex *lexer) isPrevToken(tokens ...string) bool {
for _, token := range tokens { for _, token := range tokens {
if token == lex.prevToken { if token == lex.prevToken {
@ -1130,18 +1125,15 @@ func parseFilterGT(lex *lexer, fieldName string) (filter, error) {
op = ">=" op = ">="
} }
if !lex.isNumber() { lexState := lex.backupState()
lexState := lex.backupState() minValue, fStr, err := parseNumber(lex)
fr := tryParseFilterGTString(lex, fieldName, op, includeMinValue)
if fr != nil {
return fr, nil
}
lex.restoreState(lexState)
}
minValue, fStr, err := parseFloat64(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err) lex.restoreState(lexState)
fr := tryParseFilterGTString(lex, fieldName, op, includeMinValue)
if fr == nil {
return nil, fmt.Errorf("cannot parse [%s] as number: %w", fStr, err)
}
return fr, nil
} }
if !includeMinValue { if !includeMinValue {
@ -1168,16 +1160,17 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) {
op = "<=" op = "<="
} }
if !lex.isNumber() { lexState := lex.backupState()
lexState := lex.backupState() maxValue, fStr, err := parseNumber(lex)
fr := tryParseFilterLTString(lex, fieldName, op, includeMaxValue) if err != nil {
if fr != nil {
return fr, nil
}
lex.restoreState(lexState) lex.restoreState(lexState)
fr := tryParseFilterLTString(lex, fieldName, op, includeMaxValue)
if fr == nil {
return nil, fmt.Errorf("cannot parse [%s] as number: %w", fStr, err)
}
return fr, nil
} }
maxValue, fStr, err := parseFloat64(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err) return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err)
} }
@ -1250,7 +1243,7 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) {
if !lex.mustNextToken() { if !lex.mustNextToken() {
return nil, fmt.Errorf("missing args for %s()", funcName) return nil, fmt.Errorf("missing args for %s()", funcName)
} }
minValue, minValueStr, err := parseFloat64(lex) minValue, minValueStr, err := parseNumber(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse minValue in %s(): %w", funcName, err) return nil, fmt.Errorf("cannot parse minValue in %s(): %w", funcName, err)
} }
@ -1264,7 +1257,7 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) {
} }
// Parse maxValue // Parse maxValue
maxValue, maxValueStr, err := parseFloat64(lex) maxValue, maxValueStr, err := parseNumber(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse maxValue in %s(): %w", funcName, err) return nil, fmt.Errorf("cannot parse maxValue in %s(): %w", funcName, err)
} }
@ -1304,23 +1297,18 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) {
return fr, nil return fr, nil
} }
func parseFloat64(lex *lexer) (float64, string, error) { func parseNumber(lex *lexer) (float64, string, error) {
s, err := getCompoundToken(lex) s, err := getCompoundToken(lex)
if err != nil { if err != nil {
return 0, "", fmt.Errorf("cannot parse float64 from %q: %w", s, err) return 0, "", fmt.Errorf("cannot parse float64 from %q: %w", s, err)
} }
f, err := strconv.ParseFloat(s, 64)
if err == nil { f := parseMathNumber(s)
if !math.IsNaN(f) || strings.EqualFold(s, "nan") {
return f, s, nil return f, s, nil
} }
// Try parsing s as integer. return 0, "", fmt.Errorf("cannot parse %q as float64", s)
// This handles 0x..., 0b... and 0... prefixes, alongside '_' delimiters.
n, err := parseInt(s)
if err == nil {
return float64(n), s, nil
}
return 0, "", fmt.Errorf("cannot parse %q as float64: %w", s, err)
} }
func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) { func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) {
@ -1616,6 +1604,15 @@ func isNumberPrefix(s string) bool {
return false return false
} }
} }
if len(s) >= 3 && strings.EqualFold(s, "inf") {
return true
}
if s[0] == '.' {
s = s[1:]
if len(s) == 0 {
return false
}
}
return s[0] >= '0' && s[0] <= '9' return s[0] >= '0' && s[0] <= '9'
} }
@ -1713,28 +1710,6 @@ func parseUint(s string) (uint64, error) {
return uint64(nn), nil return uint64(nn), nil
} }
func parseInt(s string) (int64, error) {
switch {
case strings.EqualFold(s, "inf"), strings.EqualFold(s, "+inf"):
return math.MaxInt64, nil
case strings.EqualFold(s, "-inf"):
return math.MinInt64, nil
}
n, err := strconv.ParseInt(s, 0, 64)
if err == nil {
return n, nil
}
nn, ok := tryParseBytes(s)
if !ok {
nn, ok = tryParseDuration(s)
if !ok {
return 0, fmt.Errorf("cannot parse %q as integer: %w", s, err)
}
}
return nn, nil
}
func nextafter(f, xInf float64) float64 { func nextafter(f, xInf float64) float64 {
if math.IsInf(f, 0) { if math.IsInf(f, 0) {
return f return f

View file

@ -356,7 +356,7 @@ func TestParseFilterStringRange(t *testing.T) {
f(">foo", ``, "foo\x00", maxStringRangeValue) f(">foo", ``, "foo\x00", maxStringRangeValue)
f("x:>=foo", `x`, "foo", maxStringRangeValue) f("x:>=foo", `x`, "foo", maxStringRangeValue)
f("x:<foo", `x`, ``, `foo`) f("x:<foo", `x`, ``, `foo`)
f(`<="123"`, ``, ``, "123\x00") f(`<="123.456.789"`, ``, ``, "123.456.789\x00")
} }
func TestParseFilterRegexp(t *testing.T) { func TestParseFilterRegexp(t *testing.T) {
@ -496,7 +496,7 @@ func TestParseRangeFilter(t *testing.T) {
} }
fr, ok := q.f.(*filterRange) fr, ok := q.f.(*filterRange)
if !ok { if !ok {
t.Fatalf("unexpected filter type; got %T; want *filterIPv4Range; filter: %s", q.f, q.f) t.Fatalf("unexpected filter type; got %T; want *filterRange; filter: %s", q.f, q.f)
} }
if fr.fieldName != fieldNameExpected { if fr.fieldName != fieldNameExpected {
t.Fatalf("unexpected fieldName; got %q; want %q", fr.fieldName, fieldNameExpected) t.Fatalf("unexpected fieldName; got %q; want %q", fr.fieldName, fieldNameExpected)
@ -535,6 +535,12 @@ func TestParseRangeFilter(t *testing.T) {
f(`foo: < -10.43`, `foo`, -inf, nextafter(-10.43, -inf)) f(`foo: < -10.43`, `foo`, -inf, nextafter(-10.43, -inf))
f(`foo:<=10.43ms`, `foo`, -inf, 10_430_000) f(`foo:<=10.43ms`, `foo`, -inf, 10_430_000)
f(`foo: <= 10.43`, `foo`, -inf, 10.43) f(`foo: <= 10.43`, `foo`, -inf, 10.43)
f(`foo:<=1.2.3.4`, `foo`, -inf, 16909060)
f(`foo:<='1.2.3.4'`, `foo`, -inf, 16909060)
f(`foo:>=0xffffffff`, `foo`, (1<<32)-1, inf)
f(`foo:>=1_234e3`, `foo`, 1234000, inf)
f(`foo:>=1_234e-3`, `foo`, 1.234, inf)
} }
func TestParseQuerySuccess(t *testing.T) { func TestParseQuerySuccess(t *testing.T) {
@ -811,10 +817,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`string_range(foo, bar)`, `string_range(foo, bar)`) f(`string_range(foo, bar)`, `string_range(foo, bar)`)
f(`foo:string_range("foo, bar", baz)`, `foo:string_range("foo, bar", baz)`) f(`foo:string_range("foo, bar", baz)`, `foo:string_range("foo, bar", baz)`)
f(`foo:>bar`, `foo:>bar`) f(`foo:>bar`, `foo:>bar`)
f(`foo:>"1234"`, `foo:>"1234"`) f(`foo:>"1234"`, `foo:>1234`)
f(`>="abc"`, `>=abc`) f(`>="abc"`, `>=abc`)
f(`foo:<bar`, `foo:<bar`) f(`foo:<bar`, `foo:<bar`)
f(`foo:<"-12.34"`, `foo:<"-12.34"`) f(`foo:<"-12.34"`, `foo:<-12.34`)
f(`<="abc < de"`, `<="abc < de"`) f(`<="abc < de"`, `<="abc < de"`)
// reserved field names // reserved field names

View file

@ -178,6 +178,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err) return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err)
} }
return pp, nil return pp, nil
case lex.isKeyword("pack_logfmt"):
pp, err := parsePackLogfmt(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'pack_logfmt' pipe: %w", err)
}
return pp, nil
case lex.isKeyword("rename", "mv"): case lex.isKeyword("rename", "mv"):
pr, err := parsePipeRename(lex) pr, err := parsePipeRename(lex)
if err != nil { if err != nil {

View file

@ -2,6 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"math"
"strconv" "strconv"
"unsafe" "unsafe"
@ -185,9 +186,31 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult
if step.field != "" { if step.field != "" {
c := br.getColumnByName(step.field) c := br.getColumnByName(step.field)
v := c.getValueAtRow(br, rowIdx) v := c.getValueAtRow(br, rowIdx)
if step.fieldOpt == "q" { switch step.fieldOpt {
case "q":
b = strconv.AppendQuote(b, v) b = strconv.AppendQuote(b, v)
} else { case "time":
nsecs, ok := tryParseInt64(v)
if !ok {
b = append(b, v...)
continue
}
b = marshalTimestampRFC3339NanoString(b, nsecs)
case "duration":
nsecs, ok := tryParseInt64(v)
if !ok {
b = append(b, v...)
continue
}
b = marshalDurationString(b, nsecs)
case "ipv4":
ipNum, ok := tryParseUint64(v)
if !ok || ipNum > math.MaxUint32 {
b = append(b, v...)
continue
}
b = marshalIPv4String(b, uint32(ipNum))
default:
b = append(b, v...) b = append(b, v...)
} }
} }

View file

@ -47,6 +47,33 @@ func TestPipeFormat(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected) expectPipeResults(t, pipeStr, rows, rowsExpected)
} }
// format time, duration and ipv4
f(`format 'time=<time:foo>, duration=<duration:bar>, ip=<ipv4:baz>' as x`, [][]Field{
{
{"foo", `1717328141123456789`},
{"bar", `210123456789`},
{"baz", "1234567890"},
},
{
{"foo", `abc`},
{"bar", `de`},
{"baz", "ghkl"},
},
}, [][]Field{
{
{"foo", `1717328141123456789`},
{"bar", `210123456789`},
{"baz", "1234567890"},
{"x", "time=2024-06-02T11:35:41.123456789Z, duration=3m30.123456789s, ip=73.150.2.210"},
},
{
{"foo", `abc`},
{"bar", `de`},
{"baz", "ghkl"},
{"x", "time=abc, duration=de, ip=ghkl"},
},
})
// skip_empty_results // skip_empty_results
f(`format '<foo><bar>' as x skip_empty_results`, [][]Field{ f(`format '<foo><bar>' as x skip_empty_results`, [][]Field{
{ {

View file

@ -58,21 +58,25 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) {
} }
rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= plp.pl.limit { limit := plp.pl.limit
if rowsProcessed <= limit {
// Fast path - write all the rows to ppNext. // Fast path - write all the rows to ppNext.
plp.ppNext.writeBlock(workerID, br) plp.ppNext.writeBlock(workerID, br)
if rowsProcessed == limit {
plp.cancel()
}
return return
} }
// Slow path - overflow. Write the remaining rows if needed. // Slow path - overflow. Write the remaining rows if needed.
rowsProcessed -= uint64(len(br.timestamps)) rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= plp.pl.limit { if rowsProcessed >= limit {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return return
} }
// Write remaining rows. // Write remaining rows.
keepRows := plp.pl.limit - rowsProcessed keepRows := limit - rowsProcessed
br.truncateRows(int(keepRows)) br.truncateRows(int(keepRows))
plp.ppNext.writeBlock(workerID, br) plp.ppNext.writeBlock(workerID, br)

View file

@ -161,6 +161,18 @@ var mathBinaryOps = map[string]mathBinaryOp{
priority: 3, priority: 3,
f: mathFuncMinus, f: mathFuncMinus,
}, },
"&": {
priority: 4,
f: mathFuncAnd,
},
"xor": {
priority: 5,
f: mathFuncXor,
},
"|": {
priority: 6,
f: mathFuncOr,
},
"default": { "default": {
priority: 10, priority: 10,
f: mathFuncDefault, f: mathFuncDefault,
@ -294,11 +306,7 @@ func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult)
var f float64 var f float64
for i, v := range values { for i, v := range values {
if i == 0 || v != values[i-1] { if i == 0 || v != values[i-1] {
var ok bool f = parseMathNumber(v)
f, ok = tryParseFloat64(v)
if !ok {
f = nan
}
} }
r[i] = f r[i] = f
} }
@ -489,13 +497,17 @@ func parseMathExprOperand(lex *lexer) (*mathExpr, error) {
return parseMathExprMin(lex) return parseMathExprMin(lex)
case lex.isKeyword("round"): case lex.isKeyword("round"):
return parseMathExprRound(lex) return parseMathExprRound(lex)
case lex.isKeyword("ceil"):
return parseMathExprCeil(lex)
case lex.isKeyword("floor"):
return parseMathExprFloor(lex)
case lex.isKeyword("-"): case lex.isKeyword("-"):
return parseMathExprUnaryMinus(lex) return parseMathExprUnaryMinus(lex)
case lex.isKeyword("+"): case lex.isKeyword("+"):
// just skip unary plus // just skip unary plus
lex.nextToken() lex.nextToken()
return parseMathExprOperand(lex) return parseMathExprOperand(lex)
case lex.isNumber(): case isNumberPrefix(lex.token):
return parseMathExprConstNumber(lex) return parseMathExprConstNumber(lex)
default: default:
return parseMathExprFieldName(lex) return parseMathExprFieldName(lex)
@ -568,6 +580,28 @@ func parseMathExprRound(lex *lexer) (*mathExpr, error) {
return me, nil return me, nil
} }
func parseMathExprCeil(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "ceil", mathFuncCeil)
if err != nil {
return nil, err
}
if len(me.args) != 1 {
return nil, fmt.Errorf("'ceil' function needs one arg; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprFloor(lex *lexer) (*mathExpr, error) {
me, err := parseMathExprGenericFunc(lex, "floor", mathFuncFloor)
if err != nil {
return nil, err
}
if len(me.args) != 1 {
return nil, fmt.Errorf("'floor' function needs one arg; got %d args: [%s]", len(me.args), me)
}
return me, nil
}
func parseMathExprGenericFunc(lex *lexer, funcName string, f mathFunc) (*mathExpr, error) { func parseMathExprGenericFunc(lex *lexer, funcName string, f mathFunc) (*mathExpr, error) {
if !lex.isKeyword(funcName) { if !lex.isKeyword(funcName) {
return nil, fmt.Errorf("missing %q keyword", funcName) return nil, fmt.Errorf("missing %q keyword", funcName)
@ -637,15 +671,15 @@ func parseMathExprUnaryMinus(lex *lexer) (*mathExpr, error) {
} }
func parseMathExprConstNumber(lex *lexer) (*mathExpr, error) { func parseMathExprConstNumber(lex *lexer) (*mathExpr, error) {
if !lex.isNumber() { if !isNumberPrefix(lex.token) {
return nil, fmt.Errorf("cannot parse number from %q", lex.token) return nil, fmt.Errorf("cannot parse number from %q", lex.token)
} }
numStr, err := getCompoundMathToken(lex) numStr, err := getCompoundMathToken(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse number: %w", err) return nil, fmt.Errorf("cannot parse number: %w", err)
} }
f, ok := tryParseNumber(numStr) f := parseMathNumber(numStr)
if !ok { if math.IsNaN(f) {
return nil, fmt.Errorf("cannot parse number from %q", numStr) return nil, fmt.Errorf("cannot parse number from %q", numStr)
} }
me := &mathExpr{ me := &mathExpr{
@ -688,6 +722,42 @@ func getCompoundMathToken(lex *lexer) (string, error) {
return rawS + suffix, nil return rawS + suffix, nil
} }
func mathFuncAnd(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
if math.IsNaN(a[i]) || math.IsNaN(b[i]) {
result[i] = nan
} else {
result[i] = float64(uint64(a[i]) & uint64(b[i]))
}
}
}
func mathFuncOr(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
if math.IsNaN(a[i]) || math.IsNaN(b[i]) {
result[i] = nan
} else {
result[i] = float64(uint64(a[i]) | uint64(b[i]))
}
}
}
func mathFuncXor(result []float64, args [][]float64) {
a := args[0]
b := args[1]
for i := range result {
if math.IsNaN(a[i]) || math.IsNaN(b[i]) {
result[i] = nan
} else {
result[i] = float64(uint64(a[i]) ^ uint64(b[i]))
}
}
}
func mathFuncPlus(result []float64, args [][]float64) { func mathFuncPlus(result []float64, args [][]float64) {
a := args[0] a := args[0]
b := args[1] b := args[1]
@ -800,6 +870,20 @@ func mathFuncMin(result []float64, args [][]float64) {
} }
} }
func mathFuncCeil(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = math.Ceil(arg[i])
}
}
func mathFuncFloor(result []float64, args [][]float64) {
arg := args[0]
for i := range result {
result[i] = math.Floor(arg[i])
}
}
func mathFuncRound(result []float64, args [][]float64) { func mathFuncRound(result []float64, args [][]float64) {
arg := args[0] arg := args[0]
if len(args) == 1 { if len(args) == 1 {
@ -829,3 +913,19 @@ func round(f, nearest float64) float64 {
f, _ = math.Modf(f * p10) f, _ = math.Modf(f * p10)
return f / p10 return f / p10
} }
func parseMathNumber(s string) float64 {
f, ok := tryParseNumber(s)
if ok {
return f
}
nsecs, ok := tryParseTimestampRFC3339Nano(s)
if ok {
return float64(nsecs)
}
ipNum, ok := tryParseIPv4(s)
if ok {
return float64(ipNum)
}
return nan
}

View file

@ -50,6 +50,33 @@ func TestPipeMath(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected) expectPipeResults(t, pipeStr, rows, rowsExpected)
} }
f(`math
'2024-05-30T01:02:03Z' + 10e9 as time,
10m5s + 10e9 as duration,
'123.45.67.89' + 1000 as ip,
time - time % time_step as time_rounded,
duration - duration % duration_step as duration_rounded,
(ip & ip_mask | 0x1234) xor 5678 as subnet
`, [][]Field{
{
{"time_step", "30m"},
{"duration_step", "30s"},
{"ip_mask", "0xffffff00"},
},
}, [][]Field{
{
{"time_step", "30m"},
{"duration_step", "30s"},
{"ip_mask", "0xffffff00"},
{"time", "1717030933000000000"},
{"duration", "615000000000"},
{"ip", "2066564929"},
{"time_rounded", "1717030800000000000"},
{"duration_rounded", "600000000000"},
{"subnet", "2066563354"},
},
})
f("math b+1 as a, a*2 as b, b-10.5+c as c", [][]Field{ f("math b+1 as a, a*2 as b, b-10.5+c as c", [][]Field{
{ {
{"a", "v1"}, {"a", "v1"},
@ -108,7 +135,7 @@ func TestPipeMath(t *testing.T) {
}, },
}) })
f("math round(exp(a), 0.01), round(ln(a), 0.01)", [][]Field{ f("math round(exp(a), 0.01), round(ln(a), 0.01), ceil(exp(a)), floor(exp(a))", [][]Field{
{ {
{"a", "v1"}, {"a", "v1"},
}, },
@ -129,26 +156,36 @@ func TestPipeMath(t *testing.T) {
{"a", "v1"}, {"a", "v1"},
{"round(exp(a), 0.01)", "NaN"}, {"round(exp(a), 0.01)", "NaN"},
{"round(ln(a), 0.01)", "NaN"}, {"round(ln(a), 0.01)", "NaN"},
{"ceil(exp(a))", "NaN"},
{"floor(exp(a))", "NaN"},
}, },
{ {
{"a", "0"}, {"a", "0"},
{"round(exp(a), 0.01)", "1"}, {"round(exp(a), 0.01)", "1"},
{"round(ln(a), 0.01)", "NaN"}, {"round(ln(a), 0.01)", "NaN"},
{"ceil(exp(a))", "1"},
{"floor(exp(a))", "1"},
}, },
{ {
{"a", "1"}, {"a", "1"},
{"round(exp(a), 0.01)", "2.72"}, {"round(exp(a), 0.01)", "2.72"},
{"round(ln(a), 0.01)", "0"}, {"round(ln(a), 0.01)", "0"},
{"ceil(exp(a))", "3"},
{"floor(exp(a))", "2"},
}, },
{ {
{"a", "2"}, {"a", "2"},
{"round(exp(a), 0.01)", "7.39"}, {"round(exp(a), 0.01)", "7.39"},
{"round(ln(a), 0.01)", "0.69"}, {"round(ln(a), 0.01)", "0.69"},
{"ceil(exp(a))", "8"},
{"floor(exp(a))", "7"},
}, },
{ {
{"a", "3"}, {"a", "3"},
{"round(exp(a), 0.01)", "20.09"}, {"round(exp(a), 0.01)", "20.09"},
{"round(ln(a), 0.01)", "1.1"}, {"round(ln(a), 0.01)", "1.1"},
{"ceil(exp(a))", "21"},
{"floor(exp(a))", "20"},
}, },
}) })

114
lib/logstorage/pipe_pack.go Normal file
View file

@ -0,0 +1,114 @@
package logstorage
import (
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
func updateNeededFieldsForPipePack(neededFields, unneededFields fieldsSet, resultField string, fields []string) {
if neededFields.contains("*") {
if !unneededFields.contains(resultField) {
if len(fields) > 0 {
unneededFields.removeFields(fields)
} else {
unneededFields.reset()
}
}
} else {
if neededFields.contains(resultField) {
neededFields.remove(resultField)
if len(fields) > 0 {
neededFields.addFields(fields)
} else {
neededFields.add("*")
}
}
}
}
func newPipePackProcessor(workersCount int, ppNext pipeProcessor, resultField string, fields []string, marshalFields func(dst []byte, fields []Field) []byte) pipeProcessor {
return &pipePackProcessor{
ppNext: ppNext,
resultField: resultField,
fields: fields,
marshalFields: marshalFields,
shards: make([]pipePackProcessorShard, workersCount),
}
}
type pipePackProcessor struct {
ppNext pipeProcessor
resultField string
fields []string
marshalFields func(dst []byte, fields []Field) []byte
shards []pipePackProcessorShard
}
type pipePackProcessorShard struct {
pipePackProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipePackProcessorShardNopad{})%128]byte
}
type pipePackProcessorShardNopad struct {
rc resultColumn
buf []byte
fields []Field
cs []*blockResultColumn
}
func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &ppp.shards[workerID]
shard.rc.name = ppp.resultField
cs := shard.cs[:0]
if len(ppp.fields) == 0 {
csAll := br.getColumns()
cs = append(cs, csAll...)
} else {
for _, f := range ppp.fields {
c := br.getColumnByName(f)
cs = append(cs, c)
}
}
shard.cs = cs
buf := shard.buf[:0]
fields := shard.fields
for rowIdx := range br.timestamps {
fields = fields[:0]
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: c.name,
Value: v,
})
}
bufLen := len(buf)
buf = ppp.marshalFields(buf, fields)
v := bytesutil.ToUnsafeString(buf[bufLen:])
shard.rc.addValue(v)
}
shard.fields = fields
br.addResultColumn(&shard.rc)
ppp.ppNext.writeBlock(workerID, br)
shard.rc.reset()
}
func (ppp *pipePackProcessor) flush() error {
return nil
}

View file

@ -3,9 +3,6 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices" "slices"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
) )
// pipePackJSON processes '| pack_json ...' pipe. // pipePackJSON processes '| pack_json ...' pipe.
@ -29,23 +26,7 @@ func (pp *pipePackJSON) String() string {
} }
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") { updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields)
if !unneededFields.contains(pp.resultField) {
if len(pp.fields) > 0 {
unneededFields.removeFields(pp.fields)
} else {
unneededFields.reset()
}
}
} else {
if neededFields.contains(pp.resultField) {
if len(pp.fields) > 0 {
neededFields.addFields(pp.fields)
} else {
neededFields.add("*")
}
}
}
} }
func (pp *pipePackJSON) optimize() { func (pp *pipePackJSON) optimize() {
@ -61,85 +42,7 @@ func (pp *pipePackJSON) initFilterInValues(_ map[string][]string, _ getFieldValu
} }
func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipePackJSONProcessor{ return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToJSON)
pp: pp,
ppNext: ppNext,
shards: make([]pipePackJSONProcessorShard, workersCount),
}
}
type pipePackJSONProcessor struct {
pp *pipePackJSON
ppNext pipeProcessor
shards []pipePackJSONProcessorShard
}
type pipePackJSONProcessorShard struct {
pipePackJSONProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipePackJSONProcessorShardNopad{})%128]byte
}
type pipePackJSONProcessorShardNopad struct {
rc resultColumn
buf []byte
fields []Field
cs []*blockResultColumn
}
func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &ppp.shards[workerID]
shard.rc.name = ppp.pp.resultField
cs := shard.cs[:0]
if len(ppp.pp.fields) == 0 {
csAll := br.getColumns()
cs = append(cs, csAll...)
} else {
for _, f := range ppp.pp.fields {
c := br.getColumnByName(f)
cs = append(cs, c)
}
}
shard.cs = cs
buf := shard.buf[:0]
fields := shard.fields
for rowIdx := range br.timestamps {
fields = fields[:0]
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: c.name,
Value: v,
})
}
bufLen := len(buf)
buf = MarshalFieldsToJSON(buf, fields)
v := bytesutil.ToUnsafeString(buf[bufLen:])
shard.rc.addValue(v)
}
shard.fields = fields
br.addResultColumn(&shard.rc)
ppp.ppNext.writeBlock(workerID, br)
shard.rc.reset()
}
func (ppp *pipePackJSONProcessor) flush() error {
return nil
} }
func parsePackJSON(lex *lexer) (*pipePackJSON, error) { func parsePackJSON(lex *lexer) (*pipePackJSON, error) {

View file

@ -0,0 +1,86 @@
package logstorage
import (
"fmt"
"slices"
)
// pipePackLogfmt processes '| pack_logfmt ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe
type pipePackLogfmt struct {
resultField string
fields []string
}
func (pp *pipePackLogfmt) String() string {
s := "pack_logfmt"
if len(pp.fields) > 0 {
s += " fields (" + fieldsToString(pp.fields) + ")"
}
if !isMsgFieldName(pp.resultField) {
s += " as " + quoteTokenIfNeeded(pp.resultField)
}
return s
}
func (pp *pipePackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields)
}
func (pp *pipePackLogfmt) optimize() {
// nothing to do
}
func (pp *pipePackLogfmt) hasFilterInWithQuery() bool {
return false
}
func (pp *pipePackLogfmt) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
return pp, nil
}
func (pp *pipePackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToLogfmt)
}
func parsePackLogfmt(lex *lexer) (*pipePackLogfmt, error) {
if !lex.isKeyword("pack_logfmt") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "pack_logfmt")
}
lex.nextToken()
var fields []string
if lex.isKeyword("fields") {
lex.nextToken()
fs, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse fields: %w", err)
}
if slices.Contains(fs, "*") {
fs = nil
}
fields = fs
}
// parse optional 'as ...` part
resultField := "_msg"
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_logfmt': %w", err)
}
resultField = field
}
pp := &pipePackLogfmt{
resultField: resultField,
fields: fields,
}
return pp, nil
}

View file

@ -0,0 +1,133 @@
package logstorage
import (
"testing"
)
func TestParsePipePackLogfmtSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`pack_logfmt`)
f(`pack_logfmt as x`)
f(`pack_logfmt fields (a, b)`)
f(`pack_logfmt fields (a, b) as x`)
}
func TestParsePipePackLogfmtFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`pack_logfmt foo bar`)
f(`pack_logfmt fields`)
}
func TestPipePackLogfmt(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// pack into _msg
f(`pack_logfmt`, [][]Field{
{
{"_msg", "x"},
{"foo", `abc`},
{"bar", `cde=ab`},
},
{
{"a", "b"},
{"c", "d"},
},
}, [][]Field{
{
{"_msg", `_msg=x foo=abc bar="cde=ab"`},
{"foo", `abc`},
{"bar", `cde=ab`},
},
{
{"_msg", `a=b c=d`},
{"a", "b"},
{"c", "d"},
},
})
// pack into other field
f(`pack_logfmt as a`, [][]Field{
{
{"_msg", "x"},
{"foo", `abc`},
{"bar", `cde`},
},
{
{"a", "b"},
{"c", "d"},
},
}, [][]Field{
{
{"_msg", `x`},
{"foo", `abc`},
{"bar", `cde`},
{"a", `_msg=x foo=abc bar=cde`},
},
{
{"a", `a=b c=d`},
{"c", "d"},
},
})
// pack only the needed fields
f(`pack_logfmt fields (foo, baz) a`, [][]Field{
{
{"_msg", "x"},
{"foo", `abc`},
{"bar", `cde`},
},
{
{"a", "b"},
{"c", "d"},
},
}, [][]Field{
{
{"_msg", `x`},
{"foo", `abc`},
{"bar", `cde`},
{"a", `foo=abc baz=`},
},
{
{"a", `foo= baz=`},
{"c", "d"},
},
})
}
func TestPipePackLogfmtUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f(`pack_logfmt as x`, "*", "", "*", "")
f(`pack_logfmt fields (a,b) as x`, "*", "", "*", "")
// unneeded fields do not intersect with output
f(`pack_logfmt as x`, "*", "f1,f2", "*", "")
f(`pack_logfmt fields(f1,f3) as x`, "*", "f1,f2", "*", "f2")
// unneeded fields intersect with output
f(`pack_logfmt as f1`, "*", "f1,f2", "*", "f1,f2")
f(`pack_logfmt fields (f2,f3) as f1`, "*", "f1,f2", "*", "f1,f2")
// needed fields do not intersect with output
f(`pack_logfmt f1`, "x,y", "", "x,y", "")
f(`pack_logfmt fields (x,z) f1`, "x,y", "", "x,y", "")
// needed fields intersect with output
f(`pack_logfmt as f2`, "f2,y", "", "*", "")
f(`pack_logfmt fields (x,y) as f2`, "f2,y", "", "x,y", "")
}

View file

@ -62,6 +62,7 @@ func TestPipeUnpackSyslog(t *testing.T) {
{"priority", "165"}, {"priority", "165"},
{"facility", "20"}, {"facility", "20"},
{"severity", "5"}, {"severity", "5"},
{"format", "rfc5424"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"}, {"hostname", "mymachine.example.com"},
{"app_name", "appname"}, {"app_name", "appname"},
@ -86,6 +87,7 @@ func TestPipeUnpackSyslog(t *testing.T) {
{"priority", "165"}, {"priority", "165"},
{"facility", "20"}, {"facility", "20"},
{"severity", "5"}, {"severity", "5"},
{"format", "rfc5424"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"}, {"hostname", "mymachine.example.com"},
{"app_name", "foobar"}, {"app_name", "foobar"},
@ -106,6 +108,7 @@ func TestPipeUnpackSyslog(t *testing.T) {
{"priority", "165"}, {"priority", "165"},
{"facility", "20"}, {"facility", "20"},
{"severity", "5"}, {"severity", "5"},
{"format", "rfc5424"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"}, {"hostname", "mymachine.example.com"},
{"app_name", "appname"}, {"app_name", "appname"},
@ -137,6 +140,7 @@ func TestPipeUnpackSyslog(t *testing.T) {
{"priority", "165"}, {"priority", "165"},
{"facility", "20"}, {"facility", "20"},
{"severity", "5"}, {"severity", "5"},
{"format", "rfc5424"},
{"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"},
{"hostname", "mymachine.example.com"}, {"hostname", "mymachine.example.com"},
{"app_name", "appname"}, {"app_name", "appname"},
@ -183,6 +187,7 @@ func TestPipeUnpackSyslog(t *testing.T) {
{"qwe_priority", "165"}, {"qwe_priority", "165"},
{"qwe_facility", "20"}, {"qwe_facility", "20"},
{"qwe_severity", "5"}, {"qwe_severity", "5"},
{"qwe_format", "rfc5424"},
{"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"}, {"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"},
{"qwe_hostname", "mymachine.example.com"}, {"qwe_hostname", "mymachine.example.com"},
{"qwe_app_name", "appname"}, {"qwe_app_name", "appname"},
@ -196,6 +201,7 @@ func TestPipeUnpackSyslog(t *testing.T) {
{"qwe_priority", "163"}, {"qwe_priority", "163"},
{"qwe_facility", "20"}, {"qwe_facility", "20"},
{"qwe_severity", "3"}, {"qwe_severity", "3"},
{"qwe_format", "rfc5424"},
{"qwe_timestamp", "2024-12-13T18:21:43Z"}, {"qwe_timestamp", "2024-12-13T18:21:43Z"},
{"qwe_hostname", "mymachine.example.com"}, {"qwe_hostname", "mymachine.example.com"},
{"qwe_app_name", "appname2"}, {"qwe_app_name", "appname2"},

View file

@ -64,7 +64,27 @@ func (f *Field) marshalToJSON(dst []byte) []byte {
return dst return dst
} }
// MarshalFieldsToJSON appends JSON-marshaled fields to dt and returns the result. func (f *Field) marshalToLogfmt(dst []byte) []byte {
dst = append(dst, f.Name...)
dst = append(dst, '=')
if needLogfmtQuoting(f.Value) {
dst = strconv.AppendQuote(dst, f.Value)
} else {
dst = append(dst, f.Value...)
}
return dst
}
func needLogfmtQuoting(s string) bool {
for _, c := range s {
if !isTokenRune(c) {
return true
}
}
return false
}
// MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result.
func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
dst = append(dst, '{') dst = append(dst, '{')
if len(fields) > 0 { if len(fields) > 0 {
@ -79,6 +99,20 @@ func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
return dst return dst
} }
// MarshalFieldsToLogfmt appends logfmt-marshaled fields to dst and returns the result.
func MarshalFieldsToLogfmt(dst []byte, fields []Field) []byte {
if len(fields) == 0 {
return dst
}
dst = fields[0].marshalToLogfmt(dst)
fields = fields[1:]
for i := range fields {
dst = append(dst, ' ')
dst = fields[i].marshalToLogfmt(dst)
}
return dst
}
func appendFields(a *arena, dst, src []Field) []Field { func appendFields(a *arena, dst, src []Field) []Field {
for _, f := range src { for _, f := range src {
dst = append(dst, Field{ dst = append(dst, Field{

View file

@ -110,6 +110,8 @@ func (p *syslogParser) parseNoHeader(s string) {
func (p *syslogParser) parseRFC5424(s string) { func (p *syslogParser) parseRFC5424(s string) {
// See https://datatracker.ietf.org/doc/html/rfc5424 // See https://datatracker.ietf.org/doc/html/rfc5424
p.addField("format", "rfc5424")
if len(s) == 0 { if len(s) == 0 {
return return
} }
@ -242,6 +244,9 @@ func (p *syslogParser) parseRFC3164(s string) {
if len(s) < n { if len(s) < n {
return return
} }
p.addField("format", "rfc3164")
t, err := time.Parse(time.Stamp, s[:n]) t, err := time.Parse(time.Stamp, s[:n])
if err != nil { if err != nil {
// TODO: fall back to parsing ISO8601 timestamp? // TODO: fall back to parsing ISO8601 timestamp?

View file

@ -21,47 +21,47 @@ func TestSyslogParser(t *testing.T) {
// RFC 3164 // RFC 3164
f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
`{"priority":"165","facility":"20","severity":"5","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...",
`{"timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`)
f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...",
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`)
f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", f("Jun 3 12:08:33 - - Starting Update the local ESM caches...",
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`)
// RFC 5424 // RFC 5424
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
`{"timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) `{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`)
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`)
// Incomplete RFC 3164 // Incomplete RFC 3164
f("", `{}`) f("", `{}`)
f("Jun 3 12:08:33", `{"timestamp":"2024-06-03T12:08:33.000Z"}`) f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`)
f("Jun 3 12:08:33 abcd", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`)
f("Jun 3 12:08:33 abcd sudo", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`)
f("Jun 3 12:08:33 abcd sudo[123]", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`)
f("Jun 3 12:08:33 abcd sudo foobar", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`)
// Incomplete RFC 5424 // Incomplete RFC 5424
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`)
f(`<165>1 2023-06-03T17:42:32.123456789Z`, f(`<165>1 2023-06-03T17:42:32.123456789Z`,
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`)
f(`<165>1 `, f(`<165>1 `,
`{"priority":"165","facility":"20","severity":"5"}`) `{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`)
} }