From 43cf2216811c24d008f9c417339d14513c93c734 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 5 Jun 2024 03:18:12 +0200 Subject: [PATCH] lib/logstorage: work-in-progress --- app/vlselect/logsql/logsql.go | 28 ++-- app/vmui/Dockerfile-web | 2 +- deployment/docker/Makefile | 2 +- .../docker/docker-compose-victorialogs.yml | 2 +- .../filebeat-docker/docker-compose.yml | 2 +- .../filebeat-syslog/docker-compose.yml | 2 +- .../fluentbit-docker/docker-compose.yml | 2 +- .../victorialogs/logstash/docker-compose.yml | 2 +- .../victorialogs/promtail/docker-compose.yml | 2 +- .../vector-docker/docker-compose.yml | 2 +- .../logs-benchmark/docker-compose-elk.yml | 2 +- .../logs-benchmark/docker-compose-loki.yml | 2 +- deployment/logs-benchmark/docker-compose.yml | 2 +- docs/VictoriaLogs/CHANGELOG.md | 14 ++ docs/VictoriaLogs/LogsQL.md | 75 +++++++++- docs/VictoriaLogs/QuickStart.md | 6 +- docs/VictoriaLogs/logsql-examples.md | 25 +++- go.mod | 2 +- lib/logstorage/block_result.go | 46 ++++++ lib/logstorage/filter_range.go | 110 +++++++++++---- lib/logstorage/filter_string_range.go | 2 +- lib/logstorage/parser.go | 89 +++++------- lib/logstorage/parser_test.go | 14 +- lib/logstorage/pipe.go | 6 + lib/logstorage/pipe_format.go | 27 +++- lib/logstorage/pipe_format_test.go | 27 ++++ lib/logstorage/pipe_limit.go | 10 +- lib/logstorage/pipe_math.go | 118 ++++++++++++++-- lib/logstorage/pipe_math_test.go | 39 ++++- lib/logstorage/pipe_pack.go | 114 +++++++++++++++ lib/logstorage/pipe_pack_json.go | 101 +------------ lib/logstorage/pipe_pack_logfmt.go | 86 +++++++++++ lib/logstorage/pipe_pack_logfmt_test.go | 133 ++++++++++++++++++ lib/logstorage/pipe_unpack_syslog_test.go | 6 + lib/logstorage/rows.go | 36 ++++- lib/logstorage/syslog_parser.go | 5 + lib/logstorage/syslog_parser_test.go | 42 +++--- 37 files changed, 923 insertions(+), 262 deletions(-) create mode 100644 lib/logstorage/pipe_pack.go create mode 100644 lib/logstorage/pipe_pack_logfmt.go create mode 100644 lib/logstorage/pipe_pack_logfmt_test.go diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index f74c63735..c2c4e8472 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -355,19 +355,20 @@ type row struct { } func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { - q.AddPipeLimit(uint64(limit + 1)) + limitUpper := 2 * limit + q.AddPipeLimit(uint64(limitUpper)) q.Optimize() - rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1) + rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) if err != nil { return nil, err } - if len(rows) <= limit { - // Fast path - the requested time range contains up to limit rows. - sortRowsByTime(rows) + if len(rows) < limitUpper { + // Fast path - the requested time range contains up to limitUpper rows. + rows = getLastNRows(rows, limit) return rows, nil } - // Slow path - search for the time range with the requested limit rows. + // Slow path - search for the time range containing up to limitUpper rows. start, end := q.GetFilterTimeRange() d := end/2 - start/2 start += d @@ -376,16 +377,13 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, for { q = qOrig.Clone() q.AddTimeFilter(start, end) - rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1) + rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limitUpper) if err != nil { return nil, err } - if len(rows) == limit || len(rows) > limit && d < 10e6 || d == 0 { - sortRowsByTime(rows) - if len(rows) > limit { - rows = rows[len(rows)-limit:] - } + if len(rows) >= limit && len(rows) < limitUpper || d == 0 { + rows = getLastNRows(rows, limit) return rows, nil } @@ -399,10 +397,14 @@ func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, } } -func sortRowsByTime(rows []row) { +func getLastNRows(rows []row, limit int) []row { sort.Slice(rows, func(i, j int) bool { return rows[i].timestamp < rows[j].timestamp }) + if len(rows) > limit { + rows = rows[len(rows)-limit:] + } + return rows } func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { diff --git a/app/vmui/Dockerfile-web b/app/vmui/Dockerfile-web index 69e692f5f..52dfe7a7e 100644 --- a/app/vmui/Dockerfile-web +++ b/app/vmui/Dockerfile-web @@ -1,4 +1,4 @@ -FROM golang:1.22.3 as build-web-stage +FROM golang:1.22.4 as build-web-stage COPY build /build WORKDIR /build diff --git a/deployment/docker/Makefile b/deployment/docker/Makefile index de92c780a..6f961a9d1 100644 --- a/deployment/docker/Makefile +++ b/deployment/docker/Makefile @@ -6,7 +6,7 @@ ROOT_IMAGE ?= alpine:3.20.0 ROOT_IMAGE_SCRATCH ?= scratch CERTS_IMAGE := alpine:3.20.0 -GO_BUILDER_IMAGE := golang:1.22.3-alpine +GO_BUILDER_IMAGE := golang:1.22.4-alpine BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1 BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __) DOCKER ?= docker diff --git a/deployment/docker/docker-compose-victorialogs.yml b/deployment/docker/docker-compose-victorialogs.yml index 9dbef797b..d6c0b8464 100644 --- a/deployment/docker/docker-compose-victorialogs.yml +++ b/deployment/docker/docker-compose-victorialogs.yml @@ -43,7 +43,7 @@ services: # storing logs and serving read queries. victorialogs: container_name: victorialogs - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs command: - "--storageDataPath=/vlogs" - "--httpListenAddr=:9428" diff --git a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml index 979ec9dd7..c90a0e049 100644 --- a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: - -beat.uri=http://filebeat-victorialogs:5066 victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs volumes: - victorialogs-filebeat-docker-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml index cbae6b505..d4347e9c4 100644 --- a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml @@ -13,7 +13,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs volumes: - victorialogs-filebeat-syslog-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml index 23c967339..7eb36a44c 100644 --- a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml @@ -11,7 +11,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs volumes: - victorialogs-fluentbit-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/logstash/docker-compose.yml b/deployment/docker/victorialogs/logstash/docker-compose.yml index db46a319d..73a0249c4 100644 --- a/deployment/docker/victorialogs/logstash/docker-compose.yml +++ b/deployment/docker/victorialogs/logstash/docker-compose.yml @@ -14,7 +14,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs volumes: - victorialogs-logstash-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/promtail/docker-compose.yml b/deployment/docker/victorialogs/promtail/docker-compose.yml index 8a628a83c..017691c81 100644 --- a/deployment/docker/victorialogs/promtail/docker-compose.yml +++ b/deployment/docker/victorialogs/promtail/docker-compose.yml @@ -12,7 +12,7 @@ services: - "5140:5140" vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs volumes: - victorialogs-promtail-docker:/vlogs ports: diff --git a/deployment/docker/victorialogs/vector-docker/docker-compose.yml b/deployment/docker/victorialogs/vector-docker/docker-compose.yml index 50ac8e646..c2c7be67b 100644 --- a/deployment/docker/victorialogs/vector-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/vector-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: condition: service_healthy victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs volumes: - victorialogs-vector-docker-vl:/vlogs ports: diff --git a/deployment/logs-benchmark/docker-compose-elk.yml b/deployment/logs-benchmark/docker-compose-elk.yml index 40b02e50e..2748e1a85 100644 --- a/deployment/logs-benchmark/docker-compose-elk.yml +++ b/deployment/logs-benchmark/docker-compose-elk.yml @@ -18,7 +18,7 @@ services: - vlogs generator: - image: golang:1.22.3-alpine + image: golang:1.22.4-alpine restart: always working_dir: /go/src/app volumes: diff --git a/deployment/logs-benchmark/docker-compose-loki.yml b/deployment/logs-benchmark/docker-compose-loki.yml index c20a8125c..40e4a38e5 100644 --- a/deployment/logs-benchmark/docker-compose-loki.yml +++ b/deployment/logs-benchmark/docker-compose-loki.yml @@ -2,7 +2,7 @@ version: '3' services: generator: - image: golang:1.22.3-alpine + image: golang:1.22.4-alpine restart: always working_dir: /go/src/app volumes: diff --git a/deployment/logs-benchmark/docker-compose.yml b/deployment/logs-benchmark/docker-compose.yml index 9fd53f607..55baca381 100644 --- a/deployment/logs-benchmark/docker-compose.yml +++ b/deployment/logs-benchmark/docker-compose.yml @@ -3,7 +3,7 @@ version: '3' services: # Run `make package-victoria-logs` to build victoria-logs image vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs volumes: - vlogs:/vlogs ports: diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 51ef8d8cc..a94a2ec18 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,20 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +## [v0.17.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.17.0-victorialogs) + +Released at 2024-06-05 + +* FEATURE: add [`pack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe) for formatting [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) messages. +* FEATURE: allow using IPv4 addresses in [range comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filter). For example, `ip:>'12.34.56.78'` is valid filter now. +* FEATURE: add `ceil()` and `floor()` functions to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). +* FEATURE: add support for bitwise `and`, `or` and `xor` operations at [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). +* FEATURE: add support for automatic conversion of [RFC3339 time](https://www.rfc-editor.org/rfc/rfc3339) and IPv4 addresses into numeric representation at [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). +* FEATURE: add ability to format numeric fields into string representation of time, duration and IPv4 with [`format` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe). +* FEATURE: set `format` field to `rfc3164` or `rfc5424` depending on the [Syslog format](https://en.wikipedia.org/wiki/Syslog) parsed via [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe). + +* BUGFIX: always respect the limit set in [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe). Previously the limit could be exceeded in some cases. + ## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs) Released at 2024-06-04 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 67315b07a..9532bb94d 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -584,7 +584,7 @@ See also: ### Range comparison filter LogsQL supports `field:>X`, `field:>=X`, `field:10KiB @@ -1167,6 +1167,7 @@ LogsQL supports the following pipes: - [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`offset`](#offset-pipe) skips the given number of selected logs. - [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object. +- [`pack_logfmt`](#pack_logfmt-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. @@ -1554,6 +1555,18 @@ and stores it into `my_json` output field: _time:5m | format '{"_msg":,"stacktrace":}' as my_json ``` +Numeric fields can be transformed into the following string representation at `format` pipe: + +- [RFC3339 time](https://www.rfc-editor.org/rfc/rfc3339) - by adding `time:` in front of the corresponding field name + containing [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) in nanoseconds. + For example, `format "time="`. The timestamp can be converted into nanoseconds with the [`math` pipe](#math-pipe). + +- Human-readable duration - by adding `duration:` in front of the corresponding numeric field name containing duration in nanoseconds. + For example, `format "duration="`. The duration can be converted into nanoseconds with the [`math` pipe](#math-pipe). + +- IPv4 - by adding `ipv4:` in front of the corresponding field name containing `uint32` representation of the IPv4 address. + For example, `format "ip="`. + Add `keep_original_fields` to the end of `format ... as result_field` when the original non-empty value of the `result_field` must be preserved instead of overwriting it with the `format` results. For example, the following query adds formatted result to `foo` field only if it was missing or empty: @@ -1645,9 +1658,14 @@ The following mathematical operations are supported by `math` pipe: - `arg1 / arg2` - divides `arg1` by `arg2` - `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2` - `arg1 ^ arg2` - returns the power of `arg1` by `arg2` +- `arg1 & arg2` - returns bitwise `and` for `arg1` and `arg2`. It is expected that `arg1` and `arg2` are in the range `[0 .. 2^53-1]` +- `arg1 | arg2` - returns bitwise `or` for `arg1` and `arg2`. It is expected that `arg1` and `arg2` are in the range `[0 .. 2^53-1]` +- `arg1 xor arg2` - returns bitwise `xor` for `arg1` and `arg2`. It is expected that `arg1` and `arg2` are in the range `[0 .. 2^53-1]` - `arg1 default arg2` - returns `arg2` if `arg1` is non-[numeric](#numeric-values) or equals to `NaN` - `abs(arg)` - returns an absolute value for the given `arg` -- `exp(arg)` - powers [`e`](https://en.wikipedia.org/wiki/E_(mathematical_constant)) by `arg`. +- `ceil(arg)` - returns the least integer value greater than or equal to `arg` +- `exp(arg)` - powers [`e`](https://en.wikipedia.org/wiki/E_(mathematical_constant)) by `arg` +- `floor(arg)` - returns the greatest integer values less than or equal to `arg` - `ln(arg)` - returns [natural logarithm](https://en.wikipedia.org/wiki/Natural_logarithm) for the given `arg` - `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN` - `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN` @@ -1657,9 +1675,11 @@ The following mathematical operations are supported by `math` pipe: Every `argX` argument in every mathematical operation can contain one of the following values: - The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`. - If the log field contains value, which cannot be parsed into [supported numeric value](#numeric-values), then it is replaced with `NaN`. -- Any [supported numeric value](#numeric-values). For example, `response_size_bytes / 1MiB`. -- Another mathematical expression. Optionally, it may be put inside `(...)`. For example, `(a + b) * c`. + The log field is parsed into numeric value if it contains [supported numeric value](#numeric-values). The log field is parsed into [Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) + in nanoseconds if it contains [rfc3339 time](https://www.rfc-editor.org/rfc/rfc3339). The log field is parsed into `uint32` number if it contains IPv4 address. + The log field is parsed into `NaN` in other cases. +- Any [supported numeric value](#numeric-values), [rfc3339 time](https://www.rfc-editor.org/rfc/rfc3339) or IPv4 address. For example, `1MiB`, `"2024-05-15T10:20:30.934324Z"` or `"12.34.56.78"`. +- Another mathematical expression, which can be put inside `(...)`. For example, `(a + b) * c`. See also: @@ -1721,9 +1741,48 @@ _time:5m | pack_json as foo | fields foo See also: +- [`pack_logfmt` pipe](#pack_logfmt-pipe) - [`unpack_json` pipe](#unpack_json-pipe) +### pack_logfmt pipe + +`| pack_logfmt as field_name` [pipe](#pipe) packs all [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message +and stores its as a string in the given `field_name`. + +For example, the following query packs all the fields into [logfmt](https://brandur.org/logfmt) message and stores it +into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) for logs over the last 5 minutes: + +```logsql +_time:5m | pack_logfmt as _msg +``` + +The `as _msg` part can be omitted if packed message is stored into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +The following query is equivalent to the previous one: + +```logsql +_time:5m | pack_logfmt +``` + +If only a subset of labels must be packed into [logfmt](https://brandur.org/logfmt), then it must be listed inside `fields (...)` after `pack_logfmt`. +For example, the following query builds [logfmt](https://brandur.org/logfmt) message with `foo` and `bar` fields only and stores the result in `baz` field: + +```logsql +_time:5m | pack_logfmt fields (foo, bar) as baz +``` + +The `pack_logfmt` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_logfmt` pipe. For example, the following query +leaves only the `foo` label with the original log fields packed into [logfmt](https://brandur.org/logfmt): + +```logsql +_time:5m | pack_logfmt as foo | fields foo +``` + +See also: + +- [`pack_json` pipe](#pack_json-pipe) +- [`unpack_logfmt` pipe](#unpack_logfmt-pipe) + ### rename pipe If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be renamed, then `| rename src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. @@ -2200,6 +2259,7 @@ See also: - [`extract` pipe](#extract-pipe) - [`unroll` pipe](#unroll-pipe) - [`pack_json` pipe](#pack_json-pipe) +- [`pack_logfmt` pipe](#pack_logfmt-pipe) #### Conditional unpack_json @@ -2301,13 +2361,14 @@ _time:5m | unpack_logfmt if (ip:"") from foo from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats: - [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE` -- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` +- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` The following fields are unpacked: - `priority` - it is obtained from `PRI`. - `facility` - it is calculated as `PRI / 8`. - `severity` - it is calculated as `PRI % 8`. +- `format` - either `rfc3164` or `rfc5424` depending on which Syslog format is unpacked. - `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months. - `hostname` @@ -2316,6 +2377,8 @@ The following fields are unpacked: - `msg_id` - `message` +The `` part is optional. If it is missing, then `priority`, `facility` and `severity` fields aren't set. + The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe). diff --git a/docs/VictoriaLogs/QuickStart.md b/docs/VictoriaLogs/QuickStart.md index 85ad0a8b8..3cdec37e3 100644 --- a/docs/VictoriaLogs/QuickStart.md +++ b/docs/VictoriaLogs/QuickStart.md @@ -34,8 +34,8 @@ Just download archive for the needed Operating system and architecture, unpack i For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: ```sh -curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.16.0-victorialogs/victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz -tar xzf victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz +curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.17.0-victorialogs/victoria-logs-linux-amd64-v0.17.0-victorialogs.tar.gz +tar xzf victoria-logs-linux-amd64-v0.17.0-victorialogs.tar.gz ./victoria-logs-prod ``` @@ -59,7 +59,7 @@ Here is the command to run VictoriaLogs in a Docker container: ```sh docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ - docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs + docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs ``` See also: diff --git a/docs/VictoriaLogs/logsql-examples.md b/docs/VictoriaLogs/logsql-examples.md index 29bba01d5..f17b5d6fe 100644 --- a/docs/VictoriaLogs/logsql-examples.md +++ b/docs/VictoriaLogs/logsql-examples.md @@ -36,7 +36,8 @@ _time:5m | sort by (_time desc) | limit 10 See also: -- [How to count the number of matching logs](#how-to-count-the-number-of-matching-logs) +- [How to count the number of matching logs?](#how-to-count-the-number-of-matching-logs) +- [How to return last N logs for the given query?](#how-to-return-last-n-logs-for-the-given-query) ## How to select logs with the given word in log message? @@ -398,3 +399,25 @@ can be passed to it in order to return up to `N` latest log entries. For example ```sh curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10' ``` + +See also: + +- [How to select recently ingested logs?](#how-to-select-recently-ingested-logs) +- [How to return last N logs for the given query?](#how-to-return-last-n-logs-for-the-given-query) + + +## How to calculate the share of error logs to the total number of logs? + +Use the following query: + +```logsql +_time:5m | stats count() logs, count() if (error) errors | math errors / logs +``` + +This query uses the following [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) features: + +- [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) for selecting logs on the given time range (last 5 minutes in the query above). +- [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) with [additional filtering](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters) + for calculating the total number of logs and the number of logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) on the selected time range. +- [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for calculating the share of logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) + comparing to the total number of logs. diff --git a/go.mod b/go.mod index 927e0156f..40d4ff04b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/VictoriaMetrics/VictoriaMetrics -go 1.22.3 +go 1.22.4 require ( cloud.google.com/go/storage v1.41.0 diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index a4858f3ba..033dcac48 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -3,6 +3,8 @@ package logstorage import ( "math" "slices" + "strconv" + "strings" "sync/atomic" "time" "unsafe" @@ -1916,5 +1918,49 @@ func getCanonicalColumnName(columnName string) string { return columnName } +func tryParseNumber(s string) (float64, bool) { + if len(s) == 0 { + return 0, false + } + f, ok := tryParseFloat64(s) + if ok { + return f, true + } + nsecs, ok := tryParseDuration(s) + if ok { + return float64(nsecs), true + } + bytes, ok := tryParseBytes(s) + if ok { + return float64(bytes), true + } + if isLikelyNumber(s) { + f, err := strconv.ParseFloat(s, 64) + if err == nil { + return f, true + } + n, err := strconv.ParseInt(s, 0, 64) + if err == nil { + return float64(n), true + } + } + return 0, false +} + +func isLikelyNumber(s string) bool { + if !isNumberPrefix(s) { + return false + } + if strings.Count(s, ".") > 1 { + // This is likely IP address + return false + } + if strings.IndexByte(s, ':') >= 0 || strings.Count(s, "-") > 2 { + // This is likely a timestamp + return false + } + return true +} + var nan = math.NaN() var inf = math.Inf(1) diff --git a/lib/logstorage/filter_range.go b/lib/logstorage/filter_range.go index 3ca6f0125..3ad8233fb 100644 --- a/lib/logstorage/filter_range.go +++ b/lib/logstorage/filter_range.go @@ -44,7 +44,11 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - bm.resetBits() + minValueInt, maxValueInt := toInt64Range(minValue, maxValue) + bm.forEachSetBit(func(idx int) bool { + timestamp := br.timestamps[idx] + return timestamp >= minValueInt && timestamp <= maxValueInt + }) return } @@ -129,8 +133,30 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) { f := unmarshalFloat64(v) return f >= minValue && f <= maxValue }) + case valueTypeIPv4: + minValueUint32, maxValueUint32 := toUint32Range(minValue, maxValue) + if maxValue < 0 || uint64(minValueUint32) > c.maxValue || uint64(maxValueUint32) < c.minValue { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + n := unmarshalIPv4(v) + return n >= minValueUint32 && n <= maxValueUint32 + }) case valueTypeTimestampISO8601: - bm.resetBits() + minValueInt, maxValueInt := toInt64Range(minValue, maxValue) + if maxValue < 0 || minValueInt > int64(c.maxValue) || maxValueInt < int64(c.minValue) { + bm.resetBits() + return + } + valuesEncoded := c.getValuesEncoded(br) + bm.forEachSetBit(func(idx int) bool { + v := valuesEncoded[idx] + n := unmarshalTimestampISO8601(v) + return n >= minValueInt && n <= maxValueInt + }) default: logger.Panicf("FATAL: unknown valueType=%d", c.valueType) } @@ -178,9 +204,10 @@ func (fr *filterRange) applyToBlockSearch(bs *blockSearch, bm *bitmap) { case valueTypeFloat64: matchFloat64ByRange(bs, ch, bm, minValue, maxValue) case valueTypeIPv4: - bm.resetBits() + minValueUint32, maxValueUint32 := toUint32Range(minValue, maxValue) + matchIPv4ByRange(bs, ch, bm, minValueUint32, maxValueUint32) case valueTypeTimestampISO8601: - bm.resetBits() + matchTimestampISO8601ByRange(bs, ch, bm, minValue, maxValue) default: logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType) } @@ -263,7 +290,7 @@ func matchUint32ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { if len(v) != 4 { - logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 4", bs.partPath(), len(v)) + logger.Panicf("FATAL: %s: unexpected length for binary representation of uint32 number: got %d; want 4", bs.partPath(), len(v)) } n := uint64(unmarshalUint32(v)) return n >= minValueUint && n <= maxValueUint @@ -280,7 +307,7 @@ func matchUint64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, bb := bbPool.Get() visitValues(bs, ch, bm, func(v string) bool { if len(v) != 8 { - logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 8", bs.partPath(), len(v)) + logger.Panicf("FATAL: %s: unexpected length for binary representation of uint64 number: got %d; want 8", bs.partPath(), len(v)) } n := unmarshalUint64(v) return n >= minValueUint && n <= maxValueUint @@ -288,31 +315,26 @@ func matchUint64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, bbPool.Put(bb) } -func matchRange(s string, minValue, maxValue float64) bool { - f, ok := tryParseNumber(s) - if !ok { - return false +func matchTimestampISO8601ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue float64) { + minValueInt, maxValueInt := toInt64Range(minValue, maxValue) + if maxValue < 0 || minValueInt > int64(ch.maxValue) || maxValueInt < int64(ch.minValue) { + bm.resetBits() + return } - return f >= minValue && f <= maxValue + bb := bbPool.Get() + visitValues(bs, ch, bm, func(v string) bool { + if len(v) != 8 { + logger.Panicf("FATAL: %s: unexpected length for binary representation of timestampISO8601: got %d; want 8", bs.partPath(), len(v)) + } + n := unmarshalTimestampISO8601(v) + return n >= minValueInt && n <= maxValueInt + }) + bbPool.Put(bb) } -func tryParseNumber(s string) (float64, bool) { - if len(s) == 0 { - return 0, false - } - f, ok := tryParseFloat64(s) - if ok { - return f, true - } - nsecs, ok := tryParseDuration(s) - if ok { - return float64(nsecs), true - } - bytes, ok := tryParseBytes(s) - if ok { - return float64(bytes), true - } - return 0, false +func matchRange(s string, minValue, maxValue float64) bool { + f := parseMathNumber(s) + return f >= minValue && f <= maxValue } func toUint64Range(minValue, maxValue float64) (uint64, uint64) { @@ -330,3 +352,35 @@ func toUint64Clamp(f float64) uint64 { } return uint64(f) } + +func toInt64Range(minValue, maxValue float64) (int64, int64) { + minValue = math.Ceil(minValue) + maxValue = math.Floor(maxValue) + return toInt64Clamp(minValue), toInt64Clamp(maxValue) +} + +func toInt64Clamp(f float64) int64 { + if f < math.MinInt64 { + return math.MinInt64 + } + if f > math.MaxInt64 { + return math.MaxInt64 + } + return int64(f) +} + +func toUint32Range(minValue, maxValue float64) (uint32, uint32) { + minValue = math.Ceil(minValue) + maxValue = math.Floor(maxValue) + return toUint32Clamp(minValue), toUint32Clamp(maxValue) +} + +func toUint32Clamp(f float64) uint32 { + if f < 0 { + return 0 + } + if f > math.MaxUint32 { + return math.MaxUint32 + } + return uint32(f) +} diff --git a/lib/logstorage/filter_string_range.go b/lib/logstorage/filter_string_range.go index 095159715..f7518c723 100644 --- a/lib/logstorage/filter_string_range.go +++ b/lib/logstorage/filter_string_range.go @@ -207,5 +207,5 @@ func matchUint64ByStringRange(bs *blockSearch, ch *columnHeader, bm *bitmap, min } func matchStringRange(s, minValue, maxValue string) bool { - return s >= minValue && s < maxValue + return !lessString(s, minValue) && lessString(s, maxValue) } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 2f58a9111..090294507 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -74,11 +74,6 @@ func (lex *lexer) isQuotedToken() bool { return lex.token != lex.rawToken } -func (lex *lexer) isNumber() bool { - s := lex.rawToken + lex.s - return isNumberPrefix(s) -} - func (lex *lexer) isPrevToken(tokens ...string) bool { for _, token := range tokens { if token == lex.prevToken { @@ -1130,18 +1125,15 @@ func parseFilterGT(lex *lexer, fieldName string) (filter, error) { op = ">=" } - if !lex.isNumber() { - lexState := lex.backupState() - fr := tryParseFilterGTString(lex, fieldName, op, includeMinValue) - if fr != nil { - return fr, nil - } - lex.restoreState(lexState) - } - - minValue, fStr, err := parseFloat64(lex) + lexState := lex.backupState() + minValue, fStr, err := parseNumber(lex) if err != nil { - return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err) + lex.restoreState(lexState) + fr := tryParseFilterGTString(lex, fieldName, op, includeMinValue) + if fr == nil { + return nil, fmt.Errorf("cannot parse [%s] as number: %w", fStr, err) + } + return fr, nil } if !includeMinValue { @@ -1168,16 +1160,17 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) { op = "<=" } - if !lex.isNumber() { - lexState := lex.backupState() - fr := tryParseFilterLTString(lex, fieldName, op, includeMaxValue) - if fr != nil { - return fr, nil - } + lexState := lex.backupState() + maxValue, fStr, err := parseNumber(lex) + if err != nil { lex.restoreState(lexState) + fr := tryParseFilterLTString(lex, fieldName, op, includeMaxValue) + if fr == nil { + return nil, fmt.Errorf("cannot parse [%s] as number: %w", fStr, err) + } + return fr, nil } - maxValue, fStr, err := parseFloat64(lex) if err != nil { return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err) } @@ -1250,7 +1243,7 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) { if !lex.mustNextToken() { return nil, fmt.Errorf("missing args for %s()", funcName) } - minValue, minValueStr, err := parseFloat64(lex) + minValue, minValueStr, err := parseNumber(lex) if err != nil { return nil, fmt.Errorf("cannot parse minValue in %s(): %w", funcName, err) } @@ -1264,7 +1257,7 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) { } // Parse maxValue - maxValue, maxValueStr, err := parseFloat64(lex) + maxValue, maxValueStr, err := parseNumber(lex) if err != nil { return nil, fmt.Errorf("cannot parse maxValue in %s(): %w", funcName, err) } @@ -1304,23 +1297,18 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) { return fr, nil } -func parseFloat64(lex *lexer) (float64, string, error) { +func parseNumber(lex *lexer) (float64, string, error) { s, err := getCompoundToken(lex) if err != nil { return 0, "", fmt.Errorf("cannot parse float64 from %q: %w", s, err) } - f, err := strconv.ParseFloat(s, 64) - if err == nil { + + f := parseMathNumber(s) + if !math.IsNaN(f) || strings.EqualFold(s, "nan") { return f, s, nil } - // Try parsing s as integer. - // This handles 0x..., 0b... and 0... prefixes, alongside '_' delimiters. - n, err := parseInt(s) - if err == nil { - return float64(n), s, nil - } - return 0, "", fmt.Errorf("cannot parse %q as float64: %w", s, err) + return 0, "", fmt.Errorf("cannot parse %q as float64", s) } func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) { @@ -1616,6 +1604,15 @@ func isNumberPrefix(s string) bool { return false } } + if len(s) >= 3 && strings.EqualFold(s, "inf") { + return true + } + if s[0] == '.' { + s = s[1:] + if len(s) == 0 { + return false + } + } return s[0] >= '0' && s[0] <= '9' } @@ -1713,28 +1710,6 @@ func parseUint(s string) (uint64, error) { return uint64(nn), nil } -func parseInt(s string) (int64, error) { - switch { - case strings.EqualFold(s, "inf"), strings.EqualFold(s, "+inf"): - return math.MaxInt64, nil - case strings.EqualFold(s, "-inf"): - return math.MinInt64, nil - } - - n, err := strconv.ParseInt(s, 0, 64) - if err == nil { - return n, nil - } - nn, ok := tryParseBytes(s) - if !ok { - nn, ok = tryParseDuration(s) - if !ok { - return 0, fmt.Errorf("cannot parse %q as integer: %w", s, err) - } - } - return nn, nil -} - func nextafter(f, xInf float64) float64 { if math.IsInf(f, 0) { return f diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 78ee07268..ff603be33 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -356,7 +356,7 @@ func TestParseFilterStringRange(t *testing.T) { f(">foo", ``, "foo\x00", maxStringRangeValue) f("x:>=foo", `x`, "foo", maxStringRangeValue) f("x:=0xffffffff`, `foo`, (1<<32)-1, inf) + f(`foo:>=1_234e3`, `foo`, 1234000, inf) + f(`foo:>=1_234e-3`, `foo`, 1.234, inf) } func TestParseQuerySuccess(t *testing.T) { @@ -811,10 +817,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`string_range(foo, bar)`, `string_range(foo, bar)`) f(`foo:string_range("foo, bar", baz)`, `foo:string_range("foo, bar", baz)`) f(`foo:>bar`, `foo:>bar`) - f(`foo:>"1234"`, `foo:>"1234"`) + f(`foo:>"1234"`, `foo:>1234`) f(`>="abc"`, `>=abc`) f(`foo: math.MaxUint32 { + b = append(b, v...) + continue + } + b = marshalIPv4String(b, uint32(ipNum)) + default: b = append(b, v...) } } diff --git a/lib/logstorage/pipe_format_test.go b/lib/logstorage/pipe_format_test.go index 8f74bd3e9..449139080 100644 --- a/lib/logstorage/pipe_format_test.go +++ b/lib/logstorage/pipe_format_test.go @@ -47,6 +47,33 @@ func TestPipeFormat(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + // format time, duration and ipv4 + f(`format 'time=, duration=, ip=' as x`, [][]Field{ + { + {"foo", `1717328141123456789`}, + {"bar", `210123456789`}, + {"baz", "1234567890"}, + }, + { + {"foo", `abc`}, + {"bar", `de`}, + {"baz", "ghkl"}, + }, + }, [][]Field{ + { + {"foo", `1717328141123456789`}, + {"bar", `210123456789`}, + {"baz", "1234567890"}, + {"x", "time=2024-06-02T11:35:41.123456789Z, duration=3m30.123456789s, ip=73.150.2.210"}, + }, + { + {"foo", `abc`}, + {"bar", `de`}, + {"baz", "ghkl"}, + {"x", "time=abc, duration=de, ip=ghkl"}, + }, + }) + // skip_empty_results f(`format '' as x skip_empty_results`, [][]Field{ { diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 41f98fded..c45df3325 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -58,21 +58,25 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { } rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= plp.pl.limit { + limit := plp.pl.limit + if rowsProcessed <= limit { // Fast path - write all the rows to ppNext. plp.ppNext.writeBlock(workerID, br) + if rowsProcessed == limit { + plp.cancel() + } return } // Slow path - overflow. Write the remaining rows if needed. rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= plp.pl.limit { + if rowsProcessed >= limit { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return } // Write remaining rows. - keepRows := plp.pl.limit - rowsProcessed + keepRows := limit - rowsProcessed br.truncateRows(int(keepRows)) plp.ppNext.writeBlock(workerID, br) diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go index 11bd5d5cd..1138a38ea 100644 --- a/lib/logstorage/pipe_math.go +++ b/lib/logstorage/pipe_math.go @@ -161,6 +161,18 @@ var mathBinaryOps = map[string]mathBinaryOp{ priority: 3, f: mathFuncMinus, }, + "&": { + priority: 4, + f: mathFuncAnd, + }, + "xor": { + priority: 5, + f: mathFuncXor, + }, + "|": { + priority: 6, + f: mathFuncOr, + }, "default": { priority: 10, f: mathFuncDefault, @@ -294,11 +306,7 @@ func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) var f float64 for i, v := range values { if i == 0 || v != values[i-1] { - var ok bool - f, ok = tryParseFloat64(v) - if !ok { - f = nan - } + f = parseMathNumber(v) } r[i] = f } @@ -489,13 +497,17 @@ func parseMathExprOperand(lex *lexer) (*mathExpr, error) { return parseMathExprMin(lex) case lex.isKeyword("round"): return parseMathExprRound(lex) + case lex.isKeyword("ceil"): + return parseMathExprCeil(lex) + case lex.isKeyword("floor"): + return parseMathExprFloor(lex) case lex.isKeyword("-"): return parseMathExprUnaryMinus(lex) case lex.isKeyword("+"): // just skip unary plus lex.nextToken() return parseMathExprOperand(lex) - case lex.isNumber(): + case isNumberPrefix(lex.token): return parseMathExprConstNumber(lex) default: return parseMathExprFieldName(lex) @@ -568,6 +580,28 @@ func parseMathExprRound(lex *lexer) (*mathExpr, error) { return me, nil } +func parseMathExprCeil(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "ceil", mathFuncCeil) + if err != nil { + return nil, err + } + if len(me.args) != 1 { + return nil, fmt.Errorf("'ceil' function needs one arg; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + +func parseMathExprFloor(lex *lexer) (*mathExpr, error) { + me, err := parseMathExprGenericFunc(lex, "floor", mathFuncFloor) + if err != nil { + return nil, err + } + if len(me.args) != 1 { + return nil, fmt.Errorf("'floor' function needs one arg; got %d args: [%s]", len(me.args), me) + } + return me, nil +} + func parseMathExprGenericFunc(lex *lexer, funcName string, f mathFunc) (*mathExpr, error) { if !lex.isKeyword(funcName) { return nil, fmt.Errorf("missing %q keyword", funcName) @@ -637,15 +671,15 @@ func parseMathExprUnaryMinus(lex *lexer) (*mathExpr, error) { } func parseMathExprConstNumber(lex *lexer) (*mathExpr, error) { - if !lex.isNumber() { + if !isNumberPrefix(lex.token) { return nil, fmt.Errorf("cannot parse number from %q", lex.token) } numStr, err := getCompoundMathToken(lex) if err != nil { return nil, fmt.Errorf("cannot parse number: %w", err) } - f, ok := tryParseNumber(numStr) - if !ok { + f := parseMathNumber(numStr) + if math.IsNaN(f) { return nil, fmt.Errorf("cannot parse number from %q", numStr) } me := &mathExpr{ @@ -688,6 +722,42 @@ func getCompoundMathToken(lex *lexer) (string, error) { return rawS + suffix, nil } +func mathFuncAnd(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + if math.IsNaN(a[i]) || math.IsNaN(b[i]) { + result[i] = nan + } else { + result[i] = float64(uint64(a[i]) & uint64(b[i])) + } + } +} + +func mathFuncOr(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + if math.IsNaN(a[i]) || math.IsNaN(b[i]) { + result[i] = nan + } else { + result[i] = float64(uint64(a[i]) | uint64(b[i])) + } + } +} + +func mathFuncXor(result []float64, args [][]float64) { + a := args[0] + b := args[1] + for i := range result { + if math.IsNaN(a[i]) || math.IsNaN(b[i]) { + result[i] = nan + } else { + result[i] = float64(uint64(a[i]) ^ uint64(b[i])) + } + } +} + func mathFuncPlus(result []float64, args [][]float64) { a := args[0] b := args[1] @@ -800,6 +870,20 @@ func mathFuncMin(result []float64, args [][]float64) { } } +func mathFuncCeil(result []float64, args [][]float64) { + arg := args[0] + for i := range result { + result[i] = math.Ceil(arg[i]) + } +} + +func mathFuncFloor(result []float64, args [][]float64) { + arg := args[0] + for i := range result { + result[i] = math.Floor(arg[i]) + } +} + func mathFuncRound(result []float64, args [][]float64) { arg := args[0] if len(args) == 1 { @@ -829,3 +913,19 @@ func round(f, nearest float64) float64 { f, _ = math.Modf(f * p10) return f / p10 } + +func parseMathNumber(s string) float64 { + f, ok := tryParseNumber(s) + if ok { + return f + } + nsecs, ok := tryParseTimestampRFC3339Nano(s) + if ok { + return float64(nsecs) + } + ipNum, ok := tryParseIPv4(s) + if ok { + return float64(ipNum) + } + return nan +} diff --git a/lib/logstorage/pipe_math_test.go b/lib/logstorage/pipe_math_test.go index 7795263ac..67d446cc0 100644 --- a/lib/logstorage/pipe_math_test.go +++ b/lib/logstorage/pipe_math_test.go @@ -50,6 +50,33 @@ func TestPipeMath(t *testing.T) { expectPipeResults(t, pipeStr, rows, rowsExpected) } + f(`math + '2024-05-30T01:02:03Z' + 10e9 as time, + 10m5s + 10e9 as duration, + '123.45.67.89' + 1000 as ip, + time - time % time_step as time_rounded, + duration - duration % duration_step as duration_rounded, + (ip & ip_mask | 0x1234) xor 5678 as subnet + `, [][]Field{ + { + {"time_step", "30m"}, + {"duration_step", "30s"}, + {"ip_mask", "0xffffff00"}, + }, + }, [][]Field{ + { + {"time_step", "30m"}, + {"duration_step", "30s"}, + {"ip_mask", "0xffffff00"}, + {"time", "1717030933000000000"}, + {"duration", "615000000000"}, + {"ip", "2066564929"}, + {"time_rounded", "1717030800000000000"}, + {"duration_rounded", "600000000000"}, + {"subnet", "2066563354"}, + }, + }) + f("math b+1 as a, a*2 as b, b-10.5+c as c", [][]Field{ { {"a", "v1"}, @@ -108,7 +135,7 @@ func TestPipeMath(t *testing.T) { }, }) - f("math round(exp(a), 0.01), round(ln(a), 0.01)", [][]Field{ + f("math round(exp(a), 0.01), round(ln(a), 0.01), ceil(exp(a)), floor(exp(a))", [][]Field{ { {"a", "v1"}, }, @@ -129,26 +156,36 @@ func TestPipeMath(t *testing.T) { {"a", "v1"}, {"round(exp(a), 0.01)", "NaN"}, {"round(ln(a), 0.01)", "NaN"}, + {"ceil(exp(a))", "NaN"}, + {"floor(exp(a))", "NaN"}, }, { {"a", "0"}, {"round(exp(a), 0.01)", "1"}, {"round(ln(a), 0.01)", "NaN"}, + {"ceil(exp(a))", "1"}, + {"floor(exp(a))", "1"}, }, { {"a", "1"}, {"round(exp(a), 0.01)", "2.72"}, {"round(ln(a), 0.01)", "0"}, + {"ceil(exp(a))", "3"}, + {"floor(exp(a))", "2"}, }, { {"a", "2"}, {"round(exp(a), 0.01)", "7.39"}, {"round(ln(a), 0.01)", "0.69"}, + {"ceil(exp(a))", "8"}, + {"floor(exp(a))", "7"}, }, { {"a", "3"}, {"round(exp(a), 0.01)", "20.09"}, {"round(ln(a), 0.01)", "1.1"}, + {"ceil(exp(a))", "21"}, + {"floor(exp(a))", "20"}, }, }) diff --git a/lib/logstorage/pipe_pack.go b/lib/logstorage/pipe_pack.go new file mode 100644 index 000000000..e911121a2 --- /dev/null +++ b/lib/logstorage/pipe_pack.go @@ -0,0 +1,114 @@ +package logstorage + +import ( + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +func updateNeededFieldsForPipePack(neededFields, unneededFields fieldsSet, resultField string, fields []string) { + if neededFields.contains("*") { + if !unneededFields.contains(resultField) { + if len(fields) > 0 { + unneededFields.removeFields(fields) + } else { + unneededFields.reset() + } + } + } else { + if neededFields.contains(resultField) { + neededFields.remove(resultField) + if len(fields) > 0 { + neededFields.addFields(fields) + } else { + neededFields.add("*") + } + } + } +} + +func newPipePackProcessor(workersCount int, ppNext pipeProcessor, resultField string, fields []string, marshalFields func(dst []byte, fields []Field) []byte) pipeProcessor { + return &pipePackProcessor{ + ppNext: ppNext, + resultField: resultField, + fields: fields, + marshalFields: marshalFields, + + shards: make([]pipePackProcessorShard, workersCount), + } +} + +type pipePackProcessor struct { + ppNext pipeProcessor + resultField string + fields []string + marshalFields func(dst []byte, fields []Field) []byte + + shards []pipePackProcessorShard +} + +type pipePackProcessorShard struct { + pipePackProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipePackProcessorShardNopad{})%128]byte +} + +type pipePackProcessorShardNopad struct { + rc resultColumn + + buf []byte + fields []Field + + cs []*blockResultColumn +} + +func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &ppp.shards[workerID] + + shard.rc.name = ppp.resultField + + cs := shard.cs[:0] + if len(ppp.fields) == 0 { + csAll := br.getColumns() + cs = append(cs, csAll...) + } else { + for _, f := range ppp.fields { + c := br.getColumnByName(f) + cs = append(cs, c) + } + } + shard.cs = cs + + buf := shard.buf[:0] + fields := shard.fields + for rowIdx := range br.timestamps { + fields = fields[:0] + for _, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: c.name, + Value: v, + }) + } + + bufLen := len(buf) + buf = ppp.marshalFields(buf, fields) + v := bytesutil.ToUnsafeString(buf[bufLen:]) + shard.rc.addValue(v) + } + shard.fields = fields + + br.addResultColumn(&shard.rc) + ppp.ppNext.writeBlock(workerID, br) + + shard.rc.reset() +} + +func (ppp *pipePackProcessor) flush() error { + return nil +} diff --git a/lib/logstorage/pipe_pack_json.go b/lib/logstorage/pipe_pack_json.go index 0a1686f04..502e51cb8 100644 --- a/lib/logstorage/pipe_pack_json.go +++ b/lib/logstorage/pipe_pack_json.go @@ -3,9 +3,6 @@ package logstorage import ( "fmt" "slices" - "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipePackJSON processes '| pack_json ...' pipe. @@ -29,23 +26,7 @@ func (pp *pipePackJSON) String() string { } func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { - if neededFields.contains("*") { - if !unneededFields.contains(pp.resultField) { - if len(pp.fields) > 0 { - unneededFields.removeFields(pp.fields) - } else { - unneededFields.reset() - } - } - } else { - if neededFields.contains(pp.resultField) { - if len(pp.fields) > 0 { - neededFields.addFields(pp.fields) - } else { - neededFields.add("*") - } - } - } + updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields) } func (pp *pipePackJSON) optimize() { @@ -61,85 +42,7 @@ func (pp *pipePackJSON) initFilterInValues(_ map[string][]string, _ getFieldValu } func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { - return &pipePackJSONProcessor{ - pp: pp, - ppNext: ppNext, - - shards: make([]pipePackJSONProcessorShard, workersCount), - } -} - -type pipePackJSONProcessor struct { - pp *pipePackJSON - ppNext pipeProcessor - - shards []pipePackJSONProcessorShard -} - -type pipePackJSONProcessorShard struct { - pipePackJSONProcessorShardNopad - - // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(pipePackJSONProcessorShardNopad{})%128]byte -} - -type pipePackJSONProcessorShardNopad struct { - rc resultColumn - - buf []byte - fields []Field - - cs []*blockResultColumn -} - -func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { - return - } - - shard := &ppp.shards[workerID] - - shard.rc.name = ppp.pp.resultField - - cs := shard.cs[:0] - if len(ppp.pp.fields) == 0 { - csAll := br.getColumns() - cs = append(cs, csAll...) - } else { - for _, f := range ppp.pp.fields { - c := br.getColumnByName(f) - cs = append(cs, c) - } - } - shard.cs = cs - - buf := shard.buf[:0] - fields := shard.fields - for rowIdx := range br.timestamps { - fields = fields[:0] - for _, c := range cs { - v := c.getValueAtRow(br, rowIdx) - fields = append(fields, Field{ - Name: c.name, - Value: v, - }) - } - - bufLen := len(buf) - buf = MarshalFieldsToJSON(buf, fields) - v := bytesutil.ToUnsafeString(buf[bufLen:]) - shard.rc.addValue(v) - } - shard.fields = fields - - br.addResultColumn(&shard.rc) - ppp.ppNext.writeBlock(workerID, br) - - shard.rc.reset() -} - -func (ppp *pipePackJSONProcessor) flush() error { - return nil + return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToJSON) } func parsePackJSON(lex *lexer) (*pipePackJSON, error) { diff --git a/lib/logstorage/pipe_pack_logfmt.go b/lib/logstorage/pipe_pack_logfmt.go new file mode 100644 index 000000000..082ead30d --- /dev/null +++ b/lib/logstorage/pipe_pack_logfmt.go @@ -0,0 +1,86 @@ +package logstorage + +import ( + "fmt" + "slices" +) + +// pipePackLogfmt processes '| pack_logfmt ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe +type pipePackLogfmt struct { + resultField string + + fields []string +} + +func (pp *pipePackLogfmt) String() string { + s := "pack_logfmt" + if len(pp.fields) > 0 { + s += " fields (" + fieldsToString(pp.fields) + ")" + } + if !isMsgFieldName(pp.resultField) { + s += " as " + quoteTokenIfNeeded(pp.resultField) + } + return s +} + +func (pp *pipePackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) { + updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields) +} + +func (pp *pipePackLogfmt) optimize() { + // nothing to do +} + +func (pp *pipePackLogfmt) hasFilterInWithQuery() bool { + return false +} + +func (pp *pipePackLogfmt) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pp, nil +} + +func (pp *pipePackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToLogfmt) +} + +func parsePackLogfmt(lex *lexer) (*pipePackLogfmt, error) { + if !lex.isKeyword("pack_logfmt") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "pack_logfmt") + } + lex.nextToken() + + var fields []string + if lex.isKeyword("fields") { + lex.nextToken() + fs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse fields: %w", err) + } + if slices.Contains(fs, "*") { + fs = nil + } + fields = fs + } + + // parse optional 'as ...` part + resultField := "_msg" + if lex.isKeyword("as") { + lex.nextToken() + } + if !lex.isKeyword("|", ")", "") { + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result field for 'pack_logfmt': %w", err) + } + resultField = field + } + + pp := &pipePackLogfmt{ + resultField: resultField, + fields: fields, + } + + return pp, nil +} diff --git a/lib/logstorage/pipe_pack_logfmt_test.go b/lib/logstorage/pipe_pack_logfmt_test.go new file mode 100644 index 000000000..57c9503c8 --- /dev/null +++ b/lib/logstorage/pipe_pack_logfmt_test.go @@ -0,0 +1,133 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipePackLogfmtSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`pack_logfmt`) + f(`pack_logfmt as x`) + f(`pack_logfmt fields (a, b)`) + f(`pack_logfmt fields (a, b) as x`) +} + +func TestParsePipePackLogfmtFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`pack_logfmt foo bar`) + f(`pack_logfmt fields`) +} + +func TestPipePackLogfmt(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // pack into _msg + f(`pack_logfmt`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde=ab`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `_msg=x foo=abc bar="cde=ab"`}, + {"foo", `abc`}, + {"bar", `cde=ab`}, + }, + { + {"_msg", `a=b c=d`}, + {"a", "b"}, + {"c", "d"}, + }, + }) + + // pack into other field + f(`pack_logfmt as a`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `x`}, + {"foo", `abc`}, + {"bar", `cde`}, + {"a", `_msg=x foo=abc bar=cde`}, + }, + { + {"a", `a=b c=d`}, + {"c", "d"}, + }, + }) + + // pack only the needed fields + f(`pack_logfmt fields (foo, baz) a`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `x`}, + {"foo", `abc`}, + {"bar", `cde`}, + {"a", `foo=abc baz=`}, + }, + { + {"a", `foo= baz=`}, + {"c", "d"}, + }, + }) +} + +func TestPipePackLogfmtUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`pack_logfmt as x`, "*", "", "*", "") + f(`pack_logfmt fields (a,b) as x`, "*", "", "*", "") + + // unneeded fields do not intersect with output + f(`pack_logfmt as x`, "*", "f1,f2", "*", "") + f(`pack_logfmt fields(f1,f3) as x`, "*", "f1,f2", "*", "f2") + + // unneeded fields intersect with output + f(`pack_logfmt as f1`, "*", "f1,f2", "*", "f1,f2") + f(`pack_logfmt fields (f2,f3) as f1`, "*", "f1,f2", "*", "f1,f2") + + // needed fields do not intersect with output + f(`pack_logfmt f1`, "x,y", "", "x,y", "") + f(`pack_logfmt fields (x,z) f1`, "x,y", "", "x,y", "") + + // needed fields intersect with output + f(`pack_logfmt as f2`, "f2,y", "", "*", "") + f(`pack_logfmt fields (x,y) as f2`, "f2,y", "", "x,y", "") +} diff --git a/lib/logstorage/pipe_unpack_syslog_test.go b/lib/logstorage/pipe_unpack_syslog_test.go index ed1a1a7d4..6b7ddd948 100644 --- a/lib/logstorage/pipe_unpack_syslog_test.go +++ b/lib/logstorage/pipe_unpack_syslog_test.go @@ -62,6 +62,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "appname"}, @@ -86,6 +87,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "foobar"}, @@ -106,6 +108,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "appname"}, @@ -137,6 +140,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"priority", "165"}, {"facility", "20"}, {"severity", "5"}, + {"format", "rfc5424"}, {"timestamp", "2023-06-03T17:42:32.123456789Z"}, {"hostname", "mymachine.example.com"}, {"app_name", "appname"}, @@ -183,6 +187,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"qwe_priority", "165"}, {"qwe_facility", "20"}, {"qwe_severity", "5"}, + {"qwe_format", "rfc5424"}, {"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"}, {"qwe_hostname", "mymachine.example.com"}, {"qwe_app_name", "appname"}, @@ -196,6 +201,7 @@ func TestPipeUnpackSyslog(t *testing.T) { {"qwe_priority", "163"}, {"qwe_facility", "20"}, {"qwe_severity", "3"}, + {"qwe_format", "rfc5424"}, {"qwe_timestamp", "2024-12-13T18:21:43Z"}, {"qwe_hostname", "mymachine.example.com"}, {"qwe_app_name", "appname2"}, diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index b9e8df98e..21906df49 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -64,7 +64,27 @@ func (f *Field) marshalToJSON(dst []byte) []byte { return dst } -// MarshalFieldsToJSON appends JSON-marshaled fields to dt and returns the result. +func (f *Field) marshalToLogfmt(dst []byte) []byte { + dst = append(dst, f.Name...) + dst = append(dst, '=') + if needLogfmtQuoting(f.Value) { + dst = strconv.AppendQuote(dst, f.Value) + } else { + dst = append(dst, f.Value...) + } + return dst +} + +func needLogfmtQuoting(s string) bool { + for _, c := range s { + if !isTokenRune(c) { + return true + } + } + return false +} + +// MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result. func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { dst = append(dst, '{') if len(fields) > 0 { @@ -79,6 +99,20 @@ func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { return dst } +// MarshalFieldsToLogfmt appends logfmt-marshaled fields to dst and returns the result. +func MarshalFieldsToLogfmt(dst []byte, fields []Field) []byte { + if len(fields) == 0 { + return dst + } + dst = fields[0].marshalToLogfmt(dst) + fields = fields[1:] + for i := range fields { + dst = append(dst, ' ') + dst = fields[i].marshalToLogfmt(dst) + } + return dst +} + func appendFields(a *arena, dst, src []Field) []Field { for _, f := range src { dst = append(dst, Field{ diff --git a/lib/logstorage/syslog_parser.go b/lib/logstorage/syslog_parser.go index 6bf6f0082..5e4f67fa7 100644 --- a/lib/logstorage/syslog_parser.go +++ b/lib/logstorage/syslog_parser.go @@ -110,6 +110,8 @@ func (p *syslogParser) parseNoHeader(s string) { func (p *syslogParser) parseRFC5424(s string) { // See https://datatracker.ietf.org/doc/html/rfc5424 + p.addField("format", "rfc5424") + if len(s) == 0 { return } @@ -242,6 +244,9 @@ func (p *syslogParser) parseRFC3164(s string) { if len(s) < n { return } + + p.addField("format", "rfc3164") + t, err := time.Parse(time.Stamp, s[:n]) if err != nil { // TODO: fall back to parsing ISO8601 timestamp? diff --git a/lib/logstorage/syslog_parser_test.go b/lib/logstorage/syslog_parser_test.go index 1a61a1fe6..a82154103 100644 --- a/lib/logstorage/syslog_parser_test.go +++ b/lib/logstorage/syslog_parser_test.go @@ -21,47 +21,47 @@ func TestSyslogParser(t *testing.T) { // RFC 3164 f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", - `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", - `{"priority":"165","facility":"20","severity":"5","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", - `{"timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", - `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", - `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) + `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) // RFC 5424 f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, - `{"timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + `{"format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) // Incomplete RFC 3164 f("", `{}`) - f("Jun 3 12:08:33", `{"timestamp":"2024-06-03T12:08:33.000Z"}`) - f("Jun 3 12:08:33 abcd", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) - f("Jun 3 12:08:33 abcd sudo", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) - f("Jun 3 12:08:33 abcd sudo[123]", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) - f("Jun 3 12:08:33 abcd sudo foobar", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) + f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`) + f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) + f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) + f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) + f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) // Incomplete RFC 5424 f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) f(`<165>1 2023-06-03T17:42:32.123456789Z`, - `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424","timestamp":"2023-06-03T17:42:32.123456789Z"}`) f(`<165>1 `, - `{"priority":"165","facility":"20","severity":"5"}`) + `{"priority":"165","facility":"20","severity":"5","format":"rfc5424"}`) }